Software-Defined Assets
Experimental
#

You can find the code for this example on Github

The "Software-defined asset" APIs sit atop of the graph/job/op APIs and enable a novel way of constructing Dagster jobs that puts assets at the forefront. As a reminder, to Dagster, an "asset" is a data product: an object produced by a data pipeline, e.g. a table, ML model, or report.

Conceptually, software-defined assets invert the typical relationship between assets and computation. Instead of defining a graph of ops and recording which assets those ops end up materializing, you define a set of assets, each of which knows how to compute its contents from upstream assets.

Taking a software-defined asset approach has a few main benefits:

  • Write less code - because each asset knows about the assets it depends on, you don't need to use @graph / @job to wire up dependencies between your ops.
  • Track cross-job dependencies via asset lineage - Dagit allows you to find the parents and children of any asset, even if they live in different jobs. This is useful for finding the sources of problems and for understanding the consequences of changing or removing an asset.
  • Know when you need to take action on an asset - In a unified view, Dagster compares the assets you've defined in code to the assets you've materialized in storage. You can catch that you've deployed code for generating a new table, but that you haven't yet materialized it. Or that you've deployed code that adds a column to a table, but that your stored table is still missing that column. Or that you've removed an asset definition, but the table still exists in storage.

In this example, we'll define some tables and generate a Dagster job that updates them. We have a table of temperature samples collected in five-minute increments, and we want to compute a table of the highest temperatures for each day.

Assets computed with Pandas and stored as CSVs#

Defining the assets#

Here are our asset (aka table) definitions.

import pandas as pd
from dagster import AssetKey
from dagster.core.asset_defs import ForeignAsset, asset
from pandas import DataFrame

sfo_q2_weather_sample = ForeignAsset(key=AssetKey("sfo_q2_weather_sample"))


@asset
def daily_temperature_highs(sfo_q2_weather_sample: DataFrame) -> DataFrame:
    """Computes the temperature high for each day"""
    sfo_q2_weather_sample["valid_date"] = pd.to_datetime(sfo_q2_weather_sample["valid"])
    return sfo_q2_weather_sample.groupby("valid_date").max().rename(columns={"tmpf": "max_tmpf"})


@asset
def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame:
    """Computes the 10 hottest dates"""
    return daily_temperature_highs.nlargest(10, "max_tmpf")

sfo_q2_weather_sample represents our base temperature table. It's a ForeignAsset, meaning that it's generated outside Dagster.

daily_temperature_highs represents a computed asset. It's derived by taking the sfo_q2_weather_sample table and applying the decorated function to it. Notice that it's defined using a pure function - i.e. a function with no side effects, just logical data transformation. The code for storing and retrieving the data in persistent storage will be supplied later on - that allows swapping in different implementations in different environments. E.g. we might want to store data in a local csv file for easy testing, but store data a data warehouse in production.

hottest_dates is a computed asset that depends on another computed asset - the daily_temperture_highs asset.

The framework infers asset dependencies by looking at the names of the arguments to the decorated functions. E.g. the function that defines the daily_temperature_highs asset has an argument named sfo_q2_weather_sample - corresponding to the asset of the same name.

Building a job#

Having defined some assets, we can build a job that, when it runs, materializes them - i.e. it computes their contents' and writes them to a location in persistent storage.

weather_job = build_assets_job(
    "weather",
    assets=[daily_temperature_highs, hottest_dates],
    source_assets=[sfo_q2_weather_sample],
    resource_defs={
        "io_manager": IOManagerDefinition.hardcoded_io_manager(LocalFileSystemIOManager())
    },
)

The order that we supply the asset to build_assets_job doesn't matter. build_assets_job determines the execution dependencies based on the dependencies declared in the assets. sfo_q2_weather_sample is included in source_assets because it's not computed as part of the job - just consumed by other assets in the job.

The functions we used to define our assets describe how to compute their contents, but now how to read and write them to persistent storage. For reading and writing, we define an IOManager. In this case, our LocalFileSystemIOManager stores DataFrames as CSVs on the local filesystem:

class LocalFileSystemIOManager(IOManager):
    """Translates between Pandas DataFrames and CSVs on the local filesystem."""

    def _get_fs_path(self, asset_key: AssetKey) -> str:
        rpath = os.path.join(*asset_key.path) + ".csv"
        return os.path.abspath(rpath)

    def handle_output(self, context, obj: DataFrame):
        """This saves the dataframe as a CSV."""
        fpath = self._get_fs_path(context.asset_key)
        obj.to_csv(fpath)

    def load_input(self, context):
        """This reads a dataframe from a CSV."""
        fpath = self._get_fs_path(context.asset_key)
        return pd.read_csv(fpath)

Adding in Spark assets#

Not all the assets in a job need to have the same Python type. Here's an asset whose computation is defined using Spark DataFrames, that depends on the daily_temperature_highs asset we defined above using Pandas.

from dagster.core.asset_defs import asset
from pyspark.sql import DataFrame as SparkDF
from pyspark.sql import Window
from pyspark.sql import functions as f


@asset
def daily_temperature_high_diffs(daily_temperature_highs: SparkDF) -> SparkDF:
    """Computes the difference between each day's high and the previous day's high"""
    window = Window.orderBy("valid_date")
    return daily_temperature_highs.select(
        "valid_date",
        (
            daily_temperature_highs["max_tmpf"]
            - f.lag(daily_temperature_highs["max_tmpf"]).over(window)
        ).alias("day_high_diff"),
    )

Here's an extended version of weather_job that contains the new asset:

spark_weather_job = build_assets_job(
    "spark_weather",
    assets=[daily_temperature_highs, hottest_dates, daily_temperature_high_diffs],
    source_assets=[sfo_q2_weather_sample],
    resource_defs={
        "io_manager": IOManagerDefinition.hardcoded_io_manager(LocalFileSystemIOManager())
    },
)

Defining a multi-type IOManager#

Because the same assets will be written and read into different Python types in different situations, we need to define an IOManager that can handle both of those types. Here's an extended version of the IOManager we defined before:

class LocalFileSystemIOManager(IOManager):
    def _get_fs_path(self, asset_key: AssetKey) -> str:
        return os.path.abspath(os.path.join(*asset_key.path))

    def handle_output(self, context, obj: Union[PandasDF, SparkDF]):
        """This saves the dataframe as a CSV using the layout written and expected by Spark/Hadoop.

        E.g. if the given storage maps the asset's path to the filesystem path "/a/b/c", a directory
        will be created with two files inside it:

            /a/b/c/
                part-00000.csv
         2       _SUCCESS
        """
        if isinstance(obj, PandasDF):
            directory = self._get_fs_path(context.asset_key)
            os.makedirs(directory, exist_ok=True)
            open(os.path.join(directory, "_SUCCESS"), "wb").close()
            csv_path = os.path.join(directory, "part-00000.csv")
            obj.to_csv(csv_path)
        elif isinstance(obj, SparkDF):
            obj.write.format("csv").options(header="true").save(
                self._get_fs_path(context.asset_key), mode="overwrite"
            )
        else:
            raise ValueError("Unexpected input type")

    def load_input(self, context) -> Union[PandasDF, SparkDF]:
        """This reads a dataframe from a CSV using the layout written and expected by Spark/Hadoop.

        E.g. if the given storage maps the asset's path to the filesystem path "/a/b/c", and that
        directory contains:

            /a/b/c/
                part-00000.csv
                part-00001.csv
                _SUCCESS

        then the produced dataframe will contain the concatenated contents of the two CSV files.
        """
        if context.dagster_type.typing_type == PandasDF:
            fs_path = os.path.abspath(self._get_fs_path(context.asset_key))
            paths = glob.glob(os.path.join(fs_path, "*.csv"))
            check.invariant(len(paths) > 0, f"No csv files found under {fs_path}")
            return pd.concat(map(pd.read_csv, paths))
        elif context.dagster_type.typing_type == SparkDF:
            return (
                SparkSession.builder.getOrCreate()
                .read.format("csv")
                .options(header="true")
                .load(self._get_fs_path(context.asset_key))
            )
        else:
            raise ValueError("Unexpected input type")