Ask AI

Source code for dagster._core.definitions.decorators.asset_decorator

from collections import Counter
from inspect import Parameter
from typing import (
    AbstractSet,
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Mapping,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
    cast,
    overload,
)

import dagster._check as check
from dagster._annotations import deprecated_param, experimental_param
from dagster._builtins import Nothing
from dagster._config import UserConfigSchema
from dagster._core.decorator_utils import get_function_params, get_valid_name_permutations
from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.config import ConfigMapping
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.metadata import ArbitraryMetadataMapping, RawMetadataMapping
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.definitions.resource_annotation import (
    get_resource_args,
)
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError
from dagster._core.types.dagster_type import DagsterType
from dagster._utils.warnings import (
    disable_dagster_warnings,
)

from ..asset_check_spec import AssetCheckSpec
from ..asset_in import AssetIn
from ..asset_out import AssetOut
from ..asset_spec import AssetSpec
from ..assets import ASSET_SUBSET_INPUT_PREFIX, AssetsDefinition
from ..backfill_policy import BackfillPolicy, BackfillPolicyType
from ..decorators.graph_decorator import graph
from ..decorators.op_decorator import _Op
from ..events import AssetKey, CoercibleToAssetKey, CoercibleToAssetKeyPrefix
from ..input import GraphIn, In
from ..output import GraphOut, Out
from ..partition import PartitionsDefinition
from ..policy import RetryPolicy
from ..resource_definition import ResourceDefinition
from ..utils import (
    DEFAULT_IO_MANAGER_KEY,
    DEFAULT_OUTPUT,
    NoValueSentinel,
    validate_definition_tags,
)


@overload
def asset(
    compute_fn: Callable[..., Any],
) -> AssetsDefinition: ...


@overload
def asset(
    *,
    name: Optional[str] = ...,
    key_prefix: Optional[CoercibleToAssetKeyPrefix] = None,
    ins: Optional[Mapping[str, AssetIn]] = ...,
    deps: Optional[Iterable[CoercibleToAssetDep]] = ...,
    metadata: Optional[Mapping[str, Any]] = ...,
    tags: Optional[Mapping[str, str]] = ...,
    description: Optional[str] = ...,
    config_schema: Optional[UserConfigSchema] = None,
    required_resource_keys: Optional[AbstractSet[str]] = ...,
    resource_defs: Optional[Mapping[str, object]] = ...,
    io_manager_def: Optional[object] = ...,
    io_manager_key: Optional[str] = ...,
    compute_kind: Optional[str] = ...,
    dagster_type: Optional[DagsterType] = ...,
    partitions_def: Optional[PartitionsDefinition] = ...,
    op_tags: Optional[Mapping[str, Any]] = ...,
    group_name: Optional[str] = ...,
    output_required: bool = ...,
    freshness_policy: Optional[FreshnessPolicy] = ...,
    auto_materialize_policy: Optional[AutoMaterializePolicy] = ...,
    backfill_policy: Optional[BackfillPolicy] = ...,
    retry_policy: Optional[RetryPolicy] = ...,
    code_version: Optional[str] = ...,
    key: Optional[CoercibleToAssetKey] = None,
    non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = ...,
    check_specs: Optional[Sequence[AssetCheckSpec]] = ...,
    owners: Optional[List[str]] = ...,
) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...


[docs]@experimental_param(param="resource_defs") @experimental_param(param="io_manager_def") @experimental_param(param="auto_materialize_policy") @experimental_param(param="backfill_policy") @experimental_param(param="owners") @experimental_param(param="tags") @deprecated_param( param="non_argument_deps", breaking_version="2.0.0", additional_warn_text="use `deps` instead." ) def asset( compute_fn: Optional[Callable[..., Any]] = None, *, name: Optional[str] = None, key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, ins: Optional[Mapping[str, AssetIn]] = None, deps: Optional[Iterable[CoercibleToAssetDep]] = None, metadata: Optional[ArbitraryMetadataMapping] = None, tags: Optional[Mapping[str, str]] = None, description: Optional[str] = None, config_schema: Optional[UserConfigSchema] = None, required_resource_keys: Optional[AbstractSet[str]] = None, resource_defs: Optional[Mapping[str, object]] = None, io_manager_def: Optional[object] = None, io_manager_key: Optional[str] = None, compute_kind: Optional[str] = None, dagster_type: Optional[DagsterType] = None, partitions_def: Optional[PartitionsDefinition] = None, op_tags: Optional[Mapping[str, Any]] = None, group_name: Optional[str] = None, output_required: bool = True, freshness_policy: Optional[FreshnessPolicy] = None, auto_materialize_policy: Optional[AutoMaterializePolicy] = None, backfill_policy: Optional[BackfillPolicy] = None, retry_policy: Optional[RetryPolicy] = None, code_version: Optional[str] = None, key: Optional[CoercibleToAssetKey] = None, non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, owners: Optional[List[str]] = None, ) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]: """Create a definition for how to compute an asset. A software-defined asset is the combination of: 1. An asset key, e.g. the name of a table. 2. A function, which can be run to compute the contents of the asset. 3. A set of upstream assets that are provided as inputs to the function when computing the asset. Unlike an op, whose dependencies are determined by the graph it lives inside, an asset knows about the upstream assets it depends on. The upstream assets are inferred from the arguments to the decorated function. The name of the argument designates the name of the upstream asset. An asset has an op inside it to represent the function that computes it. The name of the op will be the segments of the asset key, separated by double-underscores. Args: name (Optional[str]): The name of the asset. If not provided, defaults to the name of the decorated function. The asset's name must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords. key_prefix (Optional[Union[str, Sequence[str]]]): If provided, the asset's key is the concatenation of the key_prefix and the asset's name, which defaults to the name of the decorated function. Each item in key_prefix must be a valid name in dagster (ie only contains letters, numbers, and _) and may not contain python reserved keywords. ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to information about the input. deps (Optional[Sequence[Union[AssetDep, AssetsDefinition, SourceAsset, AssetKey, str]]]): The assets that are upstream dependencies, but do not correspond to a parameter of the decorated function. If the AssetsDefinition for a multi_asset is provided, dependencies on all assets created by the multi_asset will be created. config_schema (Optional[ConfigSchema): The configuration schema for the asset's underlying op. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the op. metadata (Optional[Dict[str, Any]]): A dict of metadata entries for the asset. tags (Optional[Mapping[str, str]]): Tags for filtering and organizing. These tags are not attached to runs of the asset. required_resource_keys (Optional[Set[str]]): Set of resource handles required by the op. io_manager_key (Optional[str]): The resource key of the IOManager used for storing the output of the op as an asset, and for loading it in downstream ops (default: "io_manager"). Only one of io_manager_key and io_manager_def can be provided. io_manager_def (Optional[object]): (Experimental) The IOManager used for storing the output of the op as an asset, and for loading it in downstream ops. Only one of io_manager_def and io_manager_key can be provided. compute_kind (Optional[str]): A string to represent the kind of computation that produces the asset, e.g. "dbt" or "spark". It will be displayed in the Dagster UI as a badge on the asset. dagster_type (Optional[DagsterType]): Allows specifying type validation functions that will be executed on the output of the decorated function after it runs. partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that compose the asset. op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the asset. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that `json.loads(json.dumps(value)) == value`. group_name (Optional[str]): A string name used to organize multiple assets into groups. If not provided, the name "default" is used. resource_defs (Optional[Mapping[str, object]]): (Experimental) A mapping of resource keys to resources. These resources will be initialized during execution, and can be accessed from the context within the body of the function. output_required (bool): Whether the decorated function will always materialize an asset. Defaults to True. If False, the function can return None, which will not be materialized to storage and will halt execution of downstream assets. freshness_policy (FreshnessPolicy): (Deprecated) A constraint telling Dagster how often this asset is intended to be updated with respect to its root data. auto_materialize_policy (AutoMaterializePolicy): (Experimental) Configure Dagster to automatically materialize this asset according to its FreshnessPolicy and when upstream dependencies change. backfill_policy (BackfillPolicy): (Experimental) Configure Dagster to backfill this asset according to its BackfillPolicy. retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset. code_version (Optional[str]): (Experimental) Version of the code that generates this asset. In general, versions should be set only for code that deterministically produces the same output when given the same inputs. check_specs (Optional[Sequence[AssetCheckSpec]]): (Experimental) Specs for asset checks that execute in the decorated function after materializing the asset. non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Deprecated, use deps instead. Set of asset keys that are upstream dependencies, but do not pass an input to the asset. key (Optional[CoeercibleToAssetKey]): The key for this asset. If provided, cannot specify key_prefix or name. owners (Optional[Sequence[str]]): A list of strings representing owners of the asset. Each string can be a user's email address, or a team name prefixed with `team:`, e.g. `team:finops`. Examples: .. code-block:: python @asset def my_asset(my_upstream_asset: int) -> int: return my_upstream_asset + 1 """ def create_asset(): upstream_asset_deps = _deps_and_non_argument_deps_to_asset_deps( deps=deps, non_argument_deps=non_argument_deps ) return _Asset( name=cast(Optional[str], name), # (mypy bug that it can't infer name is Optional[str]) key_prefix=key_prefix, ins=ins, deps=upstream_asset_deps, metadata=metadata, tags=validate_definition_tags(tags), description=description, config_schema=config_schema, required_resource_keys=required_resource_keys, resource_defs=resource_defs, io_manager_key=io_manager_key, io_manager_def=io_manager_def, compute_kind=check.opt_str_param(compute_kind, "compute_kind"), dagster_type=dagster_type, partitions_def=partitions_def, op_tags=op_tags, group_name=group_name, output_required=output_required, freshness_policy=freshness_policy, auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, retry_policy=retry_policy, code_version=code_version, check_specs=check_specs, key=key, owners=owners, ) if compute_fn is not None: return create_asset()(compute_fn) def inner(fn: Callable[..., Any]) -> AssetsDefinition: check.invariant( not (io_manager_key and io_manager_def), "Both io_manager_key and io_manager_def were provided to `@asset` decorator. Please" " provide one or the other. ", ) return create_asset()(fn) return inner
def resolve_asset_key_and_name_for_decorator( *, key: Optional[CoercibleToAssetKey], key_prefix: Optional[CoercibleToAssetKeyPrefix], name: Optional[str], decorator: str, fn: Callable[..., Any], ) -> Tuple[AssetKey, str]: if (name or key_prefix) and key: raise DagsterInvalidDefinitionError( f"Cannot specify a name or key prefix for {decorator} when the key" " argument is provided." ) key_prefix_list = [key_prefix] if isinstance(key_prefix, str) else key_prefix key = AssetKey.from_coercible(key) if key else None assigned_name = name or fn.__name__ return ( ( # the filter here appears unnecessary per typing, but this exists # historically so keeping it here to be conservative in case users # can get Nones into the key_prefix_list somehow AssetKey(list(filter(None, [*(key_prefix_list or []), assigned_name]))) if not key else key ), assigned_name, ) class _Asset: def __init__( self, name: Optional[str] = None, key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, ins: Optional[Mapping[str, AssetIn]] = None, deps: Optional[Iterable[AssetDep]] = None, metadata: Optional[ArbitraryMetadataMapping] = None, tags: Optional[Mapping[str, str]] = None, description: Optional[str] = None, config_schema: Optional[UserConfigSchema] = None, required_resource_keys: Optional[Set[str]] = None, resource_defs: Optional[Mapping[str, object]] = None, io_manager_key: Optional[str] = None, io_manager_def: Optional[object] = None, compute_kind: Optional[str] = None, dagster_type: Optional[DagsterType] = None, partitions_def: Optional[PartitionsDefinition] = None, op_tags: Optional[Mapping[str, Any]] = None, group_name: Optional[str] = None, output_required: bool = True, freshness_policy: Optional[FreshnessPolicy] = None, auto_materialize_policy: Optional[AutoMaterializePolicy] = None, backfill_policy: Optional[BackfillPolicy] = None, retry_policy: Optional[RetryPolicy] = None, code_version: Optional[str] = None, key: Optional[CoercibleToAssetKey] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, owners: Optional[Sequence[str]] = None, ): self.name = name self.key_prefix = key_prefix self.ins = ins or {} self.deps = deps or [] self.metadata = metadata self.tags = tags self.description = description self.required_resource_keys = check.opt_set_param( required_resource_keys, "required_resource_keys" ) self.io_manager_key = io_manager_key self.io_manager_def = io_manager_def self.config_schema = config_schema self.compute_kind = compute_kind self.dagster_type = dagster_type self.partitions_def = partitions_def self.op_tags = op_tags self.resource_defs = dict(check.opt_mapping_param(resource_defs, "resource_defs")) self.group_name = group_name self.output_required = output_required self.freshness_policy = freshness_policy self.retry_policy = retry_policy self.auto_materialize_policy = auto_materialize_policy self.backfill_policy = backfill_policy self.code_version = code_version self.check_specs = check_specs self.key = key self.owners = owners def __call__(self, fn: Callable[..., Any]) -> AssetsDefinition: from dagster._config.pythonic_config import ( validate_resource_annotated_function, ) from dagster._core.execution.build_resources import wrap_resources_for_execution validate_resource_annotated_function(fn) asset_ins = build_asset_ins(fn, self.ins or {}, {dep.asset_key for dep in self.deps}) out_asset_key, asset_name = resolve_asset_key_and_name_for_decorator( key=self.key, key_prefix=self.key_prefix, name=self.name, fn=fn, decorator="@asset", ) with disable_dagster_warnings(): arg_resource_keys = {arg.name for arg in get_resource_args(fn)} bare_required_resource_keys = set(self.required_resource_keys) resource_defs_dict = self.resource_defs resource_defs_keys = set(resource_defs_dict.keys()) decorator_resource_keys = bare_required_resource_keys | resource_defs_keys io_manager_key = self.io_manager_key if self.io_manager_def: if not io_manager_key: io_manager_key = out_asset_key.to_python_identifier("io_manager") if ( io_manager_key in self.resource_defs and self.resource_defs[io_manager_key] != self.io_manager_def ): raise DagsterInvalidDefinitionError( f"Provided conflicting definitions for io manager key '{io_manager_key}'." " Please provide only one definition per key." ) resource_defs_dict[io_manager_key] = self.io_manager_def wrapped_resource_defs = wrap_resources_for_execution(resource_defs_dict) check.param_invariant( len(bare_required_resource_keys) == 0 or len(arg_resource_keys) == 0, "Cannot specify resource requirements in both @asset decorator and as arguments" " to the decorated function", ) io_manager_key = cast(str, io_manager_key) if io_manager_key else DEFAULT_IO_MANAGER_KEY out = Out( metadata=self.metadata or {}, io_manager_key=io_manager_key, dagster_type=self.dagster_type if self.dagster_type else NoValueSentinel, description=self.description, is_required=self.output_required, code_version=self.code_version, ) check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs( self.check_specs, [out_asset_key] ) check_outs: Mapping[str, Out] = { output_name: Out(dagster_type=None) for output_name in check_specs_by_output_name.keys() } op_required_resource_keys = decorator_resource_keys - arg_resource_keys op = _Op( name=out_asset_key.to_python_identifier(), description=self.description, ins=dict(asset_ins.values()), out={DEFAULT_OUTPUT: out, **check_outs}, # Any resource requirements specified as arguments will be identified as # part of the Op definition instantiation required_resource_keys=op_required_resource_keys, tags={ **({"kind": self.compute_kind} if self.compute_kind else {}), **(self.op_tags or {}), }, config_schema=self.config_schema, retry_policy=self.retry_policy, code_version=self.code_version, )(fn) # check backfill policy is BackfillPolicyType.SINGLE_RUN for non-partitioned asset if self.partitions_def is None: check.param_invariant( ( self.backfill_policy.policy_type is BackfillPolicyType.SINGLE_RUN if self.backfill_policy else True ), "backfill_policy", "Non partitioned asset can only have single run backfill policy", ) keys_by_input_name = { input_name: asset_key for asset_key, (input_name, _) in asset_ins.items() } partition_mappings = { keys_by_input_name[input_name]: asset_in.partition_mapping for input_name, asset_in in self.ins.items() if asset_in.partition_mapping is not None } partition_mappings = _get_partition_mappings_from_deps( partition_mappings=partition_mappings, deps=self.deps, asset_name=asset_name ) return AssetsDefinition.dagster_internal_init( keys_by_input_name=keys_by_input_name, keys_by_output_name={"result": out_asset_key}, node_def=op, partitions_def=self.partitions_def, partition_mappings=partition_mappings if partition_mappings else None, resource_defs=wrapped_resource_defs, group_names_by_key=( {out_asset_key: self.group_name} if self.group_name is not None else None ), freshness_policies_by_key=( {out_asset_key: self.freshness_policy} if self.freshness_policy else None ), auto_materialize_policies_by_key=( {out_asset_key: self.auto_materialize_policy} if self.auto_materialize_policy else None ), backfill_policy=self.backfill_policy, asset_deps=None, # no asset deps in single-asset decorator selected_asset_keys=None, # no subselection in decorator can_subset=False, metadata_by_key={out_asset_key: self.metadata} if self.metadata else None, tags_by_key={out_asset_key: self.tags} if self.tags else None, # see comment in @multi_asset's call to dagster_internal_init for the gory details # this is best understood as an _override_ which @asset does not support descriptions_by_key=None, check_specs_by_output_name=check_specs_by_output_name, selected_asset_check_keys=None, # no subselection in decorator is_subset=False, owners_by_key={out_asset_key: self.owners} if self.owners else None, )
[docs]@experimental_param(param="resource_defs") @deprecated_param( param="non_argument_deps", breaking_version="2.0.0", additional_warn_text="use `deps` instead." ) def multi_asset( *, outs: Optional[Mapping[str, AssetOut]] = None, name: Optional[str] = None, ins: Optional[Mapping[str, AssetIn]] = None, deps: Optional[Iterable[CoercibleToAssetDep]] = None, description: Optional[str] = None, config_schema: Optional[UserConfigSchema] = None, required_resource_keys: Optional[Set[str]] = None, compute_kind: Optional[str] = None, internal_asset_deps: Optional[Mapping[str, Set[AssetKey]]] = None, partitions_def: Optional[PartitionsDefinition] = None, backfill_policy: Optional[BackfillPolicy] = None, op_tags: Optional[Mapping[str, Any]] = None, can_subset: bool = False, resource_defs: Optional[Mapping[str, object]] = None, group_name: Optional[str] = None, retry_policy: Optional[RetryPolicy] = None, code_version: Optional[str] = None, specs: Optional[Sequence[AssetSpec]] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, # deprecated non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]] = None, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: """Create a combined definition of multiple assets that are computed using the same op and same upstream assets. Each argument to the decorated function references an upstream asset that this asset depends on. The name of the argument designates the name of the upstream asset. You can set I/O managers keys, auto-materialize policies, freshness policies, group names, etc. on an individual asset within the multi-asset by attaching them to the :py:class:`AssetOut` corresponding to that asset in the `outs` parameter. Args: name (Optional[str]): The name of the op. outs: (Optional[Dict[str, AssetOut]]): The AssetOuts representing the assets materialized by this function. AssetOuts detail the output, IO management, and core asset properties. This argument is required except when AssetSpecs are used. ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to information about the input. deps (Optional[Sequence[Union[AssetsDefinition, SourceAsset, AssetKey, str]]]): The assets that are upstream dependencies, but do not correspond to a parameter of the decorated function. If the AssetsDefinition for a multi_asset is provided, dependencies on all assets created by the multi_asset will be created. config_schema (Optional[ConfigSchema): The configuration schema for the asset's underlying op. If set, Dagster will check that config provided for the op matches this schema and fail if it does not. If not set, Dagster will accept any config provided for the op. required_resource_keys (Optional[Set[str]]): Set of resource handles required by the underlying op. compute_kind (Optional[str]): A string to represent the kind of computation that produces the asset, e.g. "dbt" or "spark". It will be displayed in the Dagster UI as a badge on the asset. internal_asset_deps (Optional[Mapping[str, Set[AssetKey]]]): By default, it is assumed that all assets produced by a multi_asset depend on all assets that are consumed by that multi asset. If this default is not correct, you pass in a map of output names to a corrected set of AssetKeys that they depend on. Any AssetKeys in this list must be either used as input to the asset or produced within the op. partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that compose the assets. backfill_policy (Optional[BackfillPolicy]): The backfill policy for the op that computes the asset. op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the asset. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that `json.loads(json.dumps(value)) == value`. can_subset (bool): If this asset's computation can emit a subset of the asset keys based on the context.selected_asset_keys argument. Defaults to False. resource_defs (Optional[Mapping[str, object]]): (Experimental) A mapping of resource keys to resources. These resources will be initialized during execution, and can be accessed from the context within the body of the function. group_name (Optional[str]): A string name used to organize multiple assets into groups. This group name will be applied to all assets produced by this multi_asset. retry_policy (Optional[RetryPolicy]): The retry policy for the op that computes the asset. code_version (Optional[str]): (Experimental) Version of the code encapsulated by the multi-asset. If set, this is used as a default code version for all defined assets. specs (Optional[Sequence[AssetSpec]]): (Experimental) The specifications for the assets materialized by this function. check_specs (Optional[Sequence[AssetCheckSpec]]): (Experimental) Specs for asset checks that execute in the decorated function after materializing the assets. non_argument_deps (Optional[Union[Set[AssetKey], Set[str]]]): Deprecated, use deps instead. Set of asset keys that are upstream dependencies, but do not pass an input to the multi_asset. Examples: .. code-block:: python @multi_asset( specs=[ AssetSpec("asset1", deps=["asset0"]), AssetSpec("asset2", deps=["asset0"]), ] ) def my_function(): asset0_value = load(path="asset0") asset1_result, asset2_result = do_some_transformation(asset0_value) write(asset1_result, path="asset1") write(asset2_result, path="asset2") # Or use IO managers to handle I/O: @multi_asset( outs={ "asset1": AssetOut(), "asset2": AssetOut(), } ) def my_function(asset0): asset1_value = do_some_transformation(asset0) asset2_value = do_some_other_transformation(asset0) return asset1_value, asset2_value """ from dagster._core.execution.build_resources import wrap_resources_for_execution specs = check.opt_list_param(specs, "specs", of_type=AssetSpec) upstream_asset_deps = _deps_and_non_argument_deps_to_asset_deps( deps=deps, non_argument_deps=non_argument_deps ) asset_deps = check.opt_mapping_param( internal_asset_deps, "internal_asset_deps", key_type=str, value_type=set ) required_resource_keys = check.opt_set_param( required_resource_keys, "required_resource_keys", of_type=str ) resource_defs = wrap_resources_for_execution( check.opt_mapping_param(resource_defs, "resource_defs", key_type=str) ) _config_schema = check.opt_mapping_param( config_schema, # type: ignore "config_schema", additional_message="Only dicts are supported for asset config_schema.", ) bare_required_resource_keys = required_resource_keys.copy() resource_defs_keys = set(resource_defs.keys()) required_resource_keys = bare_required_resource_keys | resource_defs_keys asset_out_map: Mapping[str, AssetOut] = {} if outs is None else outs def inner(fn: Callable[..., Any]) -> AssetsDefinition: op_name = name or fn.__name__ if asset_out_map and specs: raise DagsterInvalidDefinitionError("Must specify only outs or specs but not both.") elif specs: output_tuples_by_asset_key = {} for asset_spec in specs: output_name = asset_spec.key.to_python_identifier() output_tuples_by_asset_key[asset_spec.key] = ( output_name, Out( Nothing, is_required=not (can_subset or asset_spec.skippable), description=asset_spec.description, code_version=asset_spec.code_version, metadata=asset_spec.metadata, ), ) if upstream_asset_deps: raise DagsterInvalidDefinitionError( "Can not pass deps and specs to @multi_asset, specify deps on the AssetSpecs" " directly." ) if internal_asset_deps: raise DagsterInvalidDefinitionError( "Can not pass internal_asset_deps and specs to @multi_asset, specify deps on" " the AssetSpecs directly." ) upstream_keys = set() for spec in specs: for dep in spec.deps: if dep.asset_key not in output_tuples_by_asset_key: upstream_keys.add(dep.asset_key) if ( dep.asset_key in output_tuples_by_asset_key and dep.partition_mapping is not None ): # self-dependent asset also needs to be considered an upstream_key upstream_keys.add(dep.asset_key) explicit_ins = ins or {} # get which asset keys have inputs set loaded_upstreams = build_asset_ins(fn, explicit_ins, deps=set()) unexpected_upstreams = { key for key in loaded_upstreams.keys() if key not in upstream_keys } if unexpected_upstreams: raise DagsterInvalidDefinitionError( f"Asset inputs {unexpected_upstreams} do not have dependencies on the passed" " AssetSpec(s). Set the deps on the appropriate AssetSpec(s)." ) remaining_upstream_keys = {key for key in upstream_keys if key not in loaded_upstreams} asset_ins = build_asset_ins(fn, explicit_ins, deps=remaining_upstream_keys) else: asset_ins = build_asset_ins( fn, ins or {}, deps=( {dep.asset_key for dep in upstream_asset_deps} if upstream_asset_deps else set() ), ) output_tuples_by_asset_key = build_asset_outs(asset_out_map) # validate that the asset_ins are a subset of the upstream asset_deps. upstream_internal_asset_keys = set().union(*asset_deps.values()) asset_in_keys = set(asset_ins.keys()) if asset_deps and not asset_in_keys.issubset(upstream_internal_asset_keys): invalid_asset_in_keys = asset_in_keys - upstream_internal_asset_keys check.failed( f"Invalid asset dependencies: `{invalid_asset_in_keys}` specified as asset" " inputs, but are not specified in `internal_asset_deps`. Asset inputs must" " be associated with an output produced by the asset." ) # validate that the asset_deps make sense valid_asset_deps = asset_in_keys | set(output_tuples_by_asset_key.keys()) for out_name, asset_keys in asset_deps.items(): if asset_out_map and out_name not in asset_out_map: check.failed( f"Invalid out key '{out_name}' supplied to `internal_asset_deps` argument" f" for multi-asset {op_name}. Must be one of the outs for this multi-asset" f" {list(asset_out_map.keys())[:20]}.", ) invalid_asset_deps = asset_keys.difference(valid_asset_deps) check.invariant( not invalid_asset_deps, f"Invalid asset dependencies: {invalid_asset_deps} specified in" f" `internal_asset_deps` argument for multi-asset '{op_name}' on key" f" '{out_name}'. Each specified asset key must be associated with an input to" " the asset or produced by this asset. Valid keys:" f" {list(valid_asset_deps)[:20]}", ) arg_resource_keys = {arg.name for arg in get_resource_args(fn)} check.param_invariant( len(bare_required_resource_keys or []) == 0 or len(arg_resource_keys) == 0, "Cannot specify resource requirements in both @multi_asset decorator and as" " arguments to the decorated function", ) asset_outs_by_output_name: Mapping[str, Out] = dict(output_tuples_by_asset_key.values()) keys_by_output_name = { output_name: asset_key for asset_key, (output_name, _) in output_tuples_by_asset_key.items() } check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs( check_specs, list(output_tuples_by_asset_key.keys()) ) check_outs_by_output_name: Mapping[str, Out] = { output_name: Out(dagster_type=None, is_required=not can_subset) for output_name in check_specs_by_output_name.keys() } overlapping_output_names = ( asset_outs_by_output_name.keys() & check_outs_by_output_name.keys() ) check.invariant( len(overlapping_output_names) == 0, f"Check output names overlap with asset output names: {overlapping_output_names}", ) combined_outs_by_output_name: Mapping[str, Out] = { **asset_outs_by_output_name, **check_outs_by_output_name, } if specs: internal_deps = { spec.key: {dep.asset_key for dep in spec.deps} for spec in specs if spec.deps is not None } else: internal_deps = {keys_by_output_name[name]: asset_deps[name] for name in asset_deps} # when a subsettable multi-asset is defined, it is possible that it will need to be # broken into two separate parts, one which depends on the other. in order to represent # this in the op execution graph, inputs need to be added to connect these possible deps if can_subset and internal_deps: asset_ins = { **asset_ins, **build_subsettable_asset_ins( asset_ins, output_tuples_by_asset_key, internal_deps.values() ), } with disable_dagster_warnings(): op_required_resource_keys = required_resource_keys - arg_resource_keys op = _Op( name=op_name, description=description, ins=dict(asset_ins.values()), out=combined_outs_by_output_name, required_resource_keys=op_required_resource_keys, tags={ **({"kind": compute_kind} if compute_kind else {}), **(op_tags or {}), }, config_schema=_config_schema, retry_policy=retry_policy, code_version=code_version, )(fn) keys_by_input_name = { input_name: asset_key for asset_key, (input_name, _) in asset_ins.items() } partition_mappings = { keys_by_input_name[input_name]: asset_in.partition_mapping for input_name, asset_in in (ins or {}).items() if asset_in.partition_mapping is not None } if upstream_asset_deps: partition_mappings = _get_partition_mappings_from_deps( partition_mappings=partition_mappings, deps=upstream_asset_deps, asset_name=op_name ) if specs: props_by_asset_key: Mapping[AssetKey, Union[AssetSpec, AssetOut]] = { spec.key: spec for spec in specs } # Add PartitionMappings specified via AssetSpec.deps to partition_mappings dictionary. Error on duplicates for spec in specs: for dep in spec.deps: if dep.partition_mapping is None: continue if partition_mappings.get(dep.asset_key, None) is None: partition_mappings[dep.asset_key] = dep.partition_mapping continue if partition_mappings[dep.asset_key] == dep.partition_mapping: continue else: raise DagsterInvalidDefinitionError( f"Two different PartitionMappings for {dep.asset_key} provided for" f" multi_asset {op_name}. Please use the same PartitionMapping for" f" {dep.asset_key}." ) else: props_by_asset_key = { keys_by_output_name[output_name]: asset_out for output_name, asset_out in asset_out_map.items() } # handle properties defined ons AssetSpecs or AssetOuts group_names_by_key = { asset_key: props.group_name for asset_key, props in props_by_asset_key.items() if props.group_name is not None } if group_name: check.invariant( not group_names_by_key, "Cannot set group_name parameter on multi_asset if one or more of the" " AssetSpecs/AssetOuts supplied to this multi_asset have a group_name defined.", ) group_names_by_key = {asset_key: group_name for asset_key in props_by_asset_key} freshness_policies_by_key = { asset_key: props.freshness_policy for asset_key, props in props_by_asset_key.items() if props.freshness_policy is not None } auto_materialize_policies_by_key = { asset_key: props.auto_materialize_policy for asset_key, props in props_by_asset_key.items() if props.auto_materialize_policy is not None } metadata_by_key = { asset_key: props.metadata for asset_key, props in props_by_asset_key.items() if props.metadata is not None } tags_by_key = { asset_key: props.tags for asset_key, props in props_by_asset_key.items() if props.tags is not None } owners_by_key = { asset_key: props.owners for asset_key, props in props_by_asset_key.items() if props.owners is not None } return AssetsDefinition.dagster_internal_init( keys_by_input_name=keys_by_input_name, keys_by_output_name=keys_by_output_name, node_def=op, asset_deps=internal_deps, partitions_def=partitions_def, partition_mappings=partition_mappings if partition_mappings else None, can_subset=can_subset, resource_defs=resource_defs, group_names_by_key=group_names_by_key, freshness_policies_by_key=freshness_policies_by_key, auto_materialize_policies_by_key=auto_materialize_policies_by_key, backfill_policy=backfill_policy, selected_asset_keys=None, # no subselection in decorator # descriptions by key is more accurately understood as _overriding_ the descriptions # by key that are in the OutputDefinitions associated with the asset key. # This is a dangerous construction liable for bugs. Instead there should be a # canonical source of asset descriptions in AssetsDefinintion and if we need # to create a memoized cached dictionary of asset keys for perf or something we do # that in the `__init__` or on demand. # # This is actually an override. We do not override descriptions # in OutputDefinitions in @multi_asset descriptions_by_key=None, metadata_by_key=metadata_by_key, tags_by_key=tags_by_key, check_specs_by_output_name=check_specs_by_output_name, selected_asset_check_keys=None, # no subselection in decorator is_subset=False, owners_by_key=owners_by_key, ) return inner
def get_function_params_without_context_or_config_or_resources( fn: Callable[..., Any], ) -> List[Parameter]: params = get_function_params(fn) is_context_provided = len(params) > 0 and params[0].name in get_valid_name_permutations( "context" ) input_params = params[1:] if is_context_provided else params resource_arg_names = {arg.name for arg in get_resource_args(fn)} new_input_args = [] for input_arg in input_params: if input_arg.name != "config" and input_arg.name not in resource_arg_names: new_input_args.append(input_arg) return new_input_args def stringify_asset_key_to_input_name(asset_key: AssetKey) -> str: return "_".join(asset_key.path).replace("-", "_") def build_asset_ins( fn: Callable[..., Any], asset_ins: Mapping[str, AssetIn], deps: Optional[AbstractSet[AssetKey]], ) -> Mapping[AssetKey, Tuple[str, In]]: """Creates a mapping from AssetKey to (name of input, In object).""" deps = check.opt_set_param(deps, "deps", AssetKey) new_input_args = get_function_params_without_context_or_config_or_resources(fn) non_var_input_param_names = [ param.name for param in new_input_args if param.kind == Parameter.POSITIONAL_OR_KEYWORD ] has_kwargs = any(param.kind == Parameter.VAR_KEYWORD for param in new_input_args) all_input_names = set(non_var_input_param_names) | asset_ins.keys() if not has_kwargs: for in_key, asset_in in asset_ins.items(): if in_key not in non_var_input_param_names and ( not isinstance(asset_in.dagster_type, DagsterType) or not asset_in.dagster_type.is_nothing ): raise DagsterInvalidDefinitionError( f"Key '{in_key}' in provided ins dict does not correspond to any of the names " "of the arguments to the decorated function" ) ins_by_asset_key: Dict[AssetKey, Tuple[str, In]] = {} for input_name in all_input_names: asset_key = None if input_name in asset_ins: asset_key = asset_ins[input_name].key metadata = asset_ins[input_name].metadata or {} key_prefix = asset_ins[input_name].key_prefix input_manager_key = asset_ins[input_name].input_manager_key dagster_type = asset_ins[input_name].dagster_type else: metadata = {} key_prefix = None input_manager_key = None dagster_type = NoValueSentinel asset_key = asset_key or AssetKey(list(filter(None, [*(key_prefix or []), input_name]))) ins_by_asset_key[asset_key] = ( input_name.replace("-", "_"), In(metadata=metadata, input_manager_key=input_manager_key, dagster_type=dagster_type), ) for asset_key in deps: if asset_key in ins_by_asset_key: raise DagsterInvalidDefinitionError( f"deps value {asset_key} also declared as input/AssetIn" ) # mypy doesn't realize that Nothing is a valid type here ins_by_asset_key[asset_key] = ( stringify_asset_key_to_input_name(asset_key), In(cast(type, Nothing)), ) return ins_by_asset_key def build_subsettable_asset_ins( asset_ins: Mapping[AssetKey, Tuple[str, In]], asset_outs: Mapping[AssetKey, Tuple[str, Out]], internal_upstream_deps: Iterable[AbstractSet[AssetKey]], ) -> Mapping[AssetKey, Tuple[str, In]]: """Creates a mapping from AssetKey to (name of input, In object) for any asset key that is not currently an input, but may become one if this asset is subset. For example, if a subsettable multi-asset produces both A and C, where C depends on both A and some other asset B, there are some situations where executing just A and C without B will result in these assets being generated by different steps within the same job. In this case, we need a separate input to represent the fact that C depends on A. """ # set of asset keys which are upstream of another asset, and are not currently inputs potential_deps = set().union(*internal_upstream_deps).difference(set(asset_ins.keys())) return { key: (f"{ASSET_SUBSET_INPUT_PREFIX}{name}", In(Nothing)) for key, (name, _) in asset_outs.items() if key in potential_deps } @overload def graph_asset( compose_fn: Callable[..., Any], ) -> AssetsDefinition: ... @overload def graph_asset( *, name: Optional[str] = None, description: Optional[str] = None, ins: Optional[Mapping[str, AssetIn]] = None, config: Optional[Union[ConfigMapping, Mapping[str, Any]]] = None, key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, group_name: Optional[str] = None, partitions_def: Optional[PartitionsDefinition] = None, metadata: Optional[RawMetadataMapping] = ..., freshness_policy: Optional[FreshnessPolicy] = ..., auto_materialize_policy: Optional[AutoMaterializePolicy] = ..., backfill_policy: Optional[BackfillPolicy] = ..., resource_defs: Optional[Mapping[str, ResourceDefinition]] = ..., check_specs: Optional[Sequence[AssetCheckSpec]] = None, key: Optional[CoercibleToAssetKey] = None, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: ...
[docs]def graph_asset( compose_fn: Optional[Callable] = None, *, name: Optional[str] = None, description: Optional[str] = None, ins: Optional[Mapping[str, AssetIn]] = None, config: Optional[Union[ConfigMapping, Mapping[str, Any]]] = None, key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, group_name: Optional[str] = None, partitions_def: Optional[PartitionsDefinition] = None, metadata: Optional[RawMetadataMapping] = None, freshness_policy: Optional[FreshnessPolicy] = None, auto_materialize_policy: Optional[AutoMaterializePolicy] = None, backfill_policy: Optional[BackfillPolicy] = None, resource_defs: Optional[Mapping[str, ResourceDefinition]] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, key: Optional[CoercibleToAssetKey] = None, ) -> Union[AssetsDefinition, Callable[[Callable[..., Any]], AssetsDefinition]]: """Creates a software-defined asset that's computed using a graph of ops. This decorator is meant to decorate a function that composes a set of ops or graphs to define the dependencies between them. Args: name (Optional[str]): The name of the asset. If not provided, defaults to the name of the decorated function. The asset's name must be a valid name in Dagster (ie only contains letters, numbers, and underscores) and may not contain Python reserved keywords. description (Optional[str]): A human-readable description of the asset. ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to information about the input. config (Optional[Union[ConfigMapping], Mapping[str, Any]): Describes how the graph underlying the asset is configured at runtime. If a :py:class:`ConfigMapping` object is provided, then the graph takes on the config schema of this object. The mapping will be applied at runtime to generate the config for the graph's constituent nodes. If a dictionary is provided, then it will be used as the default run config for the graph. This means it must conform to the config schema of the underlying nodes. Note that the values provided will be viewable and editable in the Dagster UI, so be careful with secrets. If no value is provided, then the config schema for the graph is the default (derived from the underlying nodes). key_prefix (Optional[Union[str, Sequence[str]]]): If provided, the asset's key is the concatenation of the key_prefix and the asset's name, which defaults to the name of the decorated function. Each item in key_prefix must be a valid name in Dagster (ie only contains letters, numbers, and underscores) and may not contain Python reserved keywords. group_name (Optional[str]): A string name used to organize multiple assets into groups. If not provided, the name "default" is used. partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that compose the asset. metadata (Optional[RawMetadataMapping]): Dictionary of metadata to be associated with the asset. freshness_policy (Optional[FreshnessPolicy]): A constraint telling Dagster how often this asset is intended to be updated with respect to its root data. auto_materialize_policy (Optional[AutoMaterializePolicy]): The AutoMaterializePolicy to use for this asset. backfill_policy (Optional[BackfillPolicy]): The BackfillPolicy to use for this asset. key (Optional[CoeercibleToAssetKey]): The key for this asset. If provided, cannot specify key_prefix or name. Examples: .. code-block:: python @op def fetch_files_from_slack(context) -> pd.DataFrame: ... @op def store_files(files) -> None: files.to_sql(name="slack_files", con=create_db_connection()) @graph_asset def slack_files_table(): return store_files(fetch_files_from_slack()) """ if compose_fn is None: return lambda fn: graph_asset( fn, name=name, description=description, ins=ins, config=config, key_prefix=key_prefix, group_name=group_name, partitions_def=partitions_def, metadata=metadata, freshness_policy=freshness_policy, auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, resource_defs=resource_defs, check_specs=check_specs, key=key, ) else: return graph_asset_no_defaults( compose_fn=compose_fn, name=name, description=description, ins=ins, config=config, key_prefix=key_prefix, group_name=group_name, partitions_def=partitions_def, metadata=metadata, freshness_policy=freshness_policy, auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, resource_defs=resource_defs, check_specs=check_specs, key=key, )
def graph_asset_no_defaults( *, compose_fn: Callable[..., Any], name: Optional[str], description: Optional[str], ins: Optional[Mapping[str, AssetIn]], config: Optional[Union[ConfigMapping, Mapping[str, Any]]], key_prefix: Optional[CoercibleToAssetKeyPrefix], group_name: Optional[str], partitions_def: Optional[PartitionsDefinition], metadata: Optional[RawMetadataMapping], freshness_policy: Optional[FreshnessPolicy], auto_materialize_policy: Optional[AutoMaterializePolicy], backfill_policy: Optional[BackfillPolicy], resource_defs: Optional[Mapping[str, ResourceDefinition]], check_specs: Optional[Sequence[AssetCheckSpec]], key: Optional[CoercibleToAssetKey], ) -> AssetsDefinition: ins = ins or {} asset_ins = build_asset_ins(compose_fn, ins or {}, set()) out_asset_key, _asset_name = resolve_asset_key_and_name_for_decorator( key=key, key_prefix=key_prefix, name=name, decorator="@graph_asset", fn=compose_fn, ) keys_by_input_name = {input_name: asset_key for asset_key, (input_name, _) in asset_ins.items()} partition_mappings = { input_name: asset_in.partition_mapping for input_name, asset_in in ins.items() if asset_in.partition_mapping } check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs( check_specs, [out_asset_key] ) check_outs_by_output_name: Mapping[str, GraphOut] = { output_name: GraphOut() for output_name in check_specs_by_output_name.keys() } combined_outs_by_output_name: Mapping = { "result": GraphOut(), **check_outs_by_output_name, } op_graph = graph( name=out_asset_key.to_python_identifier(), description=description, config=config, ins={input_name: GraphIn() for _, (input_name, _) in asset_ins.items()}, out=combined_outs_by_output_name, )(compose_fn) return AssetsDefinition.from_graph( op_graph, keys_by_input_name=keys_by_input_name, keys_by_output_name={"result": out_asset_key}, partitions_def=partitions_def, partition_mappings=partition_mappings if partition_mappings else None, group_name=group_name, metadata_by_output_name={"result": metadata} if metadata else None, freshness_policies_by_output_name=( {"result": freshness_policy} if freshness_policy else None ), auto_materialize_policies_by_output_name=( {"result": auto_materialize_policy} if auto_materialize_policy else None ), backfill_policy=backfill_policy, descriptions_by_output_name={"result": description} if description else None, resource_defs=resource_defs, check_specs=check_specs, )
[docs]def graph_multi_asset( *, outs: Mapping[str, AssetOut], name: Optional[str] = None, ins: Optional[Mapping[str, AssetIn]] = None, partitions_def: Optional[PartitionsDefinition] = None, backfill_policy: Optional[BackfillPolicy] = None, group_name: Optional[str] = None, can_subset: bool = False, resource_defs: Optional[Mapping[str, ResourceDefinition]] = None, check_specs: Optional[Sequence[AssetCheckSpec]] = None, config: Optional[Union[ConfigMapping, Mapping[str, Any]]] = None, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: """Create a combined definition of multiple assets that are computed using the same graph of ops, and the same upstream assets. Each argument to the decorated function references an upstream asset that this asset depends on. The name of the argument designates the name of the upstream asset. Args: name (Optional[str]): The name of the graph. outs: (Optional[Dict[str, AssetOut]]): The AssetOuts representing the produced assets. ins (Optional[Mapping[str, AssetIn]]): A dictionary that maps input names to information about the input. partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that compose the assets. backfill_policy (Optional[BackfillPolicy]): The backfill policy for the asset. group_name (Optional[str]): A string name used to organize multiple assets into groups. This group name will be applied to all assets produced by this multi_asset. can_subset (bool): Whether this asset's computation can emit a subset of the asset keys based on the context.selected_assets argument. Defaults to False. config (Optional[Union[ConfigMapping], Mapping[str, Any]): Describes how the graph underlying the asset is configured at runtime. If a :py:class:`ConfigMapping` object is provided, then the graph takes on the config schema of this object. The mapping will be applied at runtime to generate the config for the graph's constituent nodes. If a dictionary is provided, then it will be used as the default run config for the graph. This means it must conform to the config schema of the underlying nodes. Note that the values provided will be viewable and editable in the Dagster UI, so be careful with secrets. If no value is provided, then the config schema for the graph is the default (derived from the underlying nodes). """ def inner(fn: Callable[..., Any]) -> AssetsDefinition: partition_mappings = { input_name: asset_in.partition_mapping for input_name, asset_in in (ins or {}).items() if asset_in.partition_mapping } asset_ins = build_asset_ins(fn, ins or {}, set()) keys_by_input_name = { input_name: asset_key for asset_key, (input_name, _) in asset_ins.items() } asset_outs = build_asset_outs(outs) check_specs_by_output_name = _validate_and_assign_output_names_to_check_specs( check_specs, list(asset_outs.keys()) ) check_outs_by_output_name: Mapping[str, GraphOut] = { output_name: GraphOut() for output_name in check_specs_by_output_name.keys() } combined_outs_by_output_name = { **{output_name: GraphOut() for output_name, _ in asset_outs.values()}, **check_outs_by_output_name, } op_graph = graph( name=name or fn.__name__, out=combined_outs_by_output_name, config=config, ins={input_name: GraphIn() for _, (input_name, _) in asset_ins.items()}, )(fn) # source metadata from the AssetOuts (if any) metadata_by_output_name = { output_name: out.metadata for output_name, out in outs.items() if isinstance(out, AssetOut) and out.metadata is not None } # source freshness policies from the AssetOuts (if any) freshness_policies_by_output_name = { output_name: out.freshness_policy for output_name, out in outs.items() if isinstance(out, AssetOut) and out.freshness_policy is not None } # source auto materialize policies from the AssetOuts (if any) auto_materialize_policies_by_output_name = { output_name: out.auto_materialize_policy for output_name, out in outs.items() if isinstance(out, AssetOut) and out.auto_materialize_policy is not None } # source descriptions from the AssetOuts (if any) descriptions_by_output_name = { output_name: out.description for output_name, out in outs.items() if isinstance(out, AssetOut) and out.description is not None } return AssetsDefinition.from_graph( op_graph, keys_by_input_name=keys_by_input_name, keys_by_output_name={ output_name: asset_key for asset_key, (output_name, _) in asset_outs.items() }, partitions_def=partitions_def, partition_mappings=partition_mappings if partition_mappings else None, group_name=group_name, can_subset=can_subset, metadata_by_output_name=metadata_by_output_name, freshness_policies_by_output_name=freshness_policies_by_output_name, auto_materialize_policies_by_output_name=auto_materialize_policies_by_output_name, backfill_policy=backfill_policy, descriptions_by_output_name=descriptions_by_output_name, resource_defs=resource_defs, check_specs=check_specs, ) return inner
def build_asset_outs(asset_outs: Mapping[str, AssetOut]) -> Mapping[AssetKey, Tuple[str, Out]]: """Creates a mapping from AssetKey to (name of output, Out object).""" outs_by_asset_key: Dict[AssetKey, Tuple[str, Out]] = {} for output_name, asset_out in asset_outs.items(): out = asset_out.to_out() asset_key = asset_out.key or AssetKey( list(filter(None, [*(asset_out.key_prefix or []), output_name])) ) outs_by_asset_key[asset_key] = (output_name.replace("-", "_"), out) return outs_by_asset_key def _deps_and_non_argument_deps_to_asset_deps( deps: Optional[Iterable[CoercibleToAssetDep]], non_argument_deps: Optional[Union[Set[AssetKey], Set[str]]], ) -> Optional[Iterable[AssetDep]]: """Helper function for managing deps and non_argument_deps while non_argument_deps is still an accepted parameter. Ensures only one of deps and non_argument_deps is provided, then converts the deps to AssetDeps. """ if non_argument_deps is not None and deps is not None: raise DagsterInvalidDefinitionError( "Cannot specify both deps and non_argument_deps to @asset. Use only deps instead." ) if deps is not None: return make_asset_deps(deps) if non_argument_deps is not None: check.set_param(non_argument_deps, "non_argument_deps", of_type=(AssetKey, str)) return make_asset_deps(non_argument_deps) def make_asset_deps(deps: Optional[Iterable[CoercibleToAssetDep]]) -> Optional[Iterable[AssetDep]]: if deps is None: return None # expand any multi_assets into a list of keys all_deps = [] for dep in deps: if isinstance(dep, AssetsDefinition) and len(dep.keys) > 1: all_deps.extend(dep.keys) else: all_deps.append(dep) with disable_dagster_warnings(): dep_dict = {} for dep in all_deps: asset_dep = AssetDep.from_coercible(dep) # we cannot do deduplication via a set because MultiPartitionMappings have an internal # dictionary that cannot be hashed. Instead deduplicate by making a dictionary and checking # for existing keys. If an asset is specified as a dependency more than once, only error if the # dependency is different (ie has a different PartitionMapping) if ( asset_dep.asset_key in dep_dict.keys() and asset_dep != dep_dict[asset_dep.asset_key] ): raise DagsterInvariantViolationError( f"Cannot set a dependency on asset {asset_dep.asset_key} more than once per" " asset." ) dep_dict[asset_dep.asset_key] = asset_dep return list(dep_dict.values()) def _assign_output_names_to_check_specs( check_specs: Optional[Sequence[AssetCheckSpec]], ) -> Mapping[str, AssetCheckSpec]: checks_by_output_name = {spec.get_python_identifier(): spec for spec in check_specs or []} if check_specs and len(checks_by_output_name) != len(check_specs): duplicates = { item: count for item, count in Counter( [(spec.asset_key, spec.name) for spec in check_specs] ).items() if count > 1 } raise DagsterInvalidDefinitionError(f"Duplicate check specs: {duplicates}") return checks_by_output_name def _validate_check_specs_target_relevant_asset_keys( check_specs: Optional[Sequence[AssetCheckSpec]], valid_asset_keys: Sequence[AssetKey] ) -> None: for spec in check_specs or []: if spec.asset_key not in valid_asset_keys: raise DagsterInvalidDefinitionError( f"Invalid asset key {spec.asset_key} in check spec {spec.name}. Must be one of" f" {valid_asset_keys}" ) def _validate_and_assign_output_names_to_check_specs( check_specs: Optional[Sequence[AssetCheckSpec]], valid_asset_keys: Sequence[AssetKey] ) -> Mapping[str, AssetCheckSpec]: _validate_check_specs_target_relevant_asset_keys(check_specs, valid_asset_keys) return _assign_output_names_to_check_specs(check_specs) def _get_partition_mappings_from_deps( partition_mappings: Dict[AssetKey, PartitionMapping], deps: Iterable[AssetDep], asset_name: str ): # Add PartitionMappings specified via AssetDeps to partition_mappings dictionary. Error on duplicates for dep in deps: if dep.partition_mapping is None: continue if partition_mappings.get(dep.asset_key, None) is None: partition_mappings[dep.asset_key] = dep.partition_mapping continue if partition_mappings[dep.asset_key] == dep.partition_mapping: continue else: raise DagsterInvalidDefinitionError( f"Two different PartitionMappings for {dep.asset_key} provided for" f" asset {asset_name}. Please use the same PartitionMapping for" f" {dep.asset_key}." ) return partition_mappings