Migrate Airflow tasks
Previously, we completed the "observe" stage of the Airflow migration process by encoding the assets that are produced by each task. We also introduced partitioning to those assets.
So far, we have left the Airflow code alone, but in this step, we will begin the actual migration process, which will require modifying Airflow code.
Once you have created corresponding Dagster assets for your Airflow tasks, you can proxy execution to Dagster on a per-task basis while Airflow is still controlling scheduling and orchestration. Once a task has been proxied, Airflow will kick off materializations of corresponding Dagster assets in place of executing the business logic of that task.
Create a file to track proxying state
To begin proxying tasks in an Airflow DAG, you will first need to create a file to track proxying state.
For this tutorial, in the example Airflow DAG directory, create a proxied_state
folder. In that folder, create a YAML file with the same name as the example DAG. The included example at airflow_dags/proxied_state
is used by make airflow_run
, and can be used as a template for your own proxied state files.
Given our example rebuild_customers_list
DAG with its three tasks, load_raw_customers
, run_dbt_model
, and export_customers
, your proxied_state/rebuild_customers_list.yaml
file should look like the following:
tasks:
- id: load_raw_customers
proxied: False
- id: build_dbt_models
proxied: False
- id: export_customers
proxied: False
Next, you will need to modify your Airflow DAG to make it aware of the proxied state. This is already done in the example DAG:
# Dags file can be found at tutorial_example/airflow_dags/dags.py
from pathlib import Path
from airflow import DAG
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml
dag = DAG("rebuild_customers_list", ...)
...
# Set this to True to begin the proxying process
PROXYING = False
if PROXYING:
proxying_to_dagster(
global_vars=globals(),
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
)
Set PROXYING
to True
or eliminate the if
statement.
The DAG will now display its proxied state in the Airflow UI. (There is some latency as Airflow evaluates the Python file periodically.)
Migrate individual tasks
In order to proxy a task, you must do two things:
- Ensure all associated assets are executable in Dagster by providing asset definitions in place of bare asset specs.
- Set the
proxied: False
status in theproxied_state
YAML folder toproxied: True
.
Any task marked as proxied will use the DefaultProxyTaskToDagsterOperator
when executed as part of the DAG. This operator will use the Dagster GraphQL API to initiate a Dagster run of the assets corresponding to the task.
The proxied file acts as the source of truth for proxied state. The information is attached to the DAG and then accessed by Dagster via the REST API.
A task which has been proxied can be easily toggled back to run in Airflow (for example, if a bug in implementation was encountered) simply by editing the file to proxied: False
.
Migrate common operators
For some common operator patterns, like our dbt
operator, Dagster supplies factories to build software-defined assets for our tasks. In fact, the @dagster_dbt.dbt_assets
decorator used earlier already backs its assets with definitions, so we can change the proxied state of the build_dbt_models
task to proxied: True
in the proxied state file:
tasks:
- id: load_raw_customers
proxied: False
- id: build_dbt_models
proxied: True
- id: export_customers
proxied: False
It may take up to 30 seconds for the proxied state in the Airflow UI to reflect this change. You must subsequently reload the definitions in Dagster via the UI or by restarting dagster dev
.
You can now run the rebuild_customers_list
DAG in Airflow, and the build_dbt_models
task will be executed in a Dagster run:
You'll note that we proxied a task in the middle of the Airflow DAG. The Airflow DAG structure and execution history is stable in the Airflow UI, but execution of build_dbt_models
has moved to Dagster.
Migrate the remaining custom operators
For all other operator types, we will need to build our own asset definitions. We recommend creating a factory function whose arguments match the inputs to your Airflow operator. Then, you can use this factory to build definitions for each Airflow task.
For example, our load_raw_customers
task uses a custom LoadCSVToDuckDB
operator. We'll define a function load_csv_to_duckdb_defs
factory to build corresponding software-defined assets. Similarly for export_customers
we'll define a function export_duckdb_to_csv_defs
to build software-defined assets:
import os
from pathlib import Path
import dagster as dg
import dagster_airlift.core as dg_airlift_core
import dagster_dbt as dg_dbt
from dagster._time import get_current_datetime_midnight
# Code also invoked from Airflow
from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv
from tutorial_example.shared.load_csv_to_duckdb import LoadCsvToDuckDbArgs, load_csv_to_duckdb
PARTITIONS_DEF = dg.DailyPartitionsDefinition(start_date=get_current_datetime_midnight())
@dg.asset_check(asset=dg.AssetKey(["airflow_instance_one", "dag", "rebuild_customers_list"]))
def validate_exported_csv() -> dg.AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"
if not csv_path.exists():
return dg.AssetCheckResult(
passed=False, description=f"Export CSV {csv_path} does not exist"
)
rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return dg.AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=dg.AssetCheckSeverity.WARN,
)
return dg.AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)
def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)
def airflow_dags_path() -> Path:
return Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "tutorial_example" / "airflow_dags"
# create an executable asset to load the csv file to duckdb
def load_csv_to_duckdb_asset(spec: dg.AssetSpec, args: LoadCsvToDuckDbArgs) -> dg.AssetsDefinition:
@dg.multi_asset(name=f"load_{args.table_name}", specs=[spec])
def _multi_asset() -> None:
load_csv_to_duckdb(args)
return _multi_asset
# create an executable asset to export back to csv
def export_duckdb_to_csv_defs(
spec: dg.AssetSpec, args: ExportDuckDbToCsvArgs
) -> dg.AssetsDefinition:
@dg.multi_asset(name=f"export_{args.table_name}", specs=[spec])
def _multi_asset() -> None:
export_duckdb_to_csv(args)
return _multi_asset
@dg_dbt.dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=dg_dbt.DbtProject(dbt_project_path()),
partitions_def=PARTITIONS_DEF,
)
def dbt_project_assets(context: dg.AssetExecutionContext, dbt: dg_dbt.DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
mapped_assets = dg_airlift_core.assets_with_task_mappings(
dag_id="rebuild_customers_list",
task_mappings={
# instead of just loading the asset specs, we're mapping to fully executable assets now.
"load_raw_customers": [
load_csv_to_duckdb_asset(
dg.AssetSpec(key=["raw_data", "raw_customers"], partitions_def=PARTITIONS_DEF),
LoadCsvToDuckDbArgs(
table_name="raw_customers",
csv_path=airflow_dags_path() / "raw_customers.csv",
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
names=["id", "first_name", "last_name"],
duckdb_schema="raw_data",
duckdb_database_name="jaffle_shop",
),
)
],
"build_dbt_models": [dbt_project_assets],
# instead of just loading the asset specs, we're mapping to fully executable assets now.
"export_customers": [
export_duckdb_to_csv_defs(
dg.AssetSpec(
key="customers_csv", deps=["customers"], partitions_def=PARTITIONS_DEF
),
ExportDuckDbToCsvArgs(
table_name="customers",
csv_path=Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv",
duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb",
duckdb_database_name="jaffle_shop",
),
)
],
},
)
defs = dg_airlift_core.build_defs_from_airflow_instance(
airflow_instance=dg_airlift_core.AirflowInstance(
auth_backend=dg_airlift_core.AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
defs=dg.Definitions(
assets=mapped_assets,
resources={"dbt": dg_dbt.DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)
We can then toggle the proxied state of the remaining tasks in the proxied_state
file:
tasks:
- id: load_raw_customers
proxied: True
- id: build_dbt_models
proxied: True
- id: export_customers
proxied: True
Next steps
In the next step, "Decommission the Airflow DAG", we will remove the DAG from the Airflow directory and update the Dagster code to remove task associations and attach the assets to a schedule.