Great Expectations integration

You can find the code for this example on Github.

This example demonstrates how to use the GE solid factory to test incoming data against a set of expectations built through Great Expectations' tooling.

For this example, we'll be using two versions of a dataset of baseball team payroll and wins, with one version modified to hold incorrect data.

The basic call to the GE solid factory is below, with two arguments: datasource name and expectation suite name.

The optional third argument, validation operator, isn't used here -- since we're just having the default behavior of 'validate the solid, output the results'.

ge_demo.py
payroll_expectations = ge_validation_solid_factory(
    name="ge_validation_solid", datasource_name="getest", suite_name="basic.warning"
)

And here's how you could use that call with other solids.

ge_demo.py
@solid(input_defs=[InputDefinition(name="numrows"), InputDefinition(name="expectation")])
def postprocess_payroll(_, numrows, expectation):
    if expectation["success"]:
        return numrows
    else:
        raise ValueError

(alternatively, one could use hooks to respond to the expectation result see here)

Lets take a brief look at some of the config here.

ge_demo.py
        PresetDefinition(
            "sample_preset_success",
            mode="basic",
            run_config={
                "resources": {
                    "ge_data_context": {
                        "config": {
                            "ge_root_dir": file_relative_path(__file__, "./great_expectations")
                        }
                    }
                },
                "solids": {
                    "read_in_datafile": {
                        "inputs": {
                            "csv_path": {
                                "value": file_relative_path(__file__, "./data/succeed.csv")
                            }
                        }
                    }
                },
            },
        ),
        PresetDefinition(
            "sample_preset_fail",
            mode="basic",
            run_config={
                "resources": {
                    "ge_data_context": {
                        "config": {
                            "ge_root_dir": file_relative_path(__file__, "./great_expectations")
                        }
                    }
                },
                "solids": {
                    "read_in_datafile": {
                        "inputs": {
                            "csv_path": {"value": file_relative_path(__file__, "./data/fail.csv")}
                        }
                    }
                },

We've got two presets which are pretty similar (the only difference is whether they've got a good or bad file passed into the initial solid), the relevant new piece of config is lines 39-43, where the GE data_context is configured. All we need to do to expose GE to Dagster is to provide the root of the GE directory (the path to the great_expectations file on your machine).

Finally, here's the full pipeline using the GE solid, with presets to use both the correct and incorrect data:

ge_demo.py
from dagster_ge.factory import ge_data_context, ge_validation_solid_factory
from pandas import read_csv

from dagster import InputDefinition, ModeDefinition, PresetDefinition, pipeline, solid
from dagster.utils import file_relative_path


@solid
def read_in_datafile(_, csv_path):
    return read_csv(csv_path)


@solid(input_defs=[InputDefinition(name="df")])
def process_payroll(_, df):
    return len(df)


@solid(input_defs=[InputDefinition(name="numrows"), InputDefinition(name="expectation")])
def postprocess_payroll(_, numrows, expectation):
    if expectation["success"]:
        return numrows
    else:
        raise ValueError


payroll_expectations = ge_validation_solid_factory(
    name="ge_validation_solid", datasource_name="getest", suite_name="basic.warning"
)


@pipeline(
    mode_defs=[ModeDefinition("basic", resource_defs={"ge_data_context": ge_data_context})],
    preset_defs=[
        PresetDefinition(
            "sample_preset_success",
            mode="basic",
            run_config={
                "resources": {
                    "ge_data_context": {
                        "config": {
                            "ge_root_dir": file_relative_path(__file__, "./great_expectations")
                        }
                    }
                },
                "solids": {
                    "read_in_datafile": {
                        "inputs": {
                            "csv_path": {
                                "value": file_relative_path(__file__, "./data/succeed.csv")
                            }
                        }
                    }
                },
            },
        ),
        PresetDefinition(
            "sample_preset_fail",
            mode="basic",
            run_config={
                "resources": {
                    "ge_data_context": {
                        "config": {
                            "ge_root_dir": file_relative_path(__file__, "./great_expectations")
                        }
                    }
                },
                "solids": {
                    "read_in_datafile": {
                        "inputs": {
                            "csv_path": {"value": file_relative_path(__file__, "./data/fail.csv")}
                        }
                    }
                },
            },
        ),
    ],
)
def payroll_data_pipeline():
    output_df = read_in_datafile()

    return postprocess_payroll(process_payroll(output_df), payroll_expectations(output_df))

Open in a playground

Open in Gitpod

Download

curl https://codeload.github.com/dagster-io/dagster/tar.gz/master | tar -xz --strip=2 dagster-master/examples/ge_example
cd ge_example