import os
from contextlib import contextmanager
from typing import TYPE_CHECKING, Iterator, Optional
from urllib.parse import urljoin, urlparse
import sqlalchemy as db
from sqlalchemy.engine import Connection
from sqlalchemy.pool import NullPool
from typing_extensions import Self
from dagster import (
StringSource,
_check as check,
)
from dagster._config.config_schema import UserConfigSchema
from dagster._core.storage.runs.schema import (
InstanceInfo,
RunsTable,
RunStorageSqlMetadata,
RunTagsTable,
)
from dagster._core.storage.runs.sql_run_storage import SqlRunStorage
from dagster._core.storage.sql import (
AlembicVersion,
check_alembic_revision,
create_engine,
get_alembic_config,
run_alembic_downgrade,
run_alembic_upgrade,
stamp_alembic_rev,
)
from dagster._core.storage.sqlite import create_db_conn_string
from dagster._serdes import ConfigurableClass, ConfigurableClassData
from dagster._utils import mkdir_p
if TYPE_CHECKING:
from dagster._core.storage.sqlite_storage import SqliteStorageConfig
MINIMUM_SQLITE_BUCKET_VERSION = [3, 25, 0]
[docs]
class SqliteRunStorage(SqlRunStorage, ConfigurableClass):
"""SQLite-backed run storage.
Users should not directly instantiate this class; it is instantiated by internal machinery when
``dagster-webserver`` and ``dagster-graphql`` load, based on the values in the ``dagster.yaml`` file in
``$DAGSTER_HOME``. Configuration of this class should be done by setting values in that file.
This is the default run storage when none is specified in the ``dagster.yaml``.
To explicitly specify SQLite for run storage, you can add a block such as the following to your
``dagster.yaml``:
.. code-block:: YAML
run_storage:
module: dagster._core.storage.runs
class: SqliteRunStorage
config:
base_dir: /path/to/dir
The ``base_dir`` param tells the run storage where on disk to store the database.
"""
def __init__(self, conn_string: str, inst_data: Optional[ConfigurableClassData] = None):
check.str_param(conn_string, "conn_string")
self._conn_string = conn_string
self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData)
super().__init__()
@property
def inst_data(self) -> Optional[ConfigurableClassData]:
return self._inst_data
@classmethod
def config_type(cls) -> UserConfigSchema:
return {"base_dir": StringSource}
@classmethod
def from_config_value(
cls, inst_data: Optional[ConfigurableClassData], config_value: "SqliteStorageConfig"
) -> "SqliteRunStorage":
return SqliteRunStorage.from_local(inst_data=inst_data, **config_value)
@classmethod
def from_local(cls, base_dir: str, inst_data: Optional[ConfigurableClassData] = None) -> Self:
check.str_param(base_dir, "base_dir")
mkdir_p(base_dir)
conn_string = create_db_conn_string(base_dir, "runs")
engine = create_engine(conn_string, poolclass=NullPool)
alembic_config = get_alembic_config(__file__)
should_mark_indexes = False
with engine.connect() as connection:
db_revision, head_revision = check_alembic_revision(alembic_config, connection)
if not (db_revision and head_revision):
RunStorageSqlMetadata.create_all(engine)
connection.execute(db.text("PRAGMA journal_mode=WAL;"))
stamp_alembic_rev(alembic_config, connection)
should_mark_indexes = True
table_names = db.inspect(engine).get_table_names()
if "instance_info" not in table_names:
InstanceInfo.create(engine)
run_storage = cls(conn_string, inst_data)
if should_mark_indexes:
run_storage.migrate()
run_storage.optimize()
return run_storage
@contextmanager
def connect(self) -> Iterator[Connection]:
engine = create_engine(self._conn_string, poolclass=NullPool)
with engine.connect() as conn:
with conn.begin():
yield conn
def _alembic_upgrade(self, rev: str = "head") -> None:
alembic_config = get_alembic_config(__file__)
with self.connect() as conn:
run_alembic_upgrade(alembic_config, conn, rev=rev)
def _alembic_downgrade(self, rev: str = "head") -> None:
alembic_config = get_alembic_config(__file__)
with self.connect() as conn:
run_alembic_downgrade(alembic_config, conn, rev=rev)
def upgrade(self) -> None:
self._check_for_version_066_migration_and_perform()
self._alembic_upgrade()
# In version 0.6.6, we changed the layout of the of the sqllite dbs on disk
# to move from the root of DAGSTER_HOME/runs.db to DAGSTER_HOME/history/runs.bd
# This function checks for that condition and does the move
def _check_for_version_066_migration_and_perform(self) -> None:
old_conn_string = "sqlite://" + urljoin(urlparse(self._conn_string).path, "../runs.db")
path_to_old_db = urlparse(old_conn_string).path
# sqlite URLs look like `sqlite:///foo/bar/baz on Unix/Mac` but on Windows they look like
# `sqlite:///D:/foo/bar/baz` (or `sqlite:///D:\foo\bar\baz`)
if os.name == "nt":
path_to_old_db = path_to_old_db.lstrip("/")
if os.path.exists(path_to_old_db):
old_storage = SqliteRunStorage(old_conn_string)
old_runs = old_storage.get_runs()
for run in old_runs:
self.add_run(run)
os.unlink(path_to_old_db)
def delete_run(self, run_id: str) -> None:
"""Override the default sql delete run implementation until we can get full
support on cascading deletes.
"""
check.str_param(run_id, "run_id")
remove_tags = db.delete(RunTagsTable).where(RunTagsTable.c.run_id == run_id)
remove_run = db.delete(RunsTable).where(RunsTable.c.run_id == run_id)
with self.connect() as conn:
conn.execute(remove_tags)
conn.execute(remove_run)
def alembic_version(self) -> AlembicVersion:
alembic_config = get_alembic_config(__file__)
with self.connect() as conn:
return check_alembic_revision(alembic_config, conn)