Skip to main content

Utilities

dagster.file_relative_path

Get a path relative to the currently executing Python file.

This function is useful when one needs to load a file that is relative to the position of the current file. (Such as when you encode a configuration file path in source file and want in runnable in any current working directory)

Parameters:

  • dunderfile (str) – Should always be __file__.
  • relative_path (str) – Path to get relative to the currently executing file.

Examples:

file_relative_path(__file__, 'path/relative/to/file')
dagster.config_from_files

Constructs run config from YAML files.

Parameters: config_files (List[str]) – List of paths or glob patterns for yaml files to load and parse as the run config.Returns: A run config dictionary constructed from provided YAML files.Return type: Dict[str, Any]Raises:

  • FileNotFoundError – When a config file produces no results
  • DagsterInvariantViolationErrorDagsterInvariantViolationError – When one of the YAML files is invalid and has a parse error.
dagster.config_from_pkg_resources

Load a run config from a package resource, using pkg_resources.resource_string().

Example:

config_from_pkg_resources(
pkg_resource_defs=[
('dagster_examples.airline_demo.environments', 'local_base.yaml'),
('dagster_examples.airline_demo.environments', 'local_warehouse.yaml'),
],
)

Parameters: pkg_resource_defs (List[(str, str)]) – List of pkg_resource modules/files to load as the run config.Returns: A run config dictionary constructed from the provided yaml stringsReturn type: Dict[Str, Any]Raises: DagsterInvariantViolationErrorDagsterInvariantViolationError – When one of the YAML documents is invalid and has a parse error.

dagster.config_from_yaml_strings

Static constructor for run configs from YAML strings.

Parameters: yaml_strings (List[str]) – List of yaml strings to parse as the run config.Returns: A run config dictionary constructed from the provided yaml stringsReturn type: Dict[Str, Any]Raises: DagsterInvariantViolationErrorDagsterInvariantViolationError – When one of the YAML documents is invalid and has a parse error.

dagster.get_dagster_logger

Creates a python logger whose output messages will be captured and converted into Dagster log messages. This means they will have structured information such as the step_key, run_id, etc. embedded into them, and will show up in the Dagster event log.

This can be used as a more convenient alternative to context.log in most cases. If log level is not set explicitly, defaults to DEBUG.

Parameters: name (Optional[str]) – If supplied, will create a logger with the name “dagster.builtin.{name}”, with properties inherited from the base Dagster logger. If omitted, the returned logger will be named “dagster.builtin”.Returns: A logger whose output will be captured by Dagster.Return type: logging.Logger Example:

from dagster import get_dagster_logger, op

@op
def hello_op():
log = get_dagster_logger()
for i in range(5):
# do something
log.info(f"Did {i+1} things!")
class dagster._utils.warnings.PreviewWarning
class dagster._utils.warnings.BetaWarning
class dagster._utils.warnings.SupersessionWarning
dagster.make_email_on_run_failure_sensor

Create a job failure sensor that sends email via the SMTP protocol.

Parameters:

  • email_from (str) – The sender email address to send the message from.
  • email_password (str) – The password of the sender.
  • email_to (List[str]) – The receipt email addresses to send the message to.
  • email_body_fn (Optional(Callable[[RunFailureSensorContext], str])) – Function which takes in the RunFailureSensorContext outputs the email body you want to send. Defaults to the plain text that contains error message, job name, and run ID.
  • email_subject_fn (Optional(Callable[[RunFailureSensorContext], str])) – Function which takes in the RunFailureSensorContext outputs the email subject you want to send. Defaults to “Dagster Run Failed: <job_name>”.
  • smtp_host (str) – The hostname of the SMTP server. Defaults to “smtp.gmail.com”.
  • smtp_type (str) – The protocol; either “SSL” or “STARTTLS”. Defaults to SSL.
  • smtp_port (Optional[int]) – The SMTP port. Defaults to 465 for SSL, 587 for STARTTLS.
  • smtp_user (Optional[str]) – The SMTP user for authenticatication in the SMTP server. Defaults to the value of email_from.
  • name – (Optional[str]): The name of the sensor. Defaults to “email_on_job_failure”.
  • webserver_base_url – (Optional[str]): The base url of your dagster-webserver instance. Specify this to allow messages to include deeplinks to the failed run.
  • monitored_jobs (Optional[List[Union[JobDefinition, GraphDefinition, JobDefinition, RepositorySelector, JobSelector]]]) – The jobs that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any job in the repository fails. To monitor jobs in external repositories, use RepositorySelector and JobSelector.
  • monitor_all_code_locations (bool) – If set to True, the sensor will monitor all runs in the Dagster deployment. If set to True, an error will be raised if you also specify monitored_jobs or job_selection. Defaults to False.
  • job_selection (Optional[List[Union[JobDefinition, GraphDefinition, JobDefinition, RepositorySelector, JobSelector]]]) – deprecatedmonitored_jobs instead.) (deprecated in favor of monitored_jobs) The jobs that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any job in the repository fails.
  • default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from the Dagster UI or via the GraphQL API.
  • monitor_all_repositories (bool) – deprecatedmonitor_all_code_locations instead.) If set to True, the sensor will monitor all runs in the Dagster instance. If set to True, an error will be raised if you also specify monitored_jobs or job_selection. Defaults to False.

Examples:

email_on_run_failure = make_email_on_run_failure_sensor(
email_from="no-reply@example.com",
email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
email_to=["xxx@example.com"],
)

@repository
def my_repo():
return [my_job + email_on_run_failure]
def my_message_fn(context: RunFailureSensorContext) -> str:
return (
f"Job {context.dagster_run.job_name} failed!"
f"Error: {context.failure_event.message}"
)

email_on_run_failure = make_email_on_run_failure_sensor(
email_from="no-reply@example.com",
email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
email_to=["xxx@example.com"],
email_body_fn=my_message_fn,
email_subject_fn=lambda _: "Dagster Alert",
webserver_base_url="http://mycoolsite.com",
)
class dagster._utils.forked_pdb.ForkedPdb

A pdb subclass that may be used from a forked multiprocessing child.

Examples:

from dagster._utils.forked_pdb import ForkedPdb

@solid
def complex_solid(_):
# some complicated stuff

ForkedPdb().set_trace()

# some other complicated stuff

You can initiate pipeline execution via the webserver and use the pdb debugger to examine/step through execution at the breakpoint.