Skip to main content

Observe the Airflow DAG

When migrating an entire DAG at once, you must create assets that map to the entire DAG. To do this, you can use assets_with_dag_mappings, which ensures that each mapped asset receives a materialization when the entire DAG completes.

For our rebuild_customers_list DAG, let's take a look at what the new observation code looks like:

import os
from pathlib import Path

from dagster import AssetExecutionContext, AssetSpec, Definitions
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
assets_with_dag_mappings,
build_defs_from_airflow_instance,
)
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets


def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)


@dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


# Instead of mapping assets to individual tasks, we map them to the entire DAG.
mapped_assets = assets_with_dag_mappings(
dag_mappings={
"rebuild_customers_list": [
AssetSpec(key=["raw_data", "raw_customers"]),
dbt_project_assets,
AssetSpec(key="customers_csv", deps=["customers"]),
],
},
)


defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
),
)

Now, instead of getting a materialization when a particular task completes, each mapped asset will receive a materialization when the entire DAG completes.

Next steps

In the next step, "Migrate DAG-mapped assets", we will proxy execution for the entire Airflow DAG in Dagster.