Ask AI

Using Dagster with Snowflake#

This tutorial focuses on how to store and load Dagster's software-defined assets (SDAs) in Snowflake.

By the end of the tutorial, you will:

  • Configure a Snowflake I/O manager
  • Create a table in Snowflake using a Dagster asset
  • Make a Snowflake table available in Dagster
  • Load Snowflake tables in downstream assets

This guide focuses on storing and loading Pandas DataFrames in Snowflake. Dagster also supports using PySpark DataFrames with Snowflake. The concepts from this guide apply to working with PySpark DataFrames, and you can learn more about setting up and using the Snowflake I/O manager with PySpark DataFrames in the reference guide.


Prerequisites#

To complete this tutorial, you'll need:

  • To install the dagster-snowflake and dagster-snowflake-pandas libraries:

    pip install dagster-snowflake dagster-snowflake-pandas
    
  • To gather the following information, which is required to use the Snowflake I/O manager:

    • Snowflake account name: You can find this by logging into Snowflake and getting the account name from the URL:

    • Snowflake credentials: You can authenticate with Snowflake two ways: with a username and password, or with a username and private key.

      The Snowflake I/O manager can read all of these authentication values from environment variables. In this guide, we use password authentication and store the username and password as SNOWFLAKE_USER and SNOWFLAKE_PASSWORD, respectively.

      export SNOWFLAKE_USER=<your username>
      export SNOWFLAKE_PASSWORD=<your password>
      

      Refer to the Using environment variables and secrets guide for more info.

      For more information on authenticating with a private key, see Authenticating with a private key in the Snowflake reference guide.


Step 1: Configure the Snowflake I/O manager#

The Snowflake I/O manager requires some configuration to connect to your Snowflake instance. The account, user are required to connect with Snowflake. One method of authentication is required. You can use a password or a private key. Additionally, you need to specify a database to where all the tables should be stored.

You can also provide some optional configuration to further customize the Snowflake I/O manager. You can specify a warehouse and schema where data should be stored, and a role for the I/O manager.

from dagster_snowflake_pandas import SnowflakePandasIOManager

from dagster import Definitions, EnvVar

defs = Definitions(
    assets=[iris_dataset],
    resources={
        "io_manager": SnowflakePandasIOManager(
            account="abc1234.us-east-1",  # required
            user=EnvVar("SNOWFLAKE_USER"),  # required
            password=EnvVar("SNOWFLAKE_PASSWORD"),  # password or private key required
            database="FLOWERS",  # required
            role="writer",  # optional, defaults to the default role for the account
            warehouse="PLANTS",  # optional, defaults to default warehouse for the account
            schema="IRIS",  # optional, defaults to PUBLIC
        )
    },
)

With this configuration, if you materialized an asset called iris_dataset, the Snowflake I/O manager would be permissioned with the role writer and would store the data in the FLOWERS.IRIS.IRIS_DATASET table in the PLANTS warehouse.

Finally, in the Definitions object, we assign the SnowflakePandasIOManager to the io_manager key. io_manager is a reserved key to set the default I/O manager for your assets.

For more info about each of the configuration values, refer to the SnowflakePandasIOManager API documentation.


Step 2: Create tables in Snowflake#

The Snowflake I/O manager can create and update tables for your Dagster defined assets, but you can also make existing Snowflake tables available to Dagster.

Store a Dagster asset as a table in Snowflake#

To store data in Snowflake using the Snowflake I/O manager, the definitions of your assets don't need to change. You can tell Dagster to use the Snowflake I/O manager, like in Step 1: Configure the Snowflake I/O manager, and Dagster will handle storing and loading your assets in Snowflake.

import pandas as pd

from dagster import asset


@asset
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )

In this example, we first define our asset. Here, we are fetching the Iris dataset as a Pandas DataFrame and renaming the columns. The type signature of the function tells the I/O manager what data type it is working with, so it is important to include the return type pd.DataFrame.

When Dagster materializes the iris_dataset asset using the configuration from Step 1: Configure the Snowflake I/O manager, the Snowflake I/O manager will create the table FLOWERS.IRIS.IRIS_DATASET if it does not exist and replace the contents of the table with the value returned from the iris_dataset asset.


Step 3: Load Snowflake tables in downstream assets#

Once you have created an asset or source asset that represents a table in Snowflake, you will likely want to create additional assets that work with the data. Dagster and the Snowflake I/O manager allow you to load the data stored in Snowflake tables into downstream assets.

import pandas as pd

from dagster import asset

# this example uses the iris_dataset asset from Step 2


@asset
def iris_cleaned(iris_dataset: pd.DataFrame) -> pd.DataFrame:
    return iris_dataset.dropna().drop_duplicates()

In this example, we want to provide the iris_dataset asset from the Store a Dagster asset as a table in Snowflake example to the iris_cleaned asset.

In iris_cleaned, the iris_dataset parameter tells Dagster that the value for the iris_dataset asset should be provided as input to iris_cleaned. If this feels too magical for you, refer to the docs for explicitly specifying dependencies.

When materializing these assets, Dagster will use the SnowflakePandasIOManager to fetch the FLOWERS.IRIS.IRIS_DATASET as a Pandas DataFrame and pass this DataFrame as the iris_dataset parameter to iris_cleaned. When iris_cleaned returns a Pandas DataFrame, Dagster will use the SnowflakePandasIOManager to store the DataFrame as the FLOWERS.IRIS.IRIS_CLEANED table in Snowflake.


Completed code example#

When finished, your code should look like the following:

import pandas as pd
from dagster_snowflake_pandas import SnowflakePandasIOManager

from dagster import Definitions, EnvVar, SourceAsset, asset

iris_harvest_data = SourceAsset(key="iris_harvest_data")


@asset
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )


@asset
def iris_cleaned(iris_dataset: pd.DataFrame) -> pd.DataFrame:
    return iris_dataset.dropna().drop_duplicates()


defs = Definitions(
    assets=[iris_dataset, iris_harvest_data, iris_cleaned],
    resources={
        "io_manager": SnowflakePandasIOManager(
            account="abc1234.us-east-1",
            user=EnvVar("SNOWFLAKE_USER"),
            password=EnvVar("SNOWFLAKE_PASSWORD"),
            database="FLOWERS",
            role="writer",
            warehouse="PLANTS",
            schema="IRIS",
        )
    },
)

For more Snowflake features, refer to the Snowflake reference.

For more information on software-defined assets, refer to the tutorial or the Assets concept documentation.

For more information on I/O managers, refer to the I/O manager concept documentation.