DagsterDocs

Utilities

dagster.file_relative_path(dunderfile, relative_path)[source]

Get a path relative to the currently executing Python file.

This function is useful when one needs to load a file that is relative to the position of the current file. (Such as when you encode a configuration file path in source file and want in runnable in any current working directory)

Parameters
  • dunderfile (str) – Should always be __file__.

  • relative_path (str) – Path to get relative to the currently executing file.

Examples:

file_relative_path(__file__, 'path/relative/to/file')
dagster.config_from_files(config_files)[source]

Constructs run config from YAML files.

Parameters

config_files (List[str]) – List of paths or glob patterns for yaml files to load and parse as the run config.

Returns

A run config dictionary constructed from provided YAML files.

Return type

Dict[str, Any]

Raises
dagster.config_from_pkg_resources(pkg_resource_defs)[source]

Load a run config from a package resource, using pkg_resources.resource_string().

Example:

config_from_pkg_resources(
    pkg_resource_defs=[
        ('dagster_examples.airline_demo.environments', 'local_base.yaml'),
        ('dagster_examples.airline_demo.environments', 'local_warehouse.yaml'),
    ],
)
Parameters

pkg_resource_defs (List[(str, str)]) – List of pkg_resource modules/files to load as the run config.

Returns

A run config dictionary constructed from the provided yaml strings

Return type

Dict[Str, Any]

Raises

DagsterInvariantViolationError – When one of the YAML documents is invalid and has a parse error.

dagster.config_from_yaml_strings(yaml_strings)[source]

Static constructor for run configs from YAML strings.

Parameters

yaml_strings (List[str]) – List of yaml strings to parse as the run config.

Returns

A run config dictionary constructed from the provided yaml strings

Return type

Dict[Str, Any]

Raises

DagsterInvariantViolationError – When one of the YAML documents is invalid and has a parse error.

class dagster.ExperimentalWarning[source]
class dagster.utils.forked_pdb.ForkedPdb(completekey='tab', stdin=None, stdout=None, skip=None, nosigint=False, readrc=True)[source]

A pdb subclass that may be used from a forked multiprocessing child

Examples:

from dagster.utils.forked_pdb import ForkedPdb

@solid
def complex_solid(_):
    # some complicated stuff

    ForkedPdb().set_trace()

    # some other complicated stuff

You can initiate pipeline execution via dagit and use the pdb debugger to examine/step through execution at the breakpoint.

dagster.utils.make_email_on_run_failure_sensor(email_from, email_password, email_to, email_body_fn=<function _default_failure_email_body>, email_subject_fn=<function _default_failure_email_subject>, smtp_host='smtp.gmail.com', smtp_type='SSL', smtp_port=None, name=None, dagit_base_url=None, job_selection=None)[source]

Create a pipeline failure sensor that sends email via the SMTP protocol.

Parameters
  • email_from (str) – The sender email address to send the message from.

  • email_password (str) – The password of the sender.

  • email_to (List[str]) – The receipt email addresses to send the message to.

  • email_body_fn (Optional(Callable[[RunFailureSensorContext], str])) – Function which takes in the RunFailureSensorContext outputs the email body you want to send. Defaults to the plain text that contains error message, pipeline name, and run ID.

  • email_subject_fn (Optional(Callable[[RunFailureSensorContext], str])) – Function which takes in the RunFailureSensorContext outputs the email subject you want to send. Defaults to “Dagster Pipeline Failed: <pipeline_name>”.

  • smtp_host (str) – The hostname of the SMTP server. Defaults to “smtp.gmail.com”.

  • smtp_type (str) – The protocol; either “SSL” or “STARTTLS”. Defaults to SSL.

  • smtp_port (Optional[int]) – The SMTP port. Defaults to 465 for SSL, 587 for STARTTLS.

  • name – (Optional[str]): The name of the sensor. Defaults to “email_on_pipeline_failure”.

  • dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the failed pipeline run.

  • job_selection (Optional[List[Union[PipelineDefinition, GraphDefinition]]]) – The jobs that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any job in the repository fails.

Examples

email_on_run_failure = make_email_on_run_failure_sensor(
    email_from="no-reply@example.com",
    email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
    email_to=["xxx@example.com"],
)

@repository
def my_repo():
    return [my_job + email_on_run_failure]
def my_message_fn(context: RunFailureSensorContext) -> str:
    return (
        f"Job {context.pipeline_run.pipeline_name} failed!"
        f"Error: {context.failure_event.message}"
    )

email_on_run_failure = make_email_on_run_failure_sensor(
    email_from="no-reply@example.com",
    email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
    email_to=["xxx@example.com"],
    email_body_fn=my_message_fn,
    email_subject_fn=lambda _: "Dagster Alert",
    dagit_base_url="http://mycoolsite.com",
)
dagster.utils.make_email_on_pipeline_failure_sensor(email_from, email_password, email_to, email_body_fn=<function _default_failure_email_body>, email_subject_fn=<function _default_failure_email_subject>, smtp_host='smtp.gmail.com', smtp_type='SSL', smtp_port=None, pipeline_selection=None, name=None, dagit_base_url=None)[source]

Create a pipeline failure sensor that sends email via the SMTP protocol.

Parameters
  • email_from (str) – The sender email address to send the message from.

  • email_password (str) – The password of the sender.

  • email_to (List[str]) – The receipt email addresses to send the message to.

  • email_body_fn (Optional(Callable[[PipelineFailureSensorContext], str])) – Function which takes in the PipelineFailureSensorContext outputs the email body you want to send. Defaults to the plain text that contains error message, pipeline name, and run ID.

  • email_subject_fn (Optional(Callable[[PipelineFailureSensorContext], str])) – Function which takes in the PipelineFailureSensorContext outputs the email subject you want to send. Defaults to “Dagster Pipeline Failed: <pipeline_name>”.

  • smtp_host (str) – The hostname of the SMTP server. Defaults to “smtp.gmail.com”.

  • smtp_type (str) – The protocol; either “SSL” or “STARTTLS”. Defaults to SSL.

  • smtp_port (Optional[int]) – The SMTP port. Defaults to 465 for SSL, 587 for STARTTLS.

  • pipeline_selection (Optional[List[str]]) – Names of the pipelines that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any pipeline in the repository fails.

  • name – (Optional[str]): The name of the sensor. Defaults to “email_on_pipeline_failure”.

  • dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the failed pipeline run.

Examples

email_on_pipeline_failure = make_email_on_pipeline_failure_sensor(
    email_from="no-reply@example.com",
    email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
    email_to=["xxx@example.com"],
)

@repository
def my_repo():
    return [my_pipeline + email_on_pipeline_failure]
def my_message_fn(context: PipelineFailureSensorContext) -> str:
    return "Pipeline {pipeline_name} failed! Error: {error}".format(
        pipeline_name=context.pipeline_run.pipeline_name,
        error=context.failure_event.message,
    )

email_on_pipeline_failure = make_email_on_pipeline_failure_sensor(
    email_from="no-reply@example.com",
    email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
    email_to=["xxx@example.com"],
    email_body_fn=my_message_fn,
    email_subject_fn=lambda _: "Dagster Alert",
    dagit_base_url="http://mycoolsite.com",
)
dagster.utils.log.get_dagster_logger(name=None)[source]

Creates a python logger whose output messages will be captured and converted into Dagster log messages. This means they will have structured information such as the step_key, run_id, etc. embedded into them, and will show up in the Dagster event log.

This can be used as a more convenient alternative to context.log in most cases. If log level is not set explicitly, defaults to DEBUG.

Parameters

name (Optional[str]) – If supplied, will create a logger with the name “dagster.builtin.{name}”, with properties inherited from the base Dagster logger. If omitted, the returned logger will be named “dagster.builtin”.

Returns

A logger whose output will be captured by Dagster.

Return type

logging.Logger

Example

from dagster import op
from dagster.utils.log import get_dagster_logger

@op
def hello_op():
    log = get_dagster_logger()
    for i in range(5):
        # do something
        log.info(f"Did {i+1} things!")