from collections import defaultdict
from datetime import datetime
from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Dict,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Union,
)
import dagster._check as check
from dagster._annotations import PublicAttr, experimental_param
from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.asset_graph_subset import AssetGraphSubset
from dagster._core.definitions.asset_key import EntityKey
from dagster._core.definitions.declarative_automation.serialized_objects import (
AutomationConditionEvaluation,
)
from dagster._core.definitions.dynamic_partitions_request import (
AddDynamicPartitionsRequest,
DeleteDynamicPartitionsRequest,
)
from dagster._core.definitions.events import AssetKey, AssetMaterialization, AssetObservation
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.instance import DynamicPartitionsStore
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
PARTITION_NAME_TAG,
)
from dagster._record import IHaveNew, LegacyNamedTupleMixin, record, record_custom
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._utils.error import SerializableErrorInfo
from dagster._utils.tags import normalize_tags
if TYPE_CHECKING:
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.run_config import RunConfig
@whitelist_for_serdes(old_storage_names={"JobType"})
class InstigatorType(Enum):
SCHEDULE = "SCHEDULE"
SENSOR = "SENSOR"
AUTO_MATERIALIZE = "AUTO_MATERIALIZE"
[docs]
@whitelist_for_serdes
class SkipReason(NamedTuple("_SkipReason", [("skip_message", PublicAttr[Optional[str]])])):
"""Represents a skipped evaluation, where no runs are requested. May contain a message to indicate
why no runs were requested.
Attributes:
skip_message (Optional[str]): A message displayed in the Dagster UI for why this evaluation resulted
in no requested runs.
"""
def __new__(cls, skip_message: Optional[str] = None):
return super(SkipReason, cls).__new__(
cls,
skip_message=check.opt_str_param(skip_message, "skip_message"),
)
[docs]
@whitelist_for_serdes(kwargs_fields={"asset_graph_subset"})
@record_custom
class RunRequest(IHaveNew, LegacyNamedTupleMixin):
run_key: Optional[str]
run_config: Mapping[str, Any]
tags: Mapping[str, str]
job_name: Optional[str]
asset_selection: Optional[Sequence[AssetKey]]
stale_assets_only: bool
partition_key: Optional[str]
asset_check_keys: Optional[Sequence[AssetCheckKey]]
asset_graph_subset: Optional[AssetGraphSubset]
"""Represents all the information required to launch a single run. Must be returned by a
SensorDefinition or ScheduleDefinition's evaluation function for a run to be launched.
Attributes:
run_key (Optional[str]): A string key to identify this launched run. For sensors, ensures that
only one run is created per run key across all sensor evaluations. For schedules,
ensures that one run is created per tick, across failure recoveries. Passing in a `None`
value means that a run will always be launched per evaluation.
run_config (Optional[Union[RunConfig, Mapping[str, Any]]]: Configuration for the run. If the job has
a :py:class:`PartitionedConfig`, this value will override replace the config
provided by it.
tags (Optional[Dict[str, Any]]): A dictionary of tags (string key-value pairs) to attach
to the launched run.
job_name (Optional[str]): (Experimental) The name of the job this run request will launch.
Required for sensors that target multiple jobs.
asset_selection (Optional[Sequence[AssetKey]]): A subselection of assets that should be
launched with this run. If the sensor or schedule targets a job, then by default a
RunRequest returned from it will launch all of the assets in the job. If the sensor
targets an asset selection, then by default a RunRequest returned from it will launch
all the assets in the selection. This argument is used to specify that only a subset of
these assets should be launched, instead of all of them.
asset_check_keys (Optional[Sequence[AssetCheckKey]]): (Experimental) A subselection of asset checks that
should be launched with this run. This is currently only supported on sensors. If the
sensor targets a job, then by default a RunRequest returned from it will launch all of
the asset checks in the job. If the sensor targets an asset selection, then by default a
RunRequest returned from it will launch all the asset checks in the selection. This
argument is used to specify that only a subset of these asset checks should be launched,
instead of all of them.
stale_assets_only (bool): Set to true to further narrow the asset
selection to stale assets. If passed without an asset selection, all stale assets in the
job will be materialized. If the job does not materialize assets, this flag is ignored.
partition_key (Optional[str]): The partition key for this run request.
"""
def __new__(
cls,
run_key: Optional[str] = None,
run_config: Optional[Union["RunConfig", Mapping[str, Any]]] = None,
tags: Optional[Mapping[str, Any]] = None,
job_name: Optional[str] = None,
asset_selection: Optional[Sequence[AssetKey]] = None,
stale_assets_only: bool = False,
partition_key: Optional[str] = None,
asset_check_keys: Optional[Sequence[AssetCheckKey]] = None,
**kwargs,
):
from dagster._core.definitions.run_config import convert_config_input
if kwargs.get("asset_graph_subset") is not None:
# asset_graph_subset is only passed if you use the RunRequest.for_asset_graph_subset helper
# constructor, so we assume that no other parameters were passed.
return super().__new__(
cls,
run_key=None,
run_config={},
tags=normalize_tags(tags),
job_name=None,
asset_selection=None,
stale_assets_only=False,
partition_key=None,
asset_check_keys=None,
asset_graph_subset=check.inst_param(
kwargs["asset_graph_subset"], "asset_graph_subset", AssetGraphSubset
),
)
return super().__new__(
cls,
run_key=run_key,
run_config=convert_config_input(run_config) or {},
tags=normalize_tags(tags),
job_name=job_name,
asset_selection=asset_selection,
stale_assets_only=stale_assets_only,
partition_key=partition_key,
asset_check_keys=asset_check_keys,
asset_graph_subset=None,
)
@classmethod
def for_asset_graph_subset(
cls,
asset_graph_subset: AssetGraphSubset,
tags: Optional[Mapping[str, str]],
) -> "RunRequest":
"""Constructs a RunRequest from an AssetGraphSubset. When processed by the sensor
daemon, this will launch a backfill instead of a run.
Note: This constructor is intentionally left private since AssetGraphSubset is not part of the
public API. Other constructor methods will be public.
"""
return RunRequest(tags=tags, asset_graph_subset=asset_graph_subset)
def with_replaced_attrs(self, **kwargs: Any) -> "RunRequest":
fields = self._asdict()
for k in fields.keys():
if k in kwargs:
fields[k] = kwargs[k]
return RunRequest(**fields)
def with_resolved_tags_and_config(
self,
target_definition: "JobDefinition",
dynamic_partitions_requests: Sequence[
Union[AddDynamicPartitionsRequest, DeleteDynamicPartitionsRequest]
],
current_time: Optional[datetime] = None,
dynamic_partitions_store: Optional[DynamicPartitionsStore] = None,
) -> "RunRequest":
if self.partition_key is None:
check.failed(
"Cannot resolve partition for run request without partition key",
)
dynamic_partitions_store_after_requests = (
DynamicPartitionsStoreAfterRequests.from_requests(
dynamic_partitions_store, dynamic_partitions_requests
)
if dynamic_partitions_store
else None
)
target_definition.validate_partition_key(
self.partition_key,
dynamic_partitions_store=dynamic_partitions_store_after_requests,
selected_asset_keys=self.asset_selection,
)
tags = {
**(self.tags or {}),
**target_definition.get_tags_for_partition_key(
self.partition_key, selected_asset_keys=self.asset_selection
),
}
return self.with_replaced_attrs(
run_config=(
self.run_config
if self.run_config
else target_definition.get_run_config_for_partition_key(self.partition_key)
),
tags=tags,
)
def has_resolved_partition(self) -> bool:
# Backcompat run requests yielded via `run_request_for_partition` already have resolved
# partitioning
return self.tags.get(PARTITION_NAME_TAG) is not None if self.partition_key else True
@property
def partition_key_range(self) -> Optional[PartitionKeyRange]:
if (
ASSET_PARTITION_RANGE_START_TAG in self.tags
and ASSET_PARTITION_RANGE_END_TAG in self.tags
):
return PartitionKeyRange(
self.tags[ASSET_PARTITION_RANGE_START_TAG], self.tags[ASSET_PARTITION_RANGE_END_TAG]
)
else:
return None
@property
def entity_keys(self) -> Sequence[EntityKey]:
return [*(self.asset_selection or []), *(self.asset_check_keys or [])]
def requires_backfill_daemon(self) -> bool:
"""For now we always send RunRequests with an asset_graph_subset to the backfill daemon, but
eventaully we will want to introspect on the asset_graph_subset to determine if we can
execute it as a single run instead.
"""
return self.asset_graph_subset is not None
@record
class DynamicPartitionsStoreAfterRequests(DynamicPartitionsStore):
"""Represents the dynamic partitions that will be in the contained DynamicPartitionsStore
after the contained requests are satisfied.
"""
wrapped_dynamic_partitions_store: DynamicPartitionsStore
added_partition_keys_by_partitions_def_name: Mapping[str, AbstractSet[str]]
deleted_partition_keys_by_partitions_def_name: Mapping[str, AbstractSet[str]]
@staticmethod
def from_requests(
wrapped_dynamic_partitions_store: DynamicPartitionsStore,
dynamic_partitions_requests: Sequence[
Union[AddDynamicPartitionsRequest, DeleteDynamicPartitionsRequest]
],
) -> "DynamicPartitionsStoreAfterRequests":
added_partition_keys_by_partitions_def_name: Dict[str, Set[str]] = defaultdict(set)
deleted_partition_keys_by_partitions_def_name: Dict[str, Set[str]] = defaultdict(set)
for req in dynamic_partitions_requests:
name = req.partitions_def_name
if isinstance(req, AddDynamicPartitionsRequest):
added_partition_keys_by_partitions_def_name[name].update(set(req.partition_keys))
elif isinstance(req, DeleteDynamicPartitionsRequest):
deleted_partition_keys_by_partitions_def_name[name].update(set(req.partition_keys))
else:
check.failed(f"Unexpected request type: {req}")
return DynamicPartitionsStoreAfterRequests(
wrapped_dynamic_partitions_store=wrapped_dynamic_partitions_store,
added_partition_keys_by_partitions_def_name=added_partition_keys_by_partitions_def_name,
deleted_partition_keys_by_partitions_def_name=deleted_partition_keys_by_partitions_def_name,
)
@cached_method
def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]:
partition_keys = set(
self.wrapped_dynamic_partitions_store.get_dynamic_partitions(partitions_def_name)
)
added_partition_keys = self.added_partition_keys_by_partitions_def_name.get(
partitions_def_name, set()
)
deleted_partition_keys = self.deleted_partition_keys_by_partitions_def_name.get(
partitions_def_name, set()
)
return list((partition_keys | added_partition_keys) - deleted_partition_keys)
def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool:
return partition_key not in self.deleted_partition_keys_by_partitions_def_name.get(
partitions_def_name, set()
) and (
partition_key
in self.added_partition_keys_by_partitions_def_name.get(partitions_def_name, set())
or self.wrapped_dynamic_partitions_store.has_dynamic_partition(
partitions_def_name, partition_key
)
)
@whitelist_for_serdes(
storage_name="PipelineRunReaction",
storage_field_names={
"dagster_run": "pipeline_run",
},
)
class DagsterRunReaction(
NamedTuple(
"_DagsterRunReaction",
[
("dagster_run", Optional[DagsterRun]),
("error", Optional[SerializableErrorInfo]),
("run_status", Optional[DagsterRunStatus]),
],
)
):
"""Represents a request that reacts to an existing dagster run. If success, it will report logs
back to the run.
Attributes:
dagster_run (Optional[DagsterRun]): The dagster run that originates this reaction.
error (Optional[SerializableErrorInfo]): user code execution error.
run_status: (Optional[DagsterRunStatus]): The run status that triggered the reaction.
"""
def __new__(
cls,
dagster_run: Optional[DagsterRun],
error: Optional[SerializableErrorInfo] = None,
run_status: Optional[DagsterRunStatus] = None,
):
return super(DagsterRunReaction, cls).__new__(
cls,
dagster_run=check.opt_inst_param(dagster_run, "dagster_run", DagsterRun),
error=check.opt_inst_param(error, "error", SerializableErrorInfo),
run_status=check.opt_inst_param(run_status, "run_status", DagsterRunStatus),
)
[docs]
@experimental_param(
param="asset_events", additional_warn_text="Runless asset events are experimental"
)
class SensorResult(
NamedTuple(
"_SensorResult",
[
("run_requests", Optional[Sequence[RunRequest]]),
("skip_reason", Optional[SkipReason]),
("cursor", Optional[str]),
(
"dynamic_partitions_requests",
Optional[
Sequence[Union[DeleteDynamicPartitionsRequest, AddDynamicPartitionsRequest]]
],
),
(
"asset_events",
List[Union[AssetObservation, AssetMaterialization, AssetCheckEvaluation]],
),
(
"automation_condition_evaluations",
Optional[Sequence[AutomationConditionEvaluation[EntityKey]]],
),
],
)
):
"""The result of a sensor evaluation.
Attributes:
run_requests (Optional[Sequence[RunRequest]]): A list
of run requests to be executed.
skip_reason (Optional[Union[str, SkipReason]]): A skip message indicating why sensor
evaluation was skipped.
cursor (Optional[str]): The cursor value for this sensor, which will be provided on the
context for the next sensor evaluation.
dynamic_partitions_requests (Optional[Sequence[Union[DeleteDynamicPartitionsRequest,
AddDynamicPartitionsRequest]]]): A list of dynamic partition requests to request dynamic
partition addition and deletion. Run requests will be evaluated using the state of the
partitions with these changes applied. We recommend limiting partition additions
and deletions to a maximum of 25K partitions per sensor evaluation, as this is the maximum
recommended partition limit per asset.
asset_events (Optional[Sequence[Union[AssetObservation, AssetMaterialization, AssetCheckEvaluation]]]): (Experimental) A
list of materializations, observations, and asset check evaluations that the system
will persist on your behalf at the end of sensor evaluation. These events will be not
be associated with any particular run, but will be queryable and viewable in the asset catalog.
"""
def __new__(
cls,
run_requests: Optional[Sequence[RunRequest]] = None,
skip_reason: Optional[Union[str, SkipReason]] = None,
cursor: Optional[str] = None,
dynamic_partitions_requests: Optional[
Sequence[Union[DeleteDynamicPartitionsRequest, AddDynamicPartitionsRequest]]
] = None,
asset_events: Optional[
Sequence[Union[AssetObservation, AssetMaterialization, AssetCheckEvaluation]]
] = None,
**kwargs,
):
if skip_reason and len(run_requests if run_requests else []) > 0:
check.failed(
"Expected a single skip reason or one or more run requests: received values for "
"both run_requests and skip_reason"
)
skip_reason = check.opt_inst_param(skip_reason, "skip_reason", (SkipReason, str))
if isinstance(skip_reason, str):
skip_reason = SkipReason(skip_reason)
return super(SensorResult, cls).__new__(
cls,
run_requests=check.opt_sequence_param(run_requests, "run_requests", RunRequest),
skip_reason=skip_reason,
cursor=check.opt_str_param(cursor, "cursor"),
dynamic_partitions_requests=check.opt_sequence_param(
dynamic_partitions_requests,
"dynamic_partitions_requests",
(AddDynamicPartitionsRequest, DeleteDynamicPartitionsRequest),
),
asset_events=list(
check.opt_sequence_param(
asset_events,
"asset_check_evaluations",
(AssetObservation, AssetMaterialization, AssetCheckEvaluation),
)
),
automation_condition_evaluations=check.opt_sequence_param(
kwargs.get("automation_condition_evaluations"),
"automation_condition_evaluations",
AutomationConditionEvaluation,
),
)