Source code for dagster_aws.pipes.context_injectors
import json
import os
import random
import string
from contextlib import contextmanager
from typing import TYPE_CHECKING, Iterator
import boto3
import dagster._check as check
from dagster._core.pipes.client import PipesContextInjector, PipesParams
from dagster._core.pipes.utils import PipesEnvContextInjector
if TYPE_CHECKING:
from dagster_pipes import PipesContextData
_CONTEXT_FILENAME = "context.json"
[docs]
class PipesS3ContextInjector(PipesContextInjector):
"""A context injector that injects context by writing to a temporary S3 location.
Args:
bucket (str): The S3 bucket to write to.
client (boto3.client): A boto3 client to use to write to S3.
key_prefix (Optional[str]): An optional prefix to use for the S3 key. Defaults to a random
string.
"""
def __init__(self, *, bucket: str, client: boto3.client): # pyright: ignore (reportGeneralTypeIssues)
super().__init__()
self.bucket = check.str_param(bucket, "bucket")
self.client = client
@contextmanager
def inject_context(self, context: "PipesContextData") -> Iterator[PipesParams]:
key_prefix = "".join(random.choices(string.ascii_letters, k=30))
key = os.path.join(key_prefix, _CONTEXT_FILENAME)
self.client.put_object(
Body=json.dumps(context).encode("utf-8"), Bucket=self.bucket, Key=key
)
yield {"bucket": self.bucket, "key": key}
self.client.delete_object(Bucket=self.bucket, Key=key)
def no_messages_debug_text(self) -> str:
return (
"Attempted to inject context via a temporary file in s3. Expected"
" PipesS3ContextLoader to be explicitly passed to open_dagster_pipes in the external"
" process."
)
[docs]
class PipesLambdaEventContextInjector(PipesEnvContextInjector):
"""Injects context via AWS Lambda event input.
Should be paired with :py:class`~dagster_pipes.PipesMappingParamsLoader` on the Lambda side.
"""
def no_messages_debug_text(self) -> str:
return "Attempted to inject context via the lambda event input."