CeleryK8sRunLauncher(instance_config_map, dagster_home, postgres_password_secret, load_incluster_config=True, kubeconfig_file=None, broker=None, backend=None, include=None, config_source=None, retries=None, inst_data=None, k8s_client_batch_api=None)¶
In contrast to the
K8sRunLauncher, which launches pipeline runs as single K8s
Jobs, this run launcher is intended for use in concert with
With this run launcher, execution is delegated to:
A run worker Kubernetes Job, which traverses the pipeline run execution plan and submits steps to Celery queues for execution;
The step executions which are submitted to Celery queues are picked up by Celery workers, and each step execution spawns a step execution Kubernetes Job. See the implementation defined in
You may configure a Dagster instance to use this RunLauncher by adding a section to your
dagster.yaml like the following:
run_launcher: module: dagster_k8s.launcher class: CeleryK8sRunLauncher config: instance_config_map: "dagster-k8s-instance-config-map" dagster_home: "/some/path" postgres_password_secret: "dagster-k8s-pg-password" broker: "some_celery_broker_url" backend: "some_celery_backend_url"
As always when using a
ConfigurableClass, the values
config key of this YAML block will be passed to the constructor. The full list
of acceptable values is given below by the constructor args.
instance_config_map (str) – The
name of an existing Volume to mount into the pod in
order to provide a ConfigMap for the Dagster instance. This Volume should contain a
dagster.yaml with appropriate values for run storage, event log storage, etc.
dagster_home (str) – The location of DAGSTER_HOME in the Job container; this is where the
dagster.yaml file will be mounted from the instance ConfigMap specified above.
postgres_password_secret (str) – The name of the Kubernetes Secret where the postgres password can be retrieved. Will be mounted and supplied as an environment variable to the Job Pod.
load_incluster_config (Optional[bool]) – Set this value if you are running the launcher
within a k8s cluster. If
True, we assume the launcher is running within the target
cluster and load config using
we will use the k8s config specified in
kubernetes.config.load_kube_config) or fall back to the default kubeconfig. Default:
kubeconfig_file (Optional[str]) – The kubeconfig file from which to load config. Defaults to None (using the default kubeconfig).
broker (Optional[str]) – The URL of the Celery broker.
backend (Optional[str]) – The URL of the Celery backend.
include (List[str]) – List of includes for the Celery workers
config_source – (Optional[dict]): Additional settings for the Celery app.
retries – (Optional[dict]): Default retry configuration for Celery tasks.
Celery-based executor which launches tasks as Kubernetes Jobs.
The Celery executor exposes config settings for the underlying Celery app under
config_source key. This config corresponds to the “new lowercase settings” introduced
in Celery version 4.0 and the object constructed from config will be passed to the
celery.Celery constructor as its
(See https://docs.celeryproject.org/en/latest/userguide/configuration.html for details.)
The executor also exposes the
broker, backend, and
include arguments to the
In the most common case, you may want to modify the
backend (e.g., to use
Redis instead of RabbitMQ). We expect that
config_source will be less frequently
modified, but that when solid executions are especially fast or slow, or when there are
different requirements around idempotence or retry, it may make sense to execute pipelines
with variations on these settings.
from dagster import ModeDefinition, default_executors, pipeline from dagster_celery_k8s.executor import celery_k8s_job_executor @pipeline(mode_defs=[ModeDefinition(executor_defs=default_executors + [celery_k8s_job_executor])]) def celery_enabled_pipeline(): pass
Then you can configure the executor as follows:
execution: celery-k8s: config: job_image: 'my_repo.com/image_name:latest' job_namespace: 'some-namespace' broker: 'pyamqp://guest@localhost//' # Optional[str]: The URL of the Celery broker backend: 'rpc://' # Optional[str]: The URL of the Celery results backend include: ['my_module'] # Optional[List[str]]: Modules every worker should import config_source: # Dict[str, Any]: Any additional parameters to pass to the #... # Celery workers. This dict will be passed as the `config_source` #... # argument of celery.Celery().
Note that the YAML you provide here must align with the configuration with which the Celery workers on which you hope to run were started. If, for example, you point the executor at a different broker than the one your workers are listening to, the workers will never be able to pick up tasks for execution.
In deployments where the celery_k8s_job_executor is used all appropriate celery and dagster_celery commands must be invoked with the -A dagster_celery_k8s.app argument.