Integrating Snowflake with Dagster using resources#
This tutorial focuses on how to store and load Dagster's asset definitions in Snowflake by using Dagster's SnowflakeResource. A resource allows you to directly run SQL queries against tables within an asset's compute function.
By the end of the tutorial, you will:
Configure a Snowflake resource
Use the Snowflake resource to execute a SQL query that creates a table
Load Snowflake tables in downstream assets
Add the assets and Snowflake resource to a Definitions object
Prefer to use an I/O manager? Unlike resources, an I/O manager transfers the responsibility of storing and loading DataFrames as Snowflake tables to Dagster. Refer to the Snowlake I/O manager guide for more info.
To gather the following information, which is required to use the Snowflake resource:
Snowflake account name: You can find this by logging into Snowflake and getting the account name from the URL:
Snowflake credentials: You can authenticate with Snowflake two ways: with a username and password or with a username and private key.
The Snowflake resource can read these authentication values from environment variables. In this guide, we use password authentication and store the username and password as SNOWFLAKE_USER and SNOWFLAKE_PASSWORD, respectively:
One method of authentication is required, either by using a password or a private key.
Optional: Using the warehouse, schema, and role attributes, you can specify where data should be stored and a role for the resource to use.
from dagster_snowflake import SnowflakeResource
from snowflake.connector.pandas_tools import write_pandas
from dagster import Definitions, EnvVar, MaterializeResult, asset
snowflake = SnowflakeResource(
account=EnvVar("SNOWFLAKE_ACCOUNT"),# required
user=EnvVar("SNOWFLAKE_USER"),# required
password=EnvVar("SNOWFLAKE_PASSWORD"),# password or private key required
warehouse="PLANTS",
schema="IRIS",
role="WRITER",)
With this configuration, if you materialized an asset named iris_dataset, SnowflakeResource would use the role WRITER and store the data in the FLOWERS.IRIS.IRIS_DATASET table using the PLANTS warehouse.
For more info about each of the configuration values, refer to the SnowflakeResource API documentation.
In this example, we've defined an asset that fetches the Iris dataset as a Pandas DataFrame. Then, using the Snowflake resource, the DataFrame is stored in Snowflake as the FLOWERS.IRIS.IRIS_DATASET table.
If you have existing tables in Snowflake and other assets defined in Dagster depend on those tables, you may want Dagster to be aware of those upstream dependencies.
Making Dagster aware of these tables allows you to track the full data lineage in Dagster. You can accomplish this by defining external assets for these tables. For example:
from dagster import AssetSpec
iris_harvest_data = AssetSpec(key="iris_harvest_data")
In this example, we created a AssetSpec for a pre-existing table called iris_harvest_data.
Since we supplied the database and the schema in the resource configuration in Step 1, we only need to provide the table name. We did this by using the key parameter in our AssetSpec. When the iris_harvest_data asset needs to be loaded in a downstream asset, the data in the FLOWERS.IRIS.IRIS_HARVEST_DATA table will be selected and provided to the asset.
Once you've created an asset that represents a table in Snowflake, you may want to create additional assets that work with the data. In the following example, we've defined an asset that creates a second table, which contains only the data for the Iris Setosa species:
from dagster_snowflake import SnowflakeResource
from dagster import asset
@asset(deps=["iris_dataset"])defiris_setosa(snowflake: SnowflakeResource)->None:
query ="""
create or replace table iris.iris_setosa as (
SELECT *
FROM iris.iris_dataset
WHERE species = 'Iris-setosa'
);
"""with snowflake.get_connection()as conn:
conn.cursor.execute(query)
To accomplish this, we defined a dependency on the iris_dataset asset using the deps parameter. Then, the SQL query runs and creates the table of Iris Setosa data.