import datetime
from abc import ABC, abstractmethod
from functools import cached_property
from typing import TYPE_CHECKING, Mapping, Optional, Sequence
from typing_extensions import Self
import dagster._check as check
from dagster._annotations import experimental, public
from dagster._core.asset_graph_view.asset_graph_view import AssetSlice
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_subset import AssetSubset
from dagster._core.definitions.declarative_automation.serialized_objects import (
AssetSubsetWithMetadata,
AutomationConditionCursor,
AutomationConditionEvaluation,
AutomationConditionNodeCursor,
AutomationConditionNodeSnapshot,
AutomationConditionSnapshot,
get_serializable_candidate_subset,
)
from dagster._core.definitions.partition import AllPartitionsSubset
from dagster._core.definitions.time_window_partitions import BaseTimeWindowPartitionsSubset
from dagster._record import copy
from dagster._serdes.serdes import is_whitelisted_for_serdes_object
from dagster._time import get_current_timestamp
from dagster._utils.security import non_secure_md5_hash_str
from dagster._utils.warnings import disable_dagster_warnings
if TYPE_CHECKING:
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.declarative_automation.automation_context import (
AutomationContext,
)
from dagster._core.definitions.declarative_automation.operands import (
CodeVersionChangedCondition,
CronTickPassedCondition,
FailedAutomationCondition,
InLatestTimeWindowCondition,
InProgressAutomationCondition,
MissingAutomationCondition,
NewlyRequestedCondition,
NewlyUpdatedCondition,
WillBeRequestedCondition,
)
from dagster._core.definitions.declarative_automation.operators import (
AllDepsCondition,
AndAutomationCondition,
AnyDepsCondition,
AnyDownstreamConditionsCondition,
NewlyTrueCondition,
NotAutomationCondition,
OrAutomationCondition,
SinceCondition,
)
[docs]
class AutomationCondition(ABC):
"""An AutomationCondition represents a condition of an asset that impacts whether it should be
automatically executed. For example, you can have a condition which becomes true whenever the
code version of the asset is changed, or whenever an upstream dependency is updated.
.. code-block:: python
from dagster import AutomationCondition, asset
@asset(automation_condition=AutomationCondition.on_cron("0 0 * * *"))
def my_asset(): ...
AutomationConditions may be combined together into expressions using a variety of operators.
.. code-block:: python
from dagster import AssetSelection, AutomationCondition, asset
# any dependencies from the "important" group are missing
any_important_deps_missing = AutomationCondition.any_deps_match(
AutomationCondition.missing(),
).allow(AssetSelection.groups("important"))
# there is a new code version for this asset since the last time it was requested
new_code_version = AutomationCondition.code_version_changed().since(
AutomationCondition.newly_requested()
)
# there is a new code version and no important dependencies are missing
my_condition = new_code_version & ~any_important_deps_missing
@asset(automation_condition=my_condition)
def my_asset(): ...
"""
@property
def requires_cursor(self) -> bool:
return True
@property
def children(self) -> Sequence["AutomationCondition"]:
return []
@property
def description(self) -> str:
"""Human-readable description of when this condition is true."""
return ""
@property
def label(self) -> Optional[str]:
"""User-provided label subjectively describing the purpose of this condition in the broader evaluation tree."""
return None
@property
def name(self) -> str:
"""Formal name of this specific condition, generally aligning with its static constructor."""
return self.__class__.__name__
def get_node_snapshot(self, unique_id: str) -> AutomationConditionNodeSnapshot:
"""Returns a snapshot of this condition that can be used for serialization."""
return AutomationConditionNodeSnapshot(
class_name=self.__class__.__name__,
description=self.description,
unique_id=unique_id,
label=self.label,
name=self.name,
)
def get_snapshot(
self, *, parent_unique_id: Optional[str] = None, index: Optional[int] = None
) -> AutomationConditionSnapshot:
"""Returns a serializable snapshot of the entire AutomationCondition tree."""
unique_id = self.get_unique_id(parent_unique_id=parent_unique_id, index=index)
node_snapshot = self.get_node_snapshot(unique_id)
children = [
child.get_snapshot(parent_unique_id=unique_id, index=i)
for (i, child) in enumerate(self.children)
]
return AutomationConditionSnapshot(node_snapshot=node_snapshot, children=children)
def get_unique_id(self, *, parent_unique_id: Optional[str], index: Optional[int]) -> str:
"""Returns a unique identifier for this condition within the broader condition tree."""
parts = [str(parent_unique_id), str(index), self.__class__.__name__, self.description]
return non_secure_md5_hash_str("".join(parts).encode())
def get_hash(
self, *, parent_unique_id: Optional[str] = None, index: Optional[int] = None
) -> int:
"""Generates a hash based off of the unique ids of all children."""
unique_id = self.get_unique_id(parent_unique_id=parent_unique_id, index=index)
hashes = [hash(unique_id)]
for i, child in enumerate(self.children):
hashes.append(child.get_hash(parent_unique_id=unique_id, index=i))
return hash(tuple(hashes))
def __hash__(self) -> int:
return self.get_hash()
@property
def has_rule_condition(self) -> bool:
from dagster._core.definitions.declarative_automation.legacy import RuleCondition
if isinstance(self, RuleCondition):
return True
return any(child.has_rule_condition for child in self.children)
@property
def is_serializable(self) -> bool:
if not is_whitelisted_for_serdes_object(self):
return False
return all(child.is_serializable for child in self.children)
def as_auto_materialize_policy(self) -> "AutoMaterializePolicy":
"""Returns an AutoMaterializePolicy which contains this condition."""
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
return AutoMaterializePolicy.from_automation_condition(self)
@abstractmethod
def evaluate(self, context: "AutomationContext") -> "AutomationResult":
raise NotImplementedError()
def __and__(self, other: "AutomationCondition") -> "AndAutomationCondition":
from dagster._core.definitions.declarative_automation.operators import (
AndAutomationCondition,
)
# group AndAutomationConditions together
if isinstance(self, AndAutomationCondition):
return AndAutomationCondition(operands=[*self.operands, other])
return AndAutomationCondition(operands=[self, other])
def __or__(self, other: "AutomationCondition") -> "OrAutomationCondition":
from dagster._core.definitions.declarative_automation.operators import OrAutomationCondition
# group OrAutomationConditions together
if isinstance(self, OrAutomationCondition):
return OrAutomationCondition(operands=[*self.operands, other])
return OrAutomationCondition(operands=[self, other])
def __invert__(self) -> "NotAutomationCondition":
from dagster._core.definitions.declarative_automation.operators import (
NotAutomationCondition,
)
return NotAutomationCondition(operand=self)
[docs]
@public
def with_label(self, label: Optional[str]) -> Self:
"""Returns a copy of this AutomationCondition with a human-readable label."""
return copy(self, label=label)
def since(self, reset_condition: "AutomationCondition") -> "SinceCondition":
"""Returns an AutomationCondition that is true if this condition has become true since the
last time the reference condition became true.
"""
from dagster._core.definitions.declarative_automation.operators import SinceCondition
return SinceCondition(trigger_condition=self, reset_condition=reset_condition)
def since_last_handled(self) -> "SinceCondition":
"""Returns an AutomationCondition that is true if this condition has become true since the
last time this asset partition was requested or updated.
"""
with disable_dagster_warnings():
return self.since(
(
AutomationCondition.newly_requested() | AutomationCondition.newly_updated()
).with_label("handled")
)
def newly_true(self) -> "NewlyTrueCondition":
"""Returns an AutomationCondition that is true only on the tick that this condition goes
from false to true for a given asset partition.
"""
from dagster._core.definitions.declarative_automation.operators import NewlyTrueCondition
return NewlyTrueCondition(operand=self)
[docs]
@public
@experimental
@staticmethod
def any_deps_match(condition: "AutomationCondition") -> "AnyDepsCondition":
"""Returns an AutomationCondition that is true for an asset partition if at least one partition
of any of its dependencies evaluate to True for the given condition.
Args:
condition (AutomationCondition): The AutomationCondition that will be evaluated against
this asset's dependencies.
"""
from dagster._core.definitions.declarative_automation.operators import AnyDepsCondition
return AnyDepsCondition(operand=condition)
[docs]
@public
@experimental
@staticmethod
def all_deps_match(condition: "AutomationCondition") -> "AllDepsCondition":
"""Returns an AutomationCondition that is true for an asset partition if at least one partition
of all of its dependencies evaluate to True for the given condition.
Args:
condition (AutomationCondition): The AutomationCondition that will be evaluated against
this asset's dependencies.
"""
from dagster._core.definitions.declarative_automation.operators import AllDepsCondition
return AllDepsCondition(operand=condition)
[docs]
@public
@experimental
@staticmethod
def missing() -> "MissingAutomationCondition":
"""Returns an AutomationCondition that is true for an asset partition if it has never been
materialized or observed.
"""
from dagster._core.definitions.declarative_automation.operands import (
MissingAutomationCondition,
)
return MissingAutomationCondition()
[docs]
@public
@experimental
@staticmethod
def in_progress() -> "InProgressAutomationCondition":
"""Returns an AutomationCondition that is true for an asset partition if it is part of an in-progress run."""
from dagster._core.definitions.declarative_automation.operands import (
InProgressAutomationCondition,
)
return InProgressAutomationCondition()
[docs]
@public
@experimental
@staticmethod
def failed() -> "FailedAutomationCondition":
"""Returns an AutomationCondition that is true for an asset partition if its latest run failed."""
from dagster._core.definitions.declarative_automation.operands import (
FailedAutomationCondition,
)
return FailedAutomationCondition()
[docs]
@public
@experimental
@staticmethod
def in_latest_time_window(
lookback_delta: Optional[datetime.timedelta] = None,
) -> "InLatestTimeWindowCondition":
"""Returns an AutomationCondition that is true for an asset partition when it is within the latest
time window.
Args:
lookback_delta (Optional, datetime.timedelta): If provided, the condition will
return all partitions within the provided delta of the end of the latest time window.
For example, if this is used on a daily-partitioned asset with a lookback_delta of
48 hours, this will return the latest two partitions.
"""
from dagster._core.definitions.declarative_automation.operands import (
InLatestTimeWindowCondition,
)
return InLatestTimeWindowCondition.from_lookback_delta(lookback_delta)
[docs]
@public
@experimental
@staticmethod
def will_be_requested() -> "WillBeRequestedCondition":
"""Returns an AutomationCondition that is true for an asset partition if it will be requested this tick."""
from dagster._core.definitions.declarative_automation.operands import (
WillBeRequestedCondition,
)
return WillBeRequestedCondition()
[docs]
@public
@experimental
@staticmethod
def newly_updated() -> "NewlyUpdatedCondition":
"""Returns an AutomationCondition that is true for an asset partition if it has been updated since the previous tick."""
from dagster._core.definitions.declarative_automation.operands import NewlyUpdatedCondition
return NewlyUpdatedCondition()
[docs]
@public
@experimental
@staticmethod
def newly_requested() -> "NewlyRequestedCondition":
"""Returns an AutomationCondition that is true for an asset partition if it was requested on the previous tick."""
from dagster._core.definitions.declarative_automation.operands import (
NewlyRequestedCondition,
)
return NewlyRequestedCondition()
[docs]
@public
@experimental
@staticmethod
def code_version_changed() -> "CodeVersionChangedCondition":
"""Returns an AutomationCondition that is true for an asset partition if its asset's code
version has been changed since the previous tick.
"""
from dagster._core.definitions.declarative_automation.operands import (
CodeVersionChangedCondition,
)
return CodeVersionChangedCondition()
[docs]
@public
@experimental
@staticmethod
def cron_tick_passed(
cron_schedule: str, cron_timezone: str = "UTC"
) -> "CronTickPassedCondition":
"""Returns an AutomationCondition that is true for all asset partitions whenever a cron tick of the provided schedule is passed."""
from dagster._core.definitions.declarative_automation.operands import (
CronTickPassedCondition,
)
return CronTickPassedCondition(
cron_schedule=cron_schedule, cron_timezone=cron_timezone
).with_label(f"cron_tick_passed({cron_schedule}, {cron_timezone})")
@experimental
@staticmethod
def newly_missing() -> "AutomationCondition":
"""Returns an AutomationCondition that is true on the tick that an asset partition becomes missing."""
with disable_dagster_warnings():
return AutomationCondition.missing().newly_true().with_label("newly_missing")
@experimental
@staticmethod
def any_deps_updated() -> "AnyDepsCondition":
"""Returns an AutomationCondition that is true for any asset partition with at least one
dependency that has updated since the previous tick, or will be requested on this tick.
"""
with disable_dagster_warnings():
return AutomationCondition.any_deps_match(
AutomationCondition.newly_updated() | AutomationCondition.will_be_requested()
).with_label("any_deps_updated")
@experimental
@staticmethod
def any_deps_missing() -> "AnyDepsCondition":
"""Returns an AutomationCondition that is true for any asset partition with at least one
dependency that is missing, and will not be requested on this tick.
"""
with disable_dagster_warnings():
return AutomationCondition.any_deps_match(
AutomationCondition.missing() & ~AutomationCondition.will_be_requested()
).with_label("any_deps_missing")
@experimental
@staticmethod
def any_deps_in_progress() -> "AnyDepsCondition":
"""Returns an AutomationCondition that is true for any asset partition with at least one
dependency that is in progress.
"""
with disable_dagster_warnings():
return AutomationCondition.any_deps_match(AutomationCondition.in_progress()).with_label(
"any_deps_in_progress"
)
@experimental
@staticmethod
def all_deps_updated_since_cron(
cron_schedule: str, cron_timezone: str = "UTC"
) -> "AllDepsCondition":
"""Returns an AutomatonCondition that is true for any asset partition where all of its
dependencies have updated since the latest tick of the provided cron schedule.
"""
with disable_dagster_warnings():
return AutomationCondition.all_deps_match(
AutomationCondition.newly_updated().since(
AutomationCondition.cron_tick_passed(cron_schedule, cron_timezone)
)
| AutomationCondition.will_be_requested()
).with_label(f"all_deps_updated_since_cron({cron_schedule}, {cron_timezone})")
[docs]
@public
@experimental
@staticmethod
def eager() -> "AutomationCondition":
"""Returns an AutomationCondition which will cause missing asset partitions to be
materialized, and will materialize asset partitions whenever their parents are updated.
For time partitioned assets, only the latest time partition will be considered.
This will never evaluate to true if the asset has any upstream partitions which are missing
or part of an in progress run, and will never evaluate to true if the provided asset partition
is already part of an in progress run.
"""
with disable_dagster_warnings():
return (
AutomationCondition.in_latest_time_window()
& (
AutomationCondition.newly_missing() | AutomationCondition.any_deps_updated()
).since_last_handled()
& ~AutomationCondition.any_deps_missing()
& ~AutomationCondition.any_deps_in_progress()
& ~AutomationCondition.in_progress()
).with_label("eager")
[docs]
@public
@experimental
@staticmethod
def on_cron(cron_schedule: str, cron_timezone: str = "UTC") -> "AutomationCondition":
"""Returns an AutomationCondition which will cause asset partitions to be materialized
on a given cron schedule, after all of their dependencies have been updated since the latest
tick of that cron schedule.
For time partitioned assets, only the latest time partition will be considered.
"""
with disable_dagster_warnings():
return (
AutomationCondition.in_latest_time_window()
& AutomationCondition.cron_tick_passed(
cron_schedule, cron_timezone
).since_last_handled()
& AutomationCondition.all_deps_updated_since_cron(cron_schedule, cron_timezone)
).with_label(f"on_cron({cron_schedule}, {cron_timezone})")
[docs]
@public
@experimental
@staticmethod
def any_downstream_conditions() -> "AnyDownstreamConditionsCondition":
"""Returns an AutomationCondition which represents the union of all distinct downstream conditions."""
from dagster._core.definitions.declarative_automation.operators import (
AnyDownstreamConditionsCondition,
)
return AnyDownstreamConditionsCondition()
class AutomationResult:
"""The result of evaluating an AutomationCondition."""
def __init__(
self,
context: "AutomationContext",
true_slice: AssetSlice,
cursor: Optional[str] = None,
child_results: Optional[Sequence["AutomationResult"]] = None,
**kwargs,
):
from dagster._core.definitions.declarative_automation.automation_context import (
AutomationContext,
)
self._context = check.inst_param(context, "context", AutomationContext)
self._true_slice = check.inst_param(true_slice, "true_slice", AssetSlice)
self._child_results = check.opt_sequence_param(
child_results, "child_results", of_type=AutomationResult
)
self._start_timestamp = context.create_time.timestamp()
self._end_timestamp = get_current_timestamp()
# hidden_param which should only be set by legacy RuleConditions
self._subsets_with_metadata = check.opt_sequence_param(
kwargs.get("subsets_with_metadata"), "subsets_with_metadata", AssetSubsetWithMetadata
)
# hidden_param which should only be set by builtin conditions which require high performance
# in their serdes layer
structured_cursor = kwargs.get("structured_cursor")
invalid_hidden_params = set(kwargs.keys()) - {"subsets_with_metadata", "structured_cursor"}
check.param_invariant(
not invalid_hidden_params, "kwargs", f"Invalid hidden params: {invalid_hidden_params}"
)
check.param_invariant(
not (cursor and structured_cursor),
"structured_cursor",
"Cannot provide both cursor and structured_cursor.",
)
self._extra_state = check.opt_str_param(cursor, "cursor") or structured_cursor
# used to enable the evaluator class to modify the evaluation in some edge cases
self._serializable_evaluation_override: Optional[AutomationConditionEvaluation] = None
@property
def asset_key(self) -> AssetKey:
return self._true_slice.asset_key
@property
def true_slice(self) -> AssetSlice:
return self._true_slice
@property
def true_subset(self) -> AssetSubset:
return self.true_slice.convert_to_valid_asset_subset()
@property
def start_timestamp(self) -> float:
return self._start_timestamp
@property
def end_timestamp(self) -> float:
return self._end_timestamp
@property
def child_results(self) -> Sequence["AutomationResult"]:
return self._child_results
@property
def condition(self) -> AutomationCondition:
return self._context.condition
@property
def condition_unique_id(self) -> str:
return self._context.condition_unique_id
@cached_property
def value_hash(self) -> str:
"""An identifier for the contents of this AutomationResult. This will be identical for
results with identical values, allowing us to avoid storing redundant information.
"""
components: Sequence[str] = [
self.condition_unique_id,
self.condition.description,
_compute_subset_value_str(self.true_subset),
_compute_subset_value_str(
self._context.candidate_slice.convert_to_valid_asset_subset()
),
*(_compute_subset_with_metadata_value_str(swm) for swm in self._subsets_with_metadata),
*(child_result.value_hash for child_result in self._child_results),
]
return non_secure_md5_hash_str("".join(components).encode("utf-8"))
@cached_property
def node_cursor(self) -> Optional[AutomationConditionNodeCursor]:
"""Cursor value storing information about this specific evaluation node, if required."""
if not self.condition.requires_cursor:
return None
return AutomationConditionNodeCursor(
true_subset=self.true_subset,
candidate_subset=get_serializable_candidate_subset(
self._context.candidate_slice.convert_to_valid_asset_subset()
),
subsets_with_metadata=self._subsets_with_metadata,
extra_state=self._extra_state,
)
@cached_property
def _serializable_evaluation(self) -> AutomationConditionEvaluation:
return AutomationConditionEvaluation(
condition_snapshot=self.condition.get_node_snapshot(self.condition_unique_id),
true_subset=self.true_subset,
candidate_subset=get_serializable_candidate_subset(
self._context.candidate_slice.convert_to_valid_asset_subset()
),
subsets_with_metadata=self._subsets_with_metadata,
start_timestamp=self._start_timestamp,
end_timestamp=self._end_timestamp,
child_evaluations=[
child_result.serializable_evaluation for child_result in self._child_results
],
)
@property
def serializable_evaluation(self) -> AutomationConditionEvaluation:
"""Serializable representation of the evaluation of this condition."""
return self._serializable_evaluation_override or self._serializable_evaluation
def set_internal_serializable_evaluation_override(
self, override: AutomationConditionEvaluation
) -> None:
"""Internal method for handling edge cases in which the serializable evaluation must be
updated after evaluation completes.
"""
self._serializable_evaluation_override = override
def get_child_node_cursors(self) -> Mapping[str, AutomationConditionNodeCursor]:
node_cursors = {self.condition_unique_id: self.node_cursor} if self.node_cursor else {}
for child_result in self._child_results:
node_cursors.update(child_result.get_child_node_cursors())
return node_cursors
def get_new_cursor(self) -> AutomationConditionCursor:
return AutomationConditionCursor(
previous_requested_subset=self.serializable_evaluation.true_subset,
effective_timestamp=self._context.evaluation_time.timestamp(),
last_event_id=self._context.max_storage_id,
node_cursors_by_unique_id=self.get_child_node_cursors(),
result_value_hash=self.value_hash,
)
def _compute_subset_value_str(subset: AssetSubset) -> str:
"""Computes a unique string representing a given AssetSubsets. This string will be equal for
equivalent AssetSubsets.
"""
if isinstance(subset.value, bool):
return str(subset.value)
elif isinstance(subset.value, AllPartitionsSubset):
return AllPartitionsSubset.__name__
elif isinstance(subset.value, BaseTimeWindowPartitionsSubset):
return str(
[
(tw.start.timestamp(), tw.end.timestamp())
for tw in sorted(subset.value.included_time_windows)
]
)
else:
return str(list(sorted(subset.asset_partitions)))
def _compute_subset_with_metadata_value_str(subset_with_metadata: AssetSubsetWithMetadata):
return _compute_subset_value_str(subset_with_metadata.subset) + str(
sorted(subset_with_metadata.frozen_metadata)
)