A data orchestrator for machine learning, analytics, and ETL

@solid(description="Calculates the grams of sugar per cup of each kind of cereal.")
def sugar_by_volume(_, cereals: DataFrame) -> DataFrame:
    df = cereals[["name"]].copy()
    df["sugar_per_cup"] = cereals["sugars"] / cereals["cups"]
    return df


@solid(description="Finds the sugar-per-cup cutoff for the top quartile of cereals.")
def top_quartile_cutoff(_, cereals: DataFrame) -> float:
    return cereals["sugar_per_cup"].quantile(0.75)


@solid(description="Selects cereals whose sugar-per-cup exceeds the given cutoff.")
def sugariest_cereals(_, cereals: DataFrame, cutoff: float) -> DataFrame:
    return cereals[cereals["sugar_per_cup"] > cutoff]


@pipeline
def sugariest_pipeline():
    sugar_by_vol = sugar_by_volume()
    sugariest_cereals(sugar_by_vol, top_quartile_cutoff(sugar_by_vol))
> dagster pipeline execute -c local_development.yaml sugariest_pipeline

Dagit

Dagster comes with a rich UI for viewing and executing data pipelines.

Visualize pipeline topology, with data types

Execute pipelines and digest what happened as structured logs

Manage a history of past runs