Ask AI

Source code for dagster_celery.executor

from dagster import (
    Executor,
    Field,
    Noneable,
    Permissive,
    StringSource,
    _check as check,
    executor,
    multiple_process_executor_requirements,
)
from dagster._core.execution.retries import RetryMode, get_retries_config
from dagster._grpc.types import ExecuteStepArgs
from dagster._serdes import pack_value

from .config import DEFAULT_CONFIG, dict_wrapper
from .defaults import broker_url, result_backend

CELERY_CONFIG = {
    "broker": Field(
        Noneable(StringSource),
        is_required=False,
        description=(
            "The URL of the Celery broker. Default: "
            "'pyamqp://guest@{os.getenv('DAGSTER_CELERY_BROKER_HOST',"
            "'localhost')}//'."
        ),
    ),
    "backend": Field(
        Noneable(StringSource),
        is_required=False,
        default_value="rpc://",
        description="The URL of the Celery results backend. Default: 'rpc://'.",
    ),
    "include": Field(
        [str], is_required=False, description="List of modules every worker should import"
    ),
    "config_source": Field(
        Noneable(Permissive()),
        is_required=False,
        description="Additional settings for the Celery app.",
    ),
    "retries": get_retries_config(),
}


[docs]@executor( name="celery", config_schema=CELERY_CONFIG, requirements=multiple_process_executor_requirements(), ) def celery_executor(init_context): """Celery-based executor. The Celery executor exposes config settings for the underlying Celery app under the ``config_source`` key. This config corresponds to the "new lowercase settings" introduced in Celery version 4.0 and the object constructed from config will be passed to the :py:class:`celery.Celery` constructor as its ``config_source`` argument. (See https://docs.celeryq.dev/en/stable/userguide/configuration.html for details.) The executor also exposes the ``broker``, `backend`, and ``include`` arguments to the :py:class:`celery.Celery` constructor. In the most common case, you may want to modify the ``broker`` and ``backend`` (e.g., to use Redis instead of RabbitMQ). We expect that ``config_source`` will be less frequently modified, but that when solid executions are especially fast or slow, or when there are different requirements around idempotence or retry, it may make sense to execute jobs with variations on these settings. To use the `celery_executor`, set it as the `executor_def` when defining a job: .. code-block:: python from dagster import job from dagster_celery import celery_executor @job(executor_def=celery_executor) def celery_enabled_job(): pass Then you can configure the executor as follows: .. code-block:: YAML execution: config: broker: 'pyamqp://guest@localhost//' # Optional[str]: The URL of the Celery broker backend: 'rpc://' # Optional[str]: The URL of the Celery results backend include: ['my_module'] # Optional[List[str]]: Modules every worker should import config_source: # Dict[str, Any]: Any additional parameters to pass to the #... # Celery workers. This dict will be passed as the `config_source` #... # argument of celery.Celery(). Note that the YAML you provide here must align with the configuration with which the Celery workers on which you hope to run were started. If, for example, you point the executor at a different broker than the one your workers are listening to, the workers will never be able to pick up tasks for execution. """ return CeleryExecutor( broker=init_context.executor_config.get("broker"), backend=init_context.executor_config.get("backend"), config_source=init_context.executor_config.get("config_source"), include=init_context.executor_config.get("include"), retries=RetryMode.from_config(init_context.executor_config["retries"]), )
def _submit_task(app, plan_context, step, queue, priority, known_state): from .tasks import create_task execute_step_args = ExecuteStepArgs( job_origin=plan_context.reconstructable_job.get_python_origin(), run_id=plan_context.dagster_run.run_id, step_keys_to_execute=[step.key], instance_ref=plan_context.instance.get_ref(), retry_mode=plan_context.executor.retries.for_inner_plan(), known_state=known_state, print_serialized_events=True, # Not actually checked by the celery task ) task = create_task(app) task_signature = task.si( execute_step_args_packed=pack_value(execute_step_args), executable_dict=plan_context.reconstructable_job.to_dict(), ) return task_signature.apply_async( priority=priority, queue=queue, routing_key=f"{queue}.execute_plan", ) class CeleryExecutor(Executor): def __init__( self, retries, broker=None, backend=None, include=None, config_source=None, ): self.broker = check.opt_str_param(broker, "broker", default=broker_url) self.backend = check.opt_str_param(backend, "backend", default=result_backend) self.include = check.opt_list_param(include, "include", of_type=str) self.config_source = dict_wrapper( dict(DEFAULT_CONFIG, **check.opt_dict_param(config_source, "config_source")) ) self._retries = check.inst_param(retries, "retries", RetryMode) @property def retries(self): return self._retries def execute(self, plan_context, execution_plan): from .core_execution_loop import core_celery_execution_loop return core_celery_execution_loop( plan_context, execution_plan, step_execution_fn=_submit_task ) @staticmethod def for_cli(broker=None, backend=None, include=None, config_source=None): return CeleryExecutor( retries=RetryMode(RetryMode.DISABLED), broker=broker, backend=backend, include=include, config_source=config_source, ) def app_args(self): return { "broker": self.broker, "backend": self.backend, "include": self.include, "config_source": self.config_source, "retries": self.retries, }