Skip to main content

Using Snowflake with Dagster resources

This tutorial focuses on how to store and load Dagster's asset definitions in Snowflake by using Dagster's SnowflakeResource. A resource allows you to directly run SQL queries against tables within an asset's compute function.

By the end of the tutorial, you will:

  • Configure a Snowflake resource
  • Use the Snowflake resource to execute a SQL query that creates a table
  • Load Snowflake tables in downstream assets
  • Add the assets and Snowflake resource to a Definitions object

Prefer to use an I/O manager? Unlike resources, an I/O manager transfers the responsibility of storing and loading DataFrames as Snowflake tables to Dagster. Refer to the Snowlake I/O manager guide for more info.

Prerequisites

To complete this tutorial, you'll need:

  • To install the following libraries:

    pip install dagster dagster-snowflake pandas
  • To gather the following information, which is required to use the Snowflake resource:

    • Snowflake account name: You can find this by logging into Snowflake and getting the account name from the URL:

      Snowflake account name in URL

    • Snowflake credentials: You can authenticate with Snowflake two ways: with a username and password or with a username and private key.

      The Snowflake resource can read these authentication values from environment variables. In this guide, we use password authentication and store the username and password as SNOWFLAKE_USER and SNOWFLAKE_PASSWORD, respectively:

      export SNOWFLAKE_USER=<your username>
      export SNOWFLAKE_PASSWORD=<your password>

      Refer to the Using environment variables and secrets guide for more info.

      For more information on authenticating with a private key, see Authenticating with a private key in the Snowflake reference guide.

Step 1: Configure the Snowflake resource

To connect to Snowflake, we'll use the dagster-snowflake SnowflakeResource. The SnowflakeResource requires some configuration:

  • The account and user values are required.
  • One method of authentication is required, either by using a password or a private key.
  • Optional: Using the warehouse, schema, and role attributes, you can specify where data should be stored and a role for the resource to use.
from dagster_snowflake import SnowflakeResource
from snowflake.connector.pandas_tools import write_pandas

from dagster import Definitions, EnvVar, MaterializeResult, asset

snowflake = SnowflakeResource(
account=EnvVar("SNOWFLAKE_ACCOUNT"), # required
user=EnvVar("SNOWFLAKE_USER"), # required
password=EnvVar("SNOWFLAKE_PASSWORD"), # password or private key required
warehouse="PLANTS",
schema="IRIS",
role="WRITER",
)

With this configuration, if you materialized an asset named iris_dataset, SnowflakeResource would use the role WRITER and store the data in the FLOWERS.IRIS.IRIS_DATASET table using the PLANTS warehouse.

For more info about each of the configuration values, refer to the SnowflakeResource API documentation.

Step 2: Create tables in Snowflake

Using the Snowflake resource, you can create Snowflake tables using the Snowflake Python API:

import pandas as pd
from dagster_snowflake import SnowflakeResource
from snowflake.connector.pandas_tools import write_pandas

from dagster import MaterializeResult, asset


@asset
def iris_dataset(snowflake: SnowflakeResource):
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"species",
],
)

with snowflake.get_connection() as conn:
table_name = "iris_dataset"
database = "flowers"
schema = "iris"
success, number_chunks, rows_inserted, output = write_pandas(
conn,
iris_df,
table_name=table_name,
database=database,
schema=schema,
auto_create_table=True,
overwrite=True,
quote_identifiers=False,
)

return MaterializeResult(
metadata={"rows_inserted": rows_inserted},
)

In this example, we've defined an asset that fetches the Iris dataset as a Pandas DataFrame. Then, using the Snowflake resource, the DataFrame is stored in Snowflake as the FLOWERS.IRIS.IRIS_DATASET table.

Step 3: Define downstream assets

Once you've created an asset that represents a table in Snowflake, you may want to create additional assets that work with the data. In the following example, we've defined an asset that creates a second table, which contains only the data for the Iris Setosa species:

from dagster_snowflake import SnowflakeResource

from dagster import asset


@asset(deps=["iris_dataset"])
def iris_setosa(snowflake: SnowflakeResource) -> None:
query = """
create or replace table iris.iris_setosa as (
SELECT *
FROM iris.iris_dataset
WHERE species = 'Iris-setosa'
);
"""

with snowflake.get_connection() as conn:
conn.cursor.execute(query)

To accomplish this, we defined a dependency on the iris_dataset asset using the deps parameter. Then, the SQL query runs and creates the table of Iris Setosa data.

Step 4: Definitions object

The last step is to add the SnowflakeResource and the assets to the project's Definitions object:

from dagster import Definitions

defs = Definitions(
assets=[iris_dataset, iris_setosa], resources={"snowflake": snowflake}
)

This makes the resource and assets available to Dagster tools like the UI and CLI.

Completed code example

When finished, your code should look like the following:

import pandas as pd
from dagster_snowflake import SnowflakeResource
from snowflake.connector.pandas_tools import write_pandas

from dagster import Definitions, EnvVar, MaterializeResult, asset

snowflake = SnowflakeResource(
account=EnvVar("SNOWFLAKE_ACCOUNT"), # required
user=EnvVar("SNOWFLAKE_USER"), # required
password=EnvVar("SNOWFLAKE_PASSWORD"), # password or private key required
warehouse="PLANTS",
schema="IRIS",
role="WRITER",
)


@asset
def iris_dataset(snowflake: SnowflakeResource):
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"species",
],
)

with snowflake.get_connection() as conn:
table_name = "iris_dataset"
database = "flowers"
schema = "iris"
success, number_chunks, rows_inserted, output = write_pandas(
conn,
iris_df,
table_name=table_name,
database=database,
schema=schema,
auto_create_table=True,
overwrite=True,
quote_identifiers=False,
)

return MaterializeResult(
metadata={"rows_inserted": rows_inserted},
)


@asset(deps=["iris_dataset"])
def iris_setosa(snowflake: SnowflakeResource) -> None:
query = """
create or replace table iris.iris_setosa as (
SELECT *
FROM iris.iris_dataset
WHERE species = 'Iris-setosa'
);
"""

with snowflake.get_connection() as conn:
conn.cursor.execute(query)


defs = Definitions(
assets=[iris_dataset, iris_setosa], resources={"snowflake": snowflake}
)