Dagster & Snowflake with components
Dagster provides a ready-to-use TemplatedSQLComponent
which can be used alongside the SnowflakeConnectionComponent
provided by the dagster-snowflake library to execute SQL queries in Dagster in order to rebuild data assets in your Snowflake instance. This guide will walk you through how to use these components to execute your SQL.
1. Prepare a Dagster project
To begin, you'll need a Dagster project. You can use an existing components-ready project or create a new one:
create-dagster project my-project && cd my-project/src
Activate the project virtual environment:
source ../.venv/bin/activate
Finally, add the dagster-snowflake
library to the project:
uv add dagster-snowflake
2. Scaffold a SQL component
Now that you have a Dagster project, you can scaffold a templated SQL component. You'll need to provide a name for your component. In this example, we'll create a component that will execute a SQL query to calculate the daily revenue from a table of sales transactions.
dg scaffold defs dagster.TemplatedSqlComponent daily_revenue
Creating defs at /.../my-project/src/my_project/defs/daily_revenue.
The scaffold call will generate a defs.yaml
file:
tree my_project/defs
my_project/defs
├── __init__.py
└── daily_revenue
└── defs.yaml
2 directories, 2 files
3. Configure Snowflake connection
You'll need to configure a Snowflake connection component to enable the SQL component to connect to your Snowflake instance. For more information on Snowflake configuration, see the Using Snowflake with Dagster guide.
First, scaffold a Snowflake connection component:
dg scaffold defs dagster_snowflake.SnowflakeConnectionComponent snowflake_connection
Creating defs at /.../my-project/src/my_project/defs/snowflake_connection.
The scaffold call will generate a connection component configuration:
type: dagster_snowflake.SnowflakeConnectionComponent
attributes:
account: "{{ env.SNOWFLAKE_ACCOUNT }}"
user: "{{ env.SNOWFLAKE_USER }}"
password: "{{ env.SNOWFLAKE_PASSWORD }}"
database: "{{ env.SNOWFLAKE_DATABASE }}"
schema: "{{ env.SNOWFLAKE_SCHEMA }}"
tree my_project/defs
my_project/defs
├── __init__.py
├── daily_revenue
│ └── defs.yaml
└── snowflake_connection
└── defs.yaml
3 directories, 3 files
You will only need a single connection component in your project for each Snowflake instance you'd like to connect to - this connection component can be used by multiple SQL components.
4. Write custom SQL
You can customize the SQL template and define the assets that will be created. Update your defs.yaml
file with a SQL template and template variables. You can also specify properties for the asset in Dagster, such as a group name and kind tag:
type: dagster.TemplatedSqlComponent
attributes:
sql_template: |
SELECT
DATE_TRUNC('day', {{ date_column }}) as date,
SUM({{ amount_column }}) as daily_revenue
FROM {{ table_name }}
WHERE {{ date_column }} >= '{{ start_date }}'
GROUP BY DATE_TRUNC('day', {{ date_column }})
ORDER BY date
sql_template_vars:
table_name: SALES_TRANSACTIONS
date_column: TRANSACTION_DATE
amount_column: SALE_AMOUNT
start_date: "2024-01-01"
connection: "{{ load_component_at_path('snowflake_connection') }}"
assets:
- key: ANALYTICS/DAILY_REVENUE
group_name: analytics
kinds: [snowflake]
You can run dg list defs
to see the asset corresponding to your component:
dg list defs
┏━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Section ┃ Definitions ┃
┡━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Assets │ ┏━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━┓ │
│ │ ┃ Key ┃ Group ┃ Deps ┃ Kinds ┃ Description ┃ │
│ │ ┡━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━┩ │
│ │ │ ANALYTICS/DAILY_REVENUE │ analytics │ │ snowflake │ │ │
│ │ └─────────────────────────┴───────────┴──────┴───────────┴─────────────┘ │
└─────────┴──────────────────────────────────────────────────────────────────────────┘
Using an external SQL file
Instead of embedding SQL directly in your component configuration, you can store SQL in separate files. This approach provides better organization and enables SQL syntax highlighting in your editor.
First, create a SQL file with your query:
SELECT
DATE_TRUNC('day', {{ date_column }}) as date,
SUM({{ amount_column }}) as daily_revenue
FROM {{ table_name }}
WHERE {{ date_column }} >= '{{ start_date }}'
GROUP BY DATE_TRUNC('day', {{ date_column }})
ORDER BY date
tree my_project/defs
my_project/defs
├── __init__.py
├── __pycache__
│ └── __init__.cpython-311.pyc
├── daily_revenue
│ ├── daily_revenue.sql
│ └── defs.yaml
└── snowflake_connection
├── __pycache__
│ └── test_snowflake_utils.cpython-311.pyc
├── defs.yaml
└── test_snowflake_utils.py
5 directories, 7 files
Then update your component configuration to reference the external file:
type: dagster.TemplatedSqlComponent
attributes:
sql_template:
path: daily_revenue.sql
sql_template_vars:
table_name: SALES_TRANSACTIONS
date_column: TRANSACTION_DATE
amount_column: SALE_AMOUNT
start_date: "2024-01-01"
connection: "{{ load_component_at_path('snowflake_connection') }}"
assets:
- key: ANALYTICS/DAILY_REVENUE
group_name: analytics
kinds: [snowflake]
5. Launch your assets
Once your component is configured, you can launch your assets to execute the SQL queries:
dg launch --assets '*'