Ask AI

Airlift Federation Tutorial: Federating Execution Across Airflow Instances#

At this point, we should be observing our DAGs within Dagster, and now we have cross-instance lineage for our DAGs. Now, we'll federate the execution of our DAGs across both Airflow instances by using Dagster's Declarative Automation system.

Making customer_metrics executable.#

The load_airflow_dag_asset_specs function creates asset representations (called AssetSpec) of Airflow DAGs, but these assets are not executable. We need to define an execution function in Dagster in order to make them executable.

In order to federate execution of customer_metrics, we first need to make it executable within Dagster. We can do this by using the @multi_asset decorator to define how the customer_metrics asset should be executed. We'll use the AirflowInstance defined earlier to trigger a run of the customer_metrics DAG. We then wait for the run to complete, and if it is successful, we'll successfully materialize the asset. If the run fails, we'll raise an exception.

@multi_asset(specs=[customer_metrics_dag_asset])
def run_customer_metrics() -> MaterializeResult:
    run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
    metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
    if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
        return MaterializeResult(asset_key=customer_metrics_dag_asset.key)
    else:
        raise Exception("Dag run failed.")

Now, we'll replace the customer_metrics_dag_asset in our Definitions object with the run_customer_metrics function:

defs = Definitions(
    assets=[load_customers_dag_asset, run_customer_metrics],
    sensors=[warehouse_sensor, metrics_sensor],
)

We should be able to go to the Dagster UI and see that the customer_metrics asset can now be materialized.

Federating execution#

Ultimately, we would like to kick off a run of customer_metrics whenever load_customers completes successfully. We're already retrieving a materialization when load_customers completes, so we can use this to trigger a run of customer_metrics by using Declarative Automation. First, we'll add an AutomationCondition.eager() to our customer_metrics_dag_asset. This will tell Dagster to run the run_customer_metrics function whenever the load_customers asset is materialized.

from dagster import AutomationCondition

customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
    automation_condition=AutomationCondition.eager(),
)

Now, we can set up Declarative Automation by adding an AutomationConditionSensorDefinition.

automation_sensor = AutomationConditionSensorDefinition(
    name="automation_sensor",
    target="*",
    default_status=DefaultSensorStatus.RUNNING,
    minimum_interval_seconds=1,
)

We'll add this sensor to our Definitions object.

defs = Definitions(
    assets=[load_customers_dag_asset, run_customer_metrics],
    sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
)

Now the run_customer_metrics function will be executed whenever the load_customers asset is materialized. Let's test this out by triggering a run of the load_customers DAG in Airflow. When the run completes, we should see a materialization of the customer_metrics asset kick off in the Dagster UI, and eventually a run of the customer_metrics DAG in the metrics Airflow instance.

Complete code#

When all the above steps are complete, your code should look something like this.

from dagster import (
    AutomationConditionSensorDefinition,
    DefaultSensorStatus,
    Definitions,
    MaterializeResult,
    multi_asset,
)
from dagster._core.definitions.declarative_automation.automation_condition import (
    AutomationCondition,
)
from dagster_airlift.core import (
    AirflowBasicAuthBackend,
    AirflowInstance,
    build_airflow_polling_sensor,
    load_airflow_dag_asset_specs,
)

warehouse_airflow_instance = AirflowInstance(
    auth_backend=AirflowBasicAuthBackend(
        webserver_url="http://localhost:8081",
        username="admin",
        password="admin",
    ),
    name="warehouse",
)

metrics_airflow_instance = AirflowInstance(
    auth_backend=AirflowBasicAuthBackend(
        webserver_url="http://localhost:8082",
        username="admin",
        password="admin",
    ),
    name="metrics",
)

load_customers_dag_asset = next(
    iter(
        load_airflow_dag_asset_specs(
            airflow_instance=warehouse_airflow_instance,
            dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
        )
    )
)
customer_metrics_dag_asset = next(
    iter(
        load_airflow_dag_asset_specs(
            airflow_instance=metrics_airflow_instance,
            dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
        )
    )
    # Add a dependency on the load_customers_dag_asset
).replace_attributes(
    deps=[load_customers_dag_asset],
    automation_condition=AutomationCondition.eager(),
)


@multi_asset(specs=[customer_metrics_dag_asset])
def run_customer_metrics() -> MaterializeResult:
    run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
    metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
    if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
        return MaterializeResult(asset_key=customer_metrics_dag_asset.key)
    else:
        raise Exception("Dag run failed.")


warehouse_sensor = build_airflow_polling_sensor(
    mapped_assets=[load_customers_dag_asset],
    airflow_instance=warehouse_airflow_instance,
)
metrics_sensor = build_airflow_polling_sensor(
    mapped_assets=[customer_metrics_dag_asset],
    airflow_instance=metrics_airflow_instance,
)

automation_sensor = AutomationConditionSensorDefinition(
    name="automation_sensor",
    target="*",
    default_status=DefaultSensorStatus.RUNNING,
    minimum_interval_seconds=1,
)

defs = Definitions(
    assets=[load_customers_dag_asset, run_customer_metrics],
    sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
)

Conclusion#

That concludes the tutorial! We've federated the execution of our DAGs across two Airflow instances using Dagster's Declarative Automation system. We've also set up cross-instance lineage for our DAGs, and can now observe the lineage and execution of our DAGs in the Dagster UI.