AWS (dagster_aws)

S3

class dagster_aws.s3.S3ComputeLogManager(bucket, local_dir=None, inst_data=None, prefix='dagster', use_ssl=True, verify=True, verify_cert_path=None, endpoint_url=None)[source]

Logs solid compute function stdout and stderr to S3.

Users should not instantiate this class directly. Instead, use a YAML block in dagster.yaml such as the following:

compute_logs:
  module: dagster_aws.s3.compute_log_manager
  class: S3ComputeLogManager
  config:
    bucket: "mycorp-dagster-compute-logs"
    local_dir: "/tmp/cool"
    prefix: "dagster-test-"
    use_ssl: true
    verify: true
    verify_cert_path: "/path/to/cert/bundle.pem"
    endpoint_url: "http://alternate-s3-host.io"
Parameters
  • bucket (str) – The name of the s3 bucket to which to log.

  • local_dir (Optional[str]) – Path to the local directory in which to stage logs. Default: dagster.seven.get_system_temp_directory().

  • prefix (Optional[str]) – Prefix for the log file keys.

  • use_ssl (Optional[bool]) – Whether or not to use SSL. Default True.

  • verify (Optional[bool]) – Whether or not to verify SSL certificates. Default True.

  • verify_cert_path (Optional[str]) – A filename of the CA cert bundle to use. Only used if verify set to False.

  • endpoint_url (Optional[str]) – Override for the S3 endpoint url.

  • inst_data (Optional[ConfigurableClassData]) – Serializable representation of the compute log manager when newed up from config.

class dagster_aws.s3.S3FileCache(s3_bucket, s3_key, s3_session, overwrite=False)[source]
class dagster_aws.s3.S3FileHandle(s3_bucket, s3_key)[source]
property path_desc

This is a properly to return a representation of the path for diplay purposes. Should not be used in a programatically meaningful way beyond display

dagster_aws.s3.s3_resource ResourceDefinition[source]

Resource that gives solids access to S3.

The underlying S3 session is created by calling boto3.resource('s3').

Attach this resource definition to a ModeDefinition in order to make it available to your solids.

Example

from dagster import ModeDefinition, execute_solid, solid
from dagster_aws.s3 import s3_resource

@solid(required_resource_keys={'s3'})
def example_s3_solid(context):
    return context.resources.s3.list_objects_v2(
        Bucket='my-bucket',
        Prefix='some-key'
    )

result = execute_solid(
    example_s3_solid,
    run_config={
        'resources': {
            's3': {
                'config': {
                    'region_name': 'us-west-1',
                }
            }
        }
    },
    mode_def=ModeDefinition(resource_defs={'s3': s3_resource}),
)

Note that your solids must also declare that they require this resource with required_resource_keys, or it will not be initialized for the execution of their compute functions.

You may configure this resource as follows:

resources:
  s3:
    config:
      region_name: "us-west-1"
      # Optional[str]: Specifies a custom region for the S3 session. Default is chosen
      # through the ordinary boto credential chain.
      use_unsigned_session: false
      # Optional[bool]: Specifies whether to use an unsigned S3 session. Default: True
      endpoint_url: "http://localhost"
      # Optional[str]: Specifies a custom endpoint for the S3 session. Default is None.
dagster_aws.s3.S3Coordinate DagsterType

A dagster.DagsterType intended to make it easier to pass information about files on S3 from solid to solid. Objects of this type should be dicts with 'bucket' and 'key' keys, and may be hydrated from config in the intuitive way, e.g., for an input with the name s3_file:

inputs:
  s3_file:
    value:
      bucket: my-bucket
      key: my-key
dagster_aws.s3.s3_system_storage SystemStorageDefinition[source]

Persistent system storage using S3 for storage.

Suitable for intermediates storage for distributed executors, so long as each execution node has network connectivity and credentials for S3 and the backing bucket.

Attach this system storage definition, as well as the s3_resource it requires, to a ModeDefinition in order to make it available to your pipeline:

pipeline_def = PipelineDefinition(
    mode_defs=[
        ModeDefinition(
            resource_defs={'s3': s3_resource, ...},
            system_storage_defs=[s3_system_storage],
            ...
        ), ...
    ], ...
)

You may configure this storage as follows:

storage:
  s3:
    config:
      s3_bucket: my-cool-bucket
      s3_prefix: good/prefix-for-files-
dagster_aws.s3.s3_plus_default_storage_defs List[SystemStorageDefinition]

list() -> new empty list list(iterable) -> new list initialized from iterable’s items The default system storages available on any ModeDefinition that does not provide custom system storages, i.e., default_system_storage_defs plus the s3_system_storage.

Redshift

dagster_aws.redshift.redshift_resource ResourceDefinition[source]

This resource enables connecting to a Redshift cluster and issuing queries against that cluster.

Example

from dagster import ModeDefinition, execute_solid, solid
from dagster_aws.redshift import redshift_resource

@solid(required_resource_keys={'redshift'})
def example_redshift_solid(context):
    return context.resources.redshift.execute_query('SELECT 1', fetch_results=True)

result = execute_solid(
    example_redshift_solid,
    run_config={
        'resources': {
            'redshift': {
                'config': {
                    'host': 'my-redshift-cluster.us-east-1.redshift.amazonaws.com',
                    'port': 5439,
                    'user': 'dagster',
                    'password': 'dagster',
                    'database': 'dev',
                }
            }
        }
    },
    mode_def=ModeDefinition(resource_defs={'redshift': redshift_resource}),
)
assert result.output_value() == [(1,)]

Testing

dagster_aws.redshift.fake_redshift_resource ResourceDefinition[source]

EMR

dagster_aws.emr.emr_pyspark_step_launcher ResourceDefinition[source]
  • spark_config:

  • cluster_id: Name of the job flow (cluster) on which to execute.

  • region_name: The AWS region that the cluster is in.

  • action_on_failure: The EMR action to take when the cluster step fails: https://docs.aws.amazon.com/emr/latest/APIReference/API_StepConfig.html

  • staging_bucket: S3 bucket to use for passing files between the plan process and EMR process.

  • staging_prefix: S3 key prefix inside the staging_bucket to use for files passed the plan process and EMR process

  • wait_for_logs: If set, the system will wait for EMR logs to appear on S3. Note that logs are copied every 5 minutes, so enabling this will add several minutes to the job runtime.

  • local_pipeline_package_path: Absolute path to the package that contains the pipeline definition(s) whose steps will execute remotely on EMR. This is a path on the local fileystem of the process executing the pipeline. The expectation is that this package will also be available on the python path of the launched process running the Spark step on EMR, either deployed on step launch via the deploy_pipeline_package option, referenced on s3 via the s3_pipeline_package_path option, or installed on the cluster via bootstrap actions.

  • deploy_local_pipeline_package: If set, before every step run, the launcher will zip up all the code in local_pipeline_package_path, upload it to s3, and pass it to spark-submit’s –py-files option. This gives the remote process access to up-to-date user code. If not set, the assumption is that some other mechanism is used for distributing code to the EMR cluster. If this option is set to True, s3_pipeline_package_path should not also be set.

  • s3_pipeline_package_path: If set, this path will be passed to the –py-files option of spark-submit. This should usually be a path to a zip file. If this option is set, deploy_local_pipeline_package should not be set to True.

class dagster_aws.emr.EmrJobRunner(region, check_cluster_every=30, aws_access_key_id=None, aws_secret_access_key=None)[source]
class dagster_aws.emr.EmrError[source]
dagster_aws.emr.EmrClusterState = <enum 'EmrClusterState'>[source]

An enumeration.

dagster_aws.emr.EmrStepState = <enum 'EmrStepState'>[source]

An enumeration.

CloudWatch

dagster_aws.cloudwatch.cloudwatch_logger LoggerDefinition

Core class for defining loggers.

Loggers are pipeline-scoped logging handlers, which will be automatically invoked whenever solids in a pipeline log messages.

Parameters
  • logger_fn (Callable[[InitLoggerContext], logging.Logger]) – User-provided function to instantiate the logger. This logger will be automatically invoked whenever the methods on context.log are called from within solid compute logic.

  • config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in init_context.logger_config.

  • description (Optional[str]) – A human-readable description of this logger.

  • _configured_config_mapping_fn – This argument is for internal use only. Users should not specify this field. To preconfigure a resource, use the configured() API.