Skip to main content

Observe multiple Airflow instances from Dagster

In the previous step, we installed the tutorial example code and started two Airflow instances running locally. In this step, we'll create Dagster asset representations of Airflow DAGs in order to observe the Airflow instances from Dagster.

Install the dagster-airlift package in your Dagster environment

First, create a new shell and navigate to the root of the tutorial directory. You will need to install the dagster-airlift, dagster-webserver, and dagster packages in your Dagster environment:

source .venv/bin/activate
uv pip install 'dagster-airlift[core]' dagster-webserver dagster
dagster-airlift API

For a full list of dagster-airlift classes and methods, see the API docs.

Observe the warehouse Airflow instance

Next, declare a reference to the warehouse Airflow instance, which is running at http://localhost:8081:

import dagster as dg
import dagster_airlift.core as dg_airlift_core

warehouse_airflow_instance = dg_airlift_core.AirflowInstance(
auth_backend=dg_airlift_core.AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="warehouse",
)

Now you can use the load_airflow_dag_asset_specs function to create asset representations (AssetSpecs) of the DAGs in the warehouse Airflow instance:

assets = dg_airlift_core.load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
)

Add these assets to a Definitions object:

defs = dg.Definitions(assets=assets)

Next, set up some environment variables, then point Dagster to the asset created from the Airflow instance:

# Set up environment variables to point to the airlift-federation-tutorial directory on your machine
export TUTORIAL_EXAMPLE_DIR=$(pwd)
export DAGSTER_HOME="$TUTORIAL_EXAMPLE_DIR/.dagster_home"
dagster dev -f airlift_federation_tutorial/dagster_defs/definitions.py

If you navigate to the Dagster UI (running at http://localhost:3000), you should see the assets created from the warehouse Airflow instance:

Assets from the warehouse Airflow instance in the Dagster UI

There are a lot of DAGs in this instance, but we only want to focus on the load_customers DAG. Filter the assets to only include the load_customers DAG:

load_customers = next(
iter(
dg_airlift_core.load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)

Add this asset to the Definitions object:

defs = dg.Definitions(assets=[load_customers])

Now, your Dagster environment only includes the load_customers DAG from the warehouse Airflow instance:

Assets from the warehouse Airflow instance in the Dagster UI

Finally, create a sensor to poll the warehouse Airflow instance for new runs. This sensor ensures that whenever there is a successful run of the load_customers DAG, there will be a materialization in the Dagster UI:

warehouse_sensor = dg_airlift_core.build_airflow_polling_sensor(
mapped_assets=[load_customers],
airflow_instance=warehouse_airflow_instance,
)

Next, add this sensor to our Definitions object:

defs = dg.Definitions(assets=[load_customers], sensors=[warehouse_sensor])

You can test this by navigating to the Airflow UI at localhost:8081, and triggering a run of the load_customers DAG. When the run completes, you should see a materialization in the Dagster UI:

Materialization of the load_customers DAG in the Dagster UI

Observe the metrics Airflow instance

You can repeat the same process for the customer_metrics DAG in the metrics Airflow instance, which runs at http://localhost:8082. We'll leave this as an exercise to test your understanding.

Complete code

To see what the code should look like after you have completed all the steps above, check out the example in GitHub.

Observe the cross-DAG lineage between load_customer and customer_metrics

Now that you have both DAGs loaded into Dagster, you can observe the cross-DAG lineage between them. To do this, use the replace_attributes function to add a dependency from the load_customers asset to the customer_metrics asset:

customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
deps=[load_customers],
)

Now, after adding the updated customer_metrics_dag_asset to our Definitions object, you should see the lineage between the two DAGs in the Dagster UI:

Lineage between load_customers and customer_metrics in the Dagster UI

Next steps

In the next step, "Federate execution across Airflow instances", we'll federate the execution of our DAGs across both Airflow instances.