Ask AI

Source code for dagster._core.instance_for_test

import os
import sys
import tempfile
from contextlib import ExitStack, contextmanager
from typing import Any, Iterator, Mapping, Optional

import yaml

from dagster._utils.error import serializable_error_info_from_exc_info

from .._utils.env import environ
from .._utils.merger import merge_dicts
from .instance import DagsterInstance


[docs]@contextmanager def instance_for_test( overrides: Optional[Mapping[str, Any]] = None, set_dagster_home: bool = True, temp_dir: Optional[str] = None, ) -> Iterator[DagsterInstance]: """Creates a persistent :py:class:`~dagster.DagsterInstance` available within a context manager. When a context manager is opened, if no `temp_dir` parameter is set, a new temporary directory will be created for the duration of the context manager's opening. If the `set_dagster_home` parameter is set to True (True by default), the `$DAGSTER_HOME` environment variable will be overridden to be this directory (or the directory passed in by `temp_dir`) for the duration of the context manager being open. Args: overrides (Optional[Mapping[str, Any]]): Config to provide to instance (config format follows that typically found in an `instance.yaml` file). set_dagster_home (Optional[bool]): If set to True, the `$DAGSTER_HOME` environment variable will be overridden to be the directory used by this instance for the duration that the context manager is open. Upon the context manager closing, the `$DAGSTER_HOME` variable will be re-set to the original value. (Defaults to True). temp_dir (Optional[str]): The directory to use for storing local artifacts produced by the instance. If not set, a temporary directory will be created for the duration of the context manager being open, and all artifacts will be torn down afterward. """ with ExitStack() as stack: if not temp_dir: temp_dir = stack.enter_context(tempfile.TemporaryDirectory()) # wait for any grpc processes that created runs during test disposal to finish, # since they might also be using this instance's tempdir (and to keep each test # isolated / avoid race conditions in newer versions of grpcio when servers are # shutting down and spinning up at the same time) instance_overrides = merge_dicts( { "telemetry": {"enabled": False}, "code_servers": {"wait_for_local_processes_on_shutdown": True}, }, (overrides if overrides else {}), ) if set_dagster_home: stack.enter_context( environ({"DAGSTER_HOME": temp_dir, "DAGSTER_DISABLE_TELEMETRY": "yes"}) ) with open(os.path.join(temp_dir, "dagster.yaml"), "w", encoding="utf8") as fd: yaml.dump(instance_overrides, fd, default_flow_style=False) with DagsterInstance.from_config(temp_dir) as instance: try: yield instance except: sys.stderr.write( "Test raised an exception, attempting to clean up instance:" + serializable_error_info_from_exc_info(sys.exc_info()).to_string() + "\n" ) raise finally: cleanup_test_instance(instance)
def cleanup_test_instance(instance: DagsterInstance) -> None: # To avoid filesystem contention when we close the temporary directory, wait for # all runs to reach a terminal state, and close any subprocesses or threads # that might be accessing the run history DB. # Since launcher is lazy loaded, we don't need to do anyting if it's None if instance._run_launcher: # noqa: SLF001 instance._run_launcher.join() # noqa: SLF001