import os
from typing import TYPE_CHECKING, Any, Mapping, NamedTuple, Optional, Sequence, Type
import yaml
import dagster._check as check
from dagster._core.instance.config import DAGSTER_CONFIG_YAML_FILENAME, dagster_instance_config
from dagster._serdes import ConfigurableClassData, class_from_code_pointer, whitelist_for_serdes
if TYPE_CHECKING:
from dagster._core.instance import DagsterInstance, DagsterInstanceOverrides
from dagster._core.launcher.base import RunLauncher
from dagster._core.run_coordinator.base import RunCoordinator
from dagster._core.scheduler.scheduler import Scheduler
from dagster._core.secrets.loader import SecretsLoader
from dagster._core.storage.base_storage import DagsterStorage
from dagster._core.storage.compute_log_manager import ComputeLogManager
from dagster._core.storage.event_log.base import EventLogStorage
from dagster._core.storage.root import LocalArtifactStorage
from dagster._core.storage.runs.base import RunStorage
from dagster._core.storage.schedules.base import ScheduleStorage
def compute_logs_directory(base: str) -> str:
return os.path.join(base, "storage")
def _runs_directory(base: str) -> str:
return os.path.join(base, "history", "")
def _event_logs_directory(base: str) -> str:
return os.path.join(base, "history", "runs", "")
def _schedule_directory(base: str) -> str:
return os.path.join(base, "schedules")
def configurable_class_data(config_field: Mapping[str, Any]) -> ConfigurableClassData:
return ConfigurableClassData(
check.str_elem(config_field, "module"),
check.str_elem(config_field, "class"),
yaml.dump(check.opt_dict_elem(config_field, "config"), default_flow_style=False),
)
def configurable_class_data_or_default(
config_value: Mapping[str, Any], field_name: str, default: Optional[ConfigurableClassData]
) -> Optional[ConfigurableClassData]:
return (
configurable_class_data(config_value[field_name])
if config_value.get(field_name)
else default
)
def configurable_secrets_loader_data(
config_field: Mapping[str, Any], default: Optional[ConfigurableClassData]
) -> Optional[ConfigurableClassData]:
if not config_field:
return default
elif "custom" in config_field:
return configurable_class_data(config_field["custom"])
else:
return None
def configurable_storage_data(
config_field: Mapping[str, Any], defaults: Mapping[str, Optional[ConfigurableClassData]]
) -> Sequence[Optional[ConfigurableClassData]]:
storage_data: ConfigurableClassData
run_storage_data: Optional[ConfigurableClassData]
event_storage_data: Optional[ConfigurableClassData]
schedule_storage_data: Optional[ConfigurableClassData]
if not config_field:
storage_data = check.not_none(defaults.get("storage"))
run_storage_data = check.not_none(defaults.get("run_storage"))
event_storage_data = check.not_none(defaults.get("event_log_storage"))
schedule_storage_data = check.not_none(defaults.get("schedule_storage"))
elif "postgres" in config_field:
config_yaml = yaml.dump(config_field["postgres"], default_flow_style=False)
storage_data = ConfigurableClassData(
module_name="dagster_postgres",
class_name="DagsterPostgresStorage",
config_yaml=config_yaml,
)
# for backwards compatibility
run_storage_data = ConfigurableClassData(
module_name="dagster_postgres",
class_name="PostgresRunStorage",
config_yaml=config_yaml,
)
event_storage_data = ConfigurableClassData(
module_name="dagster_postgres",
class_name="PostgresEventLogStorage",
config_yaml=config_yaml,
)
schedule_storage_data = ConfigurableClassData(
module_name="dagster_postgres",
class_name="PostgresScheduleStorage",
config_yaml=config_yaml,
)
elif "mysql" in config_field:
config_yaml = yaml.dump(config_field["mysql"], default_flow_style=False)
storage_data = ConfigurableClassData(
module_name="dagster_mysql",
class_name="DagsterMySQLStorage",
config_yaml=config_yaml,
)
# for backwards compatibility
run_storage_data = ConfigurableClassData(
module_name="dagster_mysql",
class_name="MySQLRunStorage",
config_yaml=config_yaml,
)
event_storage_data = ConfigurableClassData(
module_name="dagster_mysql",
class_name="MySQLEventLogStorage",
config_yaml=config_yaml,
)
schedule_storage_data = ConfigurableClassData(
module_name="dagster_mysql",
class_name="MySQLScheduleStorage",
config_yaml=config_yaml,
)
elif "sqlite" in config_field:
base_dir = config_field["sqlite"]["base_dir"]
storage_data = ConfigurableClassData(
"dagster._core.storage.sqlite_storage",
"DagsterSqliteStorage",
yaml.dump({"base_dir": base_dir}, default_flow_style=False),
)
# Back-compat fo the legacy storage field only works if the base_dir is a string
# (env var doesn't work since each storage has a different value for the base_dir field)
if isinstance(base_dir, str):
run_storage_data = ConfigurableClassData(
"dagster._core.storage.runs",
"SqliteRunStorage",
yaml.dump({"base_dir": _runs_directory(base_dir)}, default_flow_style=False),
)
event_storage_data = ConfigurableClassData(
"dagster._core.storage.event_log",
"SqliteEventLogStorage",
yaml.dump({"base_dir": _event_logs_directory(base_dir)}, default_flow_style=False),
)
schedule_storage_data = ConfigurableClassData(
"dagster._core.storage.schedules",
"SqliteScheduleStorage",
yaml.dump({"base_dir": _schedule_directory(base_dir)}, default_flow_style=False),
)
else:
run_storage_data = None
event_storage_data = None
schedule_storage_data = None
else:
storage_data = configurable_class_data(config_field["custom"])
storage_config_yaml = yaml.dump(
{
"module_name": storage_data.module_name,
"class_name": storage_data.class_name,
"config_yaml": storage_data.config_yaml,
},
default_flow_style=False,
)
run_storage_data = ConfigurableClassData(
"dagster._core.storage.legacy_storage", "LegacyRunStorage", storage_config_yaml
)
event_storage_data = ConfigurableClassData(
"dagster._core.storage.legacy_storage", "LegacyEventLogStorage", storage_config_yaml
)
schedule_storage_data = ConfigurableClassData(
"dagster._core.storage.legacy_storage", "LegacyScheduleStorage", storage_config_yaml
)
return [storage_data, run_storage_data, event_storage_data, schedule_storage_data]
[docs]
@whitelist_for_serdes
class InstanceRef(
NamedTuple(
"_InstanceRef",
[
("local_artifact_storage_data", ConfigurableClassData),
("compute_logs_data", ConfigurableClassData),
("scheduler_data", Optional[ConfigurableClassData]),
("run_coordinator_data", Optional[ConfigurableClassData]),
("run_launcher_data", Optional[ConfigurableClassData]),
("settings", Mapping[str, object]),
# Required for backwards compatibility, but going forward will be unused by new versions
# of DagsterInstance, which instead will instead grab the constituent storages from the
# unified `storage_data`, if it is populated.
("run_storage_data", Optional[ConfigurableClassData]),
("event_storage_data", Optional[ConfigurableClassData]),
("schedule_storage_data", Optional[ConfigurableClassData]),
("custom_instance_class_data", Optional[ConfigurableClassData]),
# unified storage field
("storage_data", Optional[ConfigurableClassData]),
("secrets_loader_data", Optional[ConfigurableClassData]),
],
)
):
"""Serializable representation of a :py:class:`DagsterInstance`.
Users should not instantiate this class directly.
"""
def __new__(
cls,
local_artifact_storage_data: ConfigurableClassData,
compute_logs_data: ConfigurableClassData,
scheduler_data: Optional[ConfigurableClassData],
run_coordinator_data: Optional[ConfigurableClassData],
run_launcher_data: Optional[ConfigurableClassData],
settings: Mapping[str, object],
run_storage_data: Optional[ConfigurableClassData],
event_storage_data: Optional[ConfigurableClassData],
schedule_storage_data: Optional[ConfigurableClassData],
custom_instance_class_data: Optional[ConfigurableClassData] = None,
storage_data: Optional[ConfigurableClassData] = None,
secrets_loader_data: Optional[ConfigurableClassData] = None,
):
return super(cls, InstanceRef).__new__(
cls,
local_artifact_storage_data=check.inst_param(
local_artifact_storage_data, "local_artifact_storage_data", ConfigurableClassData
),
compute_logs_data=check.inst_param(
compute_logs_data, "compute_logs_data", ConfigurableClassData
),
scheduler_data=check.opt_inst_param(
scheduler_data, "scheduler_data", ConfigurableClassData
),
run_coordinator_data=check.opt_inst_param(
run_coordinator_data, "run_coordinator_data", ConfigurableClassData
),
run_launcher_data=check.opt_inst_param(
run_launcher_data, "run_launcher_data", ConfigurableClassData
),
settings=check.opt_mapping_param(settings, "settings", key_type=str),
run_storage_data=check.opt_inst_param(
run_storage_data, "run_storage_data", ConfigurableClassData
),
event_storage_data=check.opt_inst_param(
event_storage_data, "event_storage_data", ConfigurableClassData
),
schedule_storage_data=check.opt_inst_param(
schedule_storage_data, "schedule_storage_data", ConfigurableClassData
),
custom_instance_class_data=check.opt_inst_param(
custom_instance_class_data,
"instance_class",
ConfigurableClassData,
),
storage_data=check.opt_inst_param(storage_data, "storage_data", ConfigurableClassData),
secrets_loader_data=check.opt_inst_param(
secrets_loader_data, "secrets_loader_data", ConfigurableClassData
),
)
@staticmethod
def config_defaults(base_dir: str) -> Mapping[str, Optional[ConfigurableClassData]]:
default_run_storage_data = ConfigurableClassData(
"dagster._core.storage.runs",
"SqliteRunStorage",
yaml.dump({"base_dir": _runs_directory(base_dir)}, default_flow_style=False),
)
default_event_log_storage_data = ConfigurableClassData(
"dagster._core.storage.event_log",
"SqliteEventLogStorage",
yaml.dump({"base_dir": _event_logs_directory(base_dir)}, default_flow_style=False),
)
default_schedule_storage_data = ConfigurableClassData(
"dagster._core.storage.schedules",
"SqliteScheduleStorage",
yaml.dump({"base_dir": _schedule_directory(base_dir)}, default_flow_style=False),
)
return {
"local_artifact_storage": ConfigurableClassData(
"dagster._core.storage.root",
"LocalArtifactStorage",
yaml.dump({"base_dir": base_dir}, default_flow_style=False),
),
"storage": ConfigurableClassData(
"dagster._core.storage.sqlite_storage",
"DagsterSqliteStorage",
yaml.dump({"base_dir": base_dir}, default_flow_style=False),
),
"compute_logs": ConfigurableClassData(
"dagster._core.storage.local_compute_log_manager",
"LocalComputeLogManager",
yaml.dump({"base_dir": compute_logs_directory(base_dir)}, default_flow_style=False),
),
"scheduler": ConfigurableClassData(
"dagster._core.scheduler",
"DagsterDaemonScheduler",
yaml.dump({}),
),
"run_coordinator": ConfigurableClassData(
"dagster._core.run_coordinator", "DefaultRunCoordinator", yaml.dump({})
),
"run_launcher": ConfigurableClassData(
"dagster",
"DefaultRunLauncher",
yaml.dump({}),
),
# For back-compat, the default is actually set in the secrets_loader property above,
# so that old clients loading new config don't try to load a class that they
# don't recognize
"secrets": None,
# LEGACY DEFAULTS
"run_storage": default_run_storage_data,
"event_log_storage": default_event_log_storage_data,
"schedule_storage": default_schedule_storage_data,
}
@staticmethod
def from_dir(
base_dir: str,
*,
config_dir: Optional[str] = None,
config_filename: str = DAGSTER_CONFIG_YAML_FILENAME,
overrides: Optional["DagsterInstanceOverrides"] = None,
) -> "InstanceRef":
if config_dir is None:
config_dir = base_dir
overrides = check.opt_mapping_param(overrides, "overrides")
config_value, custom_instance_class = dagster_instance_config(
config_dir, config_filename=config_filename, overrides=overrides
)
if custom_instance_class:
config_keys = set(custom_instance_class.config_schema().keys()) # type: ignore # (undefined method)
custom_instance_class_config = {
key: val for key, val in config_value.items() if key in config_keys
}
custom_instance_class_data = ConfigurableClassData(
config_value["instance_class"]["module"],
config_value["instance_class"]["class"],
yaml.dump(custom_instance_class_config, default_flow_style=False),
)
defaults = custom_instance_class.config_defaults(base_dir) # type: ignore # (undefined method)
else:
custom_instance_class_data = None
defaults = InstanceRef.config_defaults(base_dir)
local_artifact_storage_data = configurable_class_data_or_default(
config_value, "local_artifact_storage", defaults["local_artifact_storage"]
)
compute_logs_data = configurable_class_data_or_default(
config_value,
"compute_logs",
defaults["compute_logs"],
)
if (
config_value.get("run_storage")
or config_value.get("event_log_storage")
or config_value.get("schedule_storage")
):
# using legacy config, specifying config for each of the constituent storages, make sure
# to create a composite storage
run_storage_data = configurable_class_data_or_default(
config_value, "run_storage", defaults["run_storage"]
)
event_storage_data = configurable_class_data_or_default(
config_value, "event_log_storage", defaults["event_log_storage"]
)
schedule_storage_data = configurable_class_data_or_default(
config_value, "schedule_storage", defaults["schedule_storage"]
)
storage_data = ConfigurableClassData(
module_name="dagster._core.storage.legacy_storage",
class_name="CompositeStorage",
config_yaml=yaml.dump(
{
"run_storage": {
"module_name": run_storage_data.module_name, # type: ignore # (possible none)
"class_name": run_storage_data.class_name, # type: ignore # (possible none)
"config_yaml": run_storage_data.config_yaml, # type: ignore # (possible none)
},
"event_log_storage": {
"module_name": event_storage_data.module_name, # type: ignore # (possible none)
"class_name": event_storage_data.class_name, # type: ignore # (possible none)
"config_yaml": event_storage_data.config_yaml, # type: ignore # (possible none)
},
"schedule_storage": {
"module_name": schedule_storage_data.module_name, # type: ignore # (possible none)
"class_name": schedule_storage_data.class_name, # type: ignore # (possible none)
"config_yaml": schedule_storage_data.config_yaml, # type: ignore # (possible none)
},
},
default_flow_style=False,
),
)
else:
[
storage_data,
run_storage_data,
event_storage_data,
schedule_storage_data,
] = configurable_storage_data(
config_value.get("storage"), # type: ignore # (possible none)
defaults,
)
scheduler_data = configurable_class_data_or_default(
config_value, "scheduler", defaults["scheduler"]
)
if config_value.get("run_queue"):
run_coordinator_data = configurable_class_data(
{
"module": "dagster.core.run_coordinator",
"class": "QueuedRunCoordinator",
"config": config_value["run_queue"],
}
)
else:
run_coordinator_data = configurable_class_data_or_default(
config_value,
"run_coordinator",
defaults["run_coordinator"],
)
run_launcher_data = configurable_class_data_or_default(
config_value,
"run_launcher",
defaults["run_launcher"],
)
secrets_loader_data = configurable_secrets_loader_data(
config_value.get("secrets"), # type: ignore # (possible none)
defaults["secrets"],
)
settings_keys = {
"telemetry",
"python_logs",
"run_monitoring",
"run_retries",
"code_servers",
"retention",
"backfills",
"sensors",
"schedules",
"nux",
"auto_materialize",
"concurrency",
}
settings = {key: config_value.get(key) for key in settings_keys if config_value.get(key)}
return InstanceRef(
local_artifact_storage_data=local_artifact_storage_data, # type: ignore # (possible none)
run_storage_data=run_storage_data,
event_storage_data=event_storage_data,
compute_logs_data=compute_logs_data, # type: ignore # (possible none)
schedule_storage_data=schedule_storage_data,
scheduler_data=scheduler_data,
run_coordinator_data=run_coordinator_data,
run_launcher_data=run_launcher_data,
settings=settings,
custom_instance_class_data=custom_instance_class_data,
storage_data=storage_data,
secrets_loader_data=secrets_loader_data,
)
@staticmethod
def from_dict(instance_ref_dict):
def value_for_ref_item(k, v):
if v is None:
return None
if k == "settings":
return v
return ConfigurableClassData(*v)
return InstanceRef(**{k: value_for_ref_item(k, v) for k, v in instance_ref_dict.items()}) # pyright: ignore[reportArgumentType]
@property
def local_artifact_storage(self) -> "LocalArtifactStorage":
from dagster._core.storage.root import LocalArtifactStorage
return self.local_artifact_storage_data.rehydrate(as_type=LocalArtifactStorage)
@property
def storage(self) -> Optional["DagsterStorage"]:
from dagster._core.storage.base_storage import DagsterStorage
return self.storage_data.rehydrate(as_type=DagsterStorage) if self.storage_data else None
@property
def run_storage(self) -> Optional["RunStorage"]:
from dagster._core.storage.runs.base import RunStorage
return (
self.run_storage_data.rehydrate(as_type=RunStorage) if self.run_storage_data else None
)
@property
def event_storage(self) -> Optional["EventLogStorage"]:
from dagster._core.storage.event_log.base import EventLogStorage
return (
self.event_storage_data.rehydrate(as_type=EventLogStorage)
if self.event_storage_data
else None
)
@property
def schedule_storage(self) -> Optional["ScheduleStorage"]:
from dagster._core.storage.schedules.base import ScheduleStorage
return (
self.schedule_storage_data.rehydrate(as_type=ScheduleStorage)
if self.schedule_storage_data
else None
)
@property
def compute_log_manager(self) -> "ComputeLogManager":
from dagster._core.storage.compute_log_manager import ComputeLogManager
return self.compute_logs_data.rehydrate(as_type=ComputeLogManager)
@property
def scheduler(self) -> Optional["Scheduler"]:
from dagster._core.scheduler.scheduler import Scheduler
return self.scheduler_data.rehydrate(as_type=Scheduler) if self.scheduler_data else None
@property
def run_coordinator(self) -> Optional["RunCoordinator"]:
from dagster._core.run_coordinator.base import RunCoordinator
return (
self.run_coordinator_data.rehydrate(as_type=RunCoordinator)
if self.run_coordinator_data
else None
)
@property
def run_launcher(self) -> Optional["RunLauncher"]:
from dagster._core.launcher.base import RunLauncher
return (
self.run_launcher_data.rehydrate(as_type=RunLauncher)
if self.run_launcher_data
else None
)
@property
def secrets_loader(self) -> Optional["SecretsLoader"]:
from dagster._core.secrets.loader import SecretsLoader
# Defining a default here rather than in stored config to avoid
# back-compat issues when loading the config on older versions where
# EnvFileLoader was not defined
return (
self.secrets_loader_data.rehydrate(as_type=SecretsLoader)
if self.secrets_loader_data
else None
)
@property
def custom_instance_class(self) -> Type["DagsterInstance"]:
return ( # type: ignore # (ambiguous return type)
class_from_code_pointer(
self.custom_instance_class_data.module_name,
self.custom_instance_class_data.class_name,
)
if self.custom_instance_class_data
else None
)
@property
def custom_instance_class_config(self) -> Mapping[str, Any]:
return (
self.custom_instance_class_data.config_dict if self.custom_instance_class_data else {}
)
def to_dict(self) -> Mapping[str, Any]:
return self._asdict()