Ask AI

Source code for dagster._core.storage.schedules.sqlite.sqlite_schedule_storage

from contextlib import contextmanager
from typing import Iterator, Optional

import sqlalchemy as db
from packaging.version import parse
from sqlalchemy.engine import Connection
from sqlalchemy.pool import NullPool

from dagster import (
    StringSource,
    _check as check,
)
from dagster._config.config_schema import UserConfigSchema
from dagster._core.storage.sql import (
    AlembicVersion,
    check_alembic_revision,
    create_engine,
    get_alembic_config,
    run_alembic_upgrade,
    stamp_alembic_rev,
)
from dagster._core.storage.sqlite import create_db_conn_string, get_sqlite_version
from dagster._serdes import ConfigurableClass, ConfigurableClassData
from dagster._utils import mkdir_p

from ..schema import ScheduleStorageSqlMetadata
from ..sql_schedule_storage import SqlScheduleStorage

MINIMUM_SQLITE_BATCH_VERSION = "3.25.0"


[docs]class SqliteScheduleStorage(SqlScheduleStorage, ConfigurableClass): """Local SQLite backed schedule storage.""" def __init__(self, conn_string: str, inst_data: Optional[ConfigurableClassData] = None): check.str_param(conn_string, "conn_string") self._conn_string = conn_string self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData) super().__init__() @property def inst_data(self) -> Optional[ConfigurableClassData]: return self._inst_data @classmethod def config_type(cls) -> UserConfigSchema: return {"base_dir": StringSource} @classmethod def from_config_value( cls, inst_data: Optional[ConfigurableClassData], config_value ) -> "SqliteScheduleStorage": return SqliteScheduleStorage.from_local(inst_data=inst_data, **config_value) @classmethod def from_local( cls, base_dir: str, inst_data: Optional[ConfigurableClassData] = None ) -> "SqliteScheduleStorage": check.str_param(base_dir, "base_dir") mkdir_p(base_dir) conn_string = create_db_conn_string(base_dir, "schedules") engine = create_engine(conn_string, poolclass=NullPool) alembic_config = get_alembic_config(__file__) should_migrate_data = False with engine.connect() as connection: db_revision, head_revision = check_alembic_revision(alembic_config, connection) if not (db_revision and head_revision): ScheduleStorageSqlMetadata.create_all(engine) connection.execute(db.text("PRAGMA journal_mode=WAL;")) stamp_alembic_rev(alembic_config, connection) should_migrate_data = True schedule_storage = cls(conn_string, inst_data) if should_migrate_data: schedule_storage.migrate() schedule_storage.optimize() return schedule_storage @contextmanager def connect(self) -> Iterator[Connection]: engine = create_engine(self._conn_string, poolclass=NullPool) with engine.connect() as conn: with conn.begin(): yield conn @property def supports_batch_queries(self) -> bool: if not super().supports_batch_queries: return False return super().supports_batch_queries and parse(get_sqlite_version()) >= parse( MINIMUM_SQLITE_BATCH_VERSION ) def upgrade(self) -> None: alembic_config = get_alembic_config(__file__) with self.connect() as conn: run_alembic_upgrade(alembic_config, conn) def alembic_version(self) -> AlembicVersion: alembic_config = get_alembic_config(__file__) with self.connect() as conn: return check_alembic_revision(alembic_config, conn)