DagsterDocs

Execute on Celery #

Celery is an open-source Python distributed task queue system, with support for a variety of queues (brokers) and result persistence strategies (backends).

The dagster-celery executor uses Celery to satisfy three common requirements when running jobs in production:

  1. Parallel execution capacity that scales horizontally across multiple compute nodes.
  2. Separate queues to isolate execution and control external resource usage at the op level.
  3. Priority-based execution at the op level.

The dagster-celery executor compiles a job and its associated configuration into a concrete execution plan, and then submits each execution step to the broker as a separate Celery task. The dagster-celery workers then pick up tasks from the queues to which they are subscribed, according to the priorities assigned to each task, and execute the steps to which the tasks correspond.

Quick start #

Let's construct a very parallel toy job that uses the Celery executor.

from dagster import job, op
from dagster_celery import celery_executor


@op
def not_much():
    return


@job(executor_def=celery_executor)
def parallel_job():
    for i in range(50):
        not_much.alias("not_much_" + str(i))()

Save this job as celery_job.py.

To run the Celery executor, you'll need a running broker like RabbitMQ. With Docker, this is as simple as:

docker run -p 5672:5672 rabbitmq:3.8.2

You will also need to run a Celery worker to execute tasks. From the same directory in which you saved the job file (we'll explain why in a moment), run:

dagster-celery worker start -A dagster_celery.app

Now you can execute this job with Celery. Again, from the same directory in which you saved the job file, run:

dagit -f celery_job.py

Now you can execute the parallel job from Dagit.

Ensuring workers are in sync #

In the quick start, we cheated in a couple of ways:

  • By running a single Celery worker on the same node as Dagit, so that local ephemeral run storage and event log storage could be shared between them, and so that filesystem io manager would be sufficient to exchange values between the worker's task executions.
  • By running the Celery worker in the same directory to which we saved celery_job.py, so that our Dagster code would be available both to Dagit and to the worker: i.e., the worker and Dagit can both find the job definition in the same file (-f celery_job.py).

In production, more configuration is required.

1. Persistent run and event log storage #

First, ensure that appropriate persistent run and event log storage, e.g., PostgresRunStorage and PostgresEventLogStorage, are configured on your Dagster instance, so that Dagit and the workers can communicate information about the run and events. You can do this by adding a block such as the following to your dagster.yaml (by default, Dagster will look for this file at $DAGSTER_HOME/dagster.yaml when the DAGSTER_HOME environment variable is set):

run_storage:
  module: dagster_postgres.run_storage
  class: PostgresRunStorage
  config:
    postgres_db:
      username: { username }
      password: { password }
      hostname: { hostname }
      db_name: { database }
      port: { port }

event_log_storage:
  module: dagster_postgres.event_log
  class: PostgresEventLogStorage
  config:
    postgres_db:
      username: { username }
      password: { password }
      hostname: { hostname }
      db_name: { db_name }
      port: { port }

schedule_storage:
  module: dagster_postgres.schedule_storage
  class: PostgresScheduleStorage
  config:
    postgres_db:
      username: { username }
      password: { password }
      hostname: { hostname }
      db_name: { db_name }
      port: { port }

The same instance config must be present in Dagit's environment and in the workers' environments. If you haven't already, please read about the Dagster instance.

2. Persistent IO Manager #

Second, because data is passed between ops that are potentially running in different worker processes, which may be on different nodes, you must use storage that is accessible from all nodes running Celery workers for the job runs you execute with the Celery executor -- for example, an S3 or GCS bucket, or an NFS mount. An appropriate IO manager -- such as s3_pickle_io_manager , adls2_pickle_io_manager , or gcs_pickle_io_manager must be included in the job's resources.

3. Executor & worker config #

Third, if you are using custom config for your runs -- for instance, using a different Celery broker url or backend -- you must ensure that your workers start up with this config. Make sure your engine config is present in a YAML file accessible to the workers and start them with the -y parameter as follows:

dagster-celery worker start -y /path/to/celery_config.yaml

4. Dagster code #

Finally, you will need to make sure that the Dagster code you want the workers to execute is present in their environment, and that it is in sync with the code present on the node running Dagit. The easiest way to do this is typically to package your code into a Python module, and for the workspace.yaml you use to run Dagit to load from that module.

In the quick start, we accomplished this by starting Dagit with the -f and -n parameters, which told it which file to find our job in and under what name, and then made sure that we started our Celery worker from the same point in the filesystem -- so that the job was available at the same place.

CLI #

In the quick start above, we started our workers using the dagster-celery CLI, rather than by invoking Celery directly. This CLI is intended as a convenience wrapper for the common case that shields users from the full complexity of Celery configuration. Of course, it's still possible to start Celery workers directly -- but please let us know if you find yourself going down this path.

Run dagster-celery worker start to start new workers, dagster-celery worker list to view running workers, and dagster-celery worker terminate to terminate workers. For all of these commands, it's essential that you have your broker running.

If you are running your celery cluster with custom config (e.g., the broker URL or backend), you should also pass a path to a YAML file containing that config to the -y parameter of all these invocations. Otherwise, workers will not start up with the config you are expecting, and the CLI may not be able to find running workers to list or terminate them.

Although this system is deliberately designed to make the full range of Celery config available as needed, keep in mind that Celery exposes many knobs, many combinations of which are not compatible with each other. However, workloads also differ widely. If you are running many jobs with very long-running or very short-running tasks, for instance, and you are already comfortable tuning Celery, you might find that changing some of the settings works better for your case.

Monitoring and debugging #

There are several available tools for monitoring and debugging your queues and workers. First, of course, is Dagit, which will display event logs and the stdout/stderr from your runs. You can also observe the logs generated by your broker and by the worker processes.

To debug broker/queue level issues, you should use the monitoring tools provided by the broker you're running. RabbitMQ includes a monitoring API and has first class support for Prometheus & Grafana integration in production.

Celery also includes the excellent Flower tool for monitoring your Celery workers and queues. This can be very useful in understanding how workers interact with the queue.

Broker and backend #

At the moment, dagster-celery is tested using the RabbitMQ broker and the default RPC backend. At least one team is running dagster-celery using SQS as the broker, and we intend to expand the scope of broker/backend matrix testing, probably starting with the Postgres backend.