from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING,
Iterable,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
Union,
)
import dagster._check as check
from dagster._core.assets import AssetDetails
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.data_version import DATA_VERSION_TAG
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.event_api import (
AssetRecordsFilter,
EventHandlerFn,
EventLogCursor,
EventLogRecord,
EventRecordsFilter,
EventRecordsResult,
RunStatusChangeRecordsFilter,
)
from dagster._core.events import DagsterEventType
from dagster._core.execution.stats import (
RunStepKeyStatsSnapshot,
build_run_stats_from_events,
build_run_step_stats_from_events,
)
from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance
from dagster._core.loader import LoadableBy, LoadingContext
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecord
from dagster._core.storage.dagster_run import DagsterRunStatsSnapshot
from dagster._core.storage.partition_status_cache import get_and_update_asset_status_cache_value
from dagster._core.storage.sql import AlembicVersion
from dagster._core.storage.tags import MULTIDIMENSIONAL_PARTITION_PREFIX
from dagster._utils import PrintFn
from dagster._utils.concurrency import ConcurrencyClaimStatus, ConcurrencyKeyInfo
from dagster._utils.warnings import deprecation_warning
if TYPE_CHECKING:
from dagster._core.events.log import EventLogEntry
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue
class EventLogConnection(NamedTuple):
records: Sequence[EventLogRecord]
cursor: str
has_more: bool
class AssetEntry(
NamedTuple(
"_AssetEntry",
[
("asset_key", AssetKey),
("last_materialization_record", Optional[EventLogRecord]),
("last_run_id", Optional[str]),
("asset_details", Optional[AssetDetails]),
("cached_status", Optional["AssetStatusCacheValue"]),
# This is an optional field which can be used for more performant last observation
# queries if the underlying storage supports it
("last_observation_record", Optional[EventLogRecord]),
("last_planned_materialization_storage_id", Optional[int]),
("last_planned_materialization_run_id", Optional[str]),
],
)
):
def __new__(
cls,
asset_key: AssetKey,
last_materialization_record: Optional[EventLogRecord] = None,
last_run_id: Optional[str] = None,
asset_details: Optional[AssetDetails] = None,
cached_status: Optional["AssetStatusCacheValue"] = None,
last_observation_record: Optional[EventLogRecord] = None,
last_planned_materialization_storage_id: Optional[int] = None,
last_planned_materialization_run_id: Optional[str] = None,
):
from dagster._core.storage.partition_status_cache import AssetStatusCacheValue
return super(AssetEntry, cls).__new__(
cls,
asset_key=check.inst_param(asset_key, "asset_key", AssetKey),
last_materialization_record=check.opt_inst_param(
last_materialization_record, "last_materialization_record", EventLogRecord
),
last_run_id=check.opt_str_param(last_run_id, "last_run_id"),
asset_details=check.opt_inst_param(asset_details, "asset_details", AssetDetails),
cached_status=check.opt_inst_param(
cached_status, "cached_status", AssetStatusCacheValue
),
last_observation_record=check.opt_inst_param(
last_observation_record, "last_observation_record", EventLogRecord
),
last_planned_materialization_storage_id=check.opt_int_param(
last_planned_materialization_storage_id,
"last_planned_materialization_storage_id",
),
last_planned_materialization_run_id=check.opt_str_param(
last_planned_materialization_run_id,
"last_planned_materialization_run_id",
),
)
@property
def last_materialization(self) -> Optional["EventLogEntry"]:
if self.last_materialization_record is None:
return None
return self.last_materialization_record.event_log_entry
@property
def last_observation(self) -> Optional["EventLogEntry"]:
if self.last_observation_record is None:
return None
return self.last_observation_record.event_log_entry
@property
def last_materialization_storage_id(self) -> Optional[int]:
if self.last_materialization_record is None:
return None
return self.last_materialization_record.storage_id
[docs]
class AssetRecord(
NamedTuple("_NamedTuple", [("storage_id", int), ("asset_entry", AssetEntry)]),
LoadableBy[AssetKey],
):
"""Internal representation of an asset record, as stored in a :py:class:`~dagster._core.storage.event_log.EventLogStorage`.
Users should not invoke this class directly.
"""
@classmethod
def _blocking_batch_load(
cls, keys: Iterable[AssetKey], context: LoadingContext
) -> Iterable[Optional["AssetRecord"]]:
records_by_key = {
record.asset_entry.asset_key: record
for record in context.instance.get_asset_records(list(keys))
}
return [records_by_key.get(key) for key in keys]
class AssetCheckSummaryRecord(
NamedTuple(
"_AssetCheckSummaryRecord",
[
("asset_check_key", AssetCheckKey),
("last_check_execution_record", Optional[AssetCheckExecutionRecord]),
("last_run_id", Optional[str]),
],
),
LoadableBy[AssetCheckKey],
):
@classmethod
def _blocking_batch_load(
cls, keys: Iterable[AssetCheckKey], context: LoadingContext
) -> Iterable[Optional["AssetCheckSummaryRecord"]]:
records_by_key = context.instance.event_log_storage.get_asset_check_summary_records(
list(keys)
)
return [records_by_key[key] for key in keys]
class PlannedMaterializationInfo(NamedTuple):
"""Internal representation of an planned materialization event, containing storage_id / run_id.
Users should not invoke this class directly.
"""
storage_id: int
run_id: str
[docs]
class EventLogStorage(ABC, MayHaveInstanceWeakref[T_DagsterInstance]):
"""Abstract base class for storing structured event logs from pipeline runs.
Note that event log storages using SQL databases as backing stores should implement
:py:class:`~dagster._core.storage.event_log.SqlEventLogStorage`.
Users should not directly instantiate concrete subclasses of this class; they are instantiated
by internal machinery when ``dagster-webserver`` and ``dagster-graphql`` load, based on the values in the
``dagster.yaml`` file in ``$DAGSTER_HOME``. Configuration of concrete subclasses of this class
should be done by setting values in that file.
"""
def get_logs_for_run(
self,
run_id: str,
cursor: Optional[Union[str, int]] = None,
of_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None,
limit: Optional[int] = None,
ascending: bool = True,
) -> Sequence["EventLogEntry"]:
"""Get all of the logs corresponding to a run.
Args:
run_id (str): The id of the run for which to fetch logs.
cursor (Optional[Union[str, int]]): Cursor value to track paginated queries. Legacy
support for integer offset cursors.
of_type (Optional[DagsterEventType]): the dagster event type to filter the logs.
limit (Optional[int]): Max number of records to return.
"""
if isinstance(cursor, int):
deprecation_warning(
"Integer cursor values in `get_logs_for_run`",
"1.8.0",
"You can instead construct a storage_id-based string cursor e.g. `cursor = EventLogCursor.from_storage_id(storage_id).to_string()`",
)
cursor = EventLogCursor.from_offset(cursor + 1).to_string()
records = self.get_records_for_run(
run_id, cursor, of_type, limit, ascending=ascending
).records
return [record.event_log_entry for record in records]
@abstractmethod
def get_records_for_run(
self,
run_id: str,
cursor: Optional[str] = None,
of_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None,
limit: Optional[int] = None,
ascending: bool = True,
) -> EventLogConnection:
"""Get all of the event log records corresponding to a run.
Args:
run_id (str): The id of the run for which to fetch logs.
cursor (Optional[str]): Cursor value to track paginated queries.
of_type (Optional[DagsterEventType]): the dagster event type to filter the logs.
limit (Optional[int]): Max number of records to return.
"""
def get_stats_for_run(self, run_id: str) -> DagsterRunStatsSnapshot:
"""Get a summary of events that have ocurred in a run."""
return build_run_stats_from_events(run_id, self.get_logs_for_run(run_id))
def get_step_stats_for_run(
self, run_id: str, step_keys: Optional[Sequence[str]] = None
) -> Sequence[RunStepKeyStatsSnapshot]:
"""Get per-step stats for a pipeline run."""
logs = self.get_logs_for_run(run_id)
if step_keys:
logs = [
event
for event in logs
if event.is_dagster_event and event.get_dagster_event().step_key in step_keys
]
return build_run_step_stats_from_events(run_id, logs)
@abstractmethod
def store_event(self, event: "EventLogEntry") -> None:
"""Store an event corresponding to a pipeline run.
Args:
event (EventLogEntry): The event to store.
"""
def store_event_batch(self, events: Sequence["EventLogEntry"]) -> None:
for event in events:
self.store_event(event)
@abstractmethod
def delete_events(self, run_id: str) -> None:
"""Remove events for a given run id."""
@abstractmethod
def upgrade(self) -> None:
"""This method should perform any schema migrations necessary to bring an
out-of-date instance of the storage up to date.
"""
@abstractmethod
def reindex_events(self, print_fn: Optional[PrintFn] = None, force: bool = False) -> None:
"""Call this method to run any data migrations across the event_log tables."""
@abstractmethod
def reindex_assets(self, print_fn: Optional[PrintFn] = None, force: bool = False) -> None:
"""Call this method to run any data migrations across the asset tables."""
@abstractmethod
def wipe(self) -> None:
"""Clear the log storage."""
@abstractmethod
def watch(self, run_id: str, cursor: Optional[str], callback: EventHandlerFn) -> None:
"""Call this method to start watching."""
@abstractmethod
def end_watch(self, run_id: str, handler: EventHandlerFn) -> None:
"""Call this method to stop watching."""
@property
@abstractmethod
def is_persistent(self) -> bool:
"""bool: Whether the storage is persistent."""
def dispose(self) -> None:
"""Explicit lifecycle management."""
def optimize_for_webserver(self, statement_timeout: int, pool_recycle: int) -> None:
"""Allows for optimizing database connection / use in the context of a long lived webserver process."""
@abstractmethod
def get_event_records(
self,
event_records_filter: EventRecordsFilter,
limit: Optional[int] = None,
ascending: bool = False,
) -> Sequence[EventLogRecord]:
pass
def supports_event_consumer_queries(self) -> bool:
return False
def get_logs_for_all_runs_by_log_id(
self,
after_cursor: int = -1,
dagster_event_type: Optional[Union[DagsterEventType, Set[DagsterEventType]]] = None,
limit: Optional[int] = None,
) -> Mapping[int, "EventLogEntry"]:
"""Get event records across all runs. Only supported for non sharded sql storage."""
raise NotImplementedError()
def get_maximum_record_id(self) -> Optional[int]:
"""Get the current greatest record id in the event log. Only supported for non sharded sql storage."""
raise NotImplementedError()
@abstractmethod
def can_read_asset_status_cache(self) -> bool:
"""Whether the storage can access cached status information for each asset."""
pass
@abstractmethod
def can_write_asset_status_cache(self) -> bool:
"""Whether the storage is able to write to that cache."""
@abstractmethod
def wipe_asset_cached_status(self, asset_key: AssetKey) -> None:
pass
@abstractmethod
def get_asset_records(
self, asset_keys: Optional[Sequence[AssetKey]] = None
) -> Sequence[AssetRecord]:
pass
@abstractmethod
def get_asset_check_summary_records(
self, asset_check_keys: Sequence[AssetCheckKey]
) -> Mapping[AssetCheckKey, AssetCheckSummaryRecord]:
pass
@property
def asset_records_have_last_planned_materialization_storage_id(self) -> bool:
return False
@abstractmethod
def has_asset_key(self, asset_key: AssetKey) -> bool:
pass
@abstractmethod
def all_asset_keys(self) -> Sequence[AssetKey]:
pass
@abstractmethod
def update_asset_cached_status_data(
self, asset_key: AssetKey, cache_values: "AssetStatusCacheValue"
) -> None:
pass
def get_asset_keys(
self,
prefix: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
cursor: Optional[str] = None,
) -> Sequence[AssetKey]:
# base implementation of get_asset_keys, using the existing `all_asset_keys` and doing the
# filtering in-memory
asset_keys = sorted(self.all_asset_keys(), key=lambda k: k.to_string())
if prefix:
asset_keys = [
asset_key for asset_key in asset_keys if asset_key.path[: len(prefix)] == prefix
]
if cursor:
cursor_asset = AssetKey.from_db_string(cursor)
if not cursor_asset:
raise Exception(f"Invalid cursor string {cursor}")
cursor_asset_str = cursor_asset.to_string()
idx = len(asset_keys)
for i in range(len(asset_keys)):
if asset_keys[i].to_string() > cursor_asset_str:
idx = i
break
asset_keys = asset_keys[idx:]
if limit:
asset_keys = asset_keys[:limit]
return asset_keys
@abstractmethod
def get_latest_materialization_events(
self, asset_keys: Iterable[AssetKey]
) -> Mapping[AssetKey, Optional["EventLogEntry"]]:
pass
def supports_add_asset_event_tags(self) -> bool:
return False
def add_asset_event_tags(
self,
event_id: int,
event_timestamp: float,
asset_key: AssetKey,
new_tags: Mapping[str, str],
) -> None:
raise NotImplementedError()
@abstractmethod
def get_event_tags_for_asset(
self,
asset_key: AssetKey,
filter_tags: Optional[Mapping[str, str]] = None,
filter_event_id: Optional[int] = None,
) -> Sequence[Mapping[str, str]]:
pass
def get_asset_tags_to_index(self, tag_keys: Set[str]) -> Set[str]:
# make sure we update the list of tested tags in test_asset_tags_to_insert to match
return {
key
for key in tag_keys
if key == DATA_VERSION_TAG or key.startswith(MULTIDIMENSIONAL_PARTITION_PREFIX)
}
@abstractmethod
def wipe_asset(self, asset_key: AssetKey) -> None:
"""Remove asset index history from event log for given asset_key."""
@abstractmethod
def wipe_asset_partitions(self, asset_key: AssetKey, partition_keys: Sequence[str]) -> None:
"""Remove asset index history from event log for given asset partitions."""
@abstractmethod
def get_materialized_partitions(
self,
asset_key: AssetKey,
before_cursor: Optional[int] = None,
after_cursor: Optional[int] = None,
) -> Set[str]:
pass
@abstractmethod
def get_latest_storage_id_by_partition(
self,
asset_key: AssetKey,
event_type: DagsterEventType,
partitions: Optional[Set[str]] = None,
) -> Mapping[str, int]:
pass
@abstractmethod
def get_latest_tags_by_partition(
self,
asset_key: AssetKey,
event_type: DagsterEventType,
tag_keys: Sequence[str],
asset_partitions: Optional[Sequence[str]] = None,
before_cursor: Optional[int] = None,
after_cursor: Optional[int] = None,
) -> Mapping[str, Mapping[str, str]]:
pass
@abstractmethod
def get_latest_asset_partition_materialization_attempts_without_materializations(
self, asset_key: AssetKey, after_storage_id: Optional[int] = None
) -> Mapping[str, Tuple[str, int]]:
pass
@abstractmethod
def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]:
"""Get the list of partition keys for a dynamic partitions definition."""
raise NotImplementedError()
@abstractmethod
def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool:
"""Check if a dynamic partition exists."""
raise NotImplementedError()
@abstractmethod
def add_dynamic_partitions(
self, partitions_def_name: str, partition_keys: Sequence[str]
) -> None:
"""Add a partition for the specified dynamic partitions definition."""
raise NotImplementedError()
@abstractmethod
def delete_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> None:
"""Delete a partition for the specified dynamic partitions definition."""
raise NotImplementedError()
def alembic_version(self) -> Optional[AlembicVersion]:
return None
@property
def is_run_sharded(self) -> bool:
"""Indicates that the EventLogStoarge is sharded."""
return False
@property
def supports_global_concurrency_limits(self) -> bool:
"""Indicates that the EventLogStorage supports global concurrency limits."""
return False
@property
def asset_records_have_last_observation(self) -> bool:
return False
@abstractmethod
def initialize_concurrency_limit_to_default(self, concurrency_key: str) -> bool:
"""Initialize a concurrency limit to the instance default value. Is a no-op for concurrency
keys that are already configured with a limit.
"""
raise NotImplementedError()
@abstractmethod
def set_concurrency_slots(self, concurrency_key: str, num: int) -> None:
"""Allocate concurrency slots for the given concurrency key."""
raise NotImplementedError()
@abstractmethod
def delete_concurrency_limit(self, concurrency_key: str) -> None:
"""Delete concurrency limits and slots for the given concurrency key."""
raise NotImplementedError()
@abstractmethod
def get_concurrency_keys(self) -> Set[str]:
"""Get the set of concurrency limited keys."""
raise NotImplementedError()
@abstractmethod
def get_concurrency_info(self, concurrency_key: str) -> ConcurrencyKeyInfo:
"""Get concurrency info for key."""
raise NotImplementedError()
@abstractmethod
def claim_concurrency_slot(
self, concurrency_key: str, run_id: str, step_key: str, priority: Optional[int] = None
) -> ConcurrencyClaimStatus:
"""Claim concurrency slots for step."""
raise NotImplementedError()
@abstractmethod
def check_concurrency_claim(
self, concurrency_key: str, run_id: str, step_key: str
) -> ConcurrencyClaimStatus:
"""Claim concurrency slots for step."""
raise NotImplementedError()
@abstractmethod
def get_concurrency_run_ids(self) -> Set[str]:
"""Get a list of run_ids that are occupying or waiting for a concurrency key slot."""
raise NotImplementedError()
@abstractmethod
def free_concurrency_slots_for_run(self, run_id: str) -> None:
"""Frees concurrency slots for a given run."""
raise NotImplementedError()
@abstractmethod
def free_concurrency_slot_for_step(self, run_id: str, step_key: str) -> None:
"""Frees concurrency slots for a given run/step."""
raise NotImplementedError()
@property
def supports_asset_checks(self):
return True
@abstractmethod
def get_asset_check_execution_history(
self,
check_key: AssetCheckKey,
limit: int,
cursor: Optional[int] = None,
) -> Sequence[AssetCheckExecutionRecord]:
"""Get executions for one asset check, sorted by recency."""
pass
@abstractmethod
def get_latest_asset_check_execution_by_key(
self, check_keys: Sequence[AssetCheckKey]
) -> Mapping[AssetCheckKey, AssetCheckExecutionRecord]:
"""Get the latest executions for a list of asset checks."""
pass
@abstractmethod
def fetch_materializations(
self,
records_filter: Union[AssetKey, AssetRecordsFilter],
limit: int,
cursor: Optional[str] = None,
ascending: bool = False,
) -> EventRecordsResult:
raise NotImplementedError()
@abstractmethod
def fetch_observations(
self,
records_filter: Union[AssetKey, AssetRecordsFilter],
limit: int,
cursor: Optional[str] = None,
ascending: bool = False,
) -> EventRecordsResult:
raise NotImplementedError()
@abstractmethod
def fetch_run_status_changes(
self,
records_filter: Union[DagsterEventType, RunStatusChangeRecordsFilter],
limit: int,
cursor: Optional[str] = None,
ascending: bool = False,
) -> EventRecordsResult:
raise NotImplementedError()
@abstractmethod
def get_latest_planned_materialization_info(
self,
asset_key: AssetKey,
partition: Optional[str] = None,
) -> Optional[PlannedMaterializationInfo]:
raise NotImplementedError()
@abstractmethod
def get_updated_data_version_partitions(
self, asset_key: AssetKey, partitions: Iterable[str], since_storage_id: int
) -> Set[str]:
raise NotImplementedError()
@property
def handles_run_events_in_store_event(self) -> bool:
return False
def default_run_scoped_event_tailer_offset(self) -> int:
return 0
def get_asset_status_cache_values(
self,
partitions_defs_by_key: Mapping[AssetKey, Optional[PartitionsDefinition]],
context: LoadingContext,
) -> Sequence[Optional["AssetStatusCacheValue"]]:
"""Get the cached status information for each asset."""
values = []
for asset_key, partitions_def in partitions_defs_by_key.items():
values.append(
get_and_update_asset_status_cache_value(
self._instance, asset_key, partitions_def, loading_context=context
)
)
return values