DagsterDocs

Advanced: Materializations #

You can find the code for this example on Github

Steps in a job often produce persistent artifacts, for instance, graphs or tables describing the result of some computation. Typically these artifacts are saved to disk (or to cloud storage) with a name that has something to do with their origin. But it can be hard to organize and cross-reference artifacts produced by many different runs of a job, or to identify all of the files that might have been created by some job's logic.

Dagster ops can describe their persistent artifacts to the system by yielding AssetMaterialization events. Like TypeCheck and ExpectationResult, asset materializations are side-channels for metadata -- they don't get passed to downstream ops and they aren't used to define the data dependencies that structure a job's DAG.

Suppose that we rewrite our sort_calories op so that it saves the newly sorted data frame to disk.

@op
def sort_by_calories(context, cereals):
    sorted_cereals = sorted(
        cereals, key=lambda cereal: int(cereal["calories"])
    )
    least_caloric = sorted_cereals[0]["name"]
    most_caloric = sorted_cereals[-1]["name"]
    context.log.info(f"Least caloric cereal: {least_caloric}")
    context.log.info(f"Most caloric cereal: {most_caloric}")
    fieldnames = list(sorted_cereals[0].keys())
    sorted_cereals_csv_path = os.path.abspath(
        f"output/calories_sorted_{context.run_id}.csv"
    )
    os.makedirs(os.path.dirname(sorted_cereals_csv_path), exist_ok=True)
    with open(sorted_cereals_csv_path, "w") as fd:
        writer = csv.DictWriter(fd, fieldnames)
        writer.writeheader()
        writer.writerows(sorted_cereals)

We've taken the basic precaution of ensuring that the saved csv file has a different filename for each run of the job. But there's no way for Dagit to know about this persistent artifact. So we'll add the following lines:

@op
def sort_by_calories(context, cereals):
    sorted_cereals = sorted(
        cereals, key=lambda cereal: int(cereal["calories"])
    )
    least_caloric = sorted_cereals[0]["name"]
    most_caloric = sorted_cereals[-1]["name"]
    context.log.info(f"Least caloric cereal: {least_caloric}")
    context.log.info(f"Most caloric cereal: {most_caloric}")
    fieldnames = list(sorted_cereals[0].keys())
    sorted_cereals_csv_path = os.path.abspath(
        f"output/calories_sorted_{context.run_id}.csv"
    )
    os.makedirs(os.path.dirname(sorted_cereals_csv_path), exist_ok=True)
    with open(sorted_cereals_csv_path, "w") as fd:
        writer = csv.DictWriter(fd, fieldnames)
        writer.writeheader()
        writer.writerows(sorted_cereals)
    yield AssetMaterialization(
        asset_key="sorted_cereals_csv",
        description="Cereals data frame sorted by caloric content",
        metadata={
            "sorted_cereals_csv_path": EventMetadata.path(
                sorted_cereals_csv_path
            )
        },
    )
    yield Output(None)

Note that we've had to add the last line, yielding an Output. Until now, all of our ops have relied on Dagster's implicit conversion of the return value of a op's compute function into its output. When we explicitly yield other types of events from op logic, we need to also explicitly yield the output so that the framework can recognize them.

Now, if we run this job in Dagit:

materializations.png