Ask AI

Airflow integration reference#

This reference provides a high-level look at working with Airflow using the dagster-airflow integration library.

Airflow DAG directory as a Dagster repository#

To load all Airflow DAGS in a file path into a Dagster repository, use make_dagster_repo_from_airflow_dags_path:

import os

from dagster_airflow import make_dagster_definitions_from_airflow_dags_path

migrated_airflow_definitions = make_dagster_definitions_from_airflow_dags_path(

Orchestrating Dagster jobs from Airflow#

You can orchestrate Dagster job runs from Airflow by using the DagsterCloudOperator or DagsterOperator operators in your existing Airflow DAGs. For example, here's an Airflow DAG:

from datetime import datetime

from airflow import DAG
from dagster_airflow import DagsterCloudOperator

with DAG(
    start_date=datetime(2022, 5, 28),
    schedule_interval="*/5 * * * *",
) as dag:

In Airflow 2.0+, you can create a Dagster connection type to store configuration related to your Dagster+ organization. If you're using Airflow 1.0, you can also pass this directly to the operator.

Ingesting DAGs from Airflow#

This example demonstrates how to use make_dagster_job_from_airflow_dag to compile an Airflow DAG into a Dagster job that works the same way as a Dagster-native job.

There are three jobs in the repo:

  • airflow_simple_dag demonstrates the use of Airflow templates.
  • airflow_complex_dag shows the translation of a more complex dependency structure.
  • airflow_kubernetes_dag shows the translation of a DAG using Kubernetes pod operators.
from dagster_airflow import (

from with_airflow.airflow_complex_dag import complex_dag
from with_airflow.airflow_kubernetes_dag import kubernetes_dag
from with_airflow.airflow_simple_dag import simple_dag

airflow_simple_dag = make_dagster_job_from_airflow_dag(simple_dag)
airflow_complex_dag = make_dagster_job_from_airflow_dag(complex_dag)
airflow_kubernetes_dag = make_dagster_job_from_airflow_dag(kubernetes_dag)

def with_airflow():
    return [airflow_complex_dag, airflow_simple_dag, airflow_kubernetes_dag]

Note that the execution_date for the Airflow DAG is specified through the job tags. To specify tags, call to:

airflow_simple_dag_with_execution_date = make_dagster_job_from_airflow_dag(
    dag=simple_dag, tags={"airflow_execution_date":}