Ask AI

Learning Dagster from Airflow#

In this tutorial, we'll help you make the switch from Airflow to Dagster. Here, we review an Airflow DAG and show how the same functionality can be achieved in Dagster.


Comparing an Airflow DAG to Dagster#

In this tutorial, we'll rewrite an Airflow DAG as a Dagster job. Let's start with a basic Airflow DAG:

from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    "tutorial",
    default_args={
        "retries": 1,
    },
    description="A simple tutorial DAG",
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        bash_command="sleep 5",
        retries=3,
    )

    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

To rewrite this DAG in Dagster, we'll break it down into three parts:

  1. Define the computations: the ops - in Airflow, the operators
  2. Define the graph: the job - in Airflow, the DAG
  3. Define the schedule - In Airflow, the schedule (how simple!)

A Dagster job is made up of a graph of ops. This should feel familiar if you've used the Airflow Task API. With ops, the focus is on writing a graph with Python functions as nodes and data dependencies in between them as edges.


Step 1: Defining the ops#

In Dagster, the minimum unit of computation is an op. This directly corresponds to an operator in Airflow. Here, we map the operators of our example Airflow DAG t1, t2, and t3 to their respective Dagster ops.

@op
def print_date(context: OpExecutionContext) -> datetime:
    ds = datetime.now()
    context.log.info(ds)
    return ds


@op(retry_policy=RetryPolicy(max_retries=3), ins={"start": In(Nothing)})
def sleep():
    time.sleep(5)


@op
def templated(context: OpExecutionContext, ds: datetime):
    for _i in range(5):
        context.log.info(ds)
        context.log.info(ds - timedelta(days=7))

Which would yield the following graph of computations in the Dagster UI. We'll spin up the UI later in the tutorial, but wanted to demonstrate:

Screenshot of the dagster UI, showing the newly created graph of tutorial Ops

Op-level retries#

In the tutorial DAG, the t2 operator allowed for three retries. To configure the same behavior in Dagster, you can use op-level retry policies.


Step 2: Define the job#

In Dagster, the computations defined in ops are composed in jobs, which define the sequence and dependency structure of the computations you want to execute. This directly corresponds to a DAG in Airflow. Here, we compose the op's print_date, sleep and templated to match the dependency structure defined by the Airflow operators t1, t2, and t3.

@job(tags={"dagster/max_retries": 1, "dag_name": "example"})
def tutorial_job():
    ds = print_date()
    sleep(ds)
    templated(ds)

Job-level retries#

Job-level retries are managed by the run launcher. Once enabled in your dagster.yaml file, you can define the retry count for the job.


Step 3: Define the schedule#

In Dagster, schedules can be defined for jobs, which determine the cadence at which a job is triggered to be executed. Below we define a schedule that will run the tutorial_job daily:

schedule = ScheduleDefinition(job=tutorial_job, cron_schedule="@daily")

Step 4: Run Dagster locally#

In order to run our newly defined Dagster job we'll need to add it and the schedule to our project's Definitions.

defs = Definitions(
    jobs=[tutorial_job],
    schedules=[schedule],
)

We can now load this file with the UI:

dagster dev -f <your_dagster_file>.py

Completed code example#

That's it! By now, your code should look like this:

import time
from datetime import datetime, timedelta

from dagster import (
    Definitions,
    In,
    Nothing,
    OpExecutionContext,
    RetryPolicy,
    ScheduleDefinition,
    job,
    op,
    schedule,
)


@op
def print_date(context: OpExecutionContext) -> datetime:
    ds = datetime.now()
    context.log.info(ds)
    return ds


@op(retry_policy=RetryPolicy(max_retries=3), ins={"start": In(Nothing)})
def sleep():
    time.sleep(5)


@op
def templated(context: OpExecutionContext, ds: datetime):
    for _i in range(5):
        context.log.info(ds)
        context.log.info(ds - timedelta(days=7))


@job(tags={"dagster/max_retries": 1, "dag_name": "example"})
def tutorial_job():
    ds = print_date()
    sleep(ds)
    templated(ds)


schedule = ScheduleDefinition(job=tutorial_job, cron_schedule="@daily")


defs = Definitions(
    jobs=[tutorial_job],
    schedules=[schedule],
)

Mapping Airflow concepts to Dagster#

While Airflow and Dagster have some significant differences, there are many concepts that overlap. Use this cheatsheet to understand how Airflow concepts map to Dagster.

Airflow conceptDagster conceptNotes
Directed Acyclic Graphs (DAG) Jobs
Task Ops
Datasets AssetsDagster assets are more powerful and mature than datasets and include support for things like partitioning.
Connections/Variables
DagBags Code locationsMultiple isolated code locations with different system and Python dependencies can exist within the same Dagster instance.
DAG runsJob runs
depends_on_pastAn asset can depend on earlier partitions of itself. When this is the case, backfills and auto-materialize will only materialize later partitions after earlier partitions have completed.
Executors Executors
Hooks ResourcesDagster resource contain a superset of the functionality of hooks and have much stronger composition guarantees.
Instances Instances
OperatorsNoneDagster uses normal Python functions instead of framework-specific operator classes. For off-the-shelf functionality with third-party tools, Dagster provides integration libraries.
Pools Run coordinators
Plugins/Providers Integrations
Schedulers Schedules
Sensors Sensors
SubDAGs/TaskGroupsDagster provides rich, searchable metadata and tagging support beyond what’s offered by Airflow.
task_concurrency Asset/op-level concurrency limits
Trigger Dagster UI LaunchpadTriggering and configuring ad-hoc runs is easier in Dagster which allows them to be initiated through the Dagster UI, the GraphQL API, or the CLI.
XComs I/O managersI/O managers are more powerful than XComs and allow the passing large datasets between jobs.