Ask AI

Source code for dagster._core.definitions.asset_checks

from typing import (
    Any,
    Mapping,
    Optional,
    Sequence,
)

from dagster import _check as check
from dagster._annotations import deprecated
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.op_definition import OpDefinition
from dagster._core.definitions.resource_definition import ResourceDefinition
from dagster._core.errors import DagsterAssetCheckFailedError
from dagster._core.types.dagster_type import Nothing


[docs]class AssetChecksDefinition(AssetsDefinition): """Defines a set of checks that are produced by the same op or op graph. AssetChecksDefinition should not be instantiated directly, but rather produced using the `@asset_check` decorator or `AssetChecksDefinition.create` method. """ @staticmethod def create( *, keys_by_input_name: Mapping[str, AssetKey], node_def: OpDefinition, check_specs_by_output_name: Mapping[str, AssetCheckSpec], can_subset: bool, resource_defs: Optional[Mapping[str, ResourceDefinition]] = None, ): """Create an AssetChecksDefinition.""" return AssetChecksDefinition( keys_by_input_name=keys_by_input_name, keys_by_output_name={}, node_def=node_def, partitions_def=None, partition_mappings=None, asset_deps=None, selected_asset_keys=None, can_subset=can_subset, resource_defs=resource_defs, group_names_by_key=None, metadata_by_key=None, tags_by_key=None, freshness_policies_by_key=None, auto_materialize_policies_by_key=None, backfill_policy=None, descriptions_by_key=None, check_specs_by_output_name=check_specs_by_output_name, selected_asset_check_keys=None, is_subset=False, owners_by_key=None, )
# This is still needed in a few places where we need to handle normal AssetsDefinition and # AssetChecksDefinition differently, but eventually those areas should be refactored and this should # be removed. def has_only_asset_checks(assets_def: AssetsDefinition) -> bool: return len(assets_def.keys) == 0 and len(assets_def.check_keys) > 0 @deprecated(breaking_version="1.8", additional_warn_text="Use @asset_check(blocking=True) instead") def build_asset_with_blocking_check( asset_def: "AssetsDefinition", checks: Sequence["AssetsDefinition"], ) -> "AssetsDefinition": from dagster import AssetIn, In, OpExecutionContext, Output, op from dagster._core.definitions.decorators.asset_decorator import graph_asset_no_defaults from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus check_specs = [] for c in checks: check_specs.extend(c.check_specs) check_output_names = [c.get_python_identifier() for c in check_specs] check.invariant(len(asset_def.op.output_defs) == 1) asset_out_type = asset_def.op.output_defs[0].dagster_type @op( name=f"{asset_def.op.name}_asset_and_checks", ins={"asset_return_value": In(asset_out_type), "check_evaluations": In(Nothing)}, ) def fan_in_checks_and_asset_return_value(context: OpExecutionContext, asset_return_value: Any): # we pass the asset_return_value through and store it again so that downstream assets can load it. # This is a little silly- we only do this because this op has the asset key in its StepOutputProperties # so the output is written to the right path. We could probably get the asset_def.op to write to the # asset path (and make sure we don't override it here) to avoid the double write. yield Output(asset_return_value) for check_spec in check_specs: executions = context.instance.event_log_storage.get_asset_check_execution_history( check_key=check_spec.key, limit=1 ) check.invariant( len(executions) == 1, "Expected asset check {check_spec.name} to execute" ) execution = executions[0] check.invariant( execution.run_id == context.run_id, "Expected asset check {check_spec.name} to execute in the current run", ) if execution.status != AssetCheckExecutionRecordStatus.SUCCEEDED: raise DagsterAssetCheckFailedError() # kwargs are the inputs to the asset_def.op that we are wrapping def blocking_asset(**kwargs): asset_return_value = asset_def.op.with_replaced_properties( name=f"{asset_def.op.name}_graph_asset_op" )(**kwargs) check_evaluations = [check.node_def(asset_return_value) for check in checks] return { "result": fan_in_checks_and_asset_return_value(asset_return_value, check_evaluations), **{ check_output_name: check_result for check_output_name, check_result in zip(check_output_names, check_evaluations) }, } return graph_asset_no_defaults( compose_fn=blocking_asset, name=None, key_prefix=None, key=asset_def.key, group_name=asset_def.group_names_by_key.get(asset_def.key), partitions_def=asset_def.partitions_def, check_specs=check_specs, description=asset_def.descriptions_by_key.get(asset_def.key), ins={name: AssetIn(key) for name, key in asset_def.keys_by_input_name.items()}, resource_defs=asset_def.resource_defs, metadata=asset_def.metadata_by_key.get(asset_def.key), tags=None, freshness_policy=asset_def.freshness_policies_by_key.get(asset_def.key), auto_materialize_policy=asset_def.auto_materialize_policies_by_key.get(asset_def.key), backfill_policy=asset_def.backfill_policy, config=None, # gets config from asset_def.op )