Skip to main content

Migrate DAG-mapped assets

Previously, we completed the "observe" stage of the Airflow DAG-level migration process by encoding the assets that are produced by each task. We also introduced partitioning to those assets.

In the task-level migration step, we "proxied" execution on a per-task basis through a YAML document. For DAG-mapped assets, execution is proxied on a per-DAG basis. Proxying execution to Dagster will require all assets mapped to that DAG be executable within Dagster. Let's take a look at some fully migrated code mapped to DAGs instead of tasks:

import os
from pathlib import Path

import dagster as dg
import dagster_airlift.core as dg_airlift_core
import dagster_dbt as dg_dbt

# Code also invoked from Airflow
from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv
from tutorial_example.shared.load_csv_to_duckdb import LoadCsvToDuckDbArgs, load_csv_to_duckdb


def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)


def airflow_dags_path() -> Path:
return Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "tutorial_example" / "airflow_dags"


def load_csv_to_duckdb_asset(spec: dg.AssetSpec, args: LoadCsvToDuckDbArgs) -> dg.AssetsDefinition:
@dg.multi_asset(name=f"load_{args.table_name}", specs=[spec])
def _multi_asset() -> None:
load_csv_to_duckdb(args)

return _multi_asset


def export_duckdb_to_csv_defs(
spec: dg.AssetSpec, args: ExportDuckDbToCsvArgs
) -> dg.AssetsDefinition:
@dg.multi_asset(name=f"export_{args.table_name}", specs=[spec])
def _multi_asset() -> None:
export_duckdb_to_csv(args)

return _multi_asset


@dg_dbt.dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=dg_dbt.DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: dg.AssetExecutionContext, dbt: dg_dbt.DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


mapped_assets = dg_airlift_core.assets_with_dag_mappings(
dag_mappings={
"rebuild_customers_list": [
load_csv_to_duckdb_asset(
dg.AssetSpec(key=["raw_data", "raw_customers"]),
LoadCsvToDuckDbArgs(
table_name="raw_customers",
csv_path=airflow_dags_path() / "raw_customers.csv",
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
names=["id", "first_name", "last_name"],
duckdb_schema="raw_data",
duckdb_database_name="jaffle_shop",
),
),
dbt_project_assets,
export_duckdb_to_csv_defs(
dg.AssetSpec(key="customers_csv", deps=["customers"]),
ExportDuckDbToCsvArgs(
table_name="customers",
csv_path=Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv",
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
duckdb_database_name="jaffle_shop",
),
),
],
},
)


defs = dg_airlift_core.build_defs_from_airflow_instance(
airflow_instance=dg_airlift_core.AirflowInstance(
auth_backend=dg_airlift_core.AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
defs=dg.Definitions(
assets=mapped_assets,
resources={"dbt": dg_dbt.DbtCliResource(project_dir=dbt_project_path())},
),
)

Now that all of our assets are fully executable, we can create a simple YAML file to proxy execution for the whole DAG:

proxied: True 

We will similarly use proxying_to_dagster at the end of our DAG file. The code is exactly the same here as it is for the per-task migration step:

# Dags file can be found at tutorial_example/airflow_dags/dags.py
from pathlib import Path

from airflow import DAG
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml

dag = DAG("rebuild_customers_list", ...)

...

# Set this to True to begin the proxying process
PROXYING = False

if PROXYING:
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
)

Once proxied is changed to True, we can visit the Airflow UI and see that our tasks have been replaced with a single task:

Before DAG proxying

After DAG proxying

When performing DAG-level mapping, we don't preserve task structure in the Airflow DAGs. This single task will materialize all mapped Dagster assets instead of executing the original Airflow task business logic.

We can similarly change proxied back to False, and the original task structure and business logic will return unchanged.