DagsterDocs

AWS (dagster-aws)

S3

class dagster_aws.s3.S3ComputeLogManager(bucket, local_dir=None, inst_data=None, prefix='dagster', use_ssl=True, verify=True, verify_cert_path=None, endpoint_url=None, skip_empty_files=False)[source]

Logs compute function stdout and stderr to S3.

Users should not instantiate this class directly. Instead, use a YAML block in dagster.yaml such as the following:

compute_logs:
  module: dagster_aws.s3.compute_log_manager
  class: S3ComputeLogManager
  config:
    bucket: "mycorp-dagster-compute-logs"
    local_dir: "/tmp/cool"
    prefix: "dagster-test-"
    use_ssl: true
    verify: true
    verify_cert_path: "/path/to/cert/bundle.pem"
    endpoint_url: "http://alternate-s3-host.io"
    skip_empty_files: true
Parameters
  • bucket (str) – The name of the s3 bucket to which to log.

  • local_dir (Optional[str]) – Path to the local directory in which to stage logs. Default: dagster.seven.get_system_temp_directory().

  • prefix (Optional[str]) – Prefix for the log file keys.

  • use_ssl (Optional[bool]) – Whether or not to use SSL. Default True.

  • verify (Optional[bool]) – Whether or not to verify SSL certificates. Default True.

  • verify_cert_path (Optional[str]) – A filename of the CA cert bundle to use. Only used if verify set to False.

  • endpoint_url (Optional[str]) – Override for the S3 endpoint url.

  • skip_empty_files – (Optional[bool]): Skip upload of empty log files.

  • inst_data (Optional[ConfigurableClassData]) – Serializable representation of the compute log manager when newed up from config.

class dagster_aws.s3.S3FileCache(s3_bucket, s3_key, s3_session, overwrite=False)[source]
class dagster_aws.s3.S3FileHandle(s3_bucket, s3_key)[source]

A reference to a file on S3.

property path_desc

The file’s S3 URL.

Type

str

property s3_bucket

The name of the S3 bucket.

Type

str

property s3_key

The S3 key.

Type

str

property s3_path

The file’s S3 URL.

Type

str

dagster_aws.s3.s3_file_manager ResourceDefinition[source]

FileManager that provides abstract access to S3.

Implements the FileManager API.

dagster_aws.s3.s3_resource ResourceDefinition[source]

Resource that gives access to S3.

The underlying S3 session is created by calling boto3.session.Session(profile_name). The returned resource object is an S3 client, an instance of botocore.client.S3.

Attach this resource definition to a ModeDefinition in order to make it available to your ops.

Example

from dagster import build_op_context, job, op
from dagster_aws.s3 import s3_resource

@op(required_resource_keys={'s3'})
def example_s3_op(context):
    return context.resources.s3.list_objects_v2(
        Bucket='my-bucket',
        Prefix='some-key'
    )

@job(resource_defs={'s3': s3_resource})
def example_job(context):
    example_s3_op()

example_job.execute_in_process(
    run_config={
        'resources': {
            's3': {
                'config': {
                    'region_name': 'us-west-1',
                }
            }
        }
    }
)

Note that your ops must also declare that they require this resource with required_resource_keys, or it will not be initialized for the execution of their compute functions.

You may configure this resource as follows:

resources:
  s3:
    config:
      region_name: "us-west-1"
      # Optional[str]: Specifies a custom region for the S3 session. Default is chosen
      # through the ordinary boto credential chain.
      use_unsigned_session: false
      # Optional[bool]: Specifies whether to use an unsigned S3 session. Default: True
      endpoint_url: "http://localhost"
      # Optional[str]: Specifies a custom endpoint for the S3 session. Default is None.
      profile_name: "dev"
      # Optional[str]: Specifies a custom profile for S3 session. Default is default
      # profile as specified in ~/.aws/credentials file
dagster_aws.s3.S3Coordinate DagsterType

A dagster.DagsterType intended to make it easier to pass information about files on S3 from op to op. Objects of this type should be dicts with 'bucket' and 'key' keys, and may be hydrated from config in the intuitive way, e.g., for an input with the name s3_file:

inputs:
  s3_file:
    value:
      bucket: my-bucket
      key: my-key
dagster_aws.s3.s3_pickle_io_manager IOManagerDefinition[source]

Persistent IO manager using S3 for storage.

Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for S3 and the backing bucket.

Attach this resource definition to your job to make it available to your ops.

@job(resource_defs={'io_manager': s3_pickle_io_manager, "s3": s3_resource, ...})
def my_job():
    ...

You may configure this storage as follows:

resources:
    io_manager:
        config:
            s3_bucket: my-cool-bucket
            s3_prefix: good/prefix-for-files-

Redshift

dagster_aws.redshift.redshift_resource ResourceDefinition[source]

This resource enables connecting to a Redshift cluster and issuing queries against that cluster.

Example

from dagster import build_op_context, op
from dagster_aws.redshift import redshift_resource

@op(required_resource_keys={'redshift'})
def example_redshift_op(context):
    return context.resources.redshift.execute_query('SELECT 1', fetch_results=True)

redshift_configured = redshift_resource.configured({
    'host': 'my-redshift-cluster.us-east-1.redshift.amazonaws.com',
    'port': 5439,
    'user': 'dagster',
    'password': 'dagster',
    'database': 'dev',
})
context = build_op_context(resources={'redshift': redshift_configured})
assert example_redshift_op(context) == [(1,)]

Testing

dagster_aws.redshift.fake_redshift_resource ResourceDefinition[source]

EMR

dagster_aws.emr.emr_pyspark_step_launcher ResourceDefinition[source]
  • spark_config:

  • cluster_id: Name of the job flow (cluster) on which to execute.

  • region_name: The AWS region that the cluster is in.

  • action_on_failure: The EMR action to take when the cluster step fails: https://docs.aws.amazon.com/emr/latest/APIReference/API_StepConfig.html

  • staging_bucket: S3 bucket to use for passing files between the plan process and EMR process.

  • staging_prefix: S3 key prefix inside the staging_bucket to use for files passed the plan process and EMR process

  • wait_for_logs: If set, the system will wait for EMR logs to appear on S3. Note that logs are copied every 5 minutes, so enabling this will add several minutes to the job runtime.

  • local_pipeline_package_path: Absolute path to the package that contains the job/pipeline definition(s) whose steps will execute remotely on EMR. This is a path on the local fileystem of the process executing the job/pipeline. The expectation is that this package will also be available on the python path of the launched process running the Spark step on EMR, either deployed on step launch via the deploy_local_pipeline_package option, referenced on s3 via the s3_pipeline_package_path option, or installed on the cluster via bootstrap actions.

  • deploy_local_pipeline_package: If set, before every step run, the launcher will zip up all the code in local_pipeline_package_path, upload it to s3, and pass it to spark-submit’s –py-files option. This gives the remote process access to up-to-date user code. If not set, the assumption is that some other mechanism is used for distributing code to the EMR cluster. If this option is set to True, s3_pipeline_package_path should not also be set.

  • s3_pipeline_package_path: If set, this path will be passed to the –py-files option of spark-submit. This should usually be a path to a zip file. If this option is set, deploy_local_pipeline_package should not be set to True.

class dagster_aws.emr.EmrJobRunner(region, check_cluster_every=30, aws_access_key_id=None, aws_secret_access_key=None)[source]
class dagster_aws.emr.EmrError[source]
dagster_aws.emr.EmrClusterState = <enum 'EmrClusterState'>[source]

An enumeration.

dagster_aws.emr.EmrStepState = <enum 'EmrStepState'>[source]

An enumeration.

CloudWatch

dagster_aws.cloudwatch.cloudwatch_logger LoggerDefinition

Core class for defining loggers.

Loggers are job-scoped logging handlers, which will be automatically invoked whenever dagster messages are logged from within a job.

Parameters
  • logger_fn (Callable[[InitLoggerContext], logging.Logger]) – User-provided function to instantiate the logger. This logger will be automatically invoked whenever the methods on context.log are called from within job/pipeline compute logic.

  • config_schema (Optional[ConfigSchema]) – The schema for the config. Configuration data available in init_context.logger_config. If not set, Dagster will accept any config provided.

  • description (Optional[str]) – A human-readable description of this logger.