For this example, we'll be using two versions of a dataset of baseball team payroll and wins, with one version modified to hold incorrect data.
You can use
ge_validation_op_factory to generate Dagster ops that integrate with Great Expectations. For example, here we show a basic call to this GE op factory, with two required arguments:
datasource_name and expectation
payroll_expectations = ge_validation_op_factory( name="ge_validation_op", datasource_name="getest", suite_name="basic.warning" )
The GE validations will happen inside the ops created above. Each of the ops will yield an
ExpectationResult with a structured dict of metadata from the GE suite. The structured metadata contain both summary stats from the suite and expectation by expectation results. The op will output the full result in case you want to process it differently. Here's how other ops could use the full result, where
expectation is the result:
@op def postprocess_payroll(numrows, expectation): if expectation["success"]: return numrows else: raise ValueError
You can configure the GE Data Context via the
ge_data_context resource from
dagster-ge integration package. All we need to do to expose GE to Dagster is to provide the root of the GE directory (the path to the great_expectations file on your machine).
Finally, here's the full job definition using the GE op, with a default run configuration to use the correct set of data:
@job def payroll_data(): output_df = read_in_datafile() postprocess_payroll(process_payroll(output_df), payroll_expectations(output_df))
We can see that we can easily swap the path for
fail.csv to exercise our job with incorrect data.