Execute on Celery#

Celery is an open-source Python distributed task queue system, with support for a variety of queues (brokers) and result persistence strategies (backends).

The dagster-celery executor uses Celery to satisfy three common requirements when running jobs in production:

  1. Parallel execution capacity that scales horizontally across multiple compute nodes.
  2. Separate queues to isolate execution and control external resource usage at the op level.
  3. Priority-based execution at the op level.

The dagster-celery executor compiles a job and its associated configuration into a concrete execution plan, and then submits each execution step to the broker as a separate Celery task. The dagster-celery workers then pick up tasks from the queues to which they are subscribed, according to the priorities assigned to each task, and execute the steps to which the tasks correspond.

Quick start#

Let's construct a very parallel toy job that uses the Celery executor.

from dagster_celery import celery_executor

from dagster import job, op

def not_much():

def parallel_job():
    for i in range(50):
        not_much.alias("not_much_" + str(i))()

Save this job as celery_job.py.

To run the Celery executor, you'll need a running broker like RabbitMQ. With Docker, this is as simple as:

docker run -p 5672:5672 rabbitmq:3.8.2

You will also need to run a Celery worker to execute tasks. From the same directory in which you saved the job file (we'll explain why in a moment), run:

dagster-celery worker start -A dagster_celery.app

Now you can execute this job with Celery. Again, from the same directory in which you saved the job file, run:

dagster dev -f celery_job.py

Now you can execute the parallel job from the Dagster UI.

Ensuring workers are in sync#

In the quick start, we cheated in a couple of ways:

  • By running a single Celery worker on the same node as the Dagster webserver, so that local ephemeral run storage and event log storage could be shared between them, and so that filesystem io manager would be sufficient to exchange values between the worker's task executions.
  • By running the Celery worker in the same directory to which we saved celery_job.py, so that our Dagster code would be available both to the webserver and to the worker: i.e., the worker and the webserver can both find the job definition in the same file (-f celery_job.py).

In production, more configuration is required.

1. Persistent run and event log storage#

First, ensure that appropriate persistent run and event log storage, e.g., PostgresRunStorage and PostgresEventLogStorage, are configured on your Dagster instance, so that the webserver and the workers can communicate information about the run and events. You can do this by adding a block such as the following to your dagster.yaml (by default, Dagster will look for this file at $DAGSTER_HOME/dagster.yaml when the DAGSTER_HOME environment variable is set):

      username: my_username
      password: my_password
      hostname: my_hostname
      db_name: my_database
      port: 5432

The same instance config must be present in the webserver's environment and in the workers' environments. If you haven't already, please read about the Dagster instance.

2. Persistent IO Manager#

Second, because data is passed between ops that are potentially running in different worker processes, which may be on different nodes, you must use storage that is accessible from all nodes running Celery workers for the job runs you execute with the Celery executor -- for example, an S3 or GCS bucket, or an NFS mount. An appropriate IO manager -- such as s3_pickle_io_manager , adls2_pickle_io_manager , or gcs_pickle_io_manager must be included in the job's resources.

3. Executor & worker config#

Third, if you are using custom config for your runs -- for instance, using a different Celery broker url or backend -- you must ensure that your workers start up with this config. Make sure your engine config is present in a YAML file accessible to the workers and start them with the -y parameter as follows:

dagster-celery worker start -y /path/to/celery_config.yaml

4. Dagster code#

Finally, you will need to make sure that the Dagster code you want the workers to execute is present in their environment, and that it is in sync with the code present on the node running the webserver. The easiest way to do this is typically to package your code into a Python module, and for the workspace.yaml you use to run the webserver to load from that module.

In the quick start, we accomplished this by starting the webserver with the -f and -n parameters, which told it which file to find our job in and under what name, and then made sure that we started our Celery worker from the same point in the filesystem -- so that the job was available at the same place.


In the quick start above, we started our workers using the dagster-celery CLI, rather than by invoking Celery directly. This CLI is intended as a convenience wrapper for the common case that shields users from the full complexity of Celery configuration. Of course, it's still possible to start Celery workers directly -- but please let us know if you find yourself going down this path.

Run dagster-celery worker start to start new workers, dagster-celery worker list to view running workers, and dagster-celery worker terminate to terminate workers. For all of these commands, it's essential that you have your broker running.

If you are running your celery cluster with custom config (e.g., the broker URL or backend), you should also pass a path to a YAML file containing that config to the -y parameter of all these invocations. Otherwise, workers will not start up with the config you are expecting, and the CLI may not be able to find running workers to list or terminate them.

Although this system is deliberately designed to make the full range of Celery config available as needed, keep in mind that Celery exposes many knobs, many combinations of which are not compatible with each other. However, workloads also differ widely. If you are running many jobs with very long-running or very short-running tasks, for instance, and you are already comfortable tuning Celery, you might find that changing some of the settings works better for your case.

Monitoring and debugging#

There are several available tools for monitoring and debugging your queues and workers. First, of course, is the Dagster UI, which will display event logs and the stdout/stderr from your runs. You can also observe the logs generated by your broker and by the worker processes.

To debug broker/queue level issues, you should use the monitoring tools provided by the broker you're running. RabbitMQ includes a monitoring API and has first class support for Prometheus & Grafana integration in production.

Celery also includes the excellent Flower tool for monitoring your Celery workers and queues. This can be very useful in understanding how workers interact with the queue.

Broker and backend#

At the moment, dagster-celery is tested using the RabbitMQ broker and the default RPC backend. At least one team is running dagster-celery using SQS as the broker, and we intend to expand the scope of broker/backend matrix testing, probably starting with the Postgres backend.