from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional
import pytz
import dagster._check as check
from dagster._annotations import public
from dagster._utils.schedules import is_valid_cron_string
if TYPE_CHECKING:
from dagster._core.definitions.auto_materialize_rule_evaluation import (
AutoMaterializeDecisionType,
AutoMaterializeRuleSnapshot,
)
from dagster._core.definitions.auto_materialize_rule_impls import (
AutoMaterializeAssetPartitionsFilter,
MaterializeOnCronRule,
MaterializeOnMissingRule,
MaterializeOnParentUpdatedRule,
MaterializeOnRequiredForFreshnessRule,
SkipOnBackfillInProgressRule,
SkipOnNotAllParentsUpdatedRule,
SkipOnNotAllParentsUpdatedSinceCronRule,
SkipOnParentMissingRule,
SkipOnParentOutdatedRule,
SkipOnRequiredButNonexistentParentsRule,
SkipOnRunInProgressRule,
)
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationResult,
)
from dagster._core.definitions.declarative_automation.automation_context import (
AutomationContext,
)
from dagster._core.definitions.declarative_automation.legacy import RuleCondition
[docs]
class AutoMaterializeRule(ABC):
"""An AutoMaterializeRule defines a bit of logic which helps determine if a materialization
should be kicked off for a given asset partition.
Each rule can have one of two decision types, `MATERIALIZE` (indicating that an asset partition
should be materialized) or `SKIP` (indicating that the asset partition should not be
materialized).
Materialize rules are evaluated first, and skip rules operate over the set of candidates that
are produced by the materialize rules. Other than that, there is no ordering between rules.
"""
@property
@abstractmethod
def decision_type(self) -> "AutoMaterializeDecisionType":
"""The decision type of the rule (either `MATERIALIZE` or `SKIP`)."""
...
@property
@abstractmethod
def description(self) -> str:
"""A human-readable description of this rule. As a basic guideline, this string should
complete the sentence: 'Indicates an asset should be (materialize/skipped) when ____'.
"""
...
def to_asset_condition(self) -> "RuleCondition":
"""Converts this AutoMaterializeRule into an AssetCondition."""
from dagster._core.definitions.declarative_automation.legacy.rule_condition import (
RuleCondition,
)
return RuleCondition(rule=self)
@abstractmethod
def evaluate_for_asset(self, context: "AutomationContext") -> "AutomationResult":
"""The core evaluation function for the rule. This function takes in a context object and
returns a mapping from evaluated rules to the set of asset partitions that the rule applies
to.
"""
...
[docs]
@public
@staticmethod
def materialize_on_required_for_freshness() -> "MaterializeOnRequiredForFreshnessRule":
"""(Deprecated) Materialize an asset partition if it is required to satisfy a freshness policy of this
asset or one of its downstream assets.
Note: This rule has no effect on partitioned assets.
"""
from dagster._core.definitions.auto_materialize_rule_impls import (
MaterializeOnRequiredForFreshnessRule,
)
return MaterializeOnRequiredForFreshnessRule()
[docs]
@public
@staticmethod
def materialize_on_cron(
cron_schedule: str, timezone: str = "UTC", all_partitions: bool = False
) -> "MaterializeOnCronRule":
"""Materialize an asset partition if it has not been materialized since the latest cron
schedule tick. For assets with a time component to their partitions_def, this rule will
request all partitions that have been missed since the previous tick.
Args:
cron_schedule (str): A cron schedule string (e.g. "`0 * * * *`") indicating the ticks for
which this rule should fire.
timezone (str): The timezone in which this cron schedule should be evaluated. Defaults
to "UTC".
all_partitions (bool): If True, this rule fires for all partitions of this asset on each
cron tick. If False, this rule fires only for the last partition of this asset.
Defaults to False.
"""
check.param_invariant(
is_valid_cron_string(cron_schedule), "cron_schedule", "must be a valid cron string"
)
check.param_invariant(
timezone in pytz.all_timezones_set, "timezone", "must be a valid timezone"
)
from dagster._core.definitions.auto_materialize_rule_impls import MaterializeOnCronRule
return MaterializeOnCronRule(
cron_schedule=cron_schedule, timezone=timezone, all_partitions=all_partitions
)
[docs]
@public
@staticmethod
def materialize_on_parent_updated(
updated_parent_filter: Optional["AutoMaterializeAssetPartitionsFilter"] = None,
) -> "MaterializeOnParentUpdatedRule":
"""Materialize an asset partition if one of its parents has been updated more recently
than it has.
Note: For time-partitioned or dynamic-partitioned assets downstream of an unpartitioned
asset, this rule will only fire for the most recent partition of the downstream.
Args:
updated_parent_filter (Optional[AutoMaterializeAssetPartitionsFilter]): Filter to apply
to updated parents. If a parent was updated but does not pass the filter criteria,
then it won't count as updated for the sake of this rule.
"""
from dagster._core.definitions.auto_materialize_rule_impls import (
MaterializeOnParentUpdatedRule,
)
return MaterializeOnParentUpdatedRule(updated_parent_filter=updated_parent_filter)
[docs]
@public
@staticmethod
def materialize_on_missing() -> "MaterializeOnMissingRule":
"""Materialize an asset partition if it has never been materialized before. This rule will
not fire for non-root assets unless that asset's parents have been updated.
"""
from dagster._core.definitions.auto_materialize_rule_impls import MaterializeOnMissingRule
return MaterializeOnMissingRule()
[docs]
@public
@staticmethod
def skip_on_parent_missing() -> "SkipOnParentMissingRule":
"""Skip materializing an asset partition if one of its parent asset partitions has never
been materialized (for regular assets) or observed (for observable source assets).
"""
from dagster._core.definitions.auto_materialize_rule_impls import SkipOnParentMissingRule
return SkipOnParentMissingRule()
[docs]
@public
@staticmethod
def skip_on_parent_outdated() -> "SkipOnParentOutdatedRule":
"""Skip materializing an asset partition if any of its parents has not incorporated the
latest data from its ancestors.
"""
from dagster._core.definitions.auto_materialize_rule_impls import SkipOnParentOutdatedRule
return SkipOnParentOutdatedRule()
[docs]
@public
@staticmethod
def skip_on_not_all_parents_updated(
require_update_for_all_parent_partitions: bool = False,
) -> "SkipOnNotAllParentsUpdatedRule":
"""Skip materializing an asset partition if any of its parents have not been updated since
the asset's last materialization.
Args:
require_update_for_all_parent_partitions (Optional[bool]): Applies only to an unpartitioned
asset or an asset partition that depends on more than one partition in any upstream asset.
If true, requires all upstream partitions in each upstream asset to be materialized since
the downstream asset's last materialization in order to update it. If false, requires at
least one upstream partition in each upstream asset to be materialized since the downstream
asset's last materialization in order to update it. Defaults to false.
"""
from dagster._core.definitions.auto_materialize_rule_impls import (
SkipOnNotAllParentsUpdatedRule,
)
return SkipOnNotAllParentsUpdatedRule(require_update_for_all_parent_partitions)
@staticmethod
def skip_on_not_all_parents_updated_since_cron(
cron_schedule: str, timezone: str = "UTC"
) -> "SkipOnNotAllParentsUpdatedSinceCronRule":
"""Skip materializing an asset partition if any of its parents have not been updated since
the latest tick of the given cron schedule.
Args:
cron_schedule (str): A cron schedule string (e.g. "`0 * * * *`").
timezone (str): The timezone in which this cron schedule should be evaluated. Defaults
to "UTC".
"""
from dagster._core.definitions.auto_materialize_rule_impls import (
SkipOnNotAllParentsUpdatedSinceCronRule,
)
return SkipOnNotAllParentsUpdatedSinceCronRule(
cron_schedule=cron_schedule, timezone=timezone
)
[docs]
@public
@staticmethod
def skip_on_required_but_nonexistent_parents() -> "SkipOnRequiredButNonexistentParentsRule":
"""Skip an asset partition if it depends on parent partitions that do not exist.
For example, imagine a downstream asset is time-partitioned, starting in 2022, but has a
time-partitioned parent which starts in 2023. This rule will skip attempting to materialize
downstream partitions from before 2023, since the parent partitions do not exist.
"""
from dagster._core.definitions.auto_materialize_rule_impls import (
SkipOnRequiredButNonexistentParentsRule,
)
return SkipOnRequiredButNonexistentParentsRule()
[docs]
@public
@staticmethod
def skip_on_backfill_in_progress(
all_partitions: bool = False,
) -> "SkipOnBackfillInProgressRule":
"""Skip an asset's partitions if targeted by an in-progress backfill.
Args:
all_partitions (bool): If True, skips all partitions of the asset being backfilled,
regardless of whether the specific partition is targeted by a backfill.
If False, skips only partitions targeted by a backfill. Defaults to False.
"""
from dagster._core.definitions.auto_materialize_rule_impls import (
SkipOnBackfillInProgressRule,
)
return SkipOnBackfillInProgressRule(all_partitions)
@staticmethod
def skip_on_run_in_progress() -> "SkipOnRunInProgressRule":
from dagster._core.definitions.auto_materialize_rule_impls import SkipOnRunInProgressRule
return SkipOnRunInProgressRule()
def to_snapshot(self) -> "AutoMaterializeRuleSnapshot":
"""Returns a serializable snapshot of this rule for historical evaluations."""
from dagster._core.definitions.auto_materialize_rule_evaluation import (
AutoMaterializeRuleSnapshot,
)
return AutoMaterializeRuleSnapshot(
class_name=self.__class__.__name__,
description=self.description,
decision_type=self.decision_type,
)
def __eq__(self, other) -> bool:
# override the default NamedTuple __eq__ method to factor in types
return type(self) == type(other) and super().__eq__(other)
def __hash__(self) -> int:
# override the default NamedTuple __hash__ method to factor in types
return hash(hash(type(self).__name__) + super().__hash__())