Observe the Airflow DAG
When migrating an entire DAG at once, you must create assets that map to the entire DAG. To do this, you can use assets_with_dag_mappings
, which ensures that each mapped asset receives a materialization when the entire DAG completes.
For our rebuild_customers_list
DAG, let's take a look at what the new observation code looks like:
import os
from pathlib import Path
from dagster import AssetExecutionContext, AssetSpec, Definitions
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
assets_with_dag_mappings,
build_defs_from_airflow_instance,
)
from dagster_dbt import DbtCliResource, DbtProject, dbt_assets
def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)
@dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
# Instead of mapping assets to individual tasks, we map them to the entire DAG.
mapped_assets = assets_with_dag_mappings(
dag_mappings={
"rebuild_customers_list": [
AssetSpec(key=["raw_data", "raw_customers"]),
dbt_project_assets,
AssetSpec(key="customers_csv", deps=["customers"]),
],
},
)
defs = build_defs_from_airflow_instance(
airflow_instance=AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
defs=Definitions(
assets=mapped_assets,
resources={"dbt": DbtCliResource(project_dir=dbt_project_path())},
),
)
Now, instead of getting a materialization when a particular task completes, each mapped asset will receive a materialization when the entire DAG completes.
Next steps
In the next step, "Migrate DAG-mapped assets", we will proxy execution for the entire Airflow DAG in Dagster.