Ask AI

Source code for dagster._core.pipes.subprocess

import os
import signal
from subprocess import PIPE, Popen
from typing import Mapping, Optional, Sequence, Union

from dagster_pipes import PipesExtras

from dagster import _check as check
from dagster._annotations import public
from dagster._core.definitions.resource_annotation import TreatAsResourceParam
from dagster._core.errors import DagsterExecutionInterruptedError, DagsterPipesExecutionError
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster._core.execution.context.op_execution_context import OpExecutionContext
from dagster._core.pipes.client import (
    PipesClient,
    PipesClientCompletedInvocation,
    PipesContextInjector,
    PipesMessageReader,
)
from dagster._core.pipes.utils import (
    PipesTempFileContextInjector,
    PipesTempFileMessageReader,
    open_pipes_session,
)


[docs] class PipesSubprocessClient(PipesClient, TreatAsResourceParam): """A pipes client that runs a subprocess with the given command and environment. By default parameters are injected via environment variables. Context is passed via a temp file, and structured messages are read from from a temp file. Args: env (Optional[Mapping[str, str]]): An optional dict of environment variables to pass to the subprocess. cwd (Optional[str]): Working directory in which to launch the subprocess command. context_injector (Optional[PipesContextInjector]): A context injector to use to inject context into the subprocess. Defaults to :py:class:`PipesTempFileContextInjector`. message_reader (Optional[PipesMessageReader]): A message reader to use to read messages from the subprocess. Defaults to :py:class:`PipesTempFileMessageReader`. forward_termination (bool): Whether to send a SIGINT signal to the subprocess if the orchestration process is interrupted or canceled. Defaults to True. forward_stdio (bool): Whether to forward stdout and stderr from the subprocess to the orchestration process. Defaults to True. termination_timeout_seconds (float): How long to wait after forwarding termination for the subprocess to exit. Defaults to 20. """ def __init__( self, env: Optional[Mapping[str, str]] = None, cwd: Optional[str] = None, context_injector: Optional[PipesContextInjector] = None, message_reader: Optional[PipesMessageReader] = None, forward_termination: bool = True, forward_stdio: bool = True, termination_timeout_seconds: float = 20, ): self.env = check.opt_mapping_param(env, "env", key_type=str, value_type=str) self.cwd = check.opt_str_param(cwd, "cwd") self.context_injector = ( check.opt_inst_param( context_injector, "context_injector", PipesContextInjector, ) or PipesTempFileContextInjector() ) self.message_reader = ( check.opt_inst_param( message_reader, "message_reader", PipesMessageReader, ) or PipesTempFileMessageReader() ) self.forward_termination = check.bool_param(forward_termination, "forward_termination") self.forward_stdio = check.bool_param(forward_stdio, "forward_stdio") self.termination_timeout_seconds = check.numeric_param( termination_timeout_seconds, "termination_timeout_seconds" ) @classmethod def _is_dagster_maintained(cls) -> bool: return True
[docs] @public def run( self, *, context: Union[OpExecutionContext, AssetExecutionContext], extras: Optional[PipesExtras] = None, command: Union[str, Sequence[str]], env: Optional[Mapping[str, str]] = None, cwd: Optional[str] = None, ) -> PipesClientCompletedInvocation: """Synchronously execute a subprocess with in a pipes session. Args: command (Union[str, Sequence[str]]): The command to run. Will be passed to `subprocess.Popen()`. context (Union[OpExecutionContext, AssetExecutionContext]): The context from the executing op or asset. extras (Optional[PipesExtras]): An optional dict of extra parameters to pass to the subprocess. env (Optional[Mapping[str, str]]): An optional dict of environment variables to pass to the subprocess. cwd (Optional[str]): Working directory in which to launch the subprocess command. Returns: PipesClientCompletedInvocation: Wrapper containing results reported by the external process. """ with open_pipes_session( context=context, context_injector=self.context_injector, message_reader=self.message_reader, extras=extras, ) as pipes_session: process = Popen( command, cwd=cwd or self.cwd, env={ **os.environ, **self.env, **(env or {}), **pipes_session.get_bootstrap_env_vars(), }, stdout=None if self.forward_stdio else PIPE, stderr=None if self.forward_stdio else PIPE, ) try: process.wait() if process.returncode != 0: raise DagsterPipesExecutionError( f"External execution process failed with code {process.returncode}" ) except DagsterExecutionInterruptedError: if self.forward_termination: context.log.info("[pipes] execution interrupted, sending SIGINT to subprocess.") # send sigint to give external process chance to exit gracefully process.send_signal(signal.SIGINT) process.wait(timeout=self.termination_timeout_seconds) raise return PipesClientCompletedInvocation(pipes_session)