Ask AI

Source code for dagster._core.definitions.external_asset

from typing import List, Sequence

from dagster import _check as check
from dagster._annotations import deprecated, experimental
from dagster._core.definitions.asset_spec import (
    SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES,
    SYSTEM_METADATA_KEY_IO_MANAGER_KEY,
    AssetExecutionType,
    AssetSpec,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.op_definition import OpDefinition
from dagster._core.definitions.output import Out
from dagster._core.definitions.source_asset import (
    SourceAsset,
    wrap_source_asset_observe_fn_in_op_compute_fn,
)
from dagster._utils.warnings import disable_dagster_warnings


def external_asset_from_spec(spec: AssetSpec) -> AssetsDefinition:
    return external_assets_from_specs([spec])[0]


[docs] @deprecated(breaking_version="1.9.0", additional_warn_text="Directly use the AssetSpecs instead.") @experimental def external_assets_from_specs(specs: Sequence[AssetSpec]) -> List[AssetsDefinition]: """Create an external assets definition from a sequence of asset specs. An external asset is an asset that is not materialized by Dagster, but is tracked in the asset graph and asset catalog. A common use case for external assets is modeling data produced by an process not under Dagster's control. For example a daily drop of a file from a third party in s3. In most systems these are described as sources. This includes Dagster, which includes :py:class:`SourceAsset`, which will be supplanted by external assets in the near-term future, as external assets are a superset of the functionality of Source Assets. External assets can act as sources, but that is not their only use. In particular, external assets have themselves have lineage-specified through the ``deps`` argument of :py:class:`AssetSpec`- and can depend on other external assets. External assets are not allowed to depend on non-external assets. The user can emit `AssetMaterialization`, `AssetObservation`, and `AssetCheckEvaluations` events attached external assets. And Dagster now has the ability to have "runless" events to enable many use cases that were previously not possible. Runless events are events generated outside the context of a particular run (for example, in a sensor or by an script), allowing for greater flexibility in event generation. This can be done in a few ways: Note to reviewers that this in an in-progress doc block and the below will have links and examples. 1) DagsterInstance exposes `report_runless_event` that can be used to generate events for external assets directly on an instance. See docs. 2) Sensors can build these events and return them using :py:class:`SensorResult`. A use case for this is using a sensor to continously monitor the metadata exhaust from an external system and inserting events that reflect that exhaust. See docs. 3) Dagster Cloud exposes a REST API for ingesting runless events. Users can copy and paste a curl command in the their external computations (such as Airflow operator) to register metadata associated with those computations See docs. 4) Dagster ops can generate these events directly and yield them or by calling ``log_event`` on :py:class:`OpExecutionContext`. Use cases for this include querying metadata in an external system that is too expensive to do so in a sensor. Or for adapting pure op-based Dagster code to take advantage of asset-oriented lineage, observability, and data quality features, without having to port them wholesale to `@asset`- and `@multi_asset`-based code. This feature set allows users to use Dagster as an observability, lineage, and data quality tool for assets that are not materialized by Dagster. In addition to traditional use cases like sources, this feature can model entire lineage graphs of assets that are scheduled and materialized by other tools and workflow engines. This allows users to use Dagster as a cross-cutting observability tool without migrating their entire data platform to a single orchestration engine. External assets do not have all the features of normal assets: they cannot be materialized ad hoc by Dagster (this is diabled in the UI); cannot be backfilled; cannot be scheduled using auto-materialize policies; and opt out of other features around direct materialization, both now and in the future. External assets also provide fewer guarantees around the correctness of information of their information in the asset catalog. In other words, in exchange for the flexibility Dagster provides less guardrails for external assets than assets that are materialized by Dagster, and there is an increased chance that they will insert non-sensical information into the asset catalog, potentially eroding trust. Args: specs (Sequence[AssetSpec]): The specs for the assets. """ assets_defs = [] for spec in specs: check.invariant( spec.auto_materialize_policy is None, "auto_materialize_policy must be None since it is ignored", ) check.invariant(spec.code_version is None, "code_version must be None since it is ignored") check.invariant( spec.skippable is False, "skippable must be False since it is ignored and False is the default", ) with disable_dagster_warnings(): assets_defs.append(AssetsDefinition(specs=[spec])) return assets_defs
def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition: if source_asset.observe_fn is not None: keys_by_output_name = {"result": source_asset.key} node_def = OpDefinition( name=source_asset.key.to_python_identifier(), compute_fn=wrap_source_asset_observe_fn_in_op_compute_fn(source_asset), # We need to access the raw attribute because the property will return a computed value that # includes requirements for the io manager. Those requirements will be inferred again when # we create an AssetsDefinition. required_resource_keys=source_asset._required_resource_keys, # noqa: SLF001, outs={"result": Out(io_manager_key=source_asset.io_manager_key)}, ) extra_metadata_entries = {} execution_type = AssetExecutionType.OBSERVATION else: keys_by_output_name = {} node_def = None extra_metadata_entries = ( {SYSTEM_METADATA_KEY_IO_MANAGER_KEY: source_asset.io_manager_key} if source_asset.io_manager_key is not None else {} ) execution_type = None observe_interval = source_asset.auto_observe_interval_minutes metadata = { **source_asset.raw_metadata, **extra_metadata_entries, **( {SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES: observe_interval} if observe_interval else {} ), } with disable_dagster_warnings(): spec = AssetSpec( key=source_asset.key, metadata=metadata, group_name=source_asset.group_name, description=source_asset.description, tags=source_asset.tags, freshness_policy=source_asset.freshness_policy, deps=[], owners=[], ) return AssetsDefinition( specs=[spec], keys_by_output_name=keys_by_output_name, node_def=node_def, partitions_def=source_asset.partitions_def, # We don't pass the `io_manager_def` because it will already be present in # `resource_defs` (it is added during `SourceAsset` initialization). resource_defs=source_asset.resource_defs, execution_type=execution_type, ) # Create an unexecutable assets def from an existing executable assets def. This is used to make a # materializable assets def available only for loading in a job. def create_unexecutable_external_asset_from_assets_def( assets_def: AssetsDefinition, ) -> AssetsDefinition: if not assets_def.is_executable: return assets_def else: with disable_dagster_warnings(): specs: List[AssetSpec] = [] # Important to iterate over assets_def.keys here instead of assets_def.specs. This is # because assets_def.specs on an AssetsDefinition that is a subset will contain all the # specs of its parent. for key in assets_def.keys: orig_spec = assets_def.get_asset_spec(key) specs.append( orig_spec._replace( metadata={ **(orig_spec.metadata or {}), **( { SYSTEM_METADATA_KEY_IO_MANAGER_KEY: assets_def.get_io_manager_key_for_asset_key( key ) } if assets_def.has_output_for_asset_key(key) else {} ), }, automation_condition=None, freshness_policy=None, ) ) return AssetsDefinition( specs=specs, resource_defs=assets_def.resource_defs, )