Ask AI

Source code for dagster_aws.pipes.clients.lambda_

import json
from typing import Any, Mapping, Optional, Union

import boto3
from dagster import PipesClient
from dagster._annotations import public
from dagster._core.definitions.resource_annotation import TreatAsResourceParam
from dagster._core.execution.context.asset_execution_context import AssetExecutionContext
from dagster._core.execution.context.compute import OpExecutionContext
from dagster._core.pipes.client import (
    PipesClientCompletedInvocation,
    PipesContextInjector,
    PipesMessageReader,
)
from dagster._core.pipes.utils import open_pipes_session

from dagster_aws.pipes.context_injectors import PipesLambdaEventContextInjector
from dagster_aws.pipes.message_readers import PipesLambdaLogsMessageReader


[docs] class PipesLambdaClient(PipesClient, TreatAsResourceParam): """A pipes client for invoking AWS lambda. By default context is injected via the lambda input event and messages are parsed out of the 4k tail of logs. Args: client (boto3.client): The boto lambda client used to call invoke. context_injector (Optional[PipesContextInjector]): A context injector to use to inject context into the lambda function. Defaults to :py:class:`PipesLambdaEventContextInjector`. message_reader (Optional[PipesMessageReader]): A message reader to use to read messages from the lambda function. Defaults to :py:class:`PipesLambdaLogsMessageReader`. """ def __init__( self, client: Optional[boto3.client] = None, # pyright: ignore (reportGeneralTypeIssues) context_injector: Optional[PipesContextInjector] = None, message_reader: Optional[PipesMessageReader] = None, ): self._client = client or boto3.client("lambda") self._message_reader = message_reader or PipesLambdaLogsMessageReader() self._context_injector = context_injector or PipesLambdaEventContextInjector() @classmethod def _is_dagster_maintained(cls) -> bool: return True
[docs] @public def run( self, *, function_name: str, event: Mapping[str, Any], context: Union[OpExecutionContext, AssetExecutionContext], ) -> PipesClientCompletedInvocation: """Synchronously invoke a lambda function, enriched with the pipes protocol. Args: function_name (str): The name of the function to use. event (Mapping[str, Any]): A JSON serializable object to pass as input to the lambda. context (Union[OpExecutionContext, AssetExecutionContext]): The context of the currently executing Dagster op or asset. Returns: PipesClientCompletedInvocation: Wrapper containing results reported by the external process. """ with open_pipes_session( context=context, message_reader=self._message_reader, context_injector=self._context_injector, ) as session: other_kwargs = {} if isinstance(self._message_reader, PipesLambdaLogsMessageReader): other_kwargs["LogType"] = "Tail" if isinstance(self._context_injector, PipesLambdaEventContextInjector): payload_data = { **event, **session.get_bootstrap_env_vars(), } else: payload_data = event response = self._client.invoke( FunctionName=function_name, InvocationType="RequestResponse", Payload=json.dumps(payload_data), **other_kwargs, ) if isinstance(self._message_reader, PipesLambdaLogsMessageReader): self._message_reader.consume_lambda_logs(response) if "FunctionError" in response: err_payload = json.loads(response["Payload"].read().decode("utf-8")) raise Exception( f"Lambda Function Error ({response['FunctionError']}):\n{json.dumps(err_payload, indent=2)}" ) # should probably have a way to return the lambda result payload return PipesClientCompletedInvocation(session)