Ask AI

Source code for dagster._core.storage.schedules.base

import abc
from typing import Mapping, Optional, Sequence, Set

from dagster._core.definitions.asset_key import EntityKey, T_EntityKey
from dagster._core.definitions.declarative_automation.serialized_objects import (
    AutomationConditionEvaluationWithRunIds,
)
from dagster._core.definitions.run_request import InstigatorType
from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance
from dagster._core.scheduler.instigation import (
    AutoMaterializeAssetEvaluationRecord,
    InstigatorState,
    InstigatorStatus,
    InstigatorTick,
    TickData,
    TickStatus,
)
from dagster._core.storage.sql import AlembicVersion
from dagster._utils import PrintFn


[docs] class ScheduleStorage(abc.ABC, MayHaveInstanceWeakref[T_DagsterInstance]): """Abstract class for managing persistance of scheduler artifacts.""" @abc.abstractmethod def wipe(self) -> None: """Delete all schedules from storage.""" @abc.abstractmethod def all_instigator_state( self, repository_origin_id: Optional[str] = None, repository_selector_id: Optional[str] = None, instigator_type: Optional[InstigatorType] = None, instigator_statuses: Optional[Set[InstigatorStatus]] = None, ) -> Sequence[InstigatorState]: """Return all InstigationStates present in storage. Args: repository_origin_id (Optional[str]): The ExternalRepository target id to scope results to repository_selector_id (Optional[str]): The repository selector id to scope results to instigator_type (Optional[InstigatorType]): The InstigatorType to scope results to instigator_statuses (Optional[Set[InstigatorStatus]]): The InstigatorStatuses to scope results to """ @abc.abstractmethod def get_instigator_state(self, origin_id: str, selector_id: str) -> Optional[InstigatorState]: """Return the instigator state for the given id. Args: origin_id (str): The unique instigator identifier selector_id (str): The logical instigator identifier """ @abc.abstractmethod def add_instigator_state(self, state: InstigatorState) -> InstigatorState: """Add an instigator state to storage. Args: state (InstigatorState): The state to add """ @abc.abstractmethod def update_instigator_state(self, state: InstigatorState) -> InstigatorState: """Update an instigator state in storage. Args: state (InstigatorState): The state to update """ @abc.abstractmethod def delete_instigator_state(self, origin_id: str, selector_id: str) -> None: """Delete a state in storage. Args: origin_id (str): The id of the instigator target to delete selector_id (str): The logical instigator identifier """ @property def supports_batch_queries(self) -> bool: return False def get_batch_ticks( self, selector_ids: Sequence[str], limit: Optional[int] = None, statuses: Optional[Sequence[TickStatus]] = None, ) -> Mapping[str, Sequence[InstigatorTick]]: raise NotImplementedError() @abc.abstractmethod def get_tick(self, tick_id: int) -> InstigatorTick: """Get the tick for a given evaluation tick id. Args: tick_id (str): The id of the tick to query. Returns: InstigatorTick: The tick for the given id. """ @abc.abstractmethod def get_ticks( self, origin_id: str, selector_id: str, before: Optional[float] = None, after: Optional[float] = None, limit: Optional[int] = None, statuses: Optional[Sequence[TickStatus]] = None, ) -> Sequence[InstigatorTick]: """Get the ticks for a given instigator. Args: origin_id (str): The id of the instigator target selector_id (str): The logical instigator identifier """ @abc.abstractmethod def create_tick(self, tick_data: TickData) -> InstigatorTick: """Add a tick to storage. Args: tick_data (TickData): The tick to add """ @abc.abstractmethod def update_tick(self, tick: InstigatorTick) -> InstigatorTick: """Update a tick already in storage. Args: tick (InstigatorTick): The tick to update """ @abc.abstractmethod def purge_ticks( self, origin_id: str, selector_id: str, before: float, tick_statuses: Optional[Sequence[TickStatus]] = None, ) -> None: """Wipe ticks for an instigator for a certain status and timestamp. Args: origin_id (str): The id of the instigator target to delete selector_id (str): The logical instigator identifier before (datetime): All ticks before this datetime will get purged tick_statuses (Optional[List[TickStatus]]): The tick statuses to wipe """ @property def supports_auto_materialize_asset_evaluations(self) -> bool: return True @abc.abstractmethod def add_auto_materialize_asset_evaluations( self, evaluation_id: int, asset_evaluations: Sequence[AutomationConditionEvaluationWithRunIds[EntityKey]], ) -> None: """Add asset policy evaluations to storage.""" @abc.abstractmethod def get_auto_materialize_asset_evaluations( self, key: T_EntityKey, limit: int, cursor: Optional[int] = None ) -> Sequence[AutoMaterializeAssetEvaluationRecord[T_EntityKey]]: """Get the policy evaluations for a given asset. Args: asset_key (AssetKey): The asset key to query limit (Optional[int]): The maximum number of evaluations to return cursor (Optional[int]): The cursor to paginate from """ @abc.abstractmethod def get_auto_materialize_evaluations_for_evaluation_id( self, evaluation_id: int ) -> Sequence[AutoMaterializeAssetEvaluationRecord]: """Get all policy evaluations for a given evaluation ID. Args: evaluation_id (int): The evaluation ID to query. """ @abc.abstractmethod def purge_asset_evaluations(self, before: float) -> None: """Wipe evaluations before a certain timestamp. Args: before (datetime): All evaluations before this datetime will get purged """ @abc.abstractmethod def upgrade(self) -> None: """Perform any needed migrations.""" def migrate(self, print_fn: Optional[PrintFn] = None, force_rebuild_all: bool = False) -> None: """Call this method to run any required data migrations.""" def optimize(self, print_fn: Optional[PrintFn] = None, force_rebuild_all: bool = False) -> None: """Call this method to run any optional data migrations for optimized reads.""" def optimize_for_webserver(self, statement_timeout: int, pool_recycle: int) -> None: """Allows for optimizing database connection / use in the context of a long lived webserver process.""" def alembic_version(self) -> Optional[AlembicVersion]: return None def dispose(self) -> None: """Explicit lifecycle management."""