Celery executor

Celery is a longstanding open-source Python distributed task queue system, with support for a variety of queues (brokers) and result persistence strategies (backends).

The dagster-celery executor uses Celery to satisfy three typical requirements when running pipelines in production:

  1. Parallel execution capacity that scales horizontally across multiple compute nodes.
  2. Separate queues to isolate execution and control external resource usage at the solid level.
  3. Priority-based execution at the solid level.

The dagster-celery executor compiles a pipeline definition and its associated configuration into a concrete execution plan, and then submits each execution step to the broker as a separate Celery task. The dagster-celery workers then pick up tasks from the queues to which they are subscribed, according to the priorities assigned to each task, and execute the steps to which the tasks correspond.

Quick start

Let's construct a very parallel toy pipeline and then execute it using the Celery executor.

celery_pipeline.py
from dagster_celery import celery_executor

from dagster import ModeDefinition, default_executors, pipeline, solid

celery_mode_defs = [ModeDefinition(executor_defs=default_executors + [celery_executor])]


@solid
def not_much(_):
    return


@pipeline(mode_defs=celery_mode_defs)
def parallel_pipeline():
    for i in range(50):
        not_much.alias("not_much_" + str(i))()

Save this pipeline as celery_pipeline.py.

To run the Celery executor, you'll need a running broker like RabbitMQ. With Docker, this is as simple as:

docker run -p 5672:5672 rabbitmq:3.8.2

You will also need to run a Celery worker to execute tasks. From the same directory in which you saved the pipeline file (we'll explain why in a moment), run:

dagster-celery worker start -A dagster_celery.app

Now you can execute this pipeline with Celery. Again, from the same directory in which you saved the pipeline file, run:

dagit -f celery_pipeline.py

Now you can execute the parallel pipeline from Dagit with the following config:

celery.yaml
storage:
  filesystem:
execution:
  celery:

Ensuring workers are in sync

In the quick start, we cheated in a couple of ways:

  • By running a single Celery worker on the same node as Dagit, so that local ephemeral run storage and event log storage could be shared between them, and so that filesystem intermediates storage would be sufficient to exchange values between the worker's task executions.
  • By running the Celery worker in the same directory to which we saved celery_pipeline.py, so that our Dagster code would be available both to Dagit and to the worker: i.e., the worker and Dagit can both find the pipeline definition in the same file (-f celery_pipeline.py).

In production, more configuration is required.

1. Persistent run and event log storage

First, ensure that appropriate persistent run and event log storage, e.g., PostgresRunStorage and PostgresEventLogStorage, are configured on your Dagster instance, so that Dagit and the workers can communicate information about the run and events. You can do this by adding a block such as the following to your dagster.yaml (by default, Dagster will look for this file at $DAGSTER_HOME/dagster.yaml when the DAGSTER_HOME environment variable is set):

dagster-pg.yaml
run_storage:
  module: dagster_postgres.run_storage
  class: PostgresRunStorage
  config:
    postgres_db:
      username: { username }
      password: { password }
      hostname: { hostname }
      db_name: { database }
      port: { port }

event_log_storage:
  module: dagster_postgres.event_log
  class: PostgresEventLogStorage
  config:
    postgres_db:
      username: { username }
      password: { password }
      hostname: { hostname }
      db_name: { db_name }
      port: { port }

schedule_storage:
  module: dagster_postgres.schedule_storage
  class: PostgresScheduleStorage
  config:
    postgres_db:
      username: { username }
      password: { password }
      hostname: { hostname }
      db_name: { db_name }
      port: { port }

The same instance config must be present in Dagit's environment and in the workers' environments. If you haven't already, please read about the Dagster instance.

2. Persistent system storage

Second, because data is passed between solids potentially running in different worker processes, which may be on different nodes, you must use storage that is accessible from all nodes running Celery workers for the pipeline runs you execute with the Celery executor -- for example, an S3 or GCS bucket, or an NFS mount. An appropriate system storage -- such as s3_system_storage or gcs_system_storage -- must be available on a ModeDefinition attached to the pipeline.

3. Executor & worker config

Third, if you are using custom config for your pipeline runs -- for instance, using a different Celery broker url or backend -- you must ensure that your workers start up with this config. Make sure your engine config is present in a YAML file accessible to the workers and start them with the -y parameter as follows:

dagster-celery worker start -y /path/to/celery_config.yaml

4. Dagster code

Finally, you will need to make sure that the Dagster code you want the workers to execute is present in their environment, and that it is in sync with the code present on the node running Dagit. The easiest way to do this is typically to package your code into a Python module, and for the workspace.yaml you use to run Dagit to load from that module.

In the quick start, we accomplished this by starting Dagit with the -f and -n parameters, which told it which file to find our pipeline in and under what name, and then made sure that we started our Celery worker from the same point in the filesystem -- so that the pipeline was available at the same place.

CLI

In the quick start above, we started our workers using the dagster-celery CLI, rather than by invoking Celery directly. This CLI is intended as a convenience wrapper for the common case that shields users from the full complexity of Celery configuration. Of course, it's still possible to start Celery workers directly -- but please let us know if you find yourself going down this path.

Run dagster-celery worker start to start new workers, dagster-celery worker list to view running workers, and dagster-celery worker terminate to terminate workers. For all of these commands, it's essential that you have your broker running.

If you are running your celery cluster with custom config (e.g., the broker URL or backend), you should also pass a path to a YAML file containing that config to the -y parameter of all these invocations. Otherwise, workers will not start up with the config you are expecting, and the CLI may not be able to find running workers to list or terminate them.

Although this system is deliberately designed to make the full range of Celery config available as needed, keep in mind that Celery exposes many knobs, many combinations of which are not compatible with each other. However, workloads also differ widely. If you are running many pipelines with very long-running or very short-running tasks, for instance, and you are already comfortable tuning Celery, you might find that changing some of the settings works better for your case.

Monitoring and debugging

There are several available tools for monitoring and debugging your queues and workers. First, of course, is Dagit, which will display event logs and the stdout/stderr from your pipeline executions. You can also observe the logs generated by your broker and by the worker processes.

To debug broker/queue level issues, you should use the monitoring tools provided by the broker you're running. RabbitMQ includes a monitoring API and has first class support for Prometheus & Grafana integration in production.

Celery also includes the excellent Flower tool for monitoring your Celery workers and queues. This can be very useful in understanding how workers interact with the queue.

Broker and backend

At the moment, dagster-celery is tested using the RabbitMQ broker and the default RPC backend. At least one team is running dagster-celery using SQS as the broker, and we intend to expand the scope of broker/backend matrix testing, probably starting with the Postgres backend.