Advanced: Intermediates

You can find the code for this tutorial on Github.

We've already seen how solids can describe their persistent artifacts to the system using materializations.

Dagster also has a facility for automatically materializing the intermediate values that actually pass between solids.

This can be very useful for debugging, when you want to inspect the value output by a solid and ensure that it is as you expect; for audit, when you want to understand how a particular downstream output was created; and for re-executing downstream solids with cached results from expensive upstream computations.

To turn intermediate storage on, just set another key in the pipeline config:

intermediates.yaml
solids:
  read_csv:
    inputs:
      csv_path:
        value: "cereal.csv"
storage:
  filesystem:

When you execute the pipeline using this config, you'll see new structured entries in the Dagit log viewer indicating that intermediates have been stored on the filesystem.

intermediates.png

Re-execution

Once intermediates are being stored, you can individually re-execute steps whose outputs are satisfied by previously stored intermediates through Dagit or the Python API.

Re-execute a pipeline in Dagit

Click on the sort_by_calories.compute execution step, and you'll see the option appear to re-execute the selected step subset, using the automatically materialized intermediate output of the previous solid.

reexecution.png

Re-executing the selected subset, sort_by_calories.compute in this case, will skip the read_csv.compute step and use the previously stored intermediate instead. You may also notice there is a re-execution section present on the right hand side. This section allows you to view and switch between related pipeline runs created from re-execution.

reexecution_results.png

Re-executing step subsets can be very helpful while you're writing solids, or while you're actively debugging only part of a pipeline.

You can also manually specify intermediates from previous runs as inputs to solids. Recall the syntax we used to set input values using the config system:

inputs_env.yaml
solids:
  read_csv:
    inputs:
      csv_path:
        value: "cereal.csv"

Instead of setting the value key (i.e., providing a CSV file), we can also set pickle, as follows:

reexecution_env.yaml
solids:
  sort_by_calories:
    inputs:
      cereals:
        - pickle:
            path: "/dagster_home/storage/2584d954-a30f-4be6-bbfc-c919e4bee84b/intermediates/read_csv.compute/result"

(Of course, you'll need to use the path to an intermediate that is actually present on your filesystem.)

If you directly substitute this config into Dagit, you'll see an error, because the system still expects the input to sort_by_calories to be satisfied by the output from read_csv.

reexecution_errors.png

To make this config valid, we'll need to tell Dagit to execute only a subset of the pipeline --just the sort_by_calories solid. Click on the subset-selector button in the top left of the playground, to the left of the Mode selector (which, when no subset has been specified, will read "*"):

subset_selection.png

Now this config will pass validation, and the individual solid can be re-executed.

This facility is especially valuable during testing, since it allows you to validate newly written solids against values generated during previous runs of a known good pipeline.

Re-execute a pipeline through Python API

Similar to the execute_pipeline() function, we've also introduced a Python API for re-executing pipelines from code: reexecute_pipeline(), where you will need to pass the run ID of the run being re-executed as the parent_run_id argument, and optionally use step_keys_to_execute to specify a list of step keys that you would like to re-execute.

reexecution.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@pipeline
def reexecution_pipeline():
    sort_by_calories(read_csv())


if __name__ == "__main__":
    run_config = {
        "solids": {
            "read_csv": {
                "inputs": {"csv_path": {"value": "../../cereal.csv"}}
            }
        },
        "storage": {"filesystem": {}},
    }
    instance = DagsterInstance.ephemeral()
    result = execute_pipeline(
        reexecution_pipeline, run_config=run_config, instance=instance
    )

    assert result.success

    # skip 'read_csv' and only re-execute step 'sort_by_calories.compute'
    reexecution_result = reexecute_pipeline(
        reexecution_pipeline,
        parent_run_id=result.run_id,
        step_keys_to_execute=["sort_by_calories.compute"],
        instance=instance,
        run_config=run_config,
    )

When you specify step_keys_to_execute as above, you will find that in the logs, the read_csv.compute step says "Copying intermediate object for input result from" the previously stored storage_location/intermediates/read_csv.compute/result. This indicates that you have skipped this step and are using the previously computed result instead during this re-execution.

2020-06-15 22:29:55 - dagster - DEBUG - reexecution_pipeline - 75561568-1f1f-49ab-ab70-30460cc257ad - OBJECT_STORE_OPERATION - Copied intermediate object for input result from /var/folders/fz/klcrnttj13v_8cv3m6_4hlsh0000gn/T/tmpyx2oin8v/storage/9b6d2373-5570-4525-a8b5-257860ec2a6f/intermediates/read_csv.compute/result to /var/folders/fz/klcrnttj13v_8cv3m6_4hlsh0000gn/T/tmpyx2oin8v/storage/75561568-1f1f-49ab-ab70-30460cc257ad/intermediates/read_csv.compute/result
 event_specific_data = {"metadata_entries": [["key", null, ["/var/folders/fz/klcrnttj13v_8cv3m6_4hlsh0000gn/T/tmpyx2oin8v/storage/9b6d2373-5570-4525-a8b5-257860ec2a6f/intermediates/read_csv.compute/result"]]], "op": "CP_OBJECT", "value_name": "result"}
               solid = "read_csv"
    solid_definition = "read_csv"
            step_key = "read_csv.compute"

Intermediate Storage for Custom Data Types

By default, Dagster will try to pickle intermediate values to store them on the filesystem. Some custom data types cannot be pickled (for instance, a Spark RDD), so you will need to tell Dagster how to serialize them.

Our toy LessSimpleDataFrame is, of course, pickleable, but supposing it was not, let's set a custom SerializationStrategy on it to tell Dagster how to store intermediates of this type.

serialization_strategy.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class CsvSerializationStrategy(SerializationStrategy):
    def __init__(self):
        super(CsvSerializationStrategy, self).__init__(
            "csv_strategy", read_mode="r", write_mode="w"
        )

    def serialize(self, value, write_file_obj):
        fieldnames = value[0]
        writer = csv.DictWriter(write_file_obj, fieldnames)
        writer.writeheader()
        writer.writerows(value)

    def deserialize(self, read_file_obj):
        reader = csv.DictReader(read_file_obj)
        return LessSimpleDataFrame([row for row in reader])


@dagster_type_loader({"pickle_path": str})
def less_simple_data_frame_loader(context, config):
    with open(config["pickle_path"], "r") as fd:
        lines = [row for row in csv.DictReader(fd)]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
    return LessSimpleDataFrame(lines)


@usable_as_dagster_type(
    name="LessSimpleDataFrame",
    description=(
        "A naive representation of a data frame, e.g., as returned by "
        "csv.DictReader."
    ),
    serialization_strategy=CsvSerializationStrategy(),
    loader=less_simple_data_frame_loader,
)
class LessSimpleDataFrame(list):

Now, when we set the storage key in the pipeline config and run this pipeline, we'll see that our intermediate is automatically persisted as a human-readable .csv:

serialization_strategy.png