Ask AI

Airflow Migration Tutorial: Observing Assets#

Previously, we completed the "Peering" stage of the Airflow migration process by peering the Airflow instance with a Dagster code location.

The next step is to represent our Airflow workflows more richly by observing the data assets that are produced by our tasks. Similar to the peering step, this stage does not require any changes to Airflow code.

In order to do this, we must define the relevant assets in the Dagster code location.

In our example, we have three sequential tasks:

  1. load_raw_customers loads a CSV file of raw customer data into duckdb.
  2. run_dbt_model builds a series of dbt models (from jaffle shop) combining customer, order, and payment data.
  3. export_customers exports a CSV representation of the final customer file from duckdb to disk.

We will first create a set of asset specs that correspond to the assets produced by these tasks. We will then annotate these asset specs so that Dagster can associate them with the Airflow tasks that produce them.

The first and third tasks involve a single table each. We can manually construct specs for these two tasks. Dagster provides the assets_with_task_mappings utility to annotate our asset specs with the tasks that produce them. Assets which are properly annotated will be materialized by the Airlift sensor once the corresponding task completes: These annotated specs are then provided to the defs argument to build_defs_from_airflow_instance.

We will also create a set of dbt asset definitions for the build_dbt_models task. We can use the dagster-dbt-supplied decorator @dbt_assets to generate these definitions using Dagster's dbt integration.

First, you need to install the extra that has the dbt factory:

uv pip install 'dagster-airlift[dbt]'

Then, we will construct our assets:

import os
from pathlib import Path

from dagster import (
    AssetCheckResult,
    AssetCheckSeverity,
    AssetExecutionContext,
    AssetKey,
    AssetSpec,
    Definitions,
    asset_check,
)
from dagster_airlift.core import (
    AirflowBasicAuthBackend,
    AirflowInstance,
    assets_with_task_mappings,
    build_defs_from_airflow_instance,
)
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets


@asset_check(asset=AssetKey(["airflow_instance_one", "dag", "rebuild_customers_list"]))
def validate_exported_csv() -> AssetCheckResult:
    csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

    if not csv_path.exists():
        return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")

    rows = len(csv_path.read_text().split("\n"))
    if rows < 2:
        return AssetCheckResult(
            passed=False,
            description=f"Export CSV {csv_path} is empty",
            severity=AssetCheckSeverity.WARN,
        )

    return AssetCheckResult(
        passed=True,
        description=f"Export CSV {csv_path} exists",
        metadata={"rows": rows},
    )


def dbt_project_path() -> Path:
    env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
    assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
    return Path(env_val)


@dbt_assets(
    manifest=dbt_project_path() / "target" / "manifest.json",
    project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()


mapped_assets = assets_with_task_mappings(
    dag_id="rebuild_customers_list",
    task_mappings={
        "load_raw_customers": [AssetSpec(key=["raw_data", "raw_customers"])],
        "build_dbt_models": [dbt_project_assets],
        "export_customers": [AssetSpec(key="customers_csv", deps=["customers"])],
    },
)


defs = build_defs_from_airflow_instance(
    airflow_instance=AirflowInstance(
        auth_backend=AirflowBasicAuthBackend(
            webserver_url="http://localhost:8080",
            username="admin",
            password="admin",
        ),
        name="airflow_instance_one",
    ),
    defs=Definitions(
        assets=mapped_assets,
        resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
        asset_checks=[validate_exported_csv],
    ),
)

Viewing observed assets#

Once your assets are set up, you should be able to reload your Dagster definitions and see a full representation of the dbt project and other data assets in your code.

Observed asset graph in Dagster

Kicking off a run of the DAG in Airflow, you should see the newly created assets materialize in Dagster as each task completes.

Note: There will be some delay between task completion and assets materializing in Dagster, managed by the sensor. This sensor runs every 30 seconds by default (you can reduce down to one second via the minimum_interval_seconds argument to sensor).

Moving the asset check#

Now that we've introduced an asset explicitly for the customers.csv file output by the DAG, we should move the asset check constructed during the Peering step to instead be on the customers_csv asset. Simply change the asset targeted by the @asset_check decorator to be AssetKey(["customers_csv"]). Doing this ensures that even when we delete the DAG, the asset check will live on.

When done, our code will look like this.

import os
from pathlib import Path

from dagster import (
    AssetCheckResult,
    AssetCheckSeverity,
    AssetExecutionContext,
    AssetKey,
    AssetSpec,
    Definitions,
    asset_check,
)
from dagster_airlift.core import (
    AirflowBasicAuthBackend,
    AirflowInstance,
    assets_with_task_mappings,
    build_defs_from_airflow_instance,
)
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets


@asset_check(asset=AssetKey(["customers_csv"]))
def validate_exported_csv() -> AssetCheckResult:
    csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

    if not csv_path.exists():
        return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")

    rows = len(csv_path.read_text().split("\n"))
    if rows < 2:
        return AssetCheckResult(
            passed=False,
            description=f"Export CSV {csv_path} is empty",
            severity=AssetCheckSeverity.WARN,
        )

    return AssetCheckResult(
        passed=True,
        description=f"Export CSV {csv_path} exists",
        metadata={"rows": rows},
    )


def dbt_project_path() -> Path:
    env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
    assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
    return Path(env_val)


@dbt_assets(
    manifest=dbt_project_path() / "target" / "manifest.json",
    project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()


mapped_assets = assets_with_task_mappings(
    dag_id="rebuild_customers_list",
    task_mappings={
        "load_raw_customers": [AssetSpec(key=["raw_data", "raw_customers"])],
        "build_dbt_models": [dbt_project_assets],
        "export_customers": [AssetSpec(key="customers_csv", deps=["customers"])],
    },
)


defs = build_defs_from_airflow_instance(
    airflow_instance=AirflowInstance(
        auth_backend=AirflowBasicAuthBackend(
            webserver_url="http://localhost:8080",
            username="admin",
            password="admin",
        ),
        name="airflow_instance_one",
    ),
    defs=Definitions(
        assets=mapped_assets,
        resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
        asset_checks=[validate_exported_csv],
    ),
)

Adding partitions#

If your Airflow tasks produce time-partitioned assets, Airlift can automatically associate your materializations to the relevant partitions. In the case of rebuild_customers_list, data is daily partitioned in each created table, and and the Airflow DAG runs on a @daily cron schedule. We can likewise add a DailyPartitionsDefinition to each of our assets.

import os
from pathlib import Path

from dagster import (
    AssetCheckResult,
    AssetCheckSeverity,
    AssetExecutionContext,
    AssetKey,
    AssetSpec,
    DailyPartitionsDefinition,
    Definitions,
    asset_check,
)
from dagster._time import get_current_datetime_midnight
from dagster_airlift.core import (
    AirflowBasicAuthBackend,
    AirflowInstance,
    assets_with_task_mappings,
    build_defs_from_airflow_instance,
)
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets

PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight())


@asset_check(asset=AssetKey(["customers_csv"]))
def validate_exported_csv() -> AssetCheckResult:
    csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

    if not csv_path.exists():
        return AssetCheckResult(passed=False, description=f"Export CSV {csv_path} does not exist")

    rows = len(csv_path.read_text().split("\n"))
    if rows < 2:
        return AssetCheckResult(
            passed=False,
            description=f"Export CSV {csv_path} is empty",
            severity=AssetCheckSeverity.WARN,
        )

    return AssetCheckResult(
        passed=True,
        description=f"Export CSV {csv_path} exists",
        metadata={"rows": rows},
    )


def dbt_project_path() -> Path:
    env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
    assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
    return Path(env_val)


@dbt_assets(
    manifest=dbt_project_path() / "target" / "manifest.json",
    project=DbtProject(dbt_project_path()),
    partitions_def=PARTITIONS_DEF,
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()


mapped_assets = assets_with_task_mappings(
    dag_id="rebuild_customers_list",
    task_mappings={
        "load_raw_customers": [
            AssetSpec(key=["raw_data", "raw_customers"], partitions_def=PARTITIONS_DEF)
        ],
        "build_dbt_models": [dbt_project_assets],
        "export_customers": [
            AssetSpec(key="customers_csv", deps=["customers"], partitions_def=PARTITIONS_DEF)
        ],
    },
)


defs = build_defs_from_airflow_instance(
    airflow_instance=AirflowInstance(
        auth_backend=AirflowBasicAuthBackend(
            webserver_url="http://localhost:8080",
            username="admin",
            password="admin",
        ),
        name="airflow_instance_one",
    ),
    defs=Definitions(
        assets=mapped_assets,
        resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
        asset_checks=[validate_exported_csv],
    ),
)

Now, every time the sensor synthesizes a materialization for an asset, it will automatically have a partition associated with it.

Let's try this out by kicking off an airflow backfill for today:

airflow dags backfill rebuild_customers_list --start-date $(date +"%Y-%m-%d")

After this dag run completes, you should see a partitioned materialization appear in Dagster.

Partitioned Materialization in Dagster

Let's clear our Airflow runs so that we can kick off this backfill again for testing in the future.

airflow db clean

In order for partitioned assets to work out of the box with dagster-airlift, the following things need to be true:

  • The asset can only be time-window partitioned. This means static, dynamic, and multi partitioned definitions will require custom functionality.
  • The partitioning scheme must match up with the logical_date / execution_date of corresponding Airflow runs. That is, each logical_date should correspond _exactly_ to a partition in Dagster.

Next steps#

Next, it's time to begin migrating our Airflow DAG code to Dagster. Follow along with the Migrate step here