Ask AI

Airflow to Dagster concepts#

In this page, we'll explain how concepts map between Airflow and Dagster.

Airflow Tasks#

An Airflow Task is a single computation within a DAG, which executes an execute method of its given Operator. Here's an example of an Airflow Task using the PythonOperator to upload a file to S3, as well as the taskflow-based @task decorator (which achieves the same thing):

from pathlib import Path

import boto3
from airflow.decorators import task
from airflow.operators.python import PythonOperator


def write_file_to_s3(path: Path) -> None:
    boto3.client("s3").upload_file(str(path), "bucket", path.name)


def _write_customers_data():
    write_file_to_s3(Path("path/to/customers.csv"))


# Defining a task using the PythonOperator syntax
PythonOperator(
    python_callable=_write_customers_data, task_id="write_customers_data", dag=...
)


# Defining a task using the task decorator
@task(task_id="write_customers_data")
def write_customers_data():
    write_file_to_s3(Path("path/to/customers.csv"))

Dagster in comparison uses software-defined assets to represent what a task produces. You define the data asset you want to exist, and then define a computation for how to create that asset. If you're familiar with Airflow's Datasets, you can think of a software-defined asset as defining a task and a dataset at once. Here's an example of a Dagster asset that uploads a file to S3:

from pathlib import Path

import boto3

from dagster import asset


def write_file_to_s3(path: Path) -> None:
    boto3.client("s3").upload_file(str(path), "bucket", path.name)


# We define in terms of the "physical" asset - the uploaded file
@asset
def customers_data():
    write_file_to_s3(Path("path/to/customers.csv"))

Learn more about Dagster's asset-oriented approach in our software-defined assets reference.

Airflow Operators#

An Airflow Operator is a "building block" which can be used to define tasks from. Here's an example of a PythonOperator subclass which takes a file as an input and uploads it to S3:

from pathlib import Path

import boto3
from airflow.operators.python import PythonOperator


class UploadToS3Operator(PythonOperator):
    def __init__(self, path: str, *args, **kwargs) -> None:
        super().__init__(
            python_callable=self.upload_to_s3, op_args=[path], *args, **kwargs
        )

    def upload_to_s3(self, path: str) -> None:
        boto3.client("s3").upload_file(
            Filepath=path, Bucket="my_bucket", Key=Path(path).name
        )

Then, you would invoke this operator to create a task like so:

task = UploadToS3Operator(task_id="write_customers_data", path="path/to/customers.csv")

The Dagster equivalent would be to create a factory method that defines an asset for a given parameter.

import boto3

from dagster import asset


def build_s3_asset(path: str):
    @asset(key=path)
    def _s3_file():
        boto3.client("s3").upload_file(path, "my-bucket", path)

    return _s3_file

Then, you would invoke this factory method to create an asset like so:

customers_data = build_s3_asset("path/to/customers.csv")

Airflow DAGs & Schedules#

An Airflow DAG is a collection of tasks with dependencies between them, and some scheduling information. Here's an example of an Airflow DAG which runs on a daily schedule:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.decorators import task

dag = DAG(
    "simple_dag",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
)


@task(task_id="write_customers_data", dag=dag)
def write_customers_data(): ...

You can define a schedule on an arbitrary set of assets in Dagster. Here's an example of a Dagster Asset that runs on a daily schedule:

from dagster import ScheduleDefinition, asset, define_asset_job


@asset
def customers_data(): ...


ScheduleDefinition(
    "daily_schedule",
    cron_schedule="0 0 * * *",
    target=customers_data,
)

Airflow DagBags#

An Airflow DagBag is a collection of dags parsed from a particular file tree. The closest equivalent in Dagster is a code location, where you can set up collections of assets, schedules, sensors, etc. See our code locations guide for more information.

Airflow data interval, Logical Date, and Execution Date#

In Airflow, data interval is the range of data being processed by the dag, with logical date and execution date being synonymous with the "start" of the data interval.

import boto3
from airflow.decorators import task


@task(task_id="write_customers_data")
def write_partitioned_customers_data(context):
    prefix = context["data_interval_start"]
    # or
    prefix = context["logical_date"]
    # or
    prefix = context["execution_date"]

    # write data to S3 with the prefix
    boto3.client("s3").upload_file(
        "path/to/customers.csv", f"bucket/{prefix}/customers.csv"
    )

The equivalent concept in Dagster is partitions - where you can define a partitioning scheme to represent the data being processed by your computations.

import boto3

from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset


@asset(partitions_def=DailyPartitionsDefinition(...))
def customers_data(context: AssetExecutionContext):
    prefix = context.partition_key
    boto3.client("s3").upload_file(
        "path/to/customers.csv", f"bucket/{prefix}/customers.csv"
    )

To learn more, see our partitions guide.

Airflow Sensors & Triggers#

Both Airflow and Dagster have the concept of sensors, but they function very differently. In Airflow, sensors, are used to wait for a certain condition to be met before proceeding with the execution of a task. Let's take the example scenario of waiting for new files to exist before proceeding.

from datetime import datetime

from airflow import DAG
from airflow.sensors.filesystem import FileSensor

dag = DAG("file_sensor_example", start_date=datetime(2024, 1, 1))

wait_for_file = FileSensor(
    task_id="wait_for_new_customer_files",
    filepath="/path/to/customer_files/*.csv",
    poke_interval=60,  # Check every minute
    timeout=3600,  # Timeout after 1 hour
    mode="poke",
    dag=dag,
)

Triggers in Airflow are an event-based way to accomplish a similar task: waiting for some condition to be true. They run asynchronously in a separate process.

Dagster sensors combine the best of both worlds of Airflow sensors and triggers, while also providing additional capabilities. Here's an example of a Dagster sensor that will kick off computation of an asset whenever a new file is added to a directory:

import json
from pathlib import Path

from dagster import RunRequest, SensorEvaluationContext, SkipReason, asset, sensor


@asset
def uploaded_customers_data():
    pass


# Implementing the FileSensor from Airflow directly in Dagster
@sensor(target=uploaded_customers_data)
def wait_for_new_files(context: SensorEvaluationContext):
    seen_files: list = json.loads(context.cursor) if context.cursor else []
    should_trigger = False
    for file in Path("path/to/customer_files").iterdir():
        if file.name not in seen_files:
            seen_files.append(file.name)
            should_trigger = True
    yield RunRequest() if should_trigger else SkipReason("No new files")

Key features of Dagster sensors:

  • In Dagster, sensors run continuously and independently. This means that they don't use up a task slot in the scheduler, and can poll much more frequently than Airflow sensors.
  • Sensors in Dagster can track state between evaluations using cursors.
  • Sensors in Dagster can do more than trigger downstream computations - they can also be used to set run tags, trigger external APIs, and more. They are ultimately arbitrary Python functions that can do anything you want (although we generally recommend keeping them lightweight, since they are designed to poll often and not run on heavy infrastructure).

A note on Airflow's ExternalTaskSensor#

Dagster can handle execution of downstreams automatically using Declarative Automation - so you don't need to worry about cross-dag dependencies like you do in Airflow. So, for example, if you were previously using the ExternalTaskSensor in Airflow to wait for a task in another DAG to complete, you don't need to do that in Dagster. Instead, you would just define a dependency between those assets in the asset graph.

Airflow Hooks & Connections#

Hooks in Airflow are used to interact with external systems, and Connections are used to authenticate to those systems. For example, you would first set up a connection to AWS using an environment variable:

export AIRFLOW_CONN_AWS_DEFAULT='aws://YOUR_AWS_ACCESS_KEY_ID:YOUR_AWS_SECRET_ACCESS_KEY@/?region_name=us-east-1'

Here's an example of using an Airflow Hook with a set-up Airflow connection to interact with S3:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook


def upload_customers_data() -> None:
    S3Hook(aws_conn_id="aws_default").load_file(
        filename="path/to/customers.csv",
        key="customers.csv",
        bucket_name="my-cool-bucket",
        replace=True,
    )


s3_task = PythonOperator(
    task_id="s3_operations",
    python_callable=upload_customers_data,
)

In Dagster, you interact with external systems using resources, and configure those resources with specific connection information. Here's an example of a Dagster Resource to interact with S3, set up with S3 credentials:

import os
from pathlib import Path

from dagster_aws.s3 import S3Resource

from dagster import Definitions, asset


@asset
def customers_s3(s3: S3Resource):
    local_file_data = Path("path/to/customers_data.csv").read_bytes()
    s3_client = s3.get_client()

    s3_client.put_object(
        Bucket="my-cool-bucket",
        Key="customers.csv",
        Body=local_file_data,
    )


defs = Definitions(
    assets=[customers_s3],
    resources={
        "s3": S3Resource(aws_access_key_id=os.environ["DEFAULT_AWS_ACCESS_KEY_ID"])
    },
)

Airflow XComs#

Airflow XComs are used to pass data between tasks. Here's an example of an Airflow XCom:

import boto3
import pandas as pd
from airflow.operators.python import PythonOperator


def upload_customers_data(**context):
    raw_customers_data = pd.read_csv("path/to/customers.csv")
    avg_revenue = raw_customers_data["revenue"].mean()
    task_instance = context["task_instance"]
    task_instance.xcom_push(key="avg_revenue", value=avg_revenue)
    boto3.client("s3").upload_file("path/to/customers.csv", "bucket/customers.csv")


PythonOperator(
    task_id="generate_stats",
    python_callable=upload_customers_data,
    provide_context=True,
)

In Dagster, you can automatically pass small amounts of data using Asset metadata. Here's an example of a Dagster Asset that passes data between tasks:

import boto3
import pandas as pd

from dagster import FloatMetadataValue, MaterializeResult, asset


@asset
def customers_data_s3():
    raw_customers_data = pd.read_csv("path/to/customers.csv")
    avg_revenue = raw_customers_data["revenue"].mean()
    boto3.client("s3").upload_file("path/to/customers.csv", "bucket/customers.csv")
    return MaterializeResult(metadata={"avg_revenue": FloatMetadataValue(avg_revenue)})

For larger amounts of data, you can use Dagster's IOManager to manage the data flow between tasks.

Airflow Templates & Macros#

Airflow allows templating variables into your DAGs using Jinja. Dagster doesn't have a direct jinja templating feature, instead you're encouraged to use Python functions and interact with Dagster's richly typed API to pass information. The context information available to each asset's execution is on the AssetExecutionContext object.

Airflow Executors#

Airflow Executors are used to determine where your tasks are executed. You can use the equivalent Dagster executors concept for this purpose. Learn more about Dagster executors.

Airflow Pools#

Airflow Pools allow users to limit the number of concurrent tasks that can be run. Dagster provides concurrency at various levels of the stack to limit the number of computations that can run at a given time, and also limit the number of runs that are in progress at a given time. View our full concurrency guide here.

Airflow Task Groups#

Airflow task groups allow you to organize tasks into hierarchical groups within the Airflow UI for a particular DAG. Dagster has global asset groups which can be applied to any asset. Learn more about asset groups.

Cheatsheet#

Here's a cheatsheet for Airflow users migrating to Dagster:

Airflow conceptDagster conceptNotes
Directed Acyclic Graphs (DAG) Jobs
Tasks & Datasets AssetsDagster assets combine the concepts of tasks and datasets into a single abstraction that can be used to define both the computation and the data produced by that computation. They include support for things like partitioning
Connections/HooksDagster's resource system allows you to define and configure connections to external systems. Dagster+ also has a dedicated secrets management system.
VariablesDagster's rich config system allows you to define configuration schematics in code, and pass configuration to computations at runtime.
DagBags Code locationsMultiple isolated code locations with different system and Python dependencies can exist within the same Dagster deployment.
DAG runsAsset Runs
depends_on_pastAn asset can depend on earlier partitions of itself. When this is the case, backfills and Declarative Automation will only materialize later partitions after earlier partitions have completed.
Executors Executors
Instances Instances
OperatorsAsset FactoriesDagster uses normal Python functions instead of framework-specific operator classes. To define multiple assets from a shared implementation, you can use a factory function.
Pools & Task Concurrency Asset & Run Concurrency
Plugins/Providers Integrations
Schedulers Schedules
Sensors & Triggers Sensors, Declarative AutomationDagster Sensors provide the best of both worlds of Airflow's sensors and triggers, while also providing additional functionality. Declarative automation makes the need for cross-dag dependency sensors like in Airflow unnecessary.
SubDAGs/TaskGroupsDagster provides rich, searchable metadata and tagging support beyond what's offered by Airflow.
XComs Runtime metadata I/O managersFor small data, you can use Dagster's rich metadata abstractions to make data available between assets, and have it show up in the UI. For larger datasets, I/O managers are more powerful than XComs and allow the passing large datasets between jobs.