In addition to using the k8s_job_executor to run each op in its own Kubernetes job, Dagster also allows you to use Celery to limit the number of ops that can concurrently connect to a resource across all running Dagster jobs.
The Helm chart can be configured to use this architecture by configuring the runLauncher.type field in your values.yaml file to be CeleryK8sRunLauncher instead of the default K8sRunLauncher. The resulting architecture is similar to the architecture described in the Helm deployment guide, with the following changes:
Users can configure multiple Celery queues (for example, one celery queue for each resource the user would like to limit) and multiple Celery workers per queue via the runLauncher.config.celeryK8sRunLauncher.workerQueues section of values.yaml.
The Celery workers poll for new Celery tasks and execute each task in order of receipt or priority. The Celery task largely consists of launching an ephemeral Kubernetes step worker to execute that step.
The run worker is still responsible for traversing the execution plan, but now uses the celery_k8s_job_executor to submit steps that are ready to be executed to the corresponding Celery queue (instead of executing the step itself).
All jobs being executed on an instance that uses the CeleryK8sRunLauncher must have the celery_k8s_job_executor set in the executor_def field.
The step worker is responsible for executing a single step, writing the structured events to the database. The Celery worker polls for the step worker completion.
We assume that you've followed the initial steps in the previous walkthrough by building your docker image for your user code, pushing it to a registry, adding the Dagster Helm chart repository, and configuring your Helm User Deployment values. Note that any job that you wish to run in an instance using the CeleryK8sRunLauncher must be using the celery_k8s_job_executor as its executor. The example user code repository includes an example job that uses the celery_k8s_job_executor, called pod_per_op_celery_job.
We need to configure persistent object storage so that data can be serialized and passed between steps. To run the Dagster User Code example, create a S3 bucket named "dagster-test".
To enable Dagster to connect to S3, provide AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables via the env, envConfigMaps, or envSecrets fields under dagster-user-deployments in values.yaml or (not recommended) by setting these variables directly in the User Code Deployment image.
Install the Helm chart and create a release, with the run launcher configured to use the CeleryK8sRunLauncher. The helm chart checks for this run launcher type and includes Celery infrastructure in your cluster. Below, we've named our release dagster. We use helm upgrade --install to create the release if it does not exist; otherwise, the existing dagster release will be modified:
Helm will launch several pods. You can check the status of the installation with kubectl. If everything worked correctly, you should see output like the following:
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
dagster-celery-workers-74886cfbfb-m9cbc 1/1 Running 1 3m42s
dagster-daemon-68c4b8d68d-vvpls 1/1 Running 1 3m42s
dagster-webserver-69974dd75b-5m8gg 1/1 Running 0 3m42s
dagster-k8s-example-user-code-1-88764b4f4-25mbd 1/1 Running 0 3m42s
dagster-postgresql-0 1/1 Running 0 3m42s
dagster-rabbitmq-0 1/1 Running 0 3m42s
Visit http://127.0.0.1:8080, and navigate to the launchpad. Notice how resources.io_manager.config.s3_bucket is set to dagster-test. You can replace this string with any other accessible S3 bucket. Then, click Launch Run.
You can introspect the jobs that were launched with kubectl:
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
dagster-step-9f5c92d1216f636e0d33877560818840 1/1 5s 12s
dagster-step-a1063317b9aac91f42ca9eacec551b6f 1/1 12s 34s
dagster-run-fb6822e5-bf43-476f-9e6c-6f9896cf3fb8 1/1 37s 37s
dagster-step- entries correspond to step workers and dagster-run- entries correspond to run workers.
Within the Dagster UI, you can watch the job as it executes.
Users can configure multiple Celery queues (for example, one queue for each resource to be limited) and multiple Celery workers per queue via the runLauncher.config.celeryK8sRunLauncher.workerQueues section of values.yaml.
To use the queues, dagster-celery/queue can be set on op tags.
By default, all ops will be sent to the default Celery queue named dagster.
Users can set dagster-celery/run_priority on job tags to configure the baseline priority of all ops from that job. To set priority at the op level, users can set dagster-celery/priority on the op tags. When priorities are set on both a job and an op, the sum of both priorities will be used.
In a real deployment, users will likely want to set up an external message broker like Redis rather than RabbitMQ, which can be done by configuring rabbitmq and redis sections of values.yaml.