import os
from dagster_airflow import(
make_dagster_definitions_from_airflow_dags_path,)
migrated_airflow_definitions = make_dagster_definitions_from_airflow_dags_path(
os.path.abspath("./dags/"),)
You can orchestrate Dagster job runs from Airflow by using the DagsterCloudOperator or DagsterOperator operators in your existing Airflow DAGs. For example, here's an Airflow DAG:
from datetime import datetime
from airflow import DAG
from dagster_airflow import DagsterCloudOperator
with DAG(
dag_id="dagster_cloud",
start_date=datetime(2022,5,28),
schedule_interval="*/5 * * * *",
catchup=False,)as dag:
DagsterCloudOperator(
task_id="new_dagster_assets",
repostitory_location_name="example_location",
repository_name="my_dagster_project",
job_name="all_assets_job",)
In Airflow 2.0+, you can create a Dagster connection type to store configuration related to your Dagster Cloud organization. If you're using Airflow 1.0, you can also pass this directly to the operator.
This example demonstrates how to use make_dagster_job_from_airflow_dag to compile an Airflow DAG into a Dagster job that works the same way as a Dagster-native job.
There are three jobs in the repo:
airflow_simple_dag demonstrates the use of Airflow templates.
airflow_complex_dag shows the translation of a more complex dependency structure.
airflow_kubernetes_dag shows the translation of a DAG using Kubernetes pod operators.
from dagster_airflow import(
make_dagster_job_from_airflow_dag,
make_schedules_and_jobs_from_airflow_dag_bag,)from with_airflow.airflow_complex_dag import complex_dag
from with_airflow.airflow_kubernetes_dag import kubernetes_dag
from with_airflow.airflow_simple_dag import simple_dag
airflow_simple_dag = make_dagster_job_from_airflow_dag(simple_dag)
airflow_complex_dag = make_dagster_job_from_airflow_dag(complex_dag)
airflow_kubernetes_dag = make_dagster_job_from_airflow_dag(kubernetes_dag)@repositorydefwith_airflow():return[airflow_complex_dag, airflow_simple_dag, airflow_kubernetes_dag]
Note that the execution_date for the Airflow DAG is specified through the job tags. To specify tags, call to: