DagsterDocs

Advanced: Resources #

You can find the code for this example on Github

Jobs often interact with external resources like Hadoop/Spark clusters or data warehouses like Snowflake or BigQuery. Dagster provides facilities to avoid hard-coding interactions with such systems, so that your business logic can remain the same across different environments (local/test, dev, prod, etc.) Resources represent these external systems, and you can construct multiple jobs that share the same graph of ops, but that each have different resources.

Parameterizing Jobs with Resources #

Dagster models interactions with features of the external environment as resources. Dagster's library modules such as dagster_aws, dagster_gcp, and dagster_slack provide out-of-the-box implementations for many common external services.

Typically, your data processing jobs will want to store their results in a data warehouse somewhere separate from the raw data sources. We'll adjust our toy job so that it does a little more work on our cereal dataset, stores the finished product in a swappable data warehouse, and lets the team know when we're finished.

You might have noticed that our cereal dataset isn't normalized—that is, the serving sizes for some cereals are as small as a quarter of a cup, and for others are as large as a cup and a half. This grossly understates the nutritional difference between our different cereals.

Let's transform our dataset and then store it in a normalized table in the warehouse:

@op(required_resource_keys={"warehouse"})
def normalize_calories(context, cereals):
    columns_to_normalize = [
        "calories",
        "protein",
        "fat",
        "sodium",
        "fiber",
        "carbo",
        "sugars",
        "potass",
        "vitamins",
        "weight",
    ]
    quantities = [cereal["cups"] for cereal in cereals]
    reweights = [1.0 / float(quantity) for quantity in quantities]

    normalized_cereals = deepcopy(cereals)
    for idx in range(len(normalized_cereals)):
        cereal = normalized_cereals[idx]
        for column in columns_to_normalize:
            cereal[column] = float(cereal[column]) * reweights[idx]

    context.resources.warehouse.update_normalized_cereals(normalized_cereals)

Resources are another facility that Dagster makes available on the context object passed to op logic. Note that we've completely encapsulated access to the database behind the call to context.resources.warehouse.update_normalized_cereals. This means that we can easily swap resource implementations—for instance, to test against a local SQLite database instead of a production Snowflake database; to abstract software changes, such as swapping raw SQL for SQLAlchemy; or to accommodate changes in business logic, like moving from an overwriting scheme to append-only, date-partitioned tables.

To implement a resource and specify its config schema, we use the @resource decorator. The decorated function should return whatever object you wish to make available under the specific resource's slot in context.resources. Resource constructor functions have access to their own context argument, which gives access to resource-specific config. (Unlike the contexts we've seen so far, which are instances of OpExecutionContext, this context is an instance of InitResourceContext.)

class LocalSQLiteWarehouse:
    def __init__(self, conn_str):
        self._conn_str = conn_str

    # In practice, you'll probably want to write more generic, reusable logic on your resources
    # than this tutorial example
    def update_normalized_cereals(self, records):
        conn = sqlite3.connect(self._conn_str)
        curs = conn.cursor()
        try:
            curs.execute("DROP TABLE IF EXISTS normalized_cereals")
            curs.execute(
                """CREATE TABLE IF NOT EXISTS normalized_cereals
                (name text, mfr text, type text, calories real,
                 protein real, fat real, sodium real, fiber real,
                 carbo real, sugars real, potass real, vitamins real,
                 shelf real, weight real, cups real, rating real)"""
            )
            curs.executemany(
                """INSERT INTO normalized_cereals VALUES
                (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
                [tuple(record.values()) for record in records],
            )
        finally:
            curs.close()


@resource(config_schema={"conn_str": Field(String)})
def local_sqlite_warehouse_resource(context):
    return LocalSQLiteWarehouse(context.resource_config["conn_str"])

The last thing we need to do is to attach the resource to our job, so that it's properly initialized when the run begins and made available to our op logic as context.resources.warehouse.

@job(resource_defs={"warehouse": local_sqlite_warehouse_resource})
def resources_job():
    normalize_calories(download_csv())

We can put it all together with the following config:

resources:
  warehouse:
    config:
      conn_str: ":memory:"

Here we pass the special string ":memory:" in config as the connection string for our database—this is how SQLite designates an in-memory database.

Different Resources for the Same Graph #

Dagster is built around the observation that any data DAG typically contains a logical core of data transformation, which is reusable across a set of environments (e.g. prod, local, staging). The graph usually needs to be "customized" for each environment, by plugging in configuration and services that are specific to that environment.

For example, you might want to run against an in-memory SQLite database in unit tests, against a local Postgres instance when developing, and against a cloud-hosted Postgres instance in production.

You can model these differences by building multiple jobs that use the same underlying graph of ops, but have different resources to represent the different databases. The graph of ops represents the logical core of data transformation, and the resources on each job customize the behavior of that job for its environment.

Here's an implementation of a SQLAlchemy warehouse resource:

class SqlAlchemyPostgresWarehouse:
    def __init__(self, conn_str):
        self._conn_str = conn_str
        self._engine = sqlalchemy.create_engine(self._conn_str)

    def update_normalized_cereals(self, records):
        Base.metadata.bind = self._engine
        Base.metadata.drop_all(self._engine)
        Base.metadata.create_all(self._engine)
        NormalizedCereal.__table__.insert().execute(records)


@resource(config_schema={"conn_str": Field(String)})
def sqlalchemy_postgres_warehouse_resource(context):
    return SqlAlchemyPostgresWarehouse(context.resource_config["conn_str"])

To build multiple jobs that use the same underlying graph of ops, we first define the graph on its own using the @graph decorator. Then we invoke GraphDefinition.to_job on the graph to construct the jobs with the relevant resources.

@graph
def calories():
    normalize_calories(download_csv())


calories_test_job = calories.to_job(
    resource_defs={"warehouse": local_sqlite_warehouse_resource}
)
calories_dev_job = calories.to_job(
    resource_defs={"warehouse": sqlalchemy_postgres_warehouse_resource}
)

We'll execute our test job using the Python API:

run_config = {
        "resources": {"warehouse": {"config": {"conn_str": ":memory:"}}}
    }
    result = calories_test_job.execute_in_process(run_config=run_config)

Different Config for the Same Graph #

Useful as the Dagit config editor and the ability to stitch together YAML fragments is, once jobs have been deployed and config is unlikely to change, it's often useful to distribute jobs with embedded config. For example, you might point ops at different S3 buckets in different environments, or want to pull database credentials from different environment variables.

To do this, you can supply a config dictionary to the config argument when constructing a job:

@graph
def calories():
    normalize_calories(download_csv())


calories_test_job = calories.to_job(
    resource_defs={"warehouse": local_sqlite_warehouse_resource},
    config={"resources": {"warehouse": {"config": {"conn_str": ":memory:"}}}},
)
calories_dev_job = calories.to_job(
    resource_defs={"warehouse": sqlalchemy_postgres_warehouse_resource},
    config=config_from_files(
        [file_relative_path(__file__, "presets_dev_warehouse.yaml")]
    ),
)

Note that, to construct the config dictionary for the calories_dev_job, we invoke the config_from_files API to load config from a YAML file.

We'll execute our test job using the Python API:

result = calories_test_job.execute_in_process()