DagsterDocs

Utilities

dagster.file_relative_path(dunderfile, relative_path)[source]

Get a path relative to the currently executing Python file.

This function is useful when one needs to load a file that is relative to the position of the current file. (Such as when you encode a configuration file path in source file and want in runnable in any current working directory)

Parameters
  • dunderfile (str) – Should always be __file__.

  • relative_path (str) – Path to get relative to the currently executing file.

Examples:

file_relative_path(__file__, 'path/relative/to/file')
class dagster.ExperimentalWarning[source]
class dagster.utils.forked_pdb.ForkedPdb(completekey='tab', stdin=None, stdout=None, skip=None, nosigint=False, readrc=True)[source]

A pdb subclass that may be used from a forked multiprocessing child

Examples:

from dagster.utils.forked_pdb import ForkedPdb

@solid
def complex_solid(_):
    # some complicated stuff

    ForkedPdb().set_trace()

    # some other complicated stuff

You can initiate pipeline execution via dagit and use the pdb debugger to examine/step through execution at the breakpoint.

dagster.utils.make_email_on_pipeline_failure_sensor(email_from, email_password, email_to, email_body_fn=<function _default_failure_email_body>, email_subject_fn=<function _default_failure_email_subject>, smtp_host='smtp.gmail.com', smtp_type='SSL', smtp_port=None, pipeline_selection=None, name=None, dagit_base_url=None)[source]

Create a pipeline failure sensor that sends email via the SMTP protocol.

Parameters
  • email_from (str) – The sender email address to send the message from.

  • email_password (str) – The password of the sender.

  • email_to (List[str]) – The receipt email addresses to send the message to.

  • email_body_fn (Optional(Callable[[PipelineFailureSensorContext], str])) – Function which takes in the PipelineFailureSensorContext outputs the email body you want to send. Defaults to the plain text that contains error message, pipeline name, and run ID.

  • email_subject_fn (Optional(Callable[[PipelineFailureSensorContext], str])) – Function which takes in the PipelineFailureSensorContext outputs the email subject you want to send. Defaults to “Dagster Pipeline Failed: <pipeline_name>”.

  • smtp_host (str) – The hostname of the SMTP server. Defaults to “smtp.gmail.com”.

  • smtp_type (str) – The protocol; either “SSL” or “STARTTLS”. Defaults to SSL.

  • smtp_port (Optional[int]) – The SMTP port. Defaults to 465 for SSL, 587 for STARTTLS.

  • pipeline_selection (Optional[List[str]]) – Names of the pipelines that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any pipeline in the repository fails.

  • name – (Optional[str]): The name of the sensor. Defaults to “email_on_pipeline_failure”.

  • dagit_base_url – (Optional[str]): The base url of your Dagit instance. Specify this to allow messages to include deeplinks to the failed pipeline run.

Examples

email_on_pipeline_failure = make_email_on_pipeline_failure_sensor(
    email_from="no-reply@example.com",
    email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
    email_to=["xxx@example.com"],
)

@repository
def my_repo():
    return [my_pipeline + email_on_pipeline_failure]
def my_message_fn(context: PipelineFailureSensorContext) -> str:
    return "Pipeline {pipeline_name} failed! Error: {error}".format(
        pipeline_name=context.pipeline_run.pipeline_name,
        error=context.failure_event.message,
    )

email_on_pipeline_failure = make_email_on_pipeline_failure_sensor(
    email_from="no-reply@example.com",
    email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
    email_to=["xxx@example.com"],
    email_body_fn=my_message_fn,
    email_subject_fn=lambda _: "Dagster Alert",
    dagit_base_url="http://mycoolsite.com",
)