import inspect
from typing import Any, Callable, Mapping, NamedTuple, Optional, Sequence, Set
import dagster._check as check
from dagster._annotations import public
from dagster._core.decorator_utils import get_function_params
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.resource_annotation import get_resource_args
from dagster._core.definitions.run_request import RunRequest, SkipReason
from dagster._core.definitions.sensor_definition import (
DefaultSensorStatus,
RawSensorEvaluationFunctionReturn,
SensorDefinition,
SensorType,
validate_and_get_resource_dict,
)
from dagster._core.definitions.target import ExecutableDefinition
from dagster._core.definitions.utils import check_valid_name
class AssetSensorParamNames(NamedTuple):
context_param_name: Optional[str]
event_log_entry_param_name: Optional[str]
def get_asset_sensor_param_names(fn: Callable[..., Any]) -> AssetSensorParamNames:
"""Determines the names of the context and event log entry parameters for an asset sensor function.
These are assumed to be the first two non-resource params, in order (context param before event log entry).
"""
resource_params = {param.name for param in get_resource_args(fn)}
non_resource_params = [
param.name for param in get_function_params(fn) if param.name not in resource_params
]
context_param_name = non_resource_params[0] if len(non_resource_params) > 0 else None
event_log_entry_param_name = non_resource_params[1] if len(non_resource_params) > 1 else None
return AssetSensorParamNames(
context_param_name=context_param_name, event_log_entry_param_name=event_log_entry_param_name
)
[docs]
class AssetSensorDefinition(SensorDefinition):
"""Define an asset sensor that initiates a set of runs based on the materialization of a given
asset.
If the asset has been materialized multiple times between since the last sensor tick, the
evaluation function will only be invoked once, with the latest materialization.
Args:
name (str): The name of the sensor to create.
asset_key (AssetKey): The asset_key this sensor monitors.
asset_materialization_fn (Callable[[SensorEvaluationContext, EventLogEntry], Union[Iterator[Union[RunRequest, SkipReason]], RunRequest, SkipReason]]): The core
evaluation function for the sensor, which is run at an interval to determine whether a
run should be launched or not. Takes a :py:class:`~dagster.SensorEvaluationContext` and
an EventLogEntry corresponding to an AssetMaterialization event.
This function must return a generator, which must yield either a single SkipReason
or one or more RunRequest objects.
minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse
between sensor evaluations.
description (Optional[str]): A human-readable description of the sensor.
job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]): The job
object to target with this sensor.
jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]):
(experimental) A list of jobs to be executed when the sensor fires.
tags (Optional[Mapping[str, str]]): A set of key-value tags that annotate the sensor and can
be used for searching and filtering in the UI.
metadata (Optional[Mapping[str, object]]): A set of metadata entries that annotate the
sensor. Values will be normalized to typed `MetadataValue` objects.
default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default
status can be overridden from the Dagster UI or via the GraphQL API.
"""
def __init__(
self,
name: str,
asset_key: AssetKey,
job_name: Optional[str],
asset_materialization_fn: Callable[
...,
RawSensorEvaluationFunctionReturn,
],
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
job: Optional[ExecutableDefinition] = None,
jobs: Optional[Sequence[ExecutableDefinition]] = None,
default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
required_resource_keys: Optional[Set[str]] = None,
tags: Optional[Mapping[str, str]] = None,
metadata: Optional[Mapping[str, object]] = None,
):
self._asset_key = check.inst_param(asset_key, "asset_key", AssetKey)
from dagster._core.event_api import AssetRecordsFilter
resource_arg_names: Set[str] = {
arg.name for arg in get_resource_args(asset_materialization_fn)
}
combined_required_resource_keys = (
check.opt_set_param(required_resource_keys, "required_resource_keys", of_type=str)
| resource_arg_names
)
def _wrap_asset_fn(materialization_fn) -> Any:
def _fn(context) -> Any:
after_cursor = None
if context.cursor:
try:
after_cursor = int(context.cursor)
except ValueError:
after_cursor = None
event_records = context.instance.fetch_materializations(
AssetRecordsFilter(
asset_key=self._asset_key,
after_storage_id=after_cursor,
),
ascending=False,
limit=1,
).records
if not event_records:
yield SkipReason(
f"No new materialization events found for asset key {self._asset_key}"
)
return
event_record = event_records[0]
(
context_param_name,
event_log_entry_param_name,
) = get_asset_sensor_param_names(materialization_fn)
resource_args_populated = validate_and_get_resource_dict(
context.resources, name, resource_arg_names
)
# Build asset sensor function args, which can include any subset of
# context arg, event log entry arg, and any resource args
args = resource_args_populated
if context_param_name:
args[context_param_name] = context
if event_log_entry_param_name:
args[event_log_entry_param_name] = event_record.event_log_entry
result = materialization_fn(**args)
if inspect.isgenerator(result) or isinstance(result, list):
for item in result:
yield item
elif isinstance(result, (SkipReason, RunRequest)):
yield result
context.update_cursor(str(event_record.storage_id))
return _fn
super(AssetSensorDefinition, self).__init__(
name=check_valid_name(name),
job_name=job_name,
evaluation_fn=_wrap_asset_fn(
check.callable_param(asset_materialization_fn, "asset_materialization_fn"),
),
minimum_interval_seconds=minimum_interval_seconds,
description=description,
job=job,
jobs=jobs,
default_status=default_status,
required_resource_keys=combined_required_resource_keys,
tags=tags,
metadata=metadata,
)
@public
@property
def asset_key(self) -> AssetKey:
"""AssetKey: The key of the asset targeted by this sensor."""
return self._asset_key
@property
def sensor_type(self) -> SensorType:
return SensorType.ASSET