In Airflow, the PythonOperator runs arbitrary python functions. For example, you might have a task that runs function write_to_db, which combs a directory for files, and writes each one to a db table.
from airflow.operators.python import PythonOperator
defwrite_to_db()->None:for raw_file in RAW_DATA_DIR.iterdir():
df = contents_as_df(raw_file)
upload_to_db(df)
PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...)
The Dagster equivalent is instead to construct a asset or multi_asset-decorated function, which materializes assets corresponding to what your python function is doing.
from dagster import asset
@asset(key=TABLE_URI)defwrite_to_db()->None:for raw_file in RAW_DATA_DIR.iterdir():
df = contents_as_df(raw_file)
upload_to_db(df)
We recommend a monorepo setup for migration; this allows you to keep all your code in one place and easily share code between Airflow and Dagster, without complex CI/CD coordination.
First, we recommend factoring out a shared package to be available to both the Dagster runtime and the Airflow runtime which contains your python function. The process is as follows:
Scaffold out a new python project which will contain your shared infrastructure.
Ensure that the shared library is available to both your Airflow and Dagster deployments. This can be done by adding an editable requirement to your setup.py or pyproject.toml file in your Airflow/Dagster package.
Include the python dependencies relevant to your particular function in your new package. Write your python function in the shared package, and change your Airflow code to import the function from the shared library.
To illustrate what this might look like a bit more; let's say you originally have this project structure in Airflow:
airflow_repo/
├── airflow_package/
│ └── dags/
│ └── my_dag.py # Contains your Python function
With dag code that looks this:
from airflow.operators.python import PythonOperator
defwrite_to_db()->None:for raw_file in RAW_DATA_DIR.iterdir():
df = contents_as_df(raw_file)
upload_to_db(df)
PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...)
You might create a new top-level package to contain the shared code:
airflow_repo/
├── airflow_package/
│ └── dags/
│ └── my_dag.py # Imports the python function from shared module.
├── shared-package/
│ └── shared_package/
│ └── shared_module.py # Contains your Python function
And then import the function from the shared package in Airflow:
from airflow.operators.python import PythonOperator
from shared_module import write_to_db
PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...)
The reason we recommend using a separate shared package is to help ensure that there aren't dependency conflicts between Airflow and Dagster as you migrate. Airflow has very complex dependency management, and migrating to Dagster gives you an opportunity to clean up and isolate your dependencies. You can do this with a series of shared packages in the monorepo, which will eventually be isolated code locations in Dagster.
Next, you can write a Dagster asset or multi_asset-decorated function that runs your python function. This will generally be pretty straightforward for a PythonOperator migration, as you can generally just invoke the shared function into the asset function.
# start_asset# This would be the python code living in a shared module.from shared_module import my_shared_python_callable
from dagster import asset
@assetdefmy_shared_asset():return my_shared_python_callable()# end_asset