Dagster Types & Expectations

You can find the code for this tutorial on Github.

Dagster Types

We've seen how we can type the inputs and outputs of solids using Python 3's typing system, and how to use Dagster's built-in config types, such as dagster.String, to define config schemas for our solids.

But what about when you want to define your own types?

This section will talk about the basics of Dagster's user defined types. You can also learn more about typing in Dagster by reading Types Overview.

Let's look back at our simple read_csv solid.

inputs_typed.py
7
8
9
10
11
12
13
14
@solid
def read_csv(context, csv_path: str):
    csv_path = os.path.join(os.path.dirname(__file__), csv_path)
    with open(csv_path, "r") as fd:
        lines = [row for row in csv.DictReader(fd)]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
    return lines

The lines object returned by Python's built-in csv.DictReader is a list of collections.OrderedDict, each of which represents one row of the dataset:

[
    OrderedDict([
        ('name', '100% Bran'), ('mfr', 'N'), ('type', 'C'), ('calories', '70'), ('protein', '4'),
        ('fat', '1'), ('sodium', '130'), ('carbo', '5'), ('sugars', '6'), ('potass', '280'),
        ('vitamins', '25'), ('shelf', '3'), ('weight', '1'), ('cups', '0.33'),
        ('rating', '68.402973')
    ]),
    OrderedDict([
        ('name', '100% Natural Bran'), ('mfr', 'Q'), ('type', 'C'), ('calories', '120'),
        ('protein', '3'), ('fat', '5'), ('sodium', '15'), ('fiber', '2'), ('carbo', '8'),
        ('sugars', '8'), ('potass', '135'), ('vitamins', '0'), ('shelf', '3'), ('weight', '1'),
        ('cups', '1'), ('rating', '33.983679')
    ]),
    ...
]

This is a simple representation of a "data frame", or a table of data. We'd like to be able to use Dagster's type system to type the output of read_csv, so that we can do type checking when we construct the pipeline, ensuring that any solid consuming the output of read_csv expects to receive a data frame.

To do this, we'll use the DagsterType class:

custom_types.py
6
7
8
9
10
SimpleDataFrame = DagsterType(
    name="SimpleDataFrame",
    type_check_fn=lambda _, value: isinstance(value, list),
    description="A naive representation of a data frame, e.g., as returned by csv.DictReader.",
)

Now we can annotate the rest of our pipeline with our new type:

custom_types.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@solid
def read_csv(context, csv_path: str) -> SimpleDataFrame:
    csv_path = os.path.join(os.path.dirname(__file__), csv_path)
    with open(csv_path, "r") as fd:
        lines = [row for row in csv.DictReader(fd)]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
    return lines


@solid
def sort_by_calories(context, cereals: SimpleDataFrame):
    sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
    context.log.info(
        "Least caloric cereal: {least_caloric}".format(
            least_caloric=sorted_cereals[0]["name"]
        )
    )
    context.log.info(
        "Most caloric cereal: {most_caloric}".format(
            most_caloric=sorted_cereals[-1]["name"]
        )
    )

The type metadata now appears in Dagit and the system will ensure the input and output to this solid are indeed instances of SimpleDataFrame. As usual, run:

dagit -f custom_types.py

custom_types_figure_one.png

You can see that the output of read_csv (which by default has the name result) is marked to be of type SimpleDataFrame.


Complex Type Checks

The Dagster framework will fail type checks when a value isn't an instance of the type we're expecting, e.g., if read_csv were to return a str rather than a SimpleDataFrame.

Sometimes we know more about the types of our values, and we'd like to do deeper type checks. For example, in the case of the SimpleDataFrame, we expect to see a list of OrderedDicts, and for each of these OrderedDicts to have the same fields, in the same order.

The type check function allows us to do this:

custom_types_2.py
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def less_simple_data_frame_type_check(_, value):
    if not isinstance(value, list):
        return False

    fields = [field for field in value[0].keys()]

    for i in range(len(value)):
        row = value[i]
        if not isinstance(row, dict):
            return False
        row_fields = [field for field in row.keys()]
        if fields != row_fields:
            return False

    return True


LessSimpleDataFrame = DagsterType(
    name="LessSimpleDataFrame",
    description="A more sophisticated data frame that type checks its structure.",
    type_check_fn=less_simple_data_frame_type_check,
)

Now, if our solid logic fails to return the right type, we'll see a type check failure. Let's replace our read_csv solid with the following bad logic:

custom_types_2.py
32
33
34
35
36
37
38
def bad_read_csv(context, csv_path: str) -> LessSimpleDataFrame:
    csv_path = os.path.join(os.path.dirname(__file__), csv_path)
    with open(csv_path, "r") as fd:
        lines = [row for row in csv.DictReader(fd)]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
    return ["not_a_dict"]

When we run the pipeline with this solid, we'll see an error like:

2020-06-15 14:08:00 - dagster - ERROR - custom_type_pipeline - 7fd9dce5-10e8-44ed-ba0c-0d48fdf9a5fc - STEP_FAILURE - Execution of step "bad_read_csv.compute" failed.
            cls_name = "AttributeError"
       error_message = "AttributeError: 'str' object has no attribute 'keys'\n"
               solid = "bad_read_csv"
    solid_definition = "bad_read_csv"
            step_key = "bad_read_csv.compute"

Providing Input Values for Custom Types in Config

We saw earlier how, when a solid doesn't receive all of its inputs from other solids further upstream in the pipeline, we can specify its input values in config:

inputs_env.yaml
solids:
  read_csv:
    inputs:
      csv_path:
        value: "cereal.csv"

The Dagster framework knows how to interpret values provided via config as scalar inputs. In this case, read_csv just takes the string representation of the filepath from which it'll read a CSV. But for more complex, custom types, we need to tell Dagster how to interpret config values.

Consider our LessSimpleDataFrame. It might be convenient if Dagster knew automatically how to read a data frame in from a CSV file, without us needing to separate that logic into the read_csv solidespecially if we knew the provenance and format of that CSV file (e.g., if we were using standard CSVs as an internal interchange format) and didn't need the full configuration surface of a general purpose read_csv solid.

What we want to be able to do is write:

custom_type_input.yaml
solids:
  sort_by_calories:
    inputs:
      cereals:
        csv: "cereal.csv"

In order for the Dagster machinery to be able to decode the config value {'csv': 'cereal.csv'} into an input of the correct LessSimpleDataFrame value, we need to write what we call a type loader.

custom_types_3.py
32
33
34
35
36
37
38
39
@dagster_type_loader(Selector({"csv": Field(String)}))
def less_simple_data_frame_loader(context, selector):
    csv_path = os.path.join(os.path.dirname(__file__), selector["csv"])
    with open(csv_path, "r") as fd:
        lines = [row for row in csv.DictReader(fd)]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
    return lines

A function decorated with @dagster_type_loader should take the context object, as usual, and a parameter representing the parsed config field. The schema for this field is defined by the argument to the @dagster_type_loader decorator.

Here, we introduce the Selector type, which lets you specify mutually exclusive options in config schemas. Here, there's only one option, csv, but you can imagine a more sophisticated data frame type that might also know how to load its inputs from other formats and sources, and might have a selector with fields like parquet, xlsx, sql, etc.

Then insert this into the original declaration:

custom_types_3.py
42
43
44
45
46
47
LessSimpleDataFrame = DagsterType(
    name="LessSimpleDataFrame",
    description="A more sophisticated data frame that type checks its structure.",
    type_check_fn=less_simple_data_frame_type_check,
    loader=less_simple_data_frame_loader,
)

Now if you run a pipeline with this solid from Dagit you will be able to provide sources for these inputs via config:

custom_types_3.py
70
71
72
73
74
75
76
77
78
79
80
if __name__ == "__main__":
    execute_pipeline(
        custom_type_pipeline,
        {
            "solids": {
                "sort_by_calories": {
                    "inputs": {"cereals": {"csv": "cereal.csv"}}
                }
            }
        },
    )

Testing Custom Types

As you write your own custom types, you'll also want to set up unit tests that ensure your types are doing what you expect them to. Dagster includes a utility function, check_dagster_type, that lets you type check any Dagster type against any value.

custom_types_test.py
102
103
104
105
106
107
108
109
110
111
112
113
114
def test_less_simple_data_frame():
    assert check_dagster_type(
        LessSimpleDataFrame, [{"foo": 1}, {"foo": 2}]
    ).success

    type_check = check_dagster_type(
        LessSimpleDataFrame, [{"foo": 1}, {"bar": 2}]
    )
    assert not type_check.success
    assert type_check.description == (
        "Rows in LessSimpleDataFrame should have the same fields, "
        "got ['bar'] for row 2, expected ['foo']"
    )

Well tested library types can be reused across solids and pipelines to provide standardized type checking within your organization's data applications.


MyPy Compliance

Note: this section requires Python 3.

In cases where DagsterTypes are created that do not have corresponding usable Python types, and the user wishes to remain mypy compliant, there are two options.

One is using InputDefinition and OutputDefinition exclusively for Dagster types, and reserving type annotations for naked Python types only. This is verbose, but is explicit and clear.

custom_types_mypy_verbose.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@solid(
    input_defs=[InputDefinition("csv_path", String)],
    output_defs=[OutputDefinition(SimpleDataFrame)],
)
def read_csv(context, csv_path: str) -> list:
    csv_path = os.path.join(os.path.dirname(__file__), csv_path)
    with open(csv_path, "r") as fd:
        lines = [row for row in csv.DictReader(fd)]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
    return lines


@solid(input_defs=[InputDefinition("cereals", SimpleDataFrame)])
def sort_by_calories(context, cereals: list):
    sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
    context.log.info(
        "Least caloric cereal: {least_caloric}".format(
            least_caloric=sorted_cereals[0]["name"]
        )
    )
    context.log.info(
        "Most caloric cereal: {most_caloric}".format(
            most_caloric=sorted_cereals[-1]["name"]
        )
    )

If one wishes to use type annotations exclusively but still use Dagster types without a 1:1 Python type counterpart, the typechecking behavior must be modified. For this we recommend using the typing.TYPE_CHECKING property in the Python typing module.

While inelegant, this centralizes boilerplate to the type instantiation, rather than having it on all places where the type is referenced.

custom_types_mypy_typing_trick.py
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if typing.TYPE_CHECKING:
    SimpleDataFrame = list
else:
    SimpleDataFrame = DagsterType(
        name="SimpleDataFrame",
        type_check_fn=lambda _, value: isinstance(value, list),
        description="A naive representation of a data frame, e.g., as returned by csv.DictReader.",
    )


@solid
def read_csv(context, csv_path: str) -> SimpleDataFrame:
    csv_path = os.path.join(os.path.dirname(__file__), csv_path)
    with open(csv_path, "r") as fd:
        lines = [row for row in csv.DictReader(fd)]

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
    return lines

Metadata and Custom Type Checks

Custom types can also yield metadata about the type check. For example, in the case of our data frame, we might want to record the number of rows and columns in the dataset when our type checks succeed, and provide more information about why type checks failed when they fail. User-defined type check functions can optionally return a TypeCheck object that contains metadata about the success or failure of the type check. Let's see how to use this to emit some summary statistics about our DataFrame type:

custom_types_4.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def less_simple_data_frame_type_check(_, value):
    if not isinstance(value, list):
        return TypeCheck(
            success=False,
            description=(
                "LessSimpleDataFrame should be a list of dicts, got "
                "{type_}"
            ).format(type_=type(value)),
        )

    fields = [field for field in value[0].keys()]

    for i in range(len(value)):
        row = value[i]
        if not isinstance(row, dict):
            return TypeCheck(
                success=False,
                description=(
                    "LessSimpleDataFrame should be a list of dicts, "
                    "got {type_} for row {idx}"
                ).format(type_=type(row), idx=(i + 1)),
            )
        row_fields = [field for field in row.keys()]
        if fields != row_fields:
            return TypeCheck(
                success=False,
                description=(
                    "Rows in LessSimpleDataFrame should have the same fields, "
                    "got {actual} for row {idx}, expected {expected}"
                ).format(actual=row_fields, idx=(i + 1), expected=fields),
            )

    return TypeCheck(
        success=True,
        description="LessSimpleDataFrame summary statistics",
        metadata_entries=[
            EventMetadataEntry.text(
                str(len(value)),
                "n_rows",
                "Number of rows seen in the data frame",
            ),
            EventMetadataEntry.text(
                str(len(value[0].keys()) if len(value) > 0 else 0),
                "n_cols",
                "Number of columns seen in the data frame",
            ),
            EventMetadataEntry.text(
                str(list(value[0].keys()) if len(value) > 0 else []),
                "column_names",
                "Keys of columns seen in the data frame",
            ),
        ],
    )

A TypeCheck must include a success argument describing whether the check passed or failed, and may include a description and/or a list of EventMetadataEntry objects. You should use the static constructors on EventMetadataEntry to construct these objects, which are flexible enough to support arbitrary metadata in JSON or Markdown format.

Dagit knows how to display and archive structured metadata of this kind for future review:

custom_types_figure_two.png


Expectations

Custom type checks and metadata are appropriate for checking that a value will behave as we expect, and for collecting summary information about values.

But sometimes we want to make more specific, data- and business logic-dependent assertions about the semantics of values. It typically isn't appropriate to embed assertions like these into data types directly.

For one, they will usually vary substantially between instantiationsfor example, we don't expect all data frames to have the same number of columns, and over-specifying data types (e.g., SixColumnedDataFrame) makes it difficult to write logic that works generically (e.g., over all data frames).

What's more, these additional, deeper semantic assertions are often non-stationary. Typically, you'll start running a pipeline with certain expectations about the data that you'll see; but over time, you'll learn more about your data (making your expectations more precise), and the process that generates your data will shift (making some of your expectations invalid).

We've already encountered the TypeCheck event, which is typically yielded by the type machinery (but can also be yielded manually from the body of a solid's compute function); ExpectationResult is another kind of structured side-channel result that a solid can yield. These extra events don't get passed to downstream solids and they aren't used to define the data dependencies of a pipeline DAG.

custom_types_bad_5.py
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def expect_column_to_be_integers(
    data_frame: LessSimpleDataFrame, column_name: str
) -> ExpectationResult:
    bad_values = []
    for idx in range(len(data_frame)):
        line = data_frame[idx]
        if not isinstance(line[column_name], int):
            bad_values.append((idx, str(line[column_name])))
    return ExpectationResult(
        success=(not bad_values),
        label="col_{column_name}_is_int".format(column_name=column_name),
        description=(
            "Check whether type of column {column_name} in "
            "LessSimpleDataFrame is int"
        ).format(column_name=column_name),
        metadata_entries=[
            EventMetadataEntry.json(
                {"index": idx, "bad_value": value},
                "bad_value",
                "Bad value in column {column_name}".format(
                    column_name=column_name
                ),
            )
            for (idx, value) in bad_values
        ],
    )


@solid
def sort_by_calories(context, cereals: LessSimpleDataFrame):
    yield expect_column_to_be_integers(cereals, "calories")
    sorted_cereals = sorted(cereals, key=lambda cereal: cereal["calories"])
    context.log.info(
        "Least caloric cereal: {least_caloric}".format(
            least_caloric=sorted_cereals[0]["name"]
        )
    )
    context.log.info(
        "Most caloric cereal: {most_caloric}".format(
            most_caloric=sorted_cereals[-1]["name"]
        )

Until now, every solid we've encountered has returned its result value, or None. But solids can also yield events of various types for side-channel communication about the results of their computations.

Running this pipeline yields an ExpectationResult with success set to False since we expect entries in the calories column to be of type int but they are of type string. We note that this precedes our incorrect result that the least caloric cereal is Corn Flakes (100 calories per serving) and the most caloric cereal Strawberry Fruit Wheats (90 calories per serving).

custom_types_bad_data.png

To fix this, we can cast calories to int during the loading process:

custom_types_5.py
75
76
77
78
79
80
81
82
83
84
85
@dagster_type_loader(Selector({"csv": Field(String)}))
def less_simple_data_frame_loader(context, selector):
    lines = []
    csv_path = os.path.join(os.path.dirname(__file__), selector["csv"])
    with open(csv_path, "r") as fd:
        for row in csv.DictReader(fd):
            row["calories"] = int(row["calories"])
            lines.append(row)

    context.log.info("Read {n_lines} lines".format(n_lines=len(lines)))
    return lines

Running this pipeline yields an ExpectationResult with success set to True and the correct result that the least caloric cereal is All-Bran with Extra Fiber (50 calories per serving) and the most caloric cereal is Mueslix Crispy Blend (160 calories per serving).

This part of this system remains relatively immature, but yielding structured expectation results from your solid logic means that in future, tools like Dagit will be able to aggregate and track expectation results, as well as implement sophisticated policy engines to drive alerting and exception handling on a deep semantic basis.


Conclusion

🎉 Congratulations! Having reached this far, you now have a working, testable, and maintainable data pipeline. You’ve also learned the basics of Dagster, and you should now be able to build your own data applications using Dagster!