Dagster & Pandas
This feature is considered in a beta stage. It is still being tested and may change. For more information, see the API lifecycle stages documentation.
This page describes the dagster-pandas
library, which is used for performing data validation. To simply use pandas with Dagster, start with the Dagster Quickstart.
Dagster makes it easy to use pandas code to manipulate data and then store that data in other systems such as files on Amazon S3 or tables in Snowflake
- Creating Dagster DataFrame Types
- Dagster DataFrame Level Validation
- Dagster DataFrame Summary Statistics
The dagster_pandas
library provides the ability to perform data validation, emit summary statistics, and enable reliable dataframe serialization/deserialization. On top of this, the Dagster type system generates documentation of your dataframe constraints and makes it accessible in the Dagster UI.
Creating Dagster DataFrame Types
To create a custom dagster_pandas
type, use create_dagster_pandas_dataframe_type
and provide a list of PandasColumn
objects which specify column-level schema and constraints. For example, we can construct a custom dataframe type to represent a set of e-bike trips in the following way:
TripDataFrame = create_dagster_pandas_dataframe_type(
name="TripDataFrame",
columns=[
PandasColumn.integer_column("bike_id", min_value=0),
PandasColumn.categorical_column("color", categories={"red", "green", "blue"}),
PandasColumn.datetime_column(
"start_time", min_datetime=Timestamp(year=2020, month=2, day=10)
),
PandasColumn.datetime_column(
"end_time", min_datetime=Timestamp(year=2020, month=2, day=10)
),
PandasColumn.string_column("station"),
PandasColumn.exists("amount_paid"),
PandasColumn.boolean_column("was_member"),
],
)
Once our custom data type is defined, we can use it as the type declaration for the inputs / outputs of our ops:
@op(out=Out(TripDataFrame))
def load_trip_dataframe() -> DataFrame:
return read_csv(
file_relative_path(__file__, "./ebike_trips.csv"),
parse_dates=["start_time", "end_time"],
date_parser=lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f"),
dtype={"color": "category"},
)
By passing in these PandasColumn
objects, we are expressing the schema and constraints we expect our dataframes to follow when Dagster performs type checks for our ops. Moreover, if we go to the op viewer, we can follow our schema documented in the UI:
Dagster DataFrame Level Validation
Now that we have a custom dataframe type that performs schema validation during a run, we can express dataframe level constraints (e.g number of rows, or columns).
To do this, we provide a list of dataframe constraints to create_dagster_pandas_dataframe_type
; for example, using RowCountConstraint
. More information on the available constraints can be found in the dagster_pandas
API docs.
This looks like:
ShapeConstrainedTripDataFrame = create_dagster_pandas_dataframe_type(
name="ShapeConstrainedTripDataFrame", dataframe_constraints=[RowCountConstraint(4)]
)
If we rerun the above example with this dataframe, nothing should change. However, if we pass in 100 to the row count constraint, we can watch our job fail that type check.