Skip to main content

Observe Airflow tasks

In the previous step, "Peer the Airflow instance with a Dagster code location", we connected the example Airflow instance to a Dagster code location.

The next step is to represent the Airflow workflows more richly by observing the data assets that are produced by the Airflow tasks. Similar to the peering step, this step does not require any changes to Airflow code.

Create asset specs for Airflow tasks

In order to observe the assets produced by the Airflow tasks in this tutorial, you will need to define the relevant assets in the Dagster code location.

In this example, there are three sequential tasks:

  1. load_raw_customers loads a CSV file of raw customer data into duckdb.
  2. build_dbt_models builds a series of dbt models (from jaffle shop) combining customer, order, and payment data.
  3. export_customers exports a CSV representation of the final customer file from duckdb to disk.

First, you will need to create a set of AssetSpecs that correspond to the assets produced by these tasks. Next, you will annotate these asset specs so Dagster can associate them with the Airflow tasks that produce them.

The first and third tasks involve a single table each, so we will manually construct asset specs for these two tasks. We will use the assets_with_task_mappings function in the dagster-airlift package to annotate these asset specs with the tasks that produce them. Assets which are properly annotated will be materialized by the Airlift sensor once the corresponding task completes, and these annotated specs are then provided to the defs argument to defs_from_airflow_instance.

The second task, build_dbt_models, will require building a set of dbt asset definitions. We will use the @dagster_dbt.dbt_assets decorator from the dagster-dbt package to generate these definitions using Dagster's dbt integration.

First, install the dbt extra of dagster-airlift:

uv pip install 'dagster-airlift[dbt]'

Next, construct the assets:

import os
from pathlib import Path

import dagster as dg
import dagster_airlift.core as dg_airlift_core
import dagster_dbt as dg_dbt


@dg.asset_check(asset=dg.AssetKey(["airflow_instance_one", "dag", "rebuild_customers_list"]))
def validate_exported_csv() -> dg.AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

if not csv_path.exists():
return dg.AssetCheckResult(
passed=False, description=f"Export CSV {csv_path} does not exist"
)

rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return dg.AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=dg.AssetCheckSeverity.WARN,
)

return dg.AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)


# Define the dbt_project_path function to return the path to the dbt project directory
def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)


# Use the dbt_assets decorator to define assets for models within the dbt project automatically.
@dg_dbt.dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=dg_dbt.DbtProject(dbt_project_path()),
)
def dbt_project_assets(context: dg.AssetExecutionContext, dbt: dg_dbt.DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


mapped_assets = dg_airlift_core.assets_with_task_mappings(
dag_id="rebuild_customers_list",
task_mappings={
# Define an AssetSpec for the csv file created by the load_raw_customers task.
"load_raw_customers": [dg.AssetSpec(key=["raw_data", "raw_customers"])],
# We map the assets to the build_dbt_models task which creates them Airflow-side.
"build_dbt_models": [dbt_project_assets],
# Define an AssetSpec for the csv file created by the export_customers task.
"export_customers": [dg.AssetSpec(key="customers_csv", deps=["customers"])],
},
)


defs = dg_airlift_core.build_defs_from_airflow_instance(
airflow_instance=dg_airlift_core.AirflowInstance(
auth_backend=dg_airlift_core.AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
defs=dg.Definitions(
assets=mapped_assets,
# We need to pass the dbt resource so that it can be utilized by dbt_project_assets.
resources={"dbt": dg_dbt.DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)

View observed assets

Once you have created the three assets above, you should be able to navigate to the UI, reload your Dagster definitions, and see a full representation of the dbt project and other data assets in your code:

Observed asset graph in Dagster

After you initiate a run of the DAG in Airflow, you should see the newly created assets materialize in Dagster as each task completes.

info

There will be a delay between when tasks complete in Airflow and assets materialize in Dagster, managed by the Dagster sensor. This sensor runs every 30 seconds by default, but you can change this interval using the minimum_interval_seconds argument to sensor, down to a minimum of one second.

Update the asset check to the customers_csv asset

Now that we've introduced an asset explicitly for the customers.csv file output by the DAG, we should update the asset check constructed during the peering step to point to the customers_csv asset. To do this, change the asset targeted by the @asset_check decorator to AssetKey(["customers_csv"]). Updating this asset check ensures that even when the DAG is deleted, the asset check will live on:

import os
from pathlib import Path

import dagster as dg
import dagster_airlift.core as dg_airlift_core
import dagster_dbt as dg_dbt


# The asset check is now directly associated with the customers_csv asset
# rather than checking it through the Airflow DAG asset
@dg.asset_check(asset=dg.AssetKey(["customers_csv"]))
def validate_exported_csv() -> dg.AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

if not csv_path.exists():
return dg.AssetCheckResult(
passed=False, description=f"Export CSV {csv_path} does not exist"
)

rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return dg.AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=dg.AssetCheckSeverity.WARN,
)

return dg.AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)

To see what the full code should look like after the asset check, see the example code in GitHub.

Add partitions

If your Airflow tasks produce time-partitioned assets, Airlift can automatically associate your materializations to the relevant partitions. In this example, in the rebuild_customers_list asset, data is partitioned daily in each created table, and the Airflow DAG runs on a @daily cron schedule. We can likewise add a DailyPartitionsDefinition to each of our assets:

import os
from pathlib import Path

import dagster as dg
import dagster_airlift.core as dg_airlift_core
import dagster_dbt as dg_dbt
from dagster._time import get_current_datetime_midnight

# Define a daily partitioning strategy starting from the current date at midnight
# This will be used to partition our assets into daily chunks
PARTITIONS_DEF = dg.DailyPartitionsDefinition(start_date=get_current_datetime_midnight())


@dg.asset_check(asset=dg.AssetKey(["customers_csv"]))
def validate_exported_csv() -> dg.AssetCheckResult:
csv_path = Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv"

if not csv_path.exists():
return dg.AssetCheckResult(
passed=False, description=f"Export CSV {csv_path} does not exist"
)

rows = len(csv_path.read_text().split("\n"))
if rows < 2:
return dg.AssetCheckResult(
passed=False,
description=f"Export CSV {csv_path} is empty",
severity=dg.AssetCheckSeverity.WARN,
)

return dg.AssetCheckResult(
passed=True,
description=f"Export CSV {csv_path} exists",
metadata={"rows": rows},
)


def dbt_project_path() -> Path:
env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR")
assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set"
return Path(env_val)


# Add daily partitioning to the dbt assets
@dg_dbt.dbt_assets(
manifest=dbt_project_path() / "target" / "manifest.json",
project=dg_dbt.DbtProject(dbt_project_path()),
partitions_def=PARTITIONS_DEF, # Enable daily partitioning for dbt assets
)
def dbt_project_assets(context: dg.AssetExecutionContext, dbt: dg_dbt.DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()


mapped_assets = dg_airlift_core.assets_with_task_mappings(
dag_id="rebuild_customers_list",
task_mappings={
"load_raw_customers": [
dg.AssetSpec(key=["raw_data", "raw_customers"], partitions_def=PARTITIONS_DEF)
],
"build_dbt_models": [dbt_project_assets],
"export_customers": [
dg.AssetSpec(key="customers_csv", deps=["customers"], partitions_def=PARTITIONS_DEF)
],
},
)


defs = dg_airlift_core.build_defs_from_airflow_instance(
airflow_instance=dg_airlift_core.AirflowInstance(
auth_backend=dg_airlift_core.AirflowBasicAuthBackend(
webserver_url="http://localhost:8080",
username="admin",
password="admin",
),
name="airflow_instance_one",
),
defs=dg.Definitions(
assets=mapped_assets,
resources={"dbt": dg_dbt.DbtCliResource(project_dir=dbt_project_path())},
asset_checks=[validate_exported_csv],
),
)

Now, every time the sensor triggers a materialization for an asset, it will automatically have a partition associated with it.

You can try this out by kicking off an Airflow backfill for today:

airflow dags backfill rebuild_customers_list --start-date $(date +"%Y-%m-%d")

After this DAG run completes, you should see a partitioned materialization appear in Dagster:

Partitioned materialization in Dagster

Finally, run airflow db clean to delete Airflow runs so you can initiate this backfill again for testing in the future:

airflow db clean
note

In order for partitioned assets to work with dagster-airlift, the following things need to be true:

  • The asset can only be time-window partitioned. This means static, dynamic, and multi partitioned definitions will require custom functionality.
  • The partitioning scheme must match up with the logical_date/execution_date of corresponding Airflow runs. That is, each logical_date should correspond exactly to a partition in Dagster.

Next steps

In the next step, "Migrate Airflow tasks", we will migrate Airflow DAG code to Dagster.