Ask AI

Source code for dagster_mysql.event_log.event_log

from typing import ContextManager, Optional, cast

import dagster._check as check
import sqlalchemy as db
import sqlalchemy.dialects as db_dialects
import sqlalchemy.exc as db_exc
import sqlalchemy.pool as db_pool
from dagster._config.config_schema import UserConfigSchema
from dagster._core.event_api import EventHandlerFn
from dagster._core.events.log import EventLogEntry
from dagster._core.storage.config import MySqlStorageConfig, mysql_config
from dagster._core.storage.event_log import (
    AssetKeyTable,
    SqlEventLogStorage,
    SqlEventLogStorageMetadata,
    SqlPollingEventWatcher,
)
from dagster._core.storage.event_log.base import EventLogCursor
from dagster._core.storage.event_log.migration import ASSET_KEY_INDEX_COLS
from dagster._core.storage.sql import (
    AlembicVersion,
    check_alembic_revision,
    create_engine,
    run_alembic_upgrade,
    stamp_alembic_rev,
)
from dagster._serdes import ConfigurableClass, ConfigurableClassData
from sqlalchemy.engine import Connection

from ..utils import (
    create_mysql_connection,
    mysql_alembic_config,
    mysql_isolation_level,
    mysql_url_from_config,
    retry_mysql_connection_fn,
    retry_mysql_creation_fn,
)


[docs]class MySQLEventLogStorage(SqlEventLogStorage, ConfigurableClass): """MySQL-backed event log storage. Users should not directly instantiate this class; it is instantiated by internal machinery when ``dagster-webserver`` and ``dagster-graphql`` load, based on the values in the ``dagster.yaml`` file in ``$DAGSTER_HOME``. Configuration of this class should be done by setting values in that file. .. literalinclude:: ../../../../../../examples/docs_snippets/docs_snippets/deploying/dagster-mysql-legacy.yaml :caption: dagster.yaml :start-after: start_marker_event_log :end-before: end_marker_event_log :language: YAML Note that the fields in this config are :py:class:`~dagster.StringSource` and :py:class:`~dagster.IntSource` and can be configured from environment variables. """ def __init__(self, mysql_url: str, inst_data: Optional[ConfigurableClassData] = None): self._inst_data = check.opt_inst_param(inst_data, "inst_data", ConfigurableClassData) self.mysql_url = check.str_param(mysql_url, "mysql_url") self._disposed = False self._event_watcher = SqlPollingEventWatcher(self) # Default to not holding any connections open to prevent accumulating connections per DagsterInstance self._engine = create_engine( self.mysql_url, isolation_level=mysql_isolation_level(), poolclass=db_pool.NullPool, ) self._secondary_index_cache = {} table_names = retry_mysql_connection_fn(db.inspect(self._engine).get_table_names) # Stamp and create tables if the main table does not exist (we can't check alembic # revision because alembic config may be shared with other storage classes) if "event_logs" not in table_names: retry_mysql_creation_fn(self._init_db) # mark all secondary indexes to be used self.reindex_events() self.reindex_assets() self._mysql_version = self.get_server_version() super().__init__() def _init_db(self) -> None: with self._connect() as conn: SqlEventLogStorageMetadata.create_all(conn) stamp_alembic_rev(mysql_alembic_config(__file__), conn) def optimize_for_webserver(self, statement_timeout: int, pool_recycle: int) -> None: # When running in dagster-webserver, hold an open connection # https://github.com/dagster-io/dagster/issues/3719 self._engine = create_engine( self.mysql_url, isolation_level=mysql_isolation_level(), pool_size=1, pool_recycle=pool_recycle, ) def upgrade(self) -> None: alembic_config = mysql_alembic_config(__file__) with self._connect() as conn: run_alembic_upgrade(alembic_config, conn) @property def inst_data(self) -> Optional[ConfigurableClassData]: return self._inst_data @classmethod def config_type(cls) -> UserConfigSchema: return mysql_config() @classmethod def from_config_value( cls, inst_data: Optional[ConfigurableClassData], config_value: MySqlStorageConfig ) -> "MySQLEventLogStorage": return MySQLEventLogStorage( inst_data=inst_data, mysql_url=mysql_url_from_config(config_value) ) @staticmethod def wipe_storage(mysql_url: str) -> None: engine = create_engine( mysql_url, isolation_level=mysql_isolation_level(), poolclass=db_pool.NullPool ) try: SqlEventLogStorageMetadata.drop_all(engine) finally: engine.dispose() @staticmethod def create_clean_storage(conn_string: str) -> "MySQLEventLogStorage": MySQLEventLogStorage.wipe_storage(conn_string) return MySQLEventLogStorage(conn_string) def get_server_version(self) -> Optional[str]: with self.index_connection() as conn: row = conn.execute(db.text("select version()")).fetchone() if not row: return None return cast(str, row[0]) def store_asset_event(self, event: EventLogEntry, event_id: int) -> None: # last_materialization_timestamp is updated upon observation, materialization, materialization_planned # See SqlEventLogStorage.store_asset_event method for more details values = self._get_asset_entry_values( event, event_id, self.has_secondary_index(ASSET_KEY_INDEX_COLS) ) with self.index_connection() as conn: if values: conn.execute( db_dialects.mysql.insert(AssetKeyTable) .values( asset_key=event.dagster_event.asset_key.to_string(), # type: ignore # (possible none) **values, ) .on_duplicate_key_update( **values, ) ) else: try: conn.execute( db_dialects.mysql.insert(AssetKeyTable).values( asset_key=event.dagster_event.asset_key.to_string(), # type: ignore # (possible none) ) ) except db_exc.IntegrityError: pass def _connect(self) -> ContextManager[Connection]: return create_mysql_connection(self._engine, __file__, "event log") def run_connection(self, run_id: Optional[str] = None) -> ContextManager[Connection]: return self._connect() def index_connection(self) -> ContextManager[Connection]: return self._connect() def has_table(self, table_name: str) -> bool: with self._connect() as conn: return table_name in db.inspect(conn).get_table_names() def has_secondary_index(self, name: str) -> bool: if name not in self._secondary_index_cache: self._secondary_index_cache[name] = super( MySQLEventLogStorage, self ).has_secondary_index(name) return self._secondary_index_cache[name] def enable_secondary_index(self, name: str) -> None: super(MySQLEventLogStorage, self).enable_secondary_index(name) if name in self._secondary_index_cache: del self._secondary_index_cache[name] def watch(self, run_id: str, cursor: Optional[str], callback: EventHandlerFn) -> None: if cursor and EventLogCursor.parse(cursor).is_offset_cursor(): check.failed("Cannot call `watch` with an offset cursor") self._event_watcher.watch_run(run_id, cursor, callback) def end_watch(self, run_id: str, handler: EventHandlerFn) -> None: self._event_watcher.unwatch_run(run_id, handler) @property def event_watcher(self) -> SqlPollingEventWatcher: return self._event_watcher def __del__(self) -> None: self.dispose() def dispose(self) -> None: if not self._disposed: self._disposed = True self._event_watcher.close() def alembic_version(self) -> AlembicVersion: alembic_config = mysql_alembic_config(__file__) with self._connect() as conn: return check_alembic_revision(alembic_config, conn)