import logging
import logging.config
import os
import sys
import warnings
import weakref
from abc import abstractmethod
from collections import defaultdict
from enum import Enum
from tempfile import TemporaryDirectory
from types import TracebackType
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
Callable,
Dict,
Generic,
Iterable,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Type,
Union,
cast,
)
import yaml
from typing_extensions import Protocol, Self, TypeAlias, TypeVar, runtime_checkable
import dagster._check as check
from dagster._annotations import deprecated, experimental, public
from dagster._core.definitions.asset_check_evaluation import (
AssetCheckEvaluation,
AssetCheckEvaluationPlanned,
)
from dagster._core.definitions.data_version import extract_data_provenance_from_entry
from dagster._core.definitions.events import AssetKey, AssetObservation
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.errors import (
DagsterHomeNotSetError,
DagsterInvalidInvocationError,
DagsterInvariantViolationError,
DagsterRunAlreadyExists,
DagsterRunConflict,
)
from dagster._core.instance.config import (
DAGSTER_CONFIG_YAML_FILENAME,
DEFAULT_LOCAL_CODE_SERVER_STARTUP_TIMEOUT,
get_default_tick_retention_settings,
get_tick_retention_settings,
)
from dagster._core.instance.ref import InstanceRef
from dagster._core.log_manager import get_log_record_metadata
from dagster._core.origin import JobPythonOrigin
from dagster._core.storage.dagster_run import (
IN_PROGRESS_RUN_STATUSES,
DagsterRun,
DagsterRunStatsSnapshot,
DagsterRunStatus,
JobBucket,
RunPartitionData,
RunRecord,
RunsFilter,
TagBucket,
)
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
PARENT_RUN_ID_TAG,
PARTITION_NAME_TAG,
RESUME_RETRY_TAG,
ROOT_RUN_ID_TAG,
RUN_FAILURE_REASON_TAG,
)
from dagster._serdes import ConfigurableClass
from dagster._time import get_current_datetime, get_current_timestamp
from dagster._utils import PrintFn, is_uuid, traced
from dagster._utils.error import serializable_error_info_from_exc_info
from dagster._utils.merger import merge_dicts
from dagster._utils.warnings import disable_dagster_warnings, experimental_warning
# 'airflow_execution_date' and 'is_airflow_ingest_pipeline' are hardcoded tags used in the
# airflow ingestion logic (see: dagster_pipeline_factory.py). 'airflow_execution_date' stores the
# 'execution_date' used in Airflow operator execution and 'is_airflow_ingest_pipeline' determines
# whether 'airflow_execution_date' is needed.
# https://github.com/dagster-io/dagster/issues/2403
AIRFLOW_EXECUTION_DATE_STR = "airflow_execution_date"
IS_AIRFLOW_INGEST_PIPELINE_STR = "is_airflow_ingest_pipeline"
# Our internal guts can handle empty strings for job name and run id
# However making these named constants for documentation, to encode where we are making the assumption,
# and to allow us to change this more easily in the future, provided we are disciplined about
# actually using this constants.
RUNLESS_RUN_ID = ""
RUNLESS_JOB_NAME = ""
if TYPE_CHECKING:
from dagster._core.debug import DebugRunPayload
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.base_asset_graph import BaseAssetGraph
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.repository_definition.repository_definition import (
RepositoryLoadData,
)
from dagster._core.definitions.run_request import InstigatorType
from dagster._core.event_api import (
AssetRecordsFilter,
EventHandlerFn,
RunStatusChangeRecordsFilter,
)
from dagster._core.events import (
AssetMaterialization,
DagsterEvent,
DagsterEventBatchMetadata,
DagsterEventType,
EngineEventData,
JobFailureData,
)
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.backfill import (
BulkActionsFilter,
BulkActionStatus,
PartitionBackfill,
)
from dagster._core.execution.plan.plan import ExecutionPlan
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.execution.stats import RunStepKeyStatsSnapshot
from dagster._core.launcher import RunLauncher
from dagster._core.remote_representation import (
CodeLocation,
HistoricalJob,
RemoteJob,
RemoteJobOrigin,
RemoteSensor,
)
from dagster._core.remote_representation.external import RemoteSchedule
from dagster._core.run_coordinator import RunCoordinator
from dagster._core.scheduler import Scheduler, SchedulerDebugInfo
from dagster._core.scheduler.instigation import (
InstigatorState,
InstigatorStatus,
InstigatorTick,
TickData,
TickStatus,
)
from dagster._core.secrets import SecretsLoader
from dagster._core.snap import (
ExecutionPlanSnapshot,
ExecutionStepOutputSnap,
ExecutionStepSnap,
JobSnap,
)
from dagster._core.storage.asset_check_execution_record import (
AssetCheckExecutionRecord,
AssetCheckInstanceSupport,
)
from dagster._core.storage.compute_log_manager import ComputeLogManager
from dagster._core.storage.daemon_cursor import DaemonCursorStorage
from dagster._core.storage.event_log import EventLogStorage
from dagster._core.storage.event_log.base import (
AssetRecord,
EventLogConnection,
EventLogRecord,
EventRecordsFilter,
EventRecordsResult,
PlannedMaterializationInfo,
)
from dagster._core.storage.partition_status_cache import (
AssetPartitionStatus,
AssetStatusCacheValue,
)
from dagster._core.storage.root import LocalArtifactStorage
from dagster._core.storage.runs import RunStorage
from dagster._core.storage.schedules import ScheduleStorage
from dagster._core.storage.sql import AlembicVersion
from dagster._core.workspace.context import BaseWorkspaceRequestContext
from dagster._daemon.types import DaemonHeartbeat, DaemonStatus
DagsterInstanceOverrides: TypeAlias = Mapping[str, Any]
# Sets the number of events that will be buffered before being written to the event log. Only
# applies to explicitly batched events. Currently this defaults to 0, which turns off batching
# entirely (multiple store_event calls are made instead of store_event_batch). This makes batching
# opt-in.
#
# Note that we don't store the value in the constant so that it can be changed without a process
# restart.
def _get_event_batch_size() -> int:
return int(os.getenv("DAGSTER_EVENT_BATCH_SIZE", "0"))
def _is_batch_writing_enabled() -> bool:
return _get_event_batch_size() > 0
def _check_run_equality(
pipeline_run: DagsterRun, candidate_run: DagsterRun
) -> Mapping[str, Tuple[Any, Any]]:
field_diff: Dict[str, Tuple[Any, Any]] = {}
for field in pipeline_run._fields:
expected_value = getattr(pipeline_run, field)
candidate_value = getattr(candidate_run, field)
if expected_value != candidate_value:
field_diff[field] = (expected_value, candidate_value)
return field_diff
def _format_field_diff(field_diff: Mapping[str, Tuple[Any, Any]]) -> str:
return "\n".join(
[
(
" {field_name}:\n"
+ " Expected: {expected_value}\n"
+ " Received: {candidate_value}"
).format(
field_name=field_name,
expected_value=expected_value,
candidate_value=candidate_value,
)
for field_name, (
expected_value,
candidate_value,
) in field_diff.items()
]
)
class _EventListenerLogHandler(logging.Handler):
def __init__(self, instance: "DagsterInstance"):
self._instance = instance
super(_EventListenerLogHandler, self).__init__()
def emit(self, record: logging.LogRecord) -> None:
from dagster._core.events import EngineEventData
from dagster._core.events.log import StructuredLoggerMessage, construct_event_record
record_metadata = get_log_record_metadata(record)
event = construct_event_record(
StructuredLoggerMessage(
name=record.name,
message=record.msg,
level=record.levelno,
meta=record_metadata,
record=record,
)
)
try:
self._instance.handle_new_event(
event, batch_metadata=record_metadata["dagster_event_batch_metadata"]
)
except Exception as e:
sys.stderr.write(f"Exception while writing logger call to event log: {e}\n")
if event.dagster_event:
# Swallow user-generated log failures so that the entire step/run doesn't fail, but
# raise failures writing system-generated log events since they are the source of
# truth for the state of the run
raise
elif event.run_id:
self._instance.report_engine_event(
"Exception while writing logger call to event log",
job_name=event.job_name,
run_id=event.run_id,
step_key=event.step_key,
engine_event_data=EngineEventData(
error=serializable_error_info_from_exc_info(sys.exc_info()),
),
)
class InstanceType(Enum):
PERSISTENT = "PERSISTENT"
EPHEMERAL = "EPHEMERAL"
T_DagsterInstance = TypeVar("T_DagsterInstance", bound="DagsterInstance", default="DagsterInstance")
class MayHaveInstanceWeakref(Generic[T_DagsterInstance]):
"""Mixin for classes that can have a weakref back to a Dagster instance."""
_instance_weakref: "Optional[weakref.ReferenceType[T_DagsterInstance]]"
def __init__(self):
self._instance_weakref = None
@property
def has_instance(self) -> bool:
return hasattr(self, "_instance_weakref") and (self._instance_weakref is not None)
@property
def _instance(self) -> T_DagsterInstance:
instance = (
self._instance_weakref()
# Backcompat with custom subclasses that don't call super().__init__()
# in their own __init__ implementations
if (hasattr(self, "_instance_weakref") and self._instance_weakref is not None)
else None
)
if instance is None:
raise DagsterInvariantViolationError(
"Attempted to resolve undefined DagsterInstance weakref."
)
else:
return instance
def register_instance(self, instance: T_DagsterInstance) -> None:
check.invariant(
(
# Backcompat with custom subclasses that don't call super().__init__()
# in their own __init__ implementations
not hasattr(self, "_instance_weakref") or self._instance_weakref is None
),
"Must only call initialize once",
)
# Store a weakref to avoid a circular reference / enable GC
self._instance_weakref = weakref.ref(instance)
@runtime_checkable
class DynamicPartitionsStore(Protocol):
@abstractmethod
def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]: ...
@abstractmethod
def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool: ...
[docs]
class DagsterInstance(DynamicPartitionsStore):
"""Core abstraction for managing Dagster's access to storage and other resources.
Use DagsterInstance.get() to grab the current DagsterInstance which will load based on
the values in the ``dagster.yaml`` file in ``$DAGSTER_HOME``.
Alternatively, DagsterInstance.ephemeral() can use used which provides a set of
transient in-memory components.
Configuration of this class should be done by setting values in ``$DAGSTER_HOME/dagster.yaml``.
For example, to use Postgres for dagster storage, you can write a ``dagster.yaml`` such as the
following:
.. literalinclude:: ../../../../../examples/docs_snippets/docs_snippets/deploying/dagster-pg.yaml
:caption: dagster.yaml
:language: YAML
Args:
instance_type (InstanceType): Indicates whether the instance is ephemeral or persistent.
Users should not attempt to set this value directly or in their ``dagster.yaml`` files.
local_artifact_storage (LocalArtifactStorage): The local artifact storage is used to
configure storage for any artifacts that require a local disk, such as schedules, or
when using the filesystem system storage to manage files and intermediates. By default,
this will be a :py:class:`dagster._core.storage.root.LocalArtifactStorage`. Configurable
in ``dagster.yaml`` using the :py:class:`~dagster.serdes.ConfigurableClass`
machinery.
run_storage (RunStorage): The run storage is used to store metadata about ongoing and past
pipeline runs. By default, this will be a
:py:class:`dagster._core.storage.runs.SqliteRunStorage`. Configurable in ``dagster.yaml``
using the :py:class:`~dagster.serdes.ConfigurableClass` machinery.
event_storage (EventLogStorage): Used to store the structured event logs generated by
pipeline runs. By default, this will be a
:py:class:`dagster._core.storage.event_log.SqliteEventLogStorage`. Configurable in
``dagster.yaml`` using the :py:class:`~dagster.serdes.ConfigurableClass` machinery.
compute_log_manager (Optional[ComputeLogManager]): The compute log manager handles stdout
and stderr logging for op compute functions. By default, this will be a
:py:class:`dagster._core.storage.local_compute_log_manager.LocalComputeLogManager`.
Configurable in ``dagster.yaml`` using the
:py:class:`~dagster.serdes.ConfigurableClass` machinery.
run_coordinator (Optional[RunCoordinator]): A runs coordinator may be used to manage the execution
of pipeline runs.
run_launcher (Optional[RunLauncher]): Optionally, a run launcher may be used to enable
a Dagster instance to launch pipeline runs, e.g. on a remote Kubernetes cluster, in
addition to running them locally.
settings (Optional[Dict]): Specifies certain per-instance settings,
such as feature flags. These are set in the ``dagster.yaml`` under a set of whitelisted
keys.
ref (Optional[InstanceRef]): Used by internal machinery to pass instances across process
boundaries.
"""
# Stores TemporaryDirectory instances that were created for DagsterInstance.local_temp() calls
# to be removed once the instance is garbage collected.
_TEMP_DIRS: "weakref.WeakKeyDictionary[DagsterInstance, TemporaryDirectory]" = (
weakref.WeakKeyDictionary()
)
def __init__(
self,
instance_type: InstanceType,
local_artifact_storage: "LocalArtifactStorage",
run_storage: "RunStorage",
event_storage: "EventLogStorage",
run_coordinator: Optional["RunCoordinator"],
compute_log_manager: Optional["ComputeLogManager"],
run_launcher: Optional["RunLauncher"],
scheduler: Optional["Scheduler"] = None,
schedule_storage: Optional["ScheduleStorage"] = None,
settings: Optional[Mapping[str, Any]] = None,
secrets_loader: Optional["SecretsLoader"] = None,
ref: Optional[InstanceRef] = None,
**_kwargs: Any, # we accept kwargs for forward-compat of custom instances
):
from dagster._core.launcher import RunLauncher
from dagster._core.run_coordinator import RunCoordinator
from dagster._core.scheduler import Scheduler
from dagster._core.secrets import SecretsLoader
from dagster._core.storage.compute_log_manager import ComputeLogManager
from dagster._core.storage.event_log import EventLogStorage
from dagster._core.storage.root import LocalArtifactStorage
from dagster._core.storage.runs import RunStorage
from dagster._core.storage.schedules import ScheduleStorage
self._instance_type = check.inst_param(instance_type, "instance_type", InstanceType)
self._local_artifact_storage = check.inst_param(
local_artifact_storage, "local_artifact_storage", LocalArtifactStorage
)
self._event_storage = check.inst_param(event_storage, "event_storage", EventLogStorage)
self._event_storage.register_instance(self)
self._run_storage = check.inst_param(run_storage, "run_storage", RunStorage)
self._run_storage.register_instance(self)
if compute_log_manager:
self._compute_log_manager = check.inst_param(
compute_log_manager, "compute_log_manager", ComputeLogManager
)
self._compute_log_manager.register_instance(self)
else:
check.invariant(
ref, "Compute log manager must be provided if instance is not from a ref"
)
self._compute_log_manager = None
self._scheduler = check.opt_inst_param(scheduler, "scheduler", Scheduler)
self._schedule_storage = check.opt_inst_param(
schedule_storage, "schedule_storage", ScheduleStorage
)
if self._schedule_storage:
self._schedule_storage.register_instance(self)
if run_coordinator:
self._run_coordinator = check.inst_param(
run_coordinator, "run_coordinator", RunCoordinator
)
self._run_coordinator.register_instance(self)
else:
check.invariant(ref, "Run coordinator must be provided if instance is not from a ref")
self._run_coordinator = None
if run_launcher:
self._run_launcher: Optional[RunLauncher] = check.inst_param(
run_launcher, "run_launcher", RunLauncher
)
run_launcher.register_instance(self)
else:
check.invariant(ref, "Run launcher must be provided if instance is not from a ref")
self._run_launcher = None
self._settings = check.opt_mapping_param(settings, "settings")
self._secrets_loader = check.opt_inst_param(secrets_loader, "secrets_loader", SecretsLoader)
if self._secrets_loader:
self._secrets_loader.register_instance(self)
self._ref = check.opt_inst_param(ref, "ref", InstanceRef)
self._subscribers: Dict[str, List[Callable]] = defaultdict(list)
run_monitoring_enabled = self.run_monitoring_settings.get("enabled", False)
self._run_monitoring_enabled = run_monitoring_enabled
if self.run_monitoring_enabled and self.run_monitoring_max_resume_run_attempts:
check.invariant(
self.run_launcher.supports_resume_run,
"The configured run launcher does not support resuming runs. Set"
" max_resume_run_attempts to 0 to use run monitoring. Any runs with a failed"
" run worker will be marked as failed, but will not be resumed.",
)
if self.run_retries_enabled:
check.invariant(
self.event_log_storage.supports_event_consumer_queries(),
"Run retries are enabled, but the configured event log storage does not support"
" them. Consider switching to Postgres or Mysql.",
)
# Used for batched event handling
self._event_buffer: Dict[str, List[EventLogEntry]] = defaultdict(list)
# ctors
[docs]
@public
@staticmethod
def ephemeral(
tempdir: Optional[str] = None,
preload: Optional[Sequence["DebugRunPayload"]] = None,
settings: Optional[Dict] = None,
) -> "DagsterInstance":
"""Create a `DagsterInstance` suitable for ephemeral execution, useful in test contexts. An
ephemeral instance uses mostly in-memory components. Use `local_temp` to create a test
instance that is fully persistent.
Args:
tempdir (Optional[str]): The path of a directory to be used for local artifact storage.
preload (Optional[Sequence[DebugRunPayload]]): A sequence of payloads to load into the
instance's run storage. Useful for debugging.
settings (Optional[Dict]): Settings for the instance.
Returns:
DagsterInstance: An ephemeral DagsterInstance.
"""
from dagster._core.launcher.sync_in_memory_run_launcher import SyncInMemoryRunLauncher
from dagster._core.run_coordinator import DefaultRunCoordinator
from dagster._core.storage.event_log import InMemoryEventLogStorage
from dagster._core.storage.noop_compute_log_manager import NoOpComputeLogManager
from dagster._core.storage.root import LocalArtifactStorage, TemporaryLocalArtifactStorage
from dagster._core.storage.runs import InMemoryRunStorage
if tempdir is not None:
local_storage = LocalArtifactStorage(tempdir)
else:
local_storage = TemporaryLocalArtifactStorage()
return DagsterInstance(
instance_type=InstanceType.EPHEMERAL,
local_artifact_storage=local_storage,
run_storage=InMemoryRunStorage(preload=preload),
event_storage=InMemoryEventLogStorage(preload=preload),
compute_log_manager=NoOpComputeLogManager(),
run_coordinator=DefaultRunCoordinator(),
run_launcher=SyncInMemoryRunLauncher(),
settings=settings,
)
[docs]
@public
@staticmethod
def get() -> "DagsterInstance":
"""Get the current `DagsterInstance` as specified by the ``DAGSTER_HOME`` environment variable.
Returns:
DagsterInstance: The current DagsterInstance.
"""
dagster_home_path = os.getenv("DAGSTER_HOME")
if not dagster_home_path:
raise DagsterHomeNotSetError(
"The environment variable $DAGSTER_HOME is not set. \nDagster requires this"
" environment variable to be set to an existing directory in your filesystem. This"
" directory is used to store metadata across sessions, or load the dagster.yaml"
" file which can configure storing metadata in an external database.\nYou can"
" resolve this error by exporting the environment variable. For example, you can"
" run the following command in your shell or include it in your shell configuration"
' file:\n\texport DAGSTER_HOME=~"/dagster_home"\nor PowerShell\n$env:DAGSTER_HOME'
" = ($home + '\\dagster_home')or batchset"
" DAGSTER_HOME=%UserProfile%/dagster_homeAlternatively, DagsterInstance.ephemeral()"
" can be used for a transient instance.\n"
)
dagster_home_path = os.path.expanduser(dagster_home_path)
if not os.path.isabs(dagster_home_path):
raise DagsterInvariantViolationError(
(
f'$DAGSTER_HOME "{dagster_home_path}" must be an absolute path. Dagster requires this '
"environment variable to be set to an existing directory in your filesystem."
)
)
if not (os.path.exists(dagster_home_path) and os.path.isdir(dagster_home_path)):
raise DagsterInvariantViolationError(
(
f'$DAGSTER_HOME "{dagster_home_path}" is not a directory or does not exist. Dagster requires this'
" environment variable to be set to an existing directory in your filesystem"
)
)
return DagsterInstance.from_config(dagster_home_path)
[docs]
@public
@staticmethod
def local_temp(
tempdir: Optional[str] = None,
overrides: Optional[DagsterInstanceOverrides] = None,
) -> "DagsterInstance":
"""Create a DagsterInstance that uses a temporary directory for local storage. This is a
regular, fully persistent instance. Use `ephemeral` to get an ephemeral instance with
in-memory components.
Args:
tempdir (Optional[str]): The path of a directory to be used for local artifact storage.
overrides (Optional[DagsterInstanceOverrides]): Override settings for the instance.
Returns:
DagsterInstance
"""
if tempdir is None:
created_dir = TemporaryDirectory()
i = DagsterInstance.from_ref(
InstanceRef.from_dir(created_dir.name, overrides=overrides)
)
DagsterInstance._TEMP_DIRS[i] = created_dir
return i
return DagsterInstance.from_ref(InstanceRef.from_dir(tempdir, overrides=overrides))
@staticmethod
def from_config(
config_dir: str,
config_filename: str = DAGSTER_CONFIG_YAML_FILENAME,
) -> "DagsterInstance":
instance_ref = InstanceRef.from_dir(config_dir, config_filename=config_filename)
return DagsterInstance.from_ref(instance_ref)
@staticmethod
def from_ref(instance_ref: InstanceRef) -> "DagsterInstance":
check.inst_param(instance_ref, "instance_ref", InstanceRef)
# DagsterInstance doesn't implement ConfigurableClass, but we may still sometimes want to
# have custom subclasses of DagsterInstance. This machinery allows for those custom
# subclasses to receive additional keyword arguments passed through the config YAML.
klass = instance_ref.custom_instance_class or DagsterInstance
kwargs = instance_ref.custom_instance_class_config
unified_storage = instance_ref.storage
run_storage = unified_storage.run_storage if unified_storage else instance_ref.run_storage
event_storage = (
unified_storage.event_log_storage if unified_storage else instance_ref.event_storage
)
schedule_storage = (
unified_storage.schedule_storage if unified_storage else instance_ref.schedule_storage
)
return klass(
instance_type=InstanceType.PERSISTENT,
local_artifact_storage=instance_ref.local_artifact_storage,
run_storage=run_storage, # type: ignore # (possible none)
event_storage=event_storage, # type: ignore # (possible none)
schedule_storage=schedule_storage,
compute_log_manager=None, # lazy load
scheduler=instance_ref.scheduler,
run_coordinator=None, # lazy load
run_launcher=None, # lazy load
settings=instance_ref.settings,
secrets_loader=instance_ref.secrets_loader,
ref=instance_ref,
**kwargs,
)
# flags
@property
def is_persistent(self) -> bool:
return self._instance_type == InstanceType.PERSISTENT
@property
def is_ephemeral(self) -> bool:
return self._instance_type == InstanceType.EPHEMERAL
def get_ref(self) -> InstanceRef:
if self._ref:
return self._ref
check.failed(
"Attempted to prepare an ineligible DagsterInstance ({inst_type}) for cross "
"process communication.{dagster_home_msg}".format(
inst_type=self._instance_type,
dagster_home_msg=(
"\nDAGSTER_HOME environment variable is not set, set it to "
"a directory on the filesystem for dagster to use for storage and cross "
"process coordination."
if os.getenv("DAGSTER_HOME") is None
else ""
),
)
)
@property
def root_directory(self) -> str:
return self._local_artifact_storage.base_dir
def _info(self, component: object) -> Union[str, Mapping[Any, Any]]:
# ConfigurableClass may not have inst_data if it's a direct instantiation
# which happens for ephemeral instances
if isinstance(component, ConfigurableClass) and component.inst_data:
return component.inst_data.info_dict()
if type(component) is dict:
return component
return component.__class__.__name__
def _info_str_for_component(self, component_name: str, component: object) -> str:
return yaml.dump(
{component_name: self._info(component)}, default_flow_style=False, sort_keys=False
)
def info_dict(self) -> Mapping[str, object]:
settings: Mapping[str, object] = self._settings if self._settings else {}
ret = {
"local_artifact_storage": self._info(self._local_artifact_storage),
"run_storage": self._info(self._run_storage),
"event_log_storage": self._info(self._event_storage),
"compute_logs": self._info(self._compute_log_manager),
"schedule_storage": self._info(self._schedule_storage),
"scheduler": self._info(self._scheduler),
"run_coordinator": self._info(self._run_coordinator),
"run_launcher": self._info(self.run_launcher),
}
ret.update(
{
settings_key: self._info(settings_value)
for settings_key, settings_value in settings.items()
}
)
return ret
def info_str(self) -> str:
return yaml.dump(self.info_dict(), default_flow_style=False, sort_keys=False)
def schema_str(self) -> str:
def _schema_dict(alembic_version: "AlembicVersion") -> Optional[Mapping[str, object]]:
if not alembic_version:
return None
db_revision, head_revision = alembic_version
return {
"current": db_revision,
"latest": head_revision,
}
return yaml.dump(
{
"schema": {
"event_log_storage": _schema_dict(self._event_storage.alembic_version()), # type: ignore # (possible none)
"run_storage": _schema_dict(self._event_storage.alembic_version()), # type: ignore # (possible none)
"schedule_storage": _schema_dict(self._event_storage.alembic_version()), # type: ignore # (possible none)
}
},
default_flow_style=False,
sort_keys=False,
)
@property
def run_storage(self) -> "RunStorage":
return self._run_storage
@property
def event_log_storage(self) -> "EventLogStorage":
return self._event_storage
@property
def daemon_cursor_storage(self) -> "DaemonCursorStorage":
return self._run_storage
# schedule storage
@property
def schedule_storage(self) -> Optional["ScheduleStorage"]:
return self._schedule_storage
@property
def scheduler(self) -> Optional["Scheduler"]:
return self._scheduler
@property
def scheduler_class(self) -> Optional[str]:
return self.scheduler.__class__.__name__ if self.scheduler else None
# run coordinator
@property
def run_coordinator(self) -> "RunCoordinator":
# Lazily load in case the run coordinator requires dependencies that are not available
# everywhere that loads the instance
if not self._run_coordinator:
check.invariant(
self._ref, "Run coordinator not provided, and no instance ref available"
)
run_coordinator = cast(InstanceRef, self._ref).run_coordinator
check.invariant(run_coordinator, "Run coordinator not configured in instance ref")
self._run_coordinator = cast("RunCoordinator", run_coordinator)
self._run_coordinator.register_instance(self)
return self._run_coordinator
# run launcher
@property
def run_launcher(self) -> "RunLauncher":
# Lazily load in case the launcher requires dependencies that are not available everywhere
# that loads the instance (e.g. The EcsRunLauncher requires boto3)
if not self._run_launcher:
check.invariant(self._ref, "Run launcher not provided, and no instance ref available")
launcher = cast(InstanceRef, self._ref).run_launcher
check.invariant(launcher, "Run launcher not configured in instance ref")
self._run_launcher = cast("RunLauncher", launcher)
self._run_launcher.register_instance(self)
return self._run_launcher
# compute logs
@property
def compute_log_manager(self) -> "ComputeLogManager":
if not self._compute_log_manager:
check.invariant(
self._ref, "Compute log manager not provided, and no instance ref available"
)
compute_log_manager = cast(InstanceRef, self._ref).compute_log_manager
check.invariant(
compute_log_manager, "Compute log manager not configured in instance ref"
)
self._compute_log_manager = cast("ComputeLogManager", compute_log_manager)
self._compute_log_manager.register_instance(self)
return self._compute_log_manager
def get_settings(self, settings_key: str) -> Any:
check.str_param(settings_key, "settings_key")
if self._settings and settings_key in self._settings:
return self._settings.get(settings_key)
return {}
def get_scheduler_settings(self) -> Mapping[str, Any]:
return self.get_settings("schedules")
def get_sensor_settings(self) -> Mapping[str, Any]:
return self.get_settings("sensors")
def get_auto_materialize_settings(self) -> Mapping[str, Any]:
return self.get_settings("auto_materialize")
@property
def telemetry_enabled(self) -> bool:
if self.is_ephemeral:
return False
dagster_telemetry_enabled_default = True
telemetry_settings = self.get_settings("telemetry")
if not telemetry_settings:
return dagster_telemetry_enabled_default
if "enabled" in telemetry_settings:
return telemetry_settings["enabled"]
else:
return dagster_telemetry_enabled_default
@property
def nux_enabled(self) -> bool:
if self.is_ephemeral:
return False
nux_enabled_by_default = True
nux_settings = self.get_settings("nux")
if not nux_settings:
return nux_enabled_by_default
if "enabled" in nux_settings:
return nux_settings["enabled"]
else:
return nux_enabled_by_default
# run monitoring
@property
def run_monitoring_enabled(self) -> bool:
return self._run_monitoring_enabled
@property
def run_monitoring_settings(self) -> Any:
return self.get_settings("run_monitoring")
@property
def run_monitoring_start_timeout_seconds(self) -> int:
return self.run_monitoring_settings.get("start_timeout_seconds", 180)
@property
def run_monitoring_cancel_timeout_seconds(self) -> int:
return self.run_monitoring_settings.get("cancel_timeout_seconds", 180)
@property
def run_monitoring_max_runtime_seconds(self) -> int:
return self.run_monitoring_settings.get("max_runtime_seconds", 0)
@property
def code_server_settings(self) -> Any:
return self.get_settings("code_servers")
@property
def code_server_process_startup_timeout(self) -> int:
return self.code_server_settings.get(
"local_startup_timeout", DEFAULT_LOCAL_CODE_SERVER_STARTUP_TIMEOUT
)
@property
def code_server_reload_timeout(self) -> int:
return self.code_server_settings.get(
"reload_timeout", DEFAULT_LOCAL_CODE_SERVER_STARTUP_TIMEOUT
)
@property
def wait_for_local_code_server_processes_on_shutdown(self) -> bool:
return self.code_server_settings.get("wait_for_local_processes_on_shutdown", False)
@property
def run_monitoring_max_resume_run_attempts(self) -> int:
return self.run_monitoring_settings.get("max_resume_run_attempts", 0)
@property
def run_monitoring_poll_interval_seconds(self) -> int:
return self.run_monitoring_settings.get("poll_interval_seconds", 120)
@property
def cancellation_thread_poll_interval_seconds(self) -> int:
return self.get_settings("run_monitoring").get(
"cancellation_thread_poll_interval_seconds", 10
)
@property
def run_retries_enabled(self) -> bool:
return self.get_settings("run_retries").get("enabled", False)
@property
def run_retries_max_retries(self) -> int:
return self.get_settings("run_retries").get("max_retries", 0)
@property
def auto_materialize_enabled(self) -> bool:
return self.get_settings("auto_materialize").get("enabled", True)
@property
def auto_materialize_minimum_interval_seconds(self) -> int:
return self.get_settings("auto_materialize").get("minimum_interval_seconds")
@property
def auto_materialize_run_tags(self) -> Dict[str, str]:
return self.get_settings("auto_materialize").get("run_tags", {})
@property
def auto_materialize_respect_materialization_data_versions(self) -> bool:
return self.get_settings("auto_materialize").get(
"respect_materialization_data_versions", False
)
@property
def auto_materialize_max_tick_retries(self) -> int:
return self.get_settings("auto_materialize").get("max_tick_retries", 3)
@property
def auto_materialize_use_sensors(self) -> int:
return self.get_settings("auto_materialize").get("use_sensors", True)
@property
def global_op_concurrency_default_limit(self) -> Optional[int]:
return self.get_settings("concurrency").get("default_op_concurrency_limit")
# python logs
@property
def managed_python_loggers(self) -> Sequence[str]:
python_log_settings = self.get_settings("python_logs") or {}
loggers: Sequence[str] = python_log_settings.get("managed_python_loggers", [])
return loggers
@property
def python_log_level(self) -> Optional[str]:
python_log_settings = self.get_settings("python_logs") or {}
return python_log_settings.get("python_log_level")
def upgrade(self, print_fn: Optional[PrintFn] = None) -> None:
from dagster._core.storage.migration.utils import upgrading_instance
with upgrading_instance(self):
if print_fn:
print_fn("Updating run storage...")
self._run_storage.upgrade() # type: ignore # (unknown method on run storage)
self._run_storage.migrate(print_fn)
if print_fn:
print_fn("Updating event storage...")
self._event_storage.upgrade()
self._event_storage.reindex_assets(print_fn=print_fn)
if print_fn:
print_fn("Updating schedule storage...")
self._schedule_storage.upgrade() # type: ignore # (possible none)
self._schedule_storage.migrate(print_fn) # type: ignore # (possible none)
def optimize_for_webserver(self, statement_timeout: int, pool_recycle: int) -> None:
if self._schedule_storage:
self._schedule_storage.optimize_for_webserver(
statement_timeout=statement_timeout, pool_recycle=pool_recycle
)
self._run_storage.optimize_for_webserver(
statement_timeout=statement_timeout, pool_recycle=pool_recycle
)
self._event_storage.optimize_for_webserver(
statement_timeout=statement_timeout, pool_recycle=pool_recycle
)
def reindex(self, print_fn: PrintFn = lambda _: None) -> None:
print_fn("Checking for reindexing...")
self._event_storage.reindex_events(print_fn)
self._event_storage.reindex_assets(print_fn)
self._run_storage.optimize(print_fn)
self._schedule_storage.optimize(print_fn) # type: ignore # (possible none)
print_fn("Done.")
def dispose(self) -> None:
self._local_artifact_storage.dispose()
self._run_storage.dispose()
if self._run_coordinator:
self._run_coordinator.dispose()
if self._run_launcher:
self._run_launcher.dispose()
self._event_storage.dispose()
if self._compute_log_manager:
self._compute_log_manager.dispose()
if self._secrets_loader:
self._secrets_loader.dispose()
if self in DagsterInstance._TEMP_DIRS:
DagsterInstance._TEMP_DIRS[self].cleanup()
del DagsterInstance._TEMP_DIRS[self]
# run storage
[docs]
@public
def get_run_by_id(self, run_id: str) -> Optional[DagsterRun]:
"""Get a :py:class:`DagsterRun` matching the provided `run_id`.
Args:
run_id (str): The id of the run to retrieve.
Returns:
Optional[DagsterRun]: The run corresponding to the given id. If no run matching the id
is found, return `None`.
"""
record = self.get_run_record_by_id(run_id)
if record is None:
return None
return record.dagster_run
[docs]
@public
@traced
def get_run_record_by_id(self, run_id: str) -> Optional[RunRecord]:
"""Get a :py:class:`RunRecord` matching the provided `run_id`.
Args:
run_id (str): The id of the run record to retrieve.
Returns:
Optional[RunRecord]: The run record corresponding to the given id. If no run matching
the id is found, return `None`.
"""
if not run_id:
return None
records = self._run_storage.get_run_records(RunsFilter(run_ids=[run_id]), limit=1)
if not records:
return None
return records[0]
@traced
def get_job_snapshot(self, snapshot_id: str) -> "JobSnap":
return self._run_storage.get_job_snapshot(snapshot_id)
@traced
def has_job_snapshot(self, snapshot_id: str) -> bool:
return self._run_storage.has_job_snapshot(snapshot_id)
@traced
def has_snapshot(self, snapshot_id: str) -> bool:
return self._run_storage.has_snapshot(snapshot_id)
@traced
def get_historical_job(self, snapshot_id: str) -> "HistoricalJob":
from dagster._core.remote_representation import HistoricalJob
snapshot = self._run_storage.get_job_snapshot(snapshot_id)
parent_snapshot = (
self._run_storage.get_job_snapshot(snapshot.lineage_snapshot.parent_snapshot_id)
if snapshot.lineage_snapshot
else None
)
return HistoricalJob(snapshot, snapshot_id, parent_snapshot)
@traced
def has_historical_job(self, snapshot_id: str) -> bool:
return self._run_storage.has_job_snapshot(snapshot_id)
@traced
def get_execution_plan_snapshot(self, snapshot_id: str) -> "ExecutionPlanSnapshot":
return self._run_storage.get_execution_plan_snapshot(snapshot_id)
@traced
def get_run_stats(self, run_id: str) -> DagsterRunStatsSnapshot:
return self._event_storage.get_stats_for_run(run_id)
@traced
def get_run_step_stats(
self, run_id: str, step_keys: Optional[Sequence[str]] = None
) -> Sequence["RunStepKeyStatsSnapshot"]:
return self._event_storage.get_step_stats_for_run(run_id, step_keys)
@traced
def get_run_tags(
self,
tag_keys: Sequence[str],
value_prefix: Optional[str] = None,
limit: Optional[int] = None,
) -> Sequence[Tuple[str, Set[str]]]:
return self._run_storage.get_run_tags(
tag_keys=tag_keys, value_prefix=value_prefix, limit=limit
)
@traced
def get_run_tag_keys(self) -> Sequence[str]:
return self._run_storage.get_run_tag_keys()
@traced
def get_run_group(self, run_id: str) -> Optional[Tuple[str, Sequence[DagsterRun]]]:
return self._run_storage.get_run_group(run_id)
def create_run_for_job(
self,
job_def: "JobDefinition",
execution_plan: Optional["ExecutionPlan"] = None,
run_id: Optional[str] = None,
run_config: Optional[Mapping[str, object]] = None,
resolved_op_selection: Optional[AbstractSet[str]] = None,
status: Optional[Union[DagsterRunStatus, str]] = None,
tags: Optional[Mapping[str, str]] = None,
root_run_id: Optional[str] = None,
parent_run_id: Optional[str] = None,
op_selection: Optional[Sequence[str]] = None,
asset_selection: Optional[AbstractSet[AssetKey]] = None,
remote_job_origin: Optional["RemoteJobOrigin"] = None,
job_code_origin: Optional[JobPythonOrigin] = None,
repository_load_data: Optional["RepositoryLoadData"] = None,
) -> DagsterRun:
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.execution.api import create_execution_plan
from dagster._core.execution.plan.plan import ExecutionPlan
from dagster._core.snap import snapshot_from_execution_plan
check.inst_param(job_def, "pipeline_def", JobDefinition)
check.opt_inst_param(execution_plan, "execution_plan", ExecutionPlan)
# note that op_selection is required to execute the solid subset, which is the
# frozenset version of the previous solid_subset.
# op_selection is not required and will not be converted to op_selection here.
# i.e. this function doesn't handle solid queries.
# op_selection is only used to pass the user queries further down.
check.opt_set_param(resolved_op_selection, "resolved_op_selection", of_type=str)
check.opt_list_param(op_selection, "op_selection", of_type=str)
check.opt_set_param(asset_selection, "asset_selection", of_type=AssetKey)
# op_selection never provided
if asset_selection or op_selection:
# for cases when `create_run_for_pipeline` is directly called
job_def = job_def.get_subset(
asset_selection=asset_selection,
op_selection=op_selection,
)
if not execution_plan:
execution_plan = create_execution_plan(
job=job_def,
run_config=run_config,
instance_ref=self.get_ref() if self.is_persistent else None,
tags=tags,
repository_load_data=repository_load_data,
)
return self.create_run(
job_name=job_def.name,
run_id=run_id,
run_config=run_config,
op_selection=op_selection,
asset_selection=asset_selection,
asset_check_selection=None,
resolved_op_selection=resolved_op_selection,
step_keys_to_execute=execution_plan.step_keys_to_execute,
status=DagsterRunStatus(status) if status else None,
tags=tags,
root_run_id=root_run_id,
parent_run_id=parent_run_id,
job_snapshot=job_def.get_job_snapshot(),
execution_plan_snapshot=snapshot_from_execution_plan(
execution_plan,
job_def.get_job_snapshot_id(),
),
parent_job_snapshot=job_def.get_parent_job_snapshot(),
remote_job_origin=remote_job_origin,
job_code_origin=job_code_origin,
asset_graph=job_def.asset_layer.asset_graph,
)
def _construct_run_with_snapshots(
self,
job_name: str,
run_id: str,
run_config: Optional[Mapping[str, object]],
resolved_op_selection: Optional[AbstractSet[str]],
step_keys_to_execute: Optional[Sequence[str]],
status: Optional[DagsterRunStatus],
tags: Mapping[str, str],
root_run_id: Optional[str],
parent_run_id: Optional[str],
job_snapshot: Optional["JobSnap"],
execution_plan_snapshot: Optional["ExecutionPlanSnapshot"],
parent_job_snapshot: Optional["JobSnap"],
asset_selection: Optional[AbstractSet[AssetKey]] = None,
asset_check_selection: Optional[AbstractSet["AssetCheckKey"]] = None,
op_selection: Optional[Sequence[str]] = None,
remote_job_origin: Optional["RemoteJobOrigin"] = None,
job_code_origin: Optional[JobPythonOrigin] = None,
) -> DagsterRun:
# https://github.com/dagster-io/dagster/issues/2403
if tags and IS_AIRFLOW_INGEST_PIPELINE_STR in tags:
if AIRFLOW_EXECUTION_DATE_STR not in tags:
tags = {
**tags,
AIRFLOW_EXECUTION_DATE_STR: get_current_datetime().isoformat(),
}
check.invariant(
not (not job_snapshot and execution_plan_snapshot),
"It is illegal to have an execution plan snapshot and not have a pipeline snapshot."
" It is possible to have no execution plan snapshot since we persist runs that do"
" not successfully compile execution plans in the scheduled case.",
)
job_snapshot_id = (
self._ensure_persisted_job_snapshot(job_snapshot, parent_job_snapshot)
if job_snapshot
else None
)
execution_plan_snapshot_id = (
self._ensure_persisted_execution_plan_snapshot(
execution_plan_snapshot, job_snapshot_id, step_keys_to_execute
)
if execution_plan_snapshot and job_snapshot_id
else None
)
if execution_plan_snapshot:
from dagster._core.op_concurrency_limits_counter import (
compute_run_op_concurrency_info_for_snapshot,
)
run_op_concurrency = compute_run_op_concurrency_info_for_snapshot(
execution_plan_snapshot
)
else:
run_op_concurrency = None
return DagsterRun(
job_name=job_name,
run_id=run_id,
run_config=run_config,
asset_selection=asset_selection,
asset_check_selection=asset_check_selection,
op_selection=op_selection,
resolved_op_selection=resolved_op_selection,
step_keys_to_execute=step_keys_to_execute,
status=status,
tags=tags,
root_run_id=root_run_id,
parent_run_id=parent_run_id,
job_snapshot_id=job_snapshot_id,
execution_plan_snapshot_id=execution_plan_snapshot_id,
remote_job_origin=remote_job_origin,
job_code_origin=job_code_origin,
has_repository_load_data=execution_plan_snapshot is not None
and execution_plan_snapshot.repository_load_data is not None,
run_op_concurrency=run_op_concurrency,
)
def _ensure_persisted_job_snapshot(
self,
job_snapshot: "JobSnap",
parent_job_snapshot: "Optional[JobSnap]",
) -> str:
from dagster._core.snap import JobSnap, create_job_snapshot_id
check.inst_param(job_snapshot, "job_snapshot", JobSnap)
check.opt_inst_param(parent_job_snapshot, "parent_job_snapshot", JobSnap)
if job_snapshot.lineage_snapshot:
parent_snapshot_id = create_job_snapshot_id(check.not_none(parent_job_snapshot))
if job_snapshot.lineage_snapshot.parent_snapshot_id != parent_snapshot_id:
warnings.warn(
f"Stored parent snapshot ID {parent_snapshot_id} did not match the parent snapshot ID {job_snapshot.lineage_snapshot.parent_snapshot_id} on the subsetted job"
)
if not self._run_storage.has_job_snapshot(parent_snapshot_id):
self._run_storage.add_job_snapshot(
check.not_none(parent_job_snapshot), parent_snapshot_id
)
job_snapshot_id = create_job_snapshot_id(job_snapshot)
if not self._run_storage.has_job_snapshot(job_snapshot_id):
returned_job_snapshot_id = self._run_storage.add_job_snapshot(job_snapshot)
check.invariant(job_snapshot_id == returned_job_snapshot_id)
return job_snapshot_id
def _ensure_persisted_execution_plan_snapshot(
self,
execution_plan_snapshot: "ExecutionPlanSnapshot",
job_snapshot_id: str,
step_keys_to_execute: Optional[Sequence[str]],
) -> str:
from dagster._core.snap.execution_plan_snapshot import (
ExecutionPlanSnapshot,
create_execution_plan_snapshot_id,
)
check.inst_param(execution_plan_snapshot, "execution_plan_snapshot", ExecutionPlanSnapshot)
check.str_param(job_snapshot_id, "job_snapshot_id")
check.opt_nullable_sequence_param(step_keys_to_execute, "step_keys_to_execute", of_type=str)
check.invariant(
execution_plan_snapshot.job_snapshot_id == job_snapshot_id,
"Snapshot mismatch: Snapshot ID in execution plan snapshot is "
f'"{execution_plan_snapshot.job_snapshot_id}" and snapshot_id created in memory is '
f'"{job_snapshot_id}"',
)
execution_plan_snapshot_id = create_execution_plan_snapshot_id(execution_plan_snapshot)
if not self._run_storage.has_execution_plan_snapshot(execution_plan_snapshot_id):
returned_execution_plan_snapshot_id = self._run_storage.add_execution_plan_snapshot(
execution_plan_snapshot
)
check.invariant(execution_plan_snapshot_id == returned_execution_plan_snapshot_id)
return execution_plan_snapshot_id
def _log_materialization_planned_event_for_asset(
self,
dagster_run: DagsterRun,
asset_key: AssetKey,
job_name: str,
step: "ExecutionStepSnap",
output: "ExecutionStepOutputSnap",
asset_graph: Optional["BaseAssetGraph"],
) -> None:
from dagster._core.definitions.partition import DynamicPartitionsDefinition
from dagster._core.events import AssetMaterializationPlannedData, DagsterEvent
partition_tag = dagster_run.tags.get(PARTITION_NAME_TAG)
partition_range_start, partition_range_end = (
dagster_run.tags.get(ASSET_PARTITION_RANGE_START_TAG),
dagster_run.tags.get(ASSET_PARTITION_RANGE_END_TAG),
)
if partition_tag and (partition_range_start or partition_range_end):
raise DagsterInvariantViolationError(
f"Cannot have {ASSET_PARTITION_RANGE_START_TAG} or"
f" {ASSET_PARTITION_RANGE_END_TAG} set along with"
f" {PARTITION_NAME_TAG}"
)
partitions_subset = None
if partition_range_start or partition_range_end:
if not partition_range_start or not partition_range_end:
raise DagsterInvariantViolationError(
f"Cannot have {ASSET_PARTITION_RANGE_START_TAG} or"
f" {ASSET_PARTITION_RANGE_END_TAG} set without the other"
)
if asset_graph is None:
raise DagsterInvariantViolationError(
"Must provide asset_graph to create_run when creating "
"a run with a partition range."
)
partitions_def = asset_graph.get(asset_key).partitions_def
if (
isinstance(partitions_def, DynamicPartitionsDefinition)
and partitions_def.name is None
):
raise DagsterInvariantViolationError(
"Creating a run targeting a partition range is not supported for assets partitioned with function-based dynamic partitions"
)
if partitions_def is not None:
partitions_subset = partitions_def.subset_with_partition_keys(
partitions_def.get_partition_keys_in_range(
PartitionKeyRange(partition_range_start, partition_range_end),
dynamic_partitions_store=self,
)
).to_serializable_subset()
partition = (
partition_tag if check.not_none(output.properties).is_asset_partitioned else None
)
materialization_planned = DagsterEvent.build_asset_materialization_planned_event(
job_name,
step.key,
AssetMaterializationPlannedData(
asset_key, partition=partition, partitions_subset=partitions_subset
),
)
self.report_dagster_event(materialization_planned, dagster_run.run_id, logging.DEBUG)
def _log_asset_planned_events(
self,
dagster_run: DagsterRun,
execution_plan_snapshot: "ExecutionPlanSnapshot",
asset_graph: Optional["BaseAssetGraph"],
) -> None:
from dagster._core.events import DagsterEvent, DagsterEventType
job_name = dagster_run.job_name
for step in execution_plan_snapshot.steps:
if step.key in execution_plan_snapshot.step_keys_to_execute:
for output in step.outputs:
asset_key = check.not_none(output.properties).asset_key
if asset_key:
self._log_materialization_planned_event_for_asset(
dagster_run, asset_key, job_name, step, output, asset_graph
)
if check.not_none(output.properties).asset_check_key:
asset_check_key = check.not_none(
check.not_none(output.properties).asset_check_key
)
target_asset_key = asset_check_key.asset_key
check_name = asset_check_key.name
event = DagsterEvent(
event_type_value=DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED.value,
job_name=job_name,
message=(
f"{job_name} intends to execute asset check {check_name} on"
f" asset {target_asset_key.to_string()}"
),
event_specific_data=AssetCheckEvaluationPlanned(
target_asset_key,
check_name=check_name,
),
step_key=step.key,
)
self.report_dagster_event(event, dagster_run.run_id, logging.DEBUG)
def create_run(
self,
*,
job_name: str,
run_id: Optional[str],
run_config: Optional[Mapping[str, object]],
status: Optional[DagsterRunStatus],
tags: Optional[Mapping[str, Any]],
root_run_id: Optional[str],
parent_run_id: Optional[str],
step_keys_to_execute: Optional[Sequence[str]],
execution_plan_snapshot: Optional["ExecutionPlanSnapshot"],
job_snapshot: Optional["JobSnap"],
parent_job_snapshot: Optional["JobSnap"],
asset_selection: Optional[AbstractSet[AssetKey]],
asset_check_selection: Optional[AbstractSet["AssetCheckKey"]],
resolved_op_selection: Optional[AbstractSet[str]],
op_selection: Optional[Sequence[str]],
remote_job_origin: Optional["RemoteJobOrigin"],
job_code_origin: Optional[JobPythonOrigin],
asset_graph: Optional["BaseAssetGraph"],
) -> DagsterRun:
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.remote_representation.origin import RemoteJobOrigin
from dagster._core.snap import ExecutionPlanSnapshot, JobSnap
from dagster._utils.tags import normalize_tags
check.str_param(job_name, "job_name")
check.opt_str_param(
run_id, "run_id"
) # will be assigned to make_new_run_id() lower in callstack
check.opt_mapping_param(run_config, "run_config", key_type=str)
check.opt_inst_param(status, "status", DagsterRunStatus)
check.opt_mapping_param(tags, "tags", key_type=str)
with disable_dagster_warnings():
validated_tags = normalize_tags(tags)
check.opt_str_param(root_run_id, "root_run_id")
check.opt_str_param(parent_run_id, "parent_run_id")
# If step_keys_to_execute is None, then everything is executed. In some cases callers
# are still exploding and sending the full list of step keys even though that is
# unnecessary.
check.opt_sequence_param(step_keys_to_execute, "step_keys_to_execute")
check.opt_inst_param(
execution_plan_snapshot, "execution_plan_snapshot", ExecutionPlanSnapshot
)
if run_id and not is_uuid(run_id):
check.failed(f"run_id must be a valid UUID. Got {run_id}")
if root_run_id or parent_run_id:
check.invariant(
root_run_id and parent_run_id,
"If root_run_id or parent_run_id is passed, this is a re-execution scenario and"
" root_run_id and parent_run_id must both be passed.",
)
# The job_snapshot should always be set in production scenarios. In tests
# we have sometimes omitted it out of convenience.
check.opt_inst_param(job_snapshot, "job_snapshot", JobSnap)
check.opt_inst_param(parent_job_snapshot, "parent_job_snapshot", JobSnap)
if parent_job_snapshot:
check.invariant(
job_snapshot,
"If parent_job_snapshot is set, job_snapshot should also be.",
)
# op_selection is a sequence of selection queries assigned by the user.
# *Most* callers expand the op_selection into an explicit set of
# resolved_op_selection via accessing remote_job.resolved_op_selection
# but not all do. Some (launch execution mutation in graphql and backfill run
# creation, for example) actually pass the solid *selection* into the
# resolved_op_selection parameter, but just as a frozen set, rather than
# fully resolving the selection, as the daemon launchers do. Given the
# state of callers we just check to ensure that the arguments are well-formed.
#
# asset_selection adds another dimension to this lovely dance. op_selection
# and asset_selection are mutually exclusive and should never both be set.
# This is invariant is checked in a sporadic fashion around
# the codebase, but is never enforced in a typed fashion.
#
# Additionally, the way that callsites currently behave *if* asset selection
# is set (i.e., not None) then *neither* op_selection *nor*
# resolved_op_selection is passed. In the asset selection case resolving
# the set of assets into the canonical resolved_op_selection is done in
# the user process, and the exact resolution is never persisted in the run.
# We are asserting that invariant here to maintain that behavior.
#
# Finally, asset_check_selection can be passed along with asset_selection. It
# is mutually exclusive with op_selection and resolved_op_selection. A `None`
# value will include any asset checks that target selected assets. An empty set
# will include no asset checks.
check.opt_set_param(resolved_op_selection, "resolved_op_selection", of_type=str)
check.opt_sequence_param(op_selection, "op_selection", of_type=str)
check.opt_set_param(asset_selection, "asset_selection", of_type=AssetKey)
check.opt_set_param(asset_check_selection, "asset_check_selection", of_type=AssetCheckKey)
# asset_selection will always be None on an op job, but asset_check_selection may be
# None or []. This is because [] and None are different for asset checks: None means
# include all asset checks on selected assets, while [] means include no asset checks.
# In an op job (which has no asset checks), these two are equivalent.
if asset_selection is not None or asset_check_selection:
check.invariant(
op_selection is None,
"Cannot pass op_selection with either of asset_selection or asset_check_selection",
)
check.invariant(
resolved_op_selection is None,
"Cannot pass resolved_op_selection with either of asset_selection or"
" asset_check_selection",
)
# The "python origin" arguments exist so a job can be reconstructed in memory
# after a DagsterRun has been fetched from the database.
#
# There are cases (notably in _logged_execute_job with Reconstructable jobs)
# where job_code_origin and is not. In some cloud test cases only
# remote_job_origin is passed But they are almost always passed together.
# If these are not set the created run will never be able to be relaunched from
# the information just in the run or in another process.
check.opt_inst_param(remote_job_origin, "remote_job_origin", RemoteJobOrigin)
check.opt_inst_param(job_code_origin, "job_code_origin", JobPythonOrigin)
dagster_run = self._construct_run_with_snapshots(
job_name=job_name,
run_id=run_id, # type: ignore # (possible none)
run_config=run_config,
asset_selection=asset_selection,
asset_check_selection=asset_check_selection,
op_selection=op_selection,
resolved_op_selection=resolved_op_selection,
step_keys_to_execute=step_keys_to_execute,
status=status,
tags=validated_tags,
root_run_id=root_run_id,
parent_run_id=parent_run_id,
job_snapshot=job_snapshot,
execution_plan_snapshot=execution_plan_snapshot,
parent_job_snapshot=parent_job_snapshot,
remote_job_origin=remote_job_origin,
job_code_origin=job_code_origin,
)
dagster_run = self._run_storage.add_run(dagster_run)
if execution_plan_snapshot:
self._log_asset_planned_events(dagster_run, execution_plan_snapshot, asset_graph)
return dagster_run
def create_reexecuted_run(
self,
*,
parent_run: DagsterRun,
code_location: "CodeLocation",
remote_job: "RemoteJob",
strategy: "ReexecutionStrategy",
extra_tags: Optional[Mapping[str, Any]] = None,
run_config: Optional[Mapping[str, Any]] = None,
use_parent_run_tags: bool = False,
) -> DagsterRun:
from dagster._core.execution.plan.resume_retry import ReexecutionStrategy
from dagster._core.execution.plan.state import KnownExecutionState
from dagster._core.remote_representation import CodeLocation, RemoteJob
check.inst_param(parent_run, "parent_run", DagsterRun)
check.inst_param(code_location, "code_location", CodeLocation)
check.inst_param(remote_job, "remote_job", RemoteJob)
check.inst_param(strategy, "strategy", ReexecutionStrategy)
check.opt_mapping_param(extra_tags, "extra_tags", key_type=str)
check.opt_mapping_param(run_config, "run_config", key_type=str)
check.bool_param(use_parent_run_tags, "use_parent_run_tags")
root_run_id = parent_run.root_run_id or parent_run.run_id
parent_run_id = parent_run.run_id
# these can differ from remote_job.tags if tags were added at launch time
parent_run_tags = (
{key: val for key, val in parent_run.tags.items() if key != RUN_FAILURE_REASON_TAG}
if use_parent_run_tags
else {}
)
tags = merge_dicts(
remote_job.tags,
parent_run_tags,
extra_tags or {},
{
PARENT_RUN_ID_TAG: parent_run_id,
ROOT_RUN_ID_TAG: root_run_id,
},
)
run_config = run_config if run_config is not None else parent_run.run_config
if strategy == ReexecutionStrategy.FROM_FAILURE:
(
step_keys_to_execute,
known_state,
) = KnownExecutionState.build_resume_retry_reexecution(
self,
parent_run=parent_run,
)
tags[RESUME_RETRY_TAG] = "true"
elif strategy == ReexecutionStrategy.ALL_STEPS:
step_keys_to_execute = None
known_state = None
else:
raise DagsterInvariantViolationError(f"Unknown reexecution strategy: {strategy}")
remote_execution_plan = code_location.get_execution_plan(
remote_job,
run_config,
step_keys_to_execute=step_keys_to_execute,
known_state=known_state,
instance=self,
)
return self.create_run(
job_name=parent_run.job_name,
run_id=None,
run_config=run_config,
resolved_op_selection=parent_run.resolved_op_selection,
step_keys_to_execute=step_keys_to_execute,
status=DagsterRunStatus.NOT_STARTED,
tags=tags,
root_run_id=root_run_id,
parent_run_id=parent_run_id,
job_snapshot=remote_job.job_snapshot,
execution_plan_snapshot=remote_execution_plan.execution_plan_snapshot,
parent_job_snapshot=remote_job.parent_job_snapshot,
op_selection=parent_run.op_selection,
asset_selection=parent_run.asset_selection,
asset_check_selection=parent_run.asset_check_selection,
remote_job_origin=remote_job.get_remote_origin(),
job_code_origin=remote_job.get_python_origin(),
asset_graph=code_location.get_repository(
remote_job.repository_handle.repository_name
).asset_graph,
)
def register_managed_run(
self,
job_name: str,
run_id: str,
run_config: Optional[Mapping[str, object]],
resolved_op_selection: Optional[AbstractSet[str]],
step_keys_to_execute: Optional[Sequence[str]],
tags: Mapping[str, str],
root_run_id: Optional[str],
parent_run_id: Optional[str],
job_snapshot: Optional["JobSnap"],
execution_plan_snapshot: Optional["ExecutionPlanSnapshot"],
parent_job_snapshot: Optional["JobSnap"],
op_selection: Optional[Sequence[str]] = None,
job_code_origin: Optional[JobPythonOrigin] = None,
) -> DagsterRun:
# The usage of this method is limited to dagster-airflow, specifically in Dagster
# Operators that are executed in Airflow. Because a common workflow in Airflow is to
# retry dags from arbitrary tasks, we need any node to be capable of creating a
# DagsterRun.
#
# The try-except DagsterRunAlreadyExists block handles the race when multiple "root" tasks
# simultaneously execute self._run_storage.add_run(dagster_run). When this happens, only
# one task succeeds in creating the run, while the others get DagsterRunAlreadyExists
# error; at this point, the failed tasks try again to fetch the existing run.
# https://github.com/dagster-io/dagster/issues/2412
dagster_run = self._construct_run_with_snapshots(
job_name=job_name,
run_id=run_id,
run_config=run_config,
op_selection=op_selection,
resolved_op_selection=resolved_op_selection,
step_keys_to_execute=step_keys_to_execute,
status=DagsterRunStatus.MANAGED,
tags=tags,
root_run_id=root_run_id,
parent_run_id=parent_run_id,
job_snapshot=job_snapshot,
execution_plan_snapshot=execution_plan_snapshot,
parent_job_snapshot=parent_job_snapshot,
job_code_origin=job_code_origin,
)
def get_run() -> DagsterRun:
candidate_run = self.get_run_by_id(dagster_run.run_id)
field_diff = _check_run_equality(dagster_run, candidate_run) # type: ignore # (possible none)
if field_diff:
raise DagsterRunConflict(
f"Found conflicting existing run with same id {dagster_run.run_id}. Runs differ in:"
f"\n{_format_field_diff(field_diff)}",
)
return candidate_run # type: ignore # (possible none)
if self.has_run(dagster_run.run_id):
return get_run()
try:
return self._run_storage.add_run(dagster_run)
except DagsterRunAlreadyExists:
return get_run()
@traced
def add_run(self, dagster_run: DagsterRun) -> DagsterRun:
return self._run_storage.add_run(dagster_run)
@traced
def add_snapshot(
self,
snapshot: Union["JobSnap", "ExecutionPlanSnapshot"],
snapshot_id: Optional[str] = None,
) -> None:
return self._run_storage.add_snapshot(snapshot, snapshot_id)
@traced
def handle_run_event(self, run_id: str, event: "DagsterEvent") -> None:
return self._run_storage.handle_run_event(run_id, event)
@traced
def add_run_tags(self, run_id: str, new_tags: Mapping[str, str]) -> None:
return self._run_storage.add_run_tags(run_id, new_tags)
@traced
def has_run(self, run_id: str) -> bool:
return self._run_storage.has_run(run_id)
@traced
def get_runs(
self,
filters: Optional[RunsFilter] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
bucket_by: Optional[Union[JobBucket, TagBucket]] = None,
ascending: bool = False,
) -> Sequence[DagsterRun]:
return self._run_storage.get_runs(filters, cursor, limit, bucket_by, ascending)
@traced
def get_run_ids(
self,
filters: Optional[RunsFilter] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
) -> Sequence[str]:
return self._run_storage.get_run_ids(filters, cursor=cursor, limit=limit)
@traced
def get_runs_count(self, filters: Optional[RunsFilter] = None) -> int:
return self._run_storage.get_runs_count(filters)
[docs]
@public
@traced
def get_run_records(
self,
filters: Optional[RunsFilter] = None,
limit: Optional[int] = None,
order_by: Optional[str] = None,
ascending: bool = False,
cursor: Optional[str] = None,
bucket_by: Optional[Union[JobBucket, TagBucket]] = None,
) -> Sequence[RunRecord]:
"""Return a list of run records stored in the run storage, sorted by the given column in given order.
Args:
filters (Optional[RunsFilter]): the filter by which to filter runs.
limit (Optional[int]): Number of results to get. Defaults to infinite.
order_by (Optional[str]): Name of the column to sort by. Defaults to id.
ascending (Optional[bool]): Sort the result in ascending order if True, descending
otherwise. Defaults to descending.
Returns:
List[RunRecord]: List of run records stored in the run storage.
"""
return self._run_storage.get_run_records(
filters, limit, order_by, ascending, cursor, bucket_by
)
@traced
def get_run_partition_data(self, runs_filter: RunsFilter) -> Sequence[RunPartitionData]:
"""Get run partition data for a given partitioned job."""
return self._run_storage.get_run_partition_data(runs_filter)
def wipe(self) -> None:
self._run_storage.wipe()
self._event_storage.wipe()
[docs]
@public
@traced
def delete_run(self, run_id: str) -> None:
"""Delete a run and all events generated by that from storage.
Args:
run_id (str): The id of the run to delete.
"""
self._run_storage.delete_run(run_id)
self._event_storage.delete_events(run_id)
# event storage
@traced
def logs_after(
self,
run_id: str,
cursor: Optional[int] = None,
of_type: Optional["DagsterEventType"] = None,
limit: Optional[int] = None,
) -> Sequence["EventLogEntry"]:
return self._event_storage.get_logs_for_run(
run_id,
cursor=cursor,
of_type=of_type,
limit=limit,
)
@traced
def all_logs(
self,
run_id: str,
of_type: Optional[Union["DagsterEventType", Set["DagsterEventType"]]] = None,
) -> Sequence["EventLogEntry"]:
return self._event_storage.get_logs_for_run(run_id, of_type=of_type)
@traced
def get_records_for_run(
self,
run_id: str,
cursor: Optional[str] = None,
of_type: Optional[Union["DagsterEventType", Set["DagsterEventType"]]] = None,
limit: Optional[int] = None,
ascending: bool = True,
) -> "EventLogConnection":
return self._event_storage.get_records_for_run(run_id, cursor, of_type, limit, ascending)
def watch_event_logs(self, run_id: str, cursor: Optional[str], cb: "EventHandlerFn") -> None:
return self._event_storage.watch(run_id, cursor, cb)
def end_watch_event_logs(self, run_id: str, cb: "EventHandlerFn") -> None:
return self._event_storage.end_watch(run_id, cb)
# asset storage
@traced
def can_read_asset_status_cache(self) -> bool:
return self._event_storage.can_read_asset_status_cache()
@traced
def update_asset_cached_status_data(
self, asset_key: AssetKey, cache_values: "AssetStatusCacheValue"
) -> None:
self._event_storage.update_asset_cached_status_data(asset_key, cache_values)
@traced
def wipe_asset_cached_status(self, asset_keys: Sequence[AssetKey]) -> None:
check.list_param(asset_keys, "asset_keys", of_type=AssetKey)
for asset_key in asset_keys:
self._event_storage.wipe_asset_cached_status(asset_key)
@traced
def all_asset_keys(self) -> Sequence[AssetKey]:
return self._event_storage.all_asset_keys()
[docs]
@public
@traced
def get_asset_keys(
self,
prefix: Optional[Sequence[str]] = None,
limit: Optional[int] = None,
cursor: Optional[str] = None,
) -> Sequence[AssetKey]:
"""Return a filtered subset of asset keys managed by this instance.
Args:
prefix (Optional[Sequence[str]]): Return only assets having this key prefix.
limit (Optional[int]): Maximum number of keys to return.
cursor (Optional[str]): Cursor to use for pagination.
Returns:
Sequence[AssetKey]: List of asset keys.
"""
return self._event_storage.get_asset_keys(prefix=prefix, limit=limit, cursor=cursor)
[docs]
@public
@traced
def has_asset_key(self, asset_key: AssetKey) -> bool:
"""Return true if this instance manages the given asset key.
Args:
asset_key (AssetKey): Asset key to check.
"""
return self._event_storage.has_asset_key(asset_key)
@traced
def get_latest_materialization_events(
self, asset_keys: Iterable[AssetKey]
) -> Mapping[AssetKey, Optional["EventLogEntry"]]:
return self._event_storage.get_latest_materialization_events(asset_keys)
[docs]
@public
@traced
def get_latest_materialization_event(self, asset_key: AssetKey) -> Optional["EventLogEntry"]:
"""Fetch the latest materialization event for the given asset key.
Args:
asset_key (AssetKey): Asset key to return materialization for.
Returns:
Optional[EventLogEntry]: The latest materialization event for the given asset
key, or `None` if the asset has not been materialized.
"""
return self._event_storage.get_latest_materialization_events([asset_key]).get(asset_key)
@traced
def get_latest_asset_check_evaluation_record(
self, asset_check_key: "AssetCheckKey"
) -> Optional["AssetCheckExecutionRecord"]:
return self._event_storage.get_latest_asset_check_execution_by_key([asset_check_key]).get(
asset_check_key
)
[docs]
@public
@traced
def get_event_records(
self,
event_records_filter: "EventRecordsFilter",
limit: Optional[int] = None,
ascending: bool = False,
) -> Sequence["EventLogRecord"]:
"""Return a list of event records stored in the event log storage.
Args:
event_records_filter (Optional[EventRecordsFilter]): the filter by which to filter event
records.
limit (Optional[int]): Number of results to get. Defaults to infinite.
ascending (Optional[bool]): Sort the result in ascending order if True, descending
otherwise. Defaults to descending.
Returns:
List[EventLogRecord]: List of event log records stored in the event log storage.
"""
from dagster._core.events import DagsterEventType
if (
event_records_filter.event_type == DagsterEventType.ASSET_MATERIALIZATION_PLANNED
and event_records_filter.asset_partitions
):
warnings.warn(
"Asset materialization planned events with partitions subsets will not be "
"returned when the event records filter contains the asset_partitions argument"
)
return self._event_storage.get_event_records(event_records_filter, limit, ascending)
[docs]
@public
@traced
def fetch_materializations(
self,
records_filter: Union[AssetKey, "AssetRecordsFilter"],
limit: int,
cursor: Optional[str] = None,
ascending: bool = False,
) -> "EventRecordsResult":
"""Return a list of materialization records stored in the event log storage.
Args:
records_filter (Union[AssetKey, AssetRecordsFilter]): the filter by which to
filter event records.
limit (int): Number of results to get.
cursor (Optional[str]): Cursor to use for pagination. Defaults to None.
ascending (Optional[bool]): Sort the result in ascending order if True, descending
otherwise. Defaults to descending.
Returns:
EventRecordsResult: Object containing a list of event log records and a cursor string
"""
return self._event_storage.fetch_materializations(records_filter, limit, cursor, ascending)
@traced
@deprecated(breaking_version="2.0")
def fetch_planned_materializations(
self,
records_filter: Union[AssetKey, "AssetRecordsFilter"],
limit: int,
cursor: Optional[str] = None,
ascending: bool = False,
) -> "EventRecordsResult":
"""Return a list of planned materialization records stored in the event log storage.
Args:
records_filter (Optional[Union[AssetKey, AssetRecordsFilter]]): the filter by which to
filter event records.
limit (int): Number of results to get.
cursor (Optional[str]): Cursor to use for pagination. Defaults to None.
ascending (Optional[bool]): Sort the result in ascending order if True, descending
otherwise. Defaults to descending.
Returns:
EventRecordsResult: Object containing a list of event log records and a cursor string
"""
from dagster._core.event_api import EventLogCursor
from dagster._core.events import DagsterEventType
from dagster._core.storage.event_log.base import EventRecordsFilter, EventRecordsResult
event_records_filter = (
EventRecordsFilter(DagsterEventType.ASSET_MATERIALIZATION_PLANNED, records_filter)
if isinstance(records_filter, AssetKey)
else records_filter.to_event_records_filter(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED, cursor=cursor, ascending=ascending
)
)
records = self._event_storage.get_event_records(
event_records_filter, limit=limit, ascending=ascending
)
if records:
new_cursor = EventLogCursor.from_storage_id(records[-1].storage_id).to_string()
elif cursor:
new_cursor = cursor
else:
new_cursor = EventLogCursor.from_storage_id(-1).to_string()
has_more = len(records) == limit
return EventRecordsResult(records, cursor=new_cursor, has_more=has_more)
[docs]
@public
@traced
def fetch_observations(
self,
records_filter: Union[AssetKey, "AssetRecordsFilter"],
limit: int,
cursor: Optional[str] = None,
ascending: bool = False,
) -> "EventRecordsResult":
"""Return a list of observation records stored in the event log storage.
Args:
records_filter (Optional[Union[AssetKey, AssetRecordsFilter]]): the filter by which to
filter event records.
limit (int): Number of results to get.
cursor (Optional[str]): Cursor to use for pagination. Defaults to None.
ascending (Optional[bool]): Sort the result in ascending order if True, descending
otherwise. Defaults to descending.
Returns:
EventRecordsResult: Object containing a list of event log records and a cursor string
"""
return self._event_storage.fetch_observations(records_filter, limit, cursor, ascending)
[docs]
@public
@traced
def fetch_run_status_changes(
self,
records_filter: Union["DagsterEventType", "RunStatusChangeRecordsFilter"],
limit: int,
cursor: Optional[str] = None,
ascending: bool = False,
) -> "EventRecordsResult":
"""Return a list of run_status_event records stored in the event log storage.
Args:
records_filter (Optional[Union[DagsterEventType, RunStatusChangeRecordsFilter]]): the
filter by which to filter event records.
limit (int): Number of results to get.
cursor (Optional[str]): Cursor to use for pagination. Defaults to None.
ascending (Optional[bool]): Sort the result in ascending order if True, descending
otherwise. Defaults to descending.
Returns:
EventRecordsResult: Object containing a list of event log records and a cursor string
"""
return self._event_storage.fetch_run_status_changes(
records_filter, limit, cursor, ascending
)
[docs]
@public
@traced
def get_status_by_partition(
self,
asset_key: AssetKey,
partition_keys: Sequence[str],
partitions_def: "PartitionsDefinition",
) -> Optional[Mapping[str, "AssetPartitionStatus"]]:
"""Get the current status of provided partition_keys for the provided asset.
Args:
asset_key (AssetKey): The asset to get per-partition status for.
partition_keys (Sequence[str]): The partitions to get status for.
partitions_def (PartitionsDefinition): The PartitionsDefinition of the asset to get
per-partition status for.
Returns:
Optional[Mapping[str, AssetPartitionStatus]]: status for each partition key
"""
from dagster._core.storage.partition_status_cache import (
AssetPartitionStatus,
AssetStatusCacheValue,
get_and_update_asset_status_cache_value,
)
cached_value = get_and_update_asset_status_cache_value(self, asset_key, partitions_def)
if isinstance(cached_value, AssetStatusCacheValue):
materialized_partitions = cached_value.deserialize_materialized_partition_subsets(
partitions_def
)
failed_partitions = cached_value.deserialize_failed_partition_subsets(partitions_def)
in_progress_partitions = cached_value.deserialize_in_progress_partition_subsets(
partitions_def
)
status_by_partition = {}
for partition_key in partition_keys:
if partition_key in in_progress_partitions:
status_by_partition[partition_key] = AssetPartitionStatus.IN_PROGRESS
elif partition_key in failed_partitions:
status_by_partition[partition_key] = AssetPartitionStatus.FAILED
elif partition_key in materialized_partitions:
status_by_partition[partition_key] = AssetPartitionStatus.MATERIALIZED
else:
status_by_partition[partition_key] = None
return status_by_partition
[docs]
@public
@traced
def get_asset_records(
self, asset_keys: Optional[Sequence[AssetKey]] = None
) -> Sequence["AssetRecord"]:
"""Return an `AssetRecord` for each of the given asset keys.
Args:
asset_keys (Optional[Sequence[AssetKey]]): List of asset keys to retrieve records for.
Returns:
Sequence[AssetRecord]: List of asset records.
"""
return self._event_storage.get_asset_records(asset_keys)
@traced
def get_event_tags_for_asset(
self,
asset_key: AssetKey,
filter_tags: Optional[Mapping[str, str]] = None,
filter_event_id: Optional[int] = None,
) -> Sequence[Mapping[str, str]]:
"""Fetches asset event tags for the given asset key.
If filter_tags is provided, searches for events containing all of the filter tags. Then,
returns all tags for those events. This enables searching for multipartitioned asset
partition tags with a fixed dimension value, e.g. all of the tags for events where
"country" == "US".
If filter_event_id is provided, searches for the event with the provided event_id.
Returns a list of dicts, where each dict is a mapping of tag key to tag value for a
single event.
"""
return self._event_storage.get_event_tags_for_asset(asset_key, filter_tags, filter_event_id)
[docs]
@public
@traced
def wipe_assets(self, asset_keys: Sequence[AssetKey]) -> None:
"""Wipes asset event history from the event log for the given asset keys.
Args:
asset_keys (Sequence[AssetKey]): Asset keys to wipe.
"""
check.list_param(asset_keys, "asset_keys", of_type=AssetKey)
for asset_key in asset_keys:
self._event_storage.wipe_asset(asset_key)
def wipe_asset_partitions(
self,
asset_key: AssetKey,
partition_keys: Sequence[str],
) -> None:
"""Wipes asset event history from the event log for the given asset key and partition keys.
Args:
asset_key (Sequence[AssetKey]): Asset key to wipe.
partition_keys (Sequence[str]): Partition keys to wipe.
"""
self._event_storage.wipe_asset_partitions(asset_key, partition_keys)
@traced
def get_materialized_partitions(
self,
asset_key: AssetKey,
before_cursor: Optional[int] = None,
after_cursor: Optional[int] = None,
) -> Set[str]:
return self._event_storage.get_materialized_partitions(
asset_key, before_cursor=before_cursor, after_cursor=after_cursor
)
@traced
def get_latest_storage_id_by_partition(
self,
asset_key: AssetKey,
event_type: "DagsterEventType",
partitions: Optional[Set[str]] = None,
) -> Mapping[str, int]:
"""Fetch the latest materialzation storage id for each partition for a given asset key.
Returns a mapping of partition to storage id.
"""
return self._event_storage.get_latest_storage_id_by_partition(
asset_key, event_type, partitions
)
@traced
def get_latest_planned_materialization_info(
self,
asset_key: AssetKey,
partition: Optional[str] = None,
) -> Optional["PlannedMaterializationInfo"]:
return self._event_storage.get_latest_planned_materialization_info(asset_key, partition)
[docs]
@public
@traced
def get_dynamic_partitions(self, partitions_def_name: str) -> Sequence[str]:
"""Get the set of partition keys for the specified :py:class:`DynamicPartitionsDefinition`.
Args:
partitions_def_name (str): The name of the `DynamicPartitionsDefinition`.
"""
check.str_param(partitions_def_name, "partitions_def_name")
return self._event_storage.get_dynamic_partitions(partitions_def_name)
[docs]
@public
@traced
def add_dynamic_partitions(
self, partitions_def_name: str, partition_keys: Sequence[str]
) -> None:
"""Add partitions to the specified :py:class:`DynamicPartitionsDefinition` idempotently.
Does not add any partitions that already exist.
Args:
partitions_def_name (str): The name of the `DynamicPartitionsDefinition`.
partition_keys (Sequence[str]): Partition keys to add.
"""
from dagster._core.definitions.partition import (
raise_error_on_invalid_partition_key_substring,
)
check.str_param(partitions_def_name, "partitions_def_name")
check.sequence_param(partition_keys, "partition_keys", of_type=str)
if isinstance(partition_keys, str):
# Guard against a single string being passed in `partition_keys`
raise DagsterInvalidInvocationError("partition_keys must be a sequence of strings")
raise_error_on_invalid_partition_key_substring(partition_keys)
return self._event_storage.add_dynamic_partitions(partitions_def_name, partition_keys)
[docs]
@public
@traced
def delete_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> None:
"""Delete a partition for the specified :py:class:`DynamicPartitionsDefinition`.
If the partition does not exist, exits silently.
Args:
partitions_def_name (str): The name of the `DynamicPartitionsDefinition`.
partition_key (str): Partition key to delete.
"""
check.str_param(partitions_def_name, "partitions_def_name")
check.str_param(partition_key, "partition_key")
self._event_storage.delete_dynamic_partition(partitions_def_name, partition_key)
[docs]
@public
@traced
def has_dynamic_partition(self, partitions_def_name: str, partition_key: str) -> bool:
"""Check if a partition key exists for the :py:class:`DynamicPartitionsDefinition`.
Args:
partitions_def_name (str): The name of the `DynamicPartitionsDefinition`.
partition_key (Sequence[str]): Partition key to check.
"""
check.str_param(partitions_def_name, "partitions_def_name")
check.str_param(partition_key, "partition_key")
return self._event_storage.has_dynamic_partition(partitions_def_name, partition_key)
# event subscriptions
def _get_yaml_python_handlers(self) -> Sequence[logging.Handler]:
if self._settings:
logging_config = self.get_settings("python_logs").get("dagster_handler_config", {})
if logging_config:
experimental_warning("Handling yaml-defined logging configuration")
# Handlers can only be retrieved from dictConfig configuration if they are attached
# to a logger. We add a dummy logger to the configuration that allows us to access user
# defined handlers.
handler_names = logging_config.get("handlers", {}).keys()
dagster_dummy_logger_name = "dagster_dummy_logger"
processed_dict_conf = {
"version": 1,
"disable_existing_loggers": False,
"loggers": {dagster_dummy_logger_name: {"handlers": handler_names}},
}
processed_dict_conf.update(logging_config)
logging.config.dictConfig(processed_dict_conf)
dummy_logger = logging.getLogger(dagster_dummy_logger_name)
return dummy_logger.handlers
return []
def _get_event_log_handler(self) -> _EventListenerLogHandler:
event_log_handler = _EventListenerLogHandler(self)
event_log_handler.setLevel(10)
return event_log_handler
def get_handlers(self) -> Sequence[logging.Handler]:
handlers: List[logging.Handler] = [self._get_event_log_handler()]
handlers.extend(self._get_yaml_python_handlers())
return handlers
def store_event(self, event: "EventLogEntry") -> None:
self._event_storage.store_event(event)
def handle_new_event(
self,
event: "EventLogEntry",
*,
batch_metadata: Optional["DagsterEventBatchMetadata"] = None,
) -> None:
"""Handle a new event by storing it and notifying subscribers.
Events may optionally be sent with `batch_metadata`. If batch writing is enabled, then
events sent with `batch_metadata` will not trigger an immediate write. Instead, they will be
kept in a batch-specific buffer (identified by `batch_metadata.id`) until either the buffer
reaches the event batch size or the end of the batch is reached (signaled by
`batch_metadata.is_end`). When this point is reached, all events in the buffer will be sent
to the storage layer in a single batch. If an error occurrs during batch writing, then we
fall back to iterative individual event writes.
Args:
event (EventLogEntry): The event to handle.
batch_metadata (Optional[DagsterEventBatchMetadata]): Metadata for batch writing.
"""
if batch_metadata is None or not _is_batch_writing_enabled():
events = [event]
else:
batch_id, is_batch_end = batch_metadata.id, batch_metadata.is_end
self._event_buffer[batch_id].append(event)
if is_batch_end or len(self._event_buffer[batch_id]) == _get_event_batch_size():
events = self._event_buffer[batch_id]
del self._event_buffer[batch_id]
else:
return
if len(events) == 1:
self._event_storage.store_event(events[0])
else:
try:
self._event_storage.store_event_batch(events)
# Fall back to storing events one by one if writing a batch fails. We catch a generic
# Exception because that is the parent class of the actually received error,
# dagster_cloud_cli.core.errors.GraphQLStorageError, which we cannot import here due to
# it living in a cloud package.
except Exception as e:
sys.stderr.write(f"Exception while storing event batch: {e}\n")
sys.stderr.write(
"Falling back to storing multiple single-event storage requests...\n"
)
for event in events:
self._event_storage.store_event(event)
for event in events:
run_id = event.run_id
if (
not self._event_storage.handles_run_events_in_store_event
and event.is_dagster_event
and event.get_dagster_event().is_job_event
):
self._run_storage.handle_run_event(run_id, event.get_dagster_event())
for sub in self._subscribers[run_id]:
sub(event)
def add_event_listener(self, run_id: str, cb) -> None:
self._subscribers[run_id].append(cb)
def report_engine_event(
self,
message: str,
dagster_run: Optional[DagsterRun] = None,
engine_event_data: Optional["EngineEventData"] = None,
cls: Optional[Type[object]] = None,
step_key: Optional[str] = None,
job_name: Optional[str] = None,
run_id: Optional[str] = None,
) -> "DagsterEvent":
"""Report a EngineEvent that occurred outside of a job execution context."""
from dagster._core.events import DagsterEvent, DagsterEventType, EngineEventData
check.opt_class_param(cls, "cls")
check.str_param(message, "message")
check.opt_inst_param(dagster_run, "dagster_run", DagsterRun)
check.opt_str_param(run_id, "run_id")
check.opt_str_param(job_name, "job_name")
check.invariant(
dagster_run or (job_name and run_id),
"Must include either dagster_run or job_name and run_id",
)
run_id = run_id if run_id else dagster_run.run_id # type: ignore
job_name = job_name if job_name else dagster_run.job_name # type: ignore
engine_event_data = check.opt_inst_param(
engine_event_data,
"engine_event_data",
EngineEventData,
EngineEventData({}),
)
if cls:
message = f"[{cls.__name__}] {message}"
log_level = logging.INFO
if engine_event_data and engine_event_data.error:
log_level = logging.ERROR
dagster_event = DagsterEvent(
event_type_value=DagsterEventType.ENGINE_EVENT.value,
job_name=job_name,
message=message,
event_specific_data=engine_event_data,
step_key=step_key,
)
self.report_dagster_event(dagster_event, run_id=run_id, log_level=log_level)
return dagster_event
def report_dagster_event(
self,
dagster_event: "DagsterEvent",
run_id: str,
log_level: Union[str, int] = logging.INFO,
) -> None:
"""Takes a DagsterEvent and stores it in persistent storage for the corresponding DagsterRun."""
from dagster._core.events.log import EventLogEntry
event_record = EventLogEntry(
user_message="",
level=log_level,
job_name=dagster_event.job_name,
run_id=run_id,
error_info=None,
timestamp=get_current_timestamp(),
step_key=dagster_event.step_key,
dagster_event=dagster_event,
)
self.handle_new_event(event_record)
def report_run_canceling(self, run: DagsterRun, message: Optional[str] = None):
from dagster._core.events import DagsterEvent, DagsterEventType
check.inst_param(run, "run", DagsterRun)
message = check.opt_str_param(
message,
"message",
"Sending run termination request.",
)
canceling_event = DagsterEvent(
event_type_value=DagsterEventType.PIPELINE_CANCELING.value,
job_name=run.job_name,
message=message,
)
self.report_dagster_event(canceling_event, run_id=run.run_id)
def report_run_canceled(
self,
dagster_run: DagsterRun,
message: Optional[str] = None,
) -> "DagsterEvent":
from dagster._core.events import DagsterEvent, DagsterEventType
check.inst_param(dagster_run, "dagster_run", DagsterRun)
message = check.opt_str_param(
message,
"mesage",
"This run has been marked as canceled from outside the execution context.",
)
dagster_event = DagsterEvent(
event_type_value=DagsterEventType.PIPELINE_CANCELED.value,
job_name=dagster_run.job_name,
message=message,
)
self.report_dagster_event(dagster_event, run_id=dagster_run.run_id, log_level=logging.ERROR)
return dagster_event
def report_run_failed(
self,
dagster_run: DagsterRun,
message: Optional[str] = None,
job_failure_data: Optional["JobFailureData"] = None,
) -> "DagsterEvent":
from dagster._core.events import DagsterEvent, DagsterEventType
check.inst_param(dagster_run, "dagster_run", DagsterRun)
message = check.opt_str_param(
message,
"message",
"This run has been marked as failed from outside the execution context.",
)
dagster_event = DagsterEvent(
event_type_value=DagsterEventType.PIPELINE_FAILURE.value,
job_name=dagster_run.job_name,
message=message,
event_specific_data=job_failure_data,
)
self.report_dagster_event(dagster_event, run_id=dagster_run.run_id, log_level=logging.ERROR)
return dagster_event
# directories
def file_manager_directory(self, run_id: str) -> str:
return self._local_artifact_storage.file_manager_dir(run_id)
def storage_directory(self) -> str:
return self._local_artifact_storage.storage_dir
def schedules_directory(self) -> str:
return self._local_artifact_storage.schedules_dir
# Runs coordinator
def submit_run(self, run_id: str, workspace: "BaseWorkspaceRequestContext") -> DagsterRun:
"""Submit a pipeline run to the coordinator.
This method delegates to the ``RunCoordinator``, configured on the instance, and will
call its implementation of ``RunCoordinator.submit_run()`` to send the run to the
coordinator for execution. Runs should be created in the instance (e.g., by calling
``DagsterInstance.create_run()``) *before* this method is called, and
should be in the ``PipelineRunStatus.NOT_STARTED`` state. They also must have a non-null
ExternalPipelineOrigin.
Args:
run_id (str): The id of the run.
"""
from dagster._core.run_coordinator import SubmitRunContext
run = self.get_run_by_id(run_id)
if run is None:
raise DagsterInvariantViolationError(
f"Could not load run {run_id} that was passed to submit_run"
)
check.not_none(
run.remote_job_origin,
"External pipeline origin must be set for submitted runs",
)
check.not_none(
run.job_code_origin,
"Python origin must be set for submitted runs",
)
try:
submitted_run = self.run_coordinator.submit_run(
SubmitRunContext(run, workspace=workspace)
)
except:
from dagster._core.events import EngineEventData
error = serializable_error_info_from_exc_info(sys.exc_info())
self.report_engine_event(
error.message,
run,
EngineEventData.engine_error(error),
)
self.report_run_failed(run)
raise
return submitted_run
# Run launcher
def launch_run(self, run_id: str, workspace: "BaseWorkspaceRequestContext") -> DagsterRun:
"""Launch a pipeline run.
This method is typically called using `instance.submit_run` rather than being invoked
directly. This method delegates to the ``RunLauncher``, if any, configured on the instance,
and will call its implementation of ``RunLauncher.launch_run()`` to begin the execution of
the specified run. Runs should be created in the instance (e.g., by calling
``DagsterInstance.create_run()``) *before* this method is called, and should be in the
``PipelineRunStatus.NOT_STARTED`` state.
Args:
run_id (str): The id of the run the launch.
"""
from dagster._core.events import DagsterEvent, DagsterEventType, EngineEventData
from dagster._core.launcher import LaunchRunContext
run = self.get_run_by_id(run_id)
if run is None:
raise DagsterInvariantViolationError(
f"Could not load run {run_id} that was passed to launch_run"
)
launch_started_event = DagsterEvent(
event_type_value=DagsterEventType.PIPELINE_STARTING.value,
job_name=run.job_name,
)
self.report_dagster_event(launch_started_event, run_id=run.run_id)
run = self.get_run_by_id(run_id)
if run is None:
check.failed(f"Failed to reload run {run_id}")
try:
self.run_launcher.launch_run(LaunchRunContext(dagster_run=run, workspace=workspace))
except:
error = serializable_error_info_from_exc_info(sys.exc_info())
self.report_engine_event(
error.message,
run,
EngineEventData.engine_error(error),
)
self.report_run_failed(run)
raise
return run
def resume_run(
self, run_id: str, workspace: "BaseWorkspaceRequestContext", attempt_number: int
) -> DagsterRun:
"""Resume a pipeline run.
This method should be called on runs which have already been launched, but whose run workers
have died.
Args:
run_id (str): The id of the run the launch.
"""
from dagster._core.events import EngineEventData
from dagster._core.launcher import ResumeRunContext
from dagster._daemon.monitoring import RESUME_RUN_LOG_MESSAGE
run = self.get_run_by_id(run_id)
if run is None:
raise DagsterInvariantViolationError(
f"Could not load run {run_id} that was passed to resume_run"
)
if run.status not in IN_PROGRESS_RUN_STATUSES:
raise DagsterInvariantViolationError(
f"Run {run_id} is not in a state that can be resumed"
)
self.report_engine_event(
RESUME_RUN_LOG_MESSAGE,
run,
)
try:
self.run_launcher.resume_run(
ResumeRunContext(
dagster_run=run,
workspace=workspace,
resume_attempt_number=attempt_number,
)
)
except:
error = serializable_error_info_from_exc_info(sys.exc_info())
self.report_engine_event(
error.message,
run,
EngineEventData.engine_error(error),
)
self.report_run_failed(run)
raise
return run
def count_resume_run_attempts(self, run_id: str) -> int:
from dagster._daemon.monitoring import count_resume_run_attempts
return count_resume_run_attempts(self, run_id)
def run_will_resume(self, run_id: str) -> bool:
if not self.run_monitoring_enabled:
return False
return self.count_resume_run_attempts(run_id) < self.run_monitoring_max_resume_run_attempts
# Scheduler
def start_schedule(self, remote_schedule: "RemoteSchedule") -> "InstigatorState":
return self._scheduler.start_schedule(self, remote_schedule) # type: ignore
def stop_schedule(
self,
schedule_origin_id: str,
schedule_selector_id: str,
remote_schedule: Optional["RemoteSchedule"],
) -> "InstigatorState":
return self._scheduler.stop_schedule( # type: ignore
self, schedule_origin_id, schedule_selector_id, remote_schedule
)
def reset_schedule(self, remote_schedule: "RemoteSchedule") -> "InstigatorState":
return self._scheduler.reset_schedule(self, remote_schedule) # type: ignore
def scheduler_debug_info(self) -> "SchedulerDebugInfo":
from dagster._core.definitions.run_request import InstigatorType
from dagster._core.scheduler import SchedulerDebugInfo
errors = []
schedules: List[str] = []
for schedule_state in self.all_instigator_state(instigator_type=InstigatorType.SCHEDULE):
schedule_info: Mapping[str, Mapping[str, object]] = {
schedule_state.instigator_name: {
"status": schedule_state.status.value,
"cron_schedule": schedule_state.instigator_data.cron_schedule, # type: ignore
"schedule_origin_id": schedule_state.instigator_origin_id,
"repository_origin_id": schedule_state.repository_origin_id,
}
}
schedules.append(yaml.safe_dump(schedule_info, default_flow_style=False))
return SchedulerDebugInfo(
scheduler_config_info=self._info_str_for_component("Scheduler", self.scheduler),
scheduler_info=self.scheduler.debug_info(), # type: ignore
schedule_storage=schedules,
errors=errors,
)
# Schedule / Sensor Storage
def start_sensor(self, remote_sensor: "RemoteSensor") -> "InstigatorState":
from dagster._core.definitions.run_request import InstigatorType
from dagster._core.scheduler.instigation import (
InstigatorState,
InstigatorStatus,
SensorInstigatorData,
)
stored_state = self.get_instigator_state(
remote_sensor.get_remote_origin_id(), remote_sensor.selector_id
)
computed_state = remote_sensor.get_current_instigator_state(stored_state)
if computed_state.is_running:
return computed_state
if not stored_state:
return self.add_instigator_state(
InstigatorState(
remote_sensor.get_remote_origin(),
InstigatorType.SENSOR,
InstigatorStatus.RUNNING,
SensorInstigatorData(
min_interval=remote_sensor.min_interval_seconds,
last_sensor_start_timestamp=get_current_timestamp(),
sensor_type=remote_sensor.sensor_type,
),
)
)
else:
data = cast(SensorInstigatorData, stored_state.instigator_data)
return self.update_instigator_state(
stored_state.with_status(InstigatorStatus.RUNNING).with_data(
data.with_sensor_start_timestamp(get_current_timestamp())
)
)
def stop_sensor(
self,
instigator_origin_id: str,
selector_id: str,
remote_sensor: Optional["RemoteSensor"],
) -> "InstigatorState":
from dagster._core.definitions.run_request import InstigatorType
from dagster._core.scheduler.instigation import (
InstigatorState,
InstigatorStatus,
SensorInstigatorData,
)
stored_state = self.get_instigator_state(instigator_origin_id, selector_id)
computed_state: InstigatorState
if remote_sensor:
computed_state = remote_sensor.get_current_instigator_state(stored_state)
else:
computed_state = check.not_none(stored_state)
if not computed_state.is_running:
return computed_state
if not stored_state:
assert remote_sensor
return self.add_instigator_state(
InstigatorState(
remote_sensor.get_remote_origin(),
InstigatorType.SENSOR,
InstigatorStatus.STOPPED,
SensorInstigatorData(
min_interval=remote_sensor.min_interval_seconds,
sensor_type=remote_sensor.sensor_type,
),
)
)
else:
return self.update_instigator_state(stored_state.with_status(InstigatorStatus.STOPPED))
def reset_sensor(self, remote_sensor: "RemoteSensor") -> "InstigatorState":
"""If the given sensor has a default sensor status, then update the status to
`InstigatorStatus.DECLARED_IN_CODE` in instigator storage.
Args:
instance (DagsterInstance): The current instance.
remote_sensor (ExternalSensor): The sensor to reset.
"""
from dagster._core.definitions.run_request import InstigatorType
from dagster._core.scheduler.instigation import (
InstigatorState,
InstigatorStatus,
SensorInstigatorData,
)
stored_state = self.get_instigator_state(
remote_sensor.get_remote_origin_id(), remote_sensor.selector_id
)
new_status = InstigatorStatus.DECLARED_IN_CODE
if not stored_state:
new_instigator_data = SensorInstigatorData(
min_interval=remote_sensor.min_interval_seconds,
sensor_type=remote_sensor.sensor_type,
)
reset_state = self.add_instigator_state(
state=InstigatorState(
remote_sensor.get_remote_origin(),
InstigatorType.SENSOR,
new_status,
new_instigator_data,
)
)
else:
reset_state = self.update_instigator_state(state=stored_state.with_status(new_status))
return reset_state
@traced
def all_instigator_state(
self,
repository_origin_id: Optional[str] = None,
repository_selector_id: Optional[str] = None,
instigator_type: Optional["InstigatorType"] = None,
instigator_statuses: Optional[Set["InstigatorStatus"]] = None,
):
if not self._schedule_storage:
check.failed("Schedule storage not available")
return self._schedule_storage.all_instigator_state(
repository_origin_id, repository_selector_id, instigator_type, instigator_statuses
)
@traced
def get_instigator_state(self, origin_id: str, selector_id: str) -> Optional["InstigatorState"]:
if not self._schedule_storage:
check.failed("Schedule storage not available")
return self._schedule_storage.get_instigator_state(origin_id, selector_id)
def add_instigator_state(self, state: "InstigatorState") -> "InstigatorState":
if not self._schedule_storage:
check.failed("Schedule storage not available")
return self._schedule_storage.add_instigator_state(state)
def update_instigator_state(self, state: "InstigatorState") -> "InstigatorState":
if not self._schedule_storage:
check.failed("Schedule storage not available")
return self._schedule_storage.update_instigator_state(state)
def delete_instigator_state(self, origin_id: str, selector_id: str) -> None:
return self._schedule_storage.delete_instigator_state(origin_id, selector_id) # type: ignore # (possible none)
@property
def supports_batch_tick_queries(self) -> bool:
return self._schedule_storage and self._schedule_storage.supports_batch_queries # type: ignore # (possible none)
@traced
def get_batch_ticks(
self,
selector_ids: Sequence[str],
limit: Optional[int] = None,
statuses: Optional[Sequence["TickStatus"]] = None,
) -> Mapping[str, Sequence["InstigatorTick"]]:
if not self._schedule_storage:
return {}
return self._schedule_storage.get_batch_ticks(selector_ids, limit, statuses)
@traced
def get_tick(
self, origin_id: str, selector_id: str, timestamp: float
) -> Optional["InstigatorTick"]:
matches = self._schedule_storage.get_ticks( # type: ignore # (possible none)
origin_id, selector_id, before=timestamp + 1, after=timestamp - 1, limit=1
)
return matches[0] if len(matches) else None
@traced
def get_ticks(
self,
origin_id: str,
selector_id: str,
before: Optional[float] = None,
after: Optional[float] = None,
limit: Optional[int] = None,
statuses: Optional[Sequence["TickStatus"]] = None,
) -> Sequence["InstigatorTick"]:
return self._schedule_storage.get_ticks( # type: ignore # (possible none)
origin_id, selector_id, before=before, after=after, limit=limit, statuses=statuses
)
def create_tick(self, tick_data: "TickData") -> "InstigatorTick":
return check.not_none(self._schedule_storage).create_tick(tick_data)
def update_tick(self, tick: "InstigatorTick"):
return check.not_none(self._schedule_storage).update_tick(tick)
def purge_ticks(
self,
origin_id: str,
selector_id: str,
before: float,
tick_statuses: Optional[Sequence["TickStatus"]] = None,
) -> None:
self._schedule_storage.purge_ticks(origin_id, selector_id, before, tick_statuses) # type: ignore # (possible none)
def wipe_all_schedules(self) -> None:
if self._scheduler:
self._scheduler.wipe(self) # type: ignore # (possible none)
self._schedule_storage.wipe() # type: ignore # (possible none)
def logs_path_for_schedule(self, schedule_origin_id: str) -> str:
return self._scheduler.get_logs_path(self, schedule_origin_id) # type: ignore # (possible none)
def __enter__(self) -> Self:
return self
def __exit__(
self,
exception_type: Optional[Type[BaseException]],
exception_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
self.dispose()
# dagster daemon
def add_daemon_heartbeat(self, daemon_heartbeat: "DaemonHeartbeat") -> None:
"""Called on a regular interval by the daemon."""
self._run_storage.add_daemon_heartbeat(daemon_heartbeat)
def get_daemon_heartbeats(self) -> Mapping[str, "DaemonHeartbeat"]:
"""Latest heartbeats of all daemon types."""
return self._run_storage.get_daemon_heartbeats()
def wipe_daemon_heartbeats(self) -> None:
self._run_storage.wipe_daemon_heartbeats()
def get_required_daemon_types(self) -> Sequence[str]:
from dagster._core.run_coordinator import QueuedRunCoordinator
from dagster._core.scheduler import DagsterDaemonScheduler
from dagster._daemon.asset_daemon import AssetDaemon
from dagster._daemon.auto_run_reexecution.event_log_consumer import EventLogConsumerDaemon
from dagster._daemon.daemon import (
BackfillDaemon,
MonitoringDaemon,
SchedulerDaemon,
SensorDaemon,
)
from dagster._daemon.run_coordinator.queued_run_coordinator_daemon import (
QueuedRunCoordinatorDaemon,
)
if self.is_ephemeral:
return []
daemons = [SensorDaemon.daemon_type(), BackfillDaemon.daemon_type()]
if isinstance(self.scheduler, DagsterDaemonScheduler):
daemons.append(SchedulerDaemon.daemon_type())
if isinstance(self.run_coordinator, QueuedRunCoordinator):
daemons.append(QueuedRunCoordinatorDaemon.daemon_type())
if self.run_monitoring_enabled:
daemons.append(MonitoringDaemon.daemon_type())
if self.run_retries_enabled:
daemons.append(EventLogConsumerDaemon.daemon_type())
if self.auto_materialize_enabled or self.auto_materialize_use_sensors:
daemons.append(AssetDaemon.daemon_type())
return daemons
def get_daemon_statuses(
self, daemon_types: Optional[Sequence[str]] = None
) -> Mapping[str, "DaemonStatus"]:
"""Get the current status of the daemons. If daemon_types aren't provided, defaults to all
required types. Returns a dict of daemon type to status.
"""
from dagster._daemon.controller import get_daemon_statuses
check.opt_sequence_param(daemon_types, "daemon_types", of_type=str)
return get_daemon_statuses(
self, daemon_types=daemon_types or self.get_required_daemon_types(), ignore_errors=True
)
@property
def daemon_skip_heartbeats_without_errors(self) -> bool:
# If enabled, daemon threads won't write heartbeats unless they encounter an error. This is
# enabled in cloud, where we don't need to use heartbeats to check if daemons are running, but
# do need to surface errors to users. This is an optimization to reduce DB writes.
return False
# backfill
def get_backfills(
self,
filters: Optional["BulkActionsFilter"] = None,
cursor: Optional[str] = None,
limit: Optional[int] = None,
status: Optional["BulkActionStatus"] = None,
) -> Sequence["PartitionBackfill"]:
return self._run_storage.get_backfills(
status=status, cursor=cursor, limit=limit, filters=filters
)
def get_backfills_count(self, filters: Optional["BulkActionsFilter"] = None) -> int:
return self._run_storage.get_backfills_count(filters=filters)
def get_backfill(self, backfill_id: str) -> Optional["PartitionBackfill"]:
return self._run_storage.get_backfill(backfill_id)
def add_backfill(self, partition_backfill: "PartitionBackfill") -> None:
self._run_storage.add_backfill(partition_backfill)
def update_backfill(self, partition_backfill: "PartitionBackfill") -> None:
self._run_storage.update_backfill(partition_backfill)
@property
def should_start_background_run_thread(self) -> bool:
"""Gate on an experimental feature to start a thread that monitors for if the run should be canceled."""
return False
def get_tick_retention_settings(
self, instigator_type: "InstigatorType"
) -> Mapping["TickStatus", int]:
from dagster._core.definitions.run_request import InstigatorType
retention_settings = self.get_settings("retention")
if instigator_type == InstigatorType.SCHEDULE:
tick_settings = retention_settings.get("schedule")
elif instigator_type == InstigatorType.SENSOR:
tick_settings = retention_settings.get("sensor")
elif instigator_type == InstigatorType.AUTO_MATERIALIZE:
tick_settings = retention_settings.get("auto_materialize")
else:
raise Exception(f"Unexpected instigator type {instigator_type}")
default_tick_settings = get_default_tick_retention_settings(instigator_type)
return get_tick_retention_settings(tick_settings, default_tick_settings)
def inject_env_vars(self, location_name: Optional[str]) -> None:
if not self._secrets_loader:
return
new_env = self._secrets_loader.get_secrets_for_environment(location_name)
for k, v in new_env.items():
os.environ[k] = v
def get_latest_data_version_record(
self,
key: AssetKey,
is_source: Optional[bool] = None,
partition_key: Optional[str] = None,
before_cursor: Optional[int] = None,
after_cursor: Optional[int] = None,
) -> Optional["EventLogRecord"]:
from dagster._core.storage.event_log.base import AssetRecordsFilter
records_filter = AssetRecordsFilter(
asset_key=key,
asset_partitions=[partition_key] if partition_key else None,
before_storage_id=before_cursor,
after_storage_id=after_cursor,
)
if is_source is True:
# this is a source asset, fetch latest observation record
return next(iter(self.fetch_observations(records_filter, limit=1).records), None)
elif is_source is False:
# this is not a source asset, fetch latest materialization record
return next(iter(self.fetch_materializations(records_filter, limit=1).records), None)
else:
assert is_source is None
# if is_source is None, the requested key could correspond to either a source asset or
# materializable asset. If there is a non-null materialization, we are dealing with a
# materializable asset and should just return that. If not, we should check for any
# observation records that may match.
materialization = next(
iter(self.fetch_materializations(records_filter, limit=1).records), None
)
if materialization:
return materialization
return next(iter(self.fetch_observations(records_filter, limit=1).records), None)
[docs]
@public
def get_latest_materialization_code_versions(
self, asset_keys: Iterable[AssetKey]
) -> Mapping[AssetKey, Optional[str]]:
"""Returns the code version used for the latest materialization of each of the provided
assets.
Args:
asset_keys (Iterable[AssetKey]): The asset keys to find latest materialization code
versions for.
Returns:
Mapping[AssetKey, Optional[str]]: A dictionary with a key for each of the provided asset
keys. The values will be None if the asset has no materializations. If an asset does
not have a code version explicitly assigned to its definitions, but was
materialized, Dagster assigns the run ID as its code version.
"""
result: Dict[AssetKey, Optional[str]] = {}
latest_materialization_events = self.get_latest_materialization_events(asset_keys)
for asset_key in asset_keys:
event_log_entry = latest_materialization_events.get(asset_key)
if event_log_entry is None:
result[asset_key] = None
else:
data_provenance = extract_data_provenance_from_entry(event_log_entry)
result[asset_key] = data_provenance.code_version if data_provenance else None
return result
[docs]
@experimental
@public
def report_runless_asset_event(
self,
asset_event: Union["AssetMaterialization", "AssetObservation", "AssetCheckEvaluation"],
):
"""Record an event log entry related to assets that does not belong to a Dagster run."""
from dagster._core.events import (
AssetMaterialization,
AssetObservationData,
DagsterEvent,
DagsterEventType,
StepMaterializationData,
)
if isinstance(asset_event, AssetMaterialization):
event_type_value = DagsterEventType.ASSET_MATERIALIZATION.value
data_payload = StepMaterializationData(asset_event)
elif isinstance(asset_event, AssetCheckEvaluation):
event_type_value = DagsterEventType.ASSET_CHECK_EVALUATION.value
data_payload = asset_event
elif isinstance(asset_event, AssetObservation):
event_type_value = DagsterEventType.ASSET_OBSERVATION.value
data_payload = AssetObservationData(asset_event)
else:
raise DagsterInvariantViolationError(
f"Received unexpected asset event type {asset_event}, expected"
" AssetMaterialization, AssetObservation or AssetCheckEvaluation"
)
return self.report_dagster_event(
run_id=RUNLESS_RUN_ID,
dagster_event=DagsterEvent(
event_type_value=event_type_value,
event_specific_data=data_payload,
job_name=RUNLESS_JOB_NAME,
),
)
def get_asset_check_support(self) -> "AssetCheckInstanceSupport":
from dagster._core.storage.asset_check_execution_record import AssetCheckInstanceSupport
return (
AssetCheckInstanceSupport.SUPPORTED
if self.event_log_storage.supports_asset_checks
else AssetCheckInstanceSupport.NEEDS_MIGRATION
)
def backfill_log_storage_enabled(self) -> bool:
return False
def da_request_backfills(self) -> bool:
return False