Federating execution across Airflow instances
At this point, we should be observing our DAGs within Dagster, and now we have cross-instance lineage for our DAGs. Now, we'll federate the execution of our DAGs across both Airflow instances by using Dagster's Declarative Automation system.
Making customer_metrics
executable
The load_airflow_dag_asset_specs
function creates asset representations (called AssetSpec
) of Airflow DAGs, but these assets are not executable. We need to define an execution function in Dagster in order to make them executable.
In order to federate execution of customer_metrics
, we first need to make it executable within Dagster. We can do this by using the @multi_asset
decorator to define how the customer_metrics
asset should be executed. We'll use the AirflowInstance
defined earlier to trigger a run of the customer_metrics
DAG. We then wait for the run to complete, and if it is successful, we'll successfully materialize the asset. If the run fails, we'll raise an exception.
@multi_asset(specs=[customer_metrics_dag_asset])
def run_customer_metrics() -> MaterializeResult:
run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
return MaterializeResult(asset_key=customer_metrics_dag_asset.key)
else:
raise Exception("Dag run failed.")
Now, we'll replace the customer_metrics_dag_asset
in our Definitions
object with the run_customer_metrics
function:
defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[warehouse_sensor, metrics_sensor],
)
We should be able to go to the Dagster UI and see that the customer_metrics
asset can now be materialized.
Federating execution
Ultimately, we would like to kick off a run of customer_metrics
whenever load_customers
completes successfully. We're already retrieving a materialization when load_customers
completes, so we can use this to trigger a run of customer_metrics
by using Declarative Automation. First, we'll add an AutomationCondition.eager()
to our customer_metrics_dag_asset
. This will tell Dagster to run the run_customer_metrics
function whenever the load_customers
asset is materialized.
from dagster import AutomationCondition
customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
automation_condition=AutomationCondition.eager(),
)
Now, we can set up Declarative Automation by adding an AutomationConditionSensorDefinition
.
automation_sensor = AutomationConditionSensorDefinition(
name="automation_sensor",
target="*",
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=1,
)
We'll add this sensor to our Definitions
object.
defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
)
Now the run_customer_metrics
function will be executed whenever the load_customers
asset is materialized. Let's test this out by triggering a run of the load_customers
DAG in Airflow. When the run completes, we should see a materialization of the customer_metrics
asset kick off in the Dagster UI, and eventually a run of the customer_metrics
DAG in the metrics Airflow instance.
Complete code
When all the above steps are complete, your code should look something like this.
from dagster import (
AutomationConditionSensorDefinition,
DefaultSensorStatus,
Definitions,
MaterializeResult,
multi_asset,
)
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
build_airflow_polling_sensor,
load_airflow_dag_asset_specs,
)
warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="warehouse",
)
metrics_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8082",
username="admin",
password="admin",
),
name="metrics",
)
load_customers_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)
customer_metrics_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
)
# Add a dependency on the load_customers_dag_asset
).replace_attributes(
deps=[load_customers_dag_asset],
automation_condition=AutomationCondition.eager(),
)
@multi_asset(specs=[customer_metrics_dag_asset])
def run_customer_metrics() -> MaterializeResult:
run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
return MaterializeResult(asset_key=customer_metrics_dag_asset.key)
else:
raise Exception("Dag run failed.")
warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers_dag_asset],
airflow_instance=warehouse_airflow_instance,
)
metrics_sensor = build_airflow_polling_sensor(
mapped_assets=[customer_metrics_dag_asset],
airflow_instance=metrics_airflow_instance,
)
automation_sensor = AutomationConditionSensorDefinition(
name="automation_sensor",
target="*",
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=1,
)
defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
)
Conclusion
That concludes the tutorial! We've federated the execution of our DAGs across two Airflow instances using Dagster's Declarative Automation system. We've also set up cross-instance lineage for our DAGs, and can now observe the lineage and execution of our DAGs in the Dagster UI.