Ask AI

Upgrading to asset definitions#

Familiar with ops and graphs? Want to understand when, why, and how to use asset definitions in Dagster? If so, this guide is for you. We'll also demonstrate what some common Dagster jobs look like before and after using asset definitions.

Before we jump in, here's a quick refresher:

  • An asset is a persistent object in storage, such as a table, machine learning (ML) model, or file.
  • An op is the core unit of computation in Dagster. For example, an op might accept tabular data as its input and produce transformed tabular data as its output.
  • A graph is a directed acyclic graph of ops or other graphs, which execute in order and pass data to each other.
  • An asset definition is a declaration of an asset that should exist and a description of how to compute it: the op or graph that needs to run and the upstream assets that it should run on.

Asset definitions aren't a replacement for Dagster's core computational concepts - ops are, in fact, the core unit of computation that occurs within an asset. Think of them as a top layer that links ops, graphs, and jobs to the long-lived objects they interact with.


Why use asset definitions?#

Using asset definitions means building Dagster jobs in a way that declares ahead of time the assets they produce and consume. This is different than using the AssetMaterialization API, which only informs Dagster at runtime about the assets a job interacted with.

Preemptively declaring assets offers distinct advantages, including:

Lineage#

As asset definitions know what other assets they depend on, an asset's lineage can be viewed easily in the Dagster UI.

Assets help track and define cross-job dependencies. For example, when viewing a job that materializes assets, you can navigate to the jobs that produce the assets that it depends on. Additionally, when an upstream asset has been updated more recently than a downstream asset, Dagster will indicate that the downstream asset might be out of date.

Direct operation#

Using asset definitions enables you to directly operate your assets in the UI. On the Asset's Details page, you can:

  • View the materialization history of the asset
  • Check when the next materialization will occur
  • Launch runs that materialize or re-materialize the asset, including its ancestors or descendants
  • View the sensors or schedules for jobs targeting the asset

Improved code ergonomics#

Asset definitions provide sizeable improvements when it comes to code ergonomics:

  • You'll usually write less code. Specifying the inputs to an asset definition defines the assets it depends on. This means you don't need to use @graph and @job to wire dependencies between ops.

    This approach improves scalability by reducing the number of times an asset's name appears in your codebase by half. Refer to the I/O manager-based example below to see this in action.

  • You no longer have to choose between easy dependency tracking and manageable organization. Without asset definitions, you're often forced to:

    • Contain everything in a single mega-job, which allows for easy dependency tracking but creates maintenance difficulties, OR
    • Split your pipeline into smaller jobs, which allows for easy maintenance but makes dependency tracking difficult

    As assets track their dependencies, you can avoid interruptions in dependency graphs and eliminate the need for input managers.


When should I use asset definitions?#

You should use asset definitions when:

  • You’re using Dagster to produce or maintain assets, AND
  • You know what those assets will be before you launch any runs.

Note that using asset definitions in one job doesn’t mean they need to be used in all your jobs. If your use case doesn't meet these criteria, you can still use graphs and ops.

Still not sure? Check out these examples to see what's a good fit and what isn't:

Use caseGood fit?Explanation
Every day, drop and recreate the users table and the user_recommender_model model that depends on itYesAssets are known before a run and are being updated
Every hour, add a partition to the events tableYesAssets are known before a run and are being updated
Clicking a button refreshes the recommender modelYesAssets are known before a run and are being updated
Every day, send emails to a set of usersNoNo assets are being updated
Every day, read a file of user IDs and change the value of a particular attribute for each userNoThe set of assets to update is not known before running the job.
Every day, scan my warehouse for tables that haven't been used in months and delete themNoThe set of assets to update is not known before running the job.

How do I upgrade jobs to use asset definitions?#

Let's say you've written jobs that you want to enrich using asset definitions. Assuming assets are known and being updated, what would upgrading look like?

Generally, every op output in a job that corresponds to a long-lived object in storage should have an asset definition. The following examples demonstrate some realistic Dagster jobs, both with and without asset definitions:

This isn't an exhaustive list! We're adding the ability to define jobs that materialize assets and then run arbitrary ops. Interested? We'd love to hear from you in Slack or a GitHub discussion.

Materialize two interdependent tables#

This example is a vanilla, op-based job that follows the idiomatic practice of delegating all I/O to I/O managers and input managers.

The goal of each op in the job is to produce an asset. However, because the job doesn't use the asset definition APIs, Dagster is unaware of this:

from pandas import DataFrame

from dagster import Definitions, In, Out, job, op

from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model


@op(
    ins={"raw_users": In(input_manager_key="warehouse")},
    out={"users": Out(io_manager_key="warehouse")},
)
def build_users(raw_users: DataFrame) -> DataFrame:
    users_df = raw_users.dropna()
    return users_df


@op(out={"users_recommender_model": Out(io_manager_key="object_store")})
def build_user_recommender_model(users: DataFrame):
    users_recommender_model = train_recommender_model(users)
    return users_recommender_model


@job(resource_defs={"warehouse": snowflake_io_manager, "object_store": s3_io_manager})
def users_recommender_job():
    build_user_recommender_model(build_users())


defs = Definitions(
    jobs=[users_recommender_job],
)

Materialize two interdependent tables without an I/O manager#

This example does the same things as the previous example, with one difference. This job performs I/O inside of the ops instead of delegating it to I/O managers and input managers:

from pandas import read_sql

from dagster import Definitions, In, Nothing, job, op

from .mylib import create_db_connection, pickle_to_s3, train_recommender_model


@op
def build_users():
    raw_users_df = read_sql("select * from raw_users", con=create_db_connection())
    users_df = raw_users_df.dropna()
    users_df.to_sql(name="users", con=create_db_connection())


@op(ins={"users": In(Nothing)})
def build_user_recommender_model():
    users_df = read_sql("select * from users", con=create_db_connection())
    users_recommender_model = train_recommender_model(users_df)
    pickle_to_s3(users_recommender_model, key="users_recommender_model")


@job
def users_recommender_job():
    build_user_recommender_model(build_users())


defs = Definitions(
    jobs=[users_recommender_job],
)

Not all ops produce assets#

This example demonstrates a job where some of the ops (extract_products and get_categories) don't produce assets of their own. Instead, they produce transient data that downstream ops will use to produce assets:

from pandas import DataFrame

from dagster import Definitions, job, op

from .mylib import create_db_connection, fetch_products


@op
def extract_products() -> DataFrame:
    return fetch_products()


@op
def get_categories(products: DataFrame) -> DataFrame:
    return DataFrame({"category": products["category"].unique()})


@op
def write_products_table(products: DataFrame) -> None:
    products.to_sql(name="products", con=create_db_connection())


@op
def write_categories_table(categories: DataFrame) -> None:
    categories.to_sql(name="categories", con=create_db_connection())


@job
def ingest_products_and_categories():
    products = extract_products()
    product_categories = get_categories(products)
    return write_products_table(products), write_categories_table(product_categories)


defs = Definitions(
    jobs=[ingest_products_and_categories],
)

How do asset definitions work with other Dagster concepts?#

Still not sure how asset definitions fit into your current Dagster usage? In this section, we'll touch on how asset definitions work with some of Dagster's core concepts.

Ops and graphs#

Without asset definitionsWith asset definitions
An op is the basic unit of computationEvery asset definitions includes a graph or an op
A graph is a composite unit of computation that connects multiple opsEvery asset definitions includes a graph or an op
Ops can have multiple outputsMultiple assets can be produced by a single op when defined using the @multi_asset decorator
Ops can use configAssets can use config
Ops can access OpExecutionContextAssets can access OpExecutionContext
Ops can require resourcesAsset definitions can require resources
Ops can be tested by directly invoking themAssets can be tested by directly invoking them