from typing import Any, Optional, TypeVar
from dagster import ConfigurableResource, IAttachDifferentObjectToOpContext, resource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from pydantic import Field
from dagster_aws.s3.file_manager import S3FileManager
from dagster_aws.s3.utils import construct_s3_client
T = TypeVar("T")
class ResourceWithS3Configuration(ConfigurableResource):
use_unsigned_session: bool = Field(
default=False, description="Specifies whether to use an unsigned S3 session."
)
region_name: Optional[str] = Field(
default=None, description="Specifies a custom region for the S3 session."
)
endpoint_url: Optional[str] = Field(
default=None, description="Specifies a custom endpoint for the S3 session."
)
max_attempts: int = Field(
default=5,
description=(
"This provides Boto3's retry handler with a value of maximum retry attempts, where the"
" initial call counts toward the max_attempts value that you provide."
),
)
profile_name: Optional[str] = Field(
default=None, description="Specifies a profile to connect that session."
)
use_ssl: bool = Field(
default=True, description="Whether or not to use SSL. By default, SSL is used."
)
verify: Optional[bool] = Field(
default=None,
description=(
"Whether or not to verify SSL certificates. By default SSL certificates are verified."
),
)
aws_access_key_id: Optional[str] = Field(
default=None, description="AWS access key ID to use when creating the boto3 session."
)
aws_secret_access_key: Optional[str] = Field(
default=None, description="AWS secret access key to use when creating the boto3 session."
)
aws_session_token: Optional[str] = Field(
default=None, description="AWS session token to use when creating the boto3 session."
)
[docs]
class S3Resource(ResourceWithS3Configuration, IAttachDifferentObjectToOpContext):
"""Resource that gives access to S3.
The underlying S3 session is created by calling
:py:func:`boto3.session.Session(profile_name) <boto3:boto3.session>`.
The returned resource object is an S3 client, an instance of `botocore.client.S3`.
Example:
.. code-block:: python
from dagster import job, op, Definitions
from dagster_aws.s3 import S3Resource
@op
def example_s3_op(s3: S3Resource):
return s3.get_client().list_objects_v2(
Bucket='my-bucket',
Prefix='some-key'
)
@job
def example_job():
example_s3_op()
defs = Definitions(
jobs=[example_job],
resources={'s3': S3Resource(region_name='us-west-1')}
)
"""
@classmethod
def _is_dagster_maintained(cls) -> bool:
return True
def get_client(self) -> Any:
return construct_s3_client(
max_attempts=self.max_attempts,
region_name=self.region_name,
endpoint_url=self.endpoint_url,
use_unsigned_session=self.use_unsigned_session,
profile_name=self.profile_name,
use_ssl=self.use_ssl,
verify=self.verify,
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token,
)
def get_object_to_set_on_execution_context(self) -> Any:
return self.get_client()
[docs]
@dagster_maintained_resource
@resource(config_schema=S3Resource.to_config_schema())
def s3_resource(context) -> Any:
"""Resource that gives access to S3.
The underlying S3 session is created by calling
:py:func:`boto3.session.Session(profile_name) <boto3:boto3.session>`.
The returned resource object is an S3 client, an instance of `botocore.client.S3`.
Example:
.. code-block:: python
from dagster import build_op_context, job, op
from dagster_aws.s3 import s3_resource
@op(required_resource_keys={'s3'})
def example_s3_op(context):
return context.resources.s3.list_objects_v2(
Bucket='my-bucket',
Prefix='some-key'
)
@job(resource_defs={'s3': s3_resource})
def example_job():
example_s3_op()
example_job.execute_in_process(
run_config={
'resources': {
's3': {
'config': {
'region_name': 'us-west-1',
}
}
}
}
)
Note that your ops must also declare that they require this resource with
`required_resource_keys`, or it will not be initialized for the execution of their compute
functions.
You may configure this resource as follows:
.. code-block:: YAML
resources:
s3:
config:
region_name: "us-west-1"
# Optional[str]: Specifies a custom region for the S3 session. Default is chosen
# through the ordinary boto credential chain.
use_unsigned_session: false
# Optional[bool]: Specifies whether to use an unsigned S3 session. Default: True
endpoint_url: "http://localhost"
# Optional[str]: Specifies a custom endpoint for the S3 session. Default is None.
profile_name: "dev"
# Optional[str]: Specifies a custom profile for S3 session. Default is default
# profile as specified in ~/.aws/credentials file
use_ssl: true
# Optional[bool]: Whether or not to use SSL. By default, SSL is used.
verify: None
# Optional[str]: Whether or not to verify SSL certificates. By default SSL certificates are verified.
# You can also specify this argument if you want to use a different CA cert bundle than the one used by botocore."
aws_access_key_id: None
# Optional[str]: The access key to use when creating the client.
aws_secret_access_key: None
# Optional[str]: The secret key to use when creating the client.
aws_session_token: None
# Optional[str]: The session token to use when creating the client.
"""
return S3Resource.from_resource_context(context).get_client()
[docs]
class S3FileManagerResource(ResourceWithS3Configuration, IAttachDifferentObjectToOpContext):
s3_bucket: str = Field(description="S3 bucket to use for the file manager.")
s3_prefix: str = Field(
default="dagster", description="Prefix to use for the S3 bucket for this file manager."
)
def get_client(self) -> S3FileManager:
return S3FileManager(
s3_session=construct_s3_client(
max_attempts=self.max_attempts,
region_name=self.region_name,
endpoint_url=self.endpoint_url,
use_unsigned_session=self.use_unsigned_session,
profile_name=self.profile_name,
use_ssl=self.use_ssl,
verify=self.verify,
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token,
),
s3_bucket=self.s3_bucket,
s3_base_key=self.s3_prefix,
)
def get_object_to_set_on_execution_context(self) -> Any:
return self.get_client()
[docs]
@dagster_maintained_resource
@resource(
config_schema=S3FileManagerResource.to_config_schema(),
)
def s3_file_manager(context) -> S3FileManager:
"""FileManager that provides abstract access to S3.
Implements the :py:class:`~dagster._core.storage.file_manager.FileManager` API.
"""
return S3FileManagerResource.from_resource_context(context).get_client()