Ask AI

Source code for dagster._core.definitions.unresolved_asset_job_definition

import warnings
from datetime import datetime
from itertools import groupby
from typing import TYPE_CHECKING, AbstractSet, Any, Mapping, NamedTuple, Optional, Sequence, Union

import dagster._check as check
from dagster._annotations import deprecated, deprecated_param
from dagster._core.definitions import AssetKey
from dagster._core.definitions.asset_job import build_asset_job, get_asset_graph_for_job
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.backfill_policy import resolve_backfill_policy
from dagster._core.definitions.config import ConfigMapping
from dagster._core.definitions.executor_definition import ExecutorDefinition
from dagster._core.definitions.hook_definition import HookDefinition
from dagster._core.definitions.metadata import RawMetadataValue
from dagster._core.definitions.partition import PartitionedConfig, PartitionsDefinition
from dagster._core.definitions.policy import RetryPolicy
from dagster._core.definitions.resource_definition import ResourceDefinition
from dagster._core.definitions.run_request import RunRequest
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.instance import DynamicPartitionsStore
from dagster._utils.tags import normalize_tags

if TYPE_CHECKING:
    from dagster._core.definitions import JobDefinition
    from dagster._core.definitions.asset_graph import AssetGraph
    from dagster._core.definitions.asset_selection import CoercibleToAssetSelection
    from dagster._core.definitions.run_config import RunConfig


class UnresolvedAssetJobDefinition(
    NamedTuple(
        "_UnresolvedAssetJobDefinition",
        [
            ("name", str),
            ("selection", AssetSelection),
            (
                "config",
                Optional[Union[ConfigMapping, Mapping[str, Any], "PartitionedConfig"]],
            ),
            ("description", Optional[str]),
            ("tags", Optional[Mapping[str, str]]),
            ("run_tags", Optional[Mapping[str, str]]),
            ("metadata", Optional[Mapping[str, RawMetadataValue]]),
            ("partitions_def", Optional[PartitionsDefinition]),
            ("executor_def", Optional[ExecutorDefinition]),
            ("hooks", Optional[AbstractSet[HookDefinition]]),
            ("op_retry_policy", Optional["RetryPolicy"]),
        ],
    )
):
    def __new__(
        cls,
        name: str,
        selection: AssetSelection,
        config: Optional[
            Union[ConfigMapping, Mapping[str, Any], "PartitionedConfig", "RunConfig"]
        ] = None,
        description: Optional[str] = None,
        tags: Optional[Mapping[str, str]] = None,
        run_tags: Optional[Mapping[str, str]] = None,
        metadata: Optional[Mapping[str, RawMetadataValue]] = None,
        partitions_def: Optional[PartitionsDefinition] = None,
        executor_def: Optional[ExecutorDefinition] = None,
        hooks: Optional[AbstractSet[HookDefinition]] = None,
        op_retry_policy: Optional["RetryPolicy"] = None,
    ):
        from dagster._core.definitions import ExecutorDefinition
        from dagster._core.definitions.run_config import convert_config_input

        tags = check.opt_mapping_param(tags, "tags")
        # If `run_tags` is set, then we use it, otherwise `tags` acts as both definition tags and
        # run tags. This is for backcompat with old behavior prior to the introduction of
        # `run_tags`.
        run_tags = check.mapping_param(run_tags, "run_tags") if run_tags else tags
        return super(UnresolvedAssetJobDefinition, cls).__new__(
            cls,
            name=check.str_param(name, "name"),
            selection=check.inst_param(selection, "selection", AssetSelection),
            config=convert_config_input(config),
            description=check.opt_str_param(description, "description"),
            tags=tags,
            run_tags=run_tags,
            metadata=check.opt_mapping_param(metadata, "metadata"),
            partitions_def=check.opt_inst_param(
                partitions_def, "partitions_def", PartitionsDefinition
            ),
            executor_def=check.opt_inst_param(executor_def, "executor_def", ExecutorDefinition),
            hooks=check.opt_nullable_set_param(hooks, "hooks", of_type=HookDefinition),
            op_retry_policy=check.opt_inst_param(op_retry_policy, "op_retry_policy", RetryPolicy),
        )

    @deprecated(
        breaking_version="2.0.0",
        additional_warn_text="Directly instantiate `RunRequest(partition_key=...)` instead.",
    )
    def run_request_for_partition(
        self,
        partition_key: str,
        run_key: Optional[str] = None,
        tags: Optional[Mapping[str, str]] = None,
        asset_selection: Optional[Sequence[AssetKey]] = None,
        run_config: Optional[Mapping[str, Any]] = None,
        current_time: Optional[datetime] = None,
        dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
    ) -> RunRequest:
        """Creates a RunRequest object for a run that processes the given partition.

        Args:
            partition_key: The key of the partition to request a run for.
            run_key (Optional[str]): A string key to identify this launched run. For sensors, ensures that
                only one run is created per run key across all sensor evaluations.  For schedules,
                ensures that one run is created per tick, across failure recoveries. Passing in a `None`
                value means that a run will always be launched per evaluation.
            tags (Optional[Dict[str, str]]): A dictionary of tags (string key-value pairs) to attach
                to the launched run.
            run_config (Optional[Mapping[str, Any]]: Configuration for the run. If the job has
                a :py:class:`PartitionedConfig`, this value will override replace the config
                provided by it.
            current_time (Optional[datetime]): Used to determine which time-partitions exist.
                Defaults to now.
            dynamic_partitions_store (Optional[DynamicPartitionsStore]): The DynamicPartitionsStore
                object that is responsible for fetching dynamic partitions. Required when the
                partitions definition is a DynamicPartitionsDefinition with a name defined. Users
                can pass the DagsterInstance fetched via `context.instance` to this argument.

        Returns:
            RunRequest: an object that requests a run to process the given partition.
        """
        from dagster._core.definitions.partition import (
            DynamicPartitionsDefinition,
            PartitionedConfig,
        )

        if not self.partitions_def:
            check.failed("Called run_request_for_partition on a non-partitioned job")

        partitioned_config = PartitionedConfig.from_flexible_config(
            self.config, self.partitions_def
        )

        if (
            isinstance(self.partitions_def, DynamicPartitionsDefinition)
            and self.partitions_def.name
        ):
            # Do not support using run_request_for_partition with dynamic partitions,
            # since this requires querying the instance once per run request for the
            # existent dynamic partitions
            check.failed(
                "run_request_for_partition is not supported for dynamic partitions. Instead, use"
                " RunRequest(partition_key=...)"
            )

        self.partitions_def.validate_partition_key(
            partition_key,
            current_time=current_time,
            dynamic_partitions_store=dynamic_partitions_store,
        )

        run_config = (
            run_config
            if run_config is not None
            else partitioned_config.get_run_config_for_partition_key(partition_key)
        )
        run_request_tags = {
            **(tags or {}),
            **partitioned_config.get_tags_for_partition_key(partition_key),
        }

        return RunRequest(
            job_name=self.name,
            run_key=run_key,
            run_config=run_config,
            tags=run_request_tags,
            asset_selection=asset_selection,
            partition_key=partition_key,
        )

    def resolve(
        self,
        asset_graph: "AssetGraph",
        default_executor_def: Optional[ExecutorDefinition] = None,
        resource_defs: Optional[Mapping[str, ResourceDefinition]] = None,
    ) -> "JobDefinition":
        """Resolve this UnresolvedAssetJobDefinition into a JobDefinition."""
        try:
            job_asset_graph = get_asset_graph_for_job(asset_graph, self.selection)
        except DagsterInvalidDefinitionError as e:
            raise DagsterInvalidDefinitionError(
                f'Error resolving selection for asset job "{self.name}": {e}'
            ) from e

        # Require that all assets in the job have the same backfill policy
        executable_nodes = {job_asset_graph.get(k) for k in job_asset_graph.executable_asset_keys}
        nodes_by_backfill_policy = dict(
            groupby((n for n in executable_nodes if n.is_partitioned), lambda n: n.backfill_policy)
        )
        backfill_policy = resolve_backfill_policy(nodes_by_backfill_policy.keys())

        if len(nodes_by_backfill_policy) > 1:
            keys_by_backfill_policy = {
                bp: [n.key.to_user_string() for n in nodes]
                for bp, nodes in nodes_by_backfill_policy.items()
            }
            warnings.warn(
                f"Asset job {self.name} materializes assets with varying BackfillPolicies."
                f" Using backfill policy with minimum `max_partitions_per_run`: {backfill_policy}."
                f"\n\nFound multiple backfill policies:\n\n{keys_by_backfill_policy}"
            )

        # Error if a PartitionedConfig is defined and any target asset has a backfill policy that
        # materializes anything other than a single partition per run. This is because
        # PartitionedConfig is a function that maps single partition keys to run config, so it's
        # behavior is undefined for multiple-partition runs.
        if backfill_policy.max_partitions_per_run != 1 and isinstance(
            self.config, PartitionedConfig
        ):
            raise DagsterInvalidDefinitionError(
                f"Asset job {self.name} materializes an asset with a BackfillPolicy targeting multiple partitions per run,"
                "but a PartitionedConfig was provided. PartitionedConfigs are not supported for "
                "jobs with multi-partition-per-run backfill policies."
            )

        return build_asset_job(
            self.name,
            asset_graph=job_asset_graph,
            config=self.config,
            description=self.description,
            tags=self.tags,
            run_tags=self.run_tags,
            metadata=self.metadata,
            partitions_def=self.partitions_def,
            executor_def=self.executor_def or default_executor_def,
            hooks=self.hooks,
            op_retry_policy=self.op_retry_policy,
            resource_defs=resource_defs,
            allow_different_partitions_defs=False,
        )


[docs] @deprecated_param( param="partitions_def", breaking_version="2.0.0", additional_warn_text="Partitioning is inferred from the selected assets, so setting this is redundant.", ) def define_asset_job( name: str, selection: Optional["CoercibleToAssetSelection"] = None, config: Optional[ Union[ConfigMapping, Mapping[str, Any], "PartitionedConfig", "RunConfig"] ] = None, description: Optional[str] = None, tags: Optional[Mapping[str, object]] = None, run_tags: Optional[Mapping[str, object]] = None, metadata: Optional[Mapping[str, RawMetadataValue]] = None, partitions_def: Optional[PartitionsDefinition] = None, executor_def: Optional[ExecutorDefinition] = None, hooks: Optional[AbstractSet[HookDefinition]] = None, op_retry_policy: Optional["RetryPolicy"] = None, ) -> UnresolvedAssetJobDefinition: """Creates a definition of a job which will either materialize a selection of assets or observe a selection of source assets. This will only be resolved to a JobDefinition once placed in a code location. Args: name (str): The name for the job. selection (Union[str, Sequence[str], Sequence[AssetKey], Sequence[Union[AssetsDefinition, SourceAsset]], AssetSelection]): The assets that will be materialized or observed when the job is run. The selected assets must all be included in the assets that are passed to the assets argument of the Definitions object that this job is included on. The string "my_asset*" selects my_asset and all downstream assets within the code location. A list of strings represents the union of all assets selected by strings within the list. The selection will be resolved to a set of assets when the location is loaded. If the selection resolves to all source assets, the created job will perform source asset observations. If the selection resolves to all regular assets, the created job will materialize assets. If the selection resolves to a mixed set of source assets and regular assets, an error will be thrown. config: Describes how the Job is parameterized at runtime. If no value is provided, then the schema for the job's run config is a standard format based on its ops and resources. If a dictionary is provided, then it must conform to the standard config schema, and it will be used as the job's run config for the job whenever the job is executed. The values provided will be viewable and editable in the Dagster UI, so be careful with secrets. If a :py:class:`ConfigMapping` object is provided, then the schema for the job's run config is determined by the config mapping, and the ConfigMapping, which should return configuration in the standard format to configure the job. tags (Optional[Mapping[str, object]]): A set of key-value tags that annotate the job and can be used for searching and filtering in the UI. Values that are not already strings will be serialized as JSON. If `run_tags` is not set, then the content of `tags` will also be automatically appended to the tags of any runs of this job. run_tags (Optional[Mapping[str, object]]): A set of key-value tags that will be automatically attached to runs launched by this job. Values that are not already strings will be serialized as JSON. These tag values may be overwritten by tag values provided at invocation time. If `run_tags` is set, then `tags` are not automatically appended to the tags of any runs of this job. metadata (Optional[Mapping[str, RawMetadataValue]]): Arbitrary metadata about the job. Keys are displayed string labels, and values are one of the following: string, float, int, JSON-serializable dict, JSON-serializable list, and one of the data classes returned by a MetadataValue static method. description (Optional[str]): A description for the Job. executor_def (Optional[ExecutorDefinition]): How this Job will be executed. Defaults to :py:class:`multi_or_in_process_executor`, which can be switched between multi-process and in-process modes of execution. The default mode of execution is multi-process. op_retry_policy (Optional[RetryPolicy]): The default retry policy for all ops that compute assets in this job. Only used if retry policy is not defined on the asset definition. partitions_def (Optional[PartitionsDefinition]): (Deprecated) Defines the set of partitions for this job. Deprecated because partitioning is inferred from the selected assets, so setting this is redundant. Returns: UnresolvedAssetJobDefinition: The job, which can be placed inside a code location. Examples: .. code-block:: python # A job that targets all assets in the code location: @asset def asset1(): ... defs = Definitions( assets=[asset1], jobs=[define_asset_job("all_assets")], ) # A job that targets a single asset @asset def asset1(): ... defs = Definitions( assets=[asset1], jobs=[define_asset_job("all_assets", selection=[asset1])], ) # A job that targets all the assets in a group: defs = Definitions( assets=assets, jobs=[define_asset_job("marketing_job", selection=AssetSelection.groups("marketing"))], ) @observable_source_asset def source_asset(): ... # A job that observes a source asset: defs = Definitions( assets=assets, jobs=[define_asset_job("observation_job", selection=[source_asset])], ) # Resources are supplied to the assets, not the job: @asset(required_resource_keys={"slack_client"}) def asset1(): ... defs = Definitions( assets=[asset1], jobs=[define_asset_job("all_assets")], resources={"slack_client": prod_slack_client}, ) """ from dagster._core.definitions import AssetSelection # convert string-based selections to AssetSelection objects if selection is None: resolved_selection = AssetSelection.all() else: resolved_selection = AssetSelection.from_coercible(selection) return UnresolvedAssetJobDefinition( name=name, selection=resolved_selection, config=config, description=description, tags=normalize_tags(tags), # Need to preserve None value run_tags=normalize_tags(run_tags) if run_tags is not None else None, metadata=metadata, partitions_def=partitions_def, executor_def=executor_def, hooks=hooks, op_retry_policy=op_retry_policy, )