Ask AI

Source code for dagster_aws.pipes.clients.glue

import time
from typing import TYPE_CHECKING, Any, Dict, Optional, Union, cast

import boto3
import dagster._check as check
from botocore.exceptions import ClientError
from dagster import MetadataValue, PipesClient
from dagster._annotations import experimental, public
from dagster._core.definitions.metadata import RawMetadataMapping
from dagster._core.definitions.resource_annotation import TreatAsResourceParam
from dagster._core.errors import DagsterExecutionInterruptedError
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster._core.execution.context.compute import OpExecutionContext
from dagster._core.pipes.client import (
    PipesClientCompletedInvocation,
    PipesContextInjector,
    PipesMessageReader,
)
from dagster._core.pipes.utils import open_pipes_session

from dagster_aws.pipes.message_readers import PipesCloudWatchLogReader, PipesCloudWatchMessageReader

if TYPE_CHECKING:
    from mypy_boto3_glue.client import GlueClient
    from mypy_boto3_glue.type_defs import GetJobRunResponseTypeDef, StartJobRunRequestRequestTypeDef


[docs] @experimental class PipesGlueClient(PipesClient, TreatAsResourceParam): """A pipes client for invoking AWS Glue jobs. Args: context_injector (Optional[PipesContextInjector]): A context injector to use to inject context into the Glue job, for example, :py:class:`PipesS3ContextInjector`. message_reader (Optional[PipesMessageReader]): A message reader to use to read messages from the glue job run. Defaults to :py:class:`PipesCloudWatchsMessageReader`. When provided with :py:class:`PipesCloudWatchMessageReader`, it will be used to recieve logs and events from the ``.../output/<job-run-id>`` CloudWatch log stream created by AWS Glue. Note that AWS Glue routes both ``stderr`` and ``stdout`` from the main job process into this LogStream. client (Optional[boto3.client]): The boto Glue client used to launch the Glue job forward_termination (bool): Whether to cancel the Glue job run when the Dagster process receives a termination signal. """ def __init__( self, context_injector: PipesContextInjector, message_reader: Optional[PipesMessageReader] = None, client: Optional["GlueClient"] = None, forward_termination: bool = True, ): self._client: GlueClient = client or boto3.client("glue") self._context_injector = context_injector self._message_reader = message_reader or PipesCloudWatchMessageReader() self.forward_termination = check.bool_param(forward_termination, "forward_termination") @classmethod def _is_dagster_maintained(cls) -> bool: return True
[docs] @public def run( self, *, context: Union[OpExecutionContext, AssetExecutionContext], start_job_run_params: "StartJobRunRequestRequestTypeDef", extras: Optional[Dict[str, Any]] = None, ) -> PipesClientCompletedInvocation: """Start a Glue job, enriched with the pipes protocol. See also: `AWS API Documentation <https://docs.aws.amazon.com/goto/WebAPI/glue-2017-03-31/StartJobRun>`_ Args: context (Union[OpExecutionContext, AssetExecutionContext]): The context of the currently executing Dagster op or asset. start_job_run_params (Dict): Parameters for the ``start_job_run`` boto3 Glue client call. extras (Optional[Dict[str, Any]]): Additional Dagster metadata to pass to the Glue job. Returns: PipesClientCompletedInvocation: Wrapper containing results reported by the external process. """ params = start_job_run_params params["Arguments"] = params.get("Arguments") or {} job_name = cast(str, params["JobName"]) with open_pipes_session( context=context, message_reader=self._message_reader, context_injector=self._context_injector, extras=extras, ) as session: pipes_args = session.get_bootstrap_cli_arguments() params["Arguments"].update(pipes_args) # pyright: ignore (reportAttributeAccessIssue) # Note: AWS Glue doesn't have a concept of Tags so we can't inject Dagster tags from session.default_remote_invocation_info try: run_id = self._client.start_job_run(**params)["JobRunId"] except ClientError as err: context.log.error( "Couldn't create job %s. Here's why: %s: %s", job_name, err.response["Error"]["Code"], # pyright: ignore (reportTypedDictNotRequiredAccess) err.response["Error"]["Message"], # pyright: ignore (reportTypedDictNotRequiredAccess) ) raise response = self._client.get_job_run(JobName=job_name, RunId=run_id) context.log.info(f"Started AWS Glue job {job_name} run: {run_id}") try: response = self._wait_for_job_run_completion(job_name, run_id) except DagsterExecutionInterruptedError: if self.forward_termination: self._terminate_job_run(context=context, job_name=job_name, run_id=run_id) raise status = response["JobRun"]["JobRunState"] # pyright: ignore (reportOptionalSubscript) if status != "SUCCEEDED": raise RuntimeError( f"Glue job {job_name} run {run_id} completed with status {status} :\n{response['JobRun'].get('ErrorMessage')}" ) else: context.log.info(f"Glue job {job_name} run {run_id} completed successfully") # Glue is dumping both stdout and stderr to the same log group called */output if isinstance(self._message_reader, PipesCloudWatchMessageReader): log_group = response.get("JobRun", {}).get("LogGroupName") if log_group is None: context.log.warning( f"CloudWatch logging is not enabled for Glue job {job_name}. Messages and logs will not be available." ) else: session.report_launched( { "extras": {"log_group": f"{log_group}/output", "log_stream": run_id}, } ) self._message_reader.add_log_reader( PipesCloudWatchLogReader( client=self._message_reader.client, log_group=f"{log_group}/output", log_stream=run_id, start_time=int(session.created_at.timestamp() * 1000), ), ) return PipesClientCompletedInvocation( session, metadata=self._extract_dagster_metadata(response) )
def _wait_for_job_run_completion( self, job_name: str, run_id: str ) -> "GetJobRunResponseTypeDef": while True: response = self._client.get_job_run(JobName=job_name, RunId=run_id) # https://docs.aws.amazon.com/glue/latest/dg/job-run-statuses.html if response["JobRun"]["JobRunState"] in [ # pyright: ignore (reportTypedDictNotRequiredAccess) "FAILED", "SUCCEEDED", "STOPPED", "TIMEOUT", "ERROR", ]: return response time.sleep(5) def _extract_dagster_metadata(self, response: "GetJobRunResponseTypeDef") -> RawMetadataMapping: job_run = response["JobRun"] metadata: RawMetadataMapping = {} if job_run_id := job_run.get("Id"): metadata["AWS Glue Job Run ID"] = job_run_id if job_name := job_run.get("JobName"): metadata["AWS Glue Job Name"] = job_name if job_run_id is not None and job_name is not None: metadata["AWS Glue Job Run URL"] = MetadataValue.url( f"https://{self._client.meta.region_name}.console.aws.amazon.com/gluestudio/home?region={self._client.meta.region_name}#/job/{job_name}/run/{job_run_id}" ) return metadata def _terminate_job_run( self, context: Union[OpExecutionContext, AssetExecutionContext], job_name: str, run_id: str ): """Creates a handler which will gracefully stop the Run in case of external termination. It will stop the Glue job before doing so. """ context.log.warning(f"[pipes] execution interrupted, stopping Glue job run {run_id}...") response = self._client.batch_stop_job_run(JobName=job_name, JobRunIds=[run_id]) runs = response["SuccessfulSubmissions"] if len(runs) > 0: context.log.warning(f"Successfully stopped Glue job run {run_id}.") else: context.log.warning( f"Something went wrong during Glue job run termination: {response['errors']}" # pyright: ignore (reportGeneralTypeIssues) )