Get a path relative to the currently executing Python file.
This function is useful when one needs to load a file that is relative to the position of the current file. (Such as when you encode a configuration file path in source file and want in runnable in any current working directory)
dunderfile (str) – Should always be __file__
.
relative_path (str) – Path to get relative to the currently executing file.
Examples:
file_relative_path(__file__, 'path/relative/to/file')
Constructs run config from YAML files.
config_files (List[str]) – List of paths or glob patterns for yaml files to load and parse as the run config.
A run config dictionary constructed from provided YAML files.
Dict[str, Any]
FileNotFoundError – When a config file produces no results
DagsterInvariantViolationError – When one of the YAML files is invalid and has a parse error.
Load a run config from a package resource, using pkg_resources.resource_string()
.
Example
config_from_pkg_resources(
pkg_resource_defs=[
('dagster_examples.airline_demo.environments', 'local_base.yaml'),
('dagster_examples.airline_demo.environments', 'local_warehouse.yaml'),
],
)
pkg_resource_defs (List[(str, str)]) – List of pkg_resource modules/files to load as the run config.
A run config dictionary constructed from the provided yaml strings
Dict[Str, Any]
DagsterInvariantViolationError – When one of the YAML documents is invalid and has a parse error.
Static constructor for run configs from YAML strings.
yaml_strings (List[str]) – List of yaml strings to parse as the run config.
A run config dictionary constructed from the provided yaml strings
Dict[Str, Any]
DagsterInvariantViolationError – When one of the YAML documents is invalid and has a parse error.
Creates a python logger whose output messages will be captured and converted into Dagster log messages. This means they will have structured information such as the step_key, run_id, etc. embedded into them, and will show up in the Dagster event log.
This can be used as a more convenient alternative to context.log in most cases. If log level is not set explicitly, defaults to DEBUG.
name (Optional[str]) – If supplied, will create a logger with the name “dagster.builtin.{name}”, with properties inherited from the base Dagster logger. If omitted, the returned logger will be named “dagster.builtin”.
A logger whose output will be captured by Dagster.
logging.Logger
Example
from dagster import get_dagster_logger, op
@op
def hello_op():
log = get_dagster_logger()
for i in range(5):
# do something
log.info(f"Did {i+1} things!")
Create a job failure sensor that sends email via the SMTP protocol.
email_from (str) – The sender email address to send the message from.
email_password (str) – The password of the sender.
email_to (List[str]) – The receipt email addresses to send the message to.
email_body_fn (Optional(Callable[[RunFailureSensorContext], str])) – Function which
takes in the RunFailureSensorContext
outputs the email body you want to send.
Defaults to the plain text that contains error message, job name, and run ID.
email_subject_fn (Optional(Callable[[RunFailureSensorContext], str])) – Function which
takes in the RunFailureSensorContext
outputs the email subject you want to send.
Defaults to “Dagster Run Failed: <job_name>”.
smtp_host (str) – The hostname of the SMTP server. Defaults to “smtp.gmail.com”.
smtp_type (str) – The protocol; either “SSL” or “STARTTLS”. Defaults to SSL.
smtp_port (Optional[int]) – The SMTP port. Defaults to 465 for SSL, 587 for STARTTLS.
smtp_user (Optional[str]) – The SMTP user for authenticatication in the SMTP server. Defaults to the value of email_from.
name – (Optional[str]): The name of the sensor. Defaults to “email_on_job_failure”.
webserver_base_url – (Optional[str]): The base url of your dagster-webserver instance. Specify this to allow messages to include deeplinks to the failed run.
monitored_jobs (Optional[List[Union[JobDefinition, GraphDefinition, JobDefinition, RepositorySelector, JobSelector]]]) – The jobs that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any job in the repository fails. To monitor jobs in external repositories, use RepositorySelector and JobSelector.
monitor_all_code_locations (bool) – If set to True, the sensor will monitor all runs in the Dagster deployment. If set to True, an error will be raised if you also specify monitored_jobs or job_selection. Defaults to False.
job_selection (Optional[List[Union[JobDefinition, GraphDefinition, JobDefinition, RepositorySelector, JobSelector]]]) – ( deprecated ) (This parameter will be removed in version 2.0. Use monitored_jobs instead.) (deprecated in favor of monitored_jobs) The jobs that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any job in the repository fails.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from the Dagster UI or via the GraphQL API.
monitor_all_repositories (bool) – ( deprecated ) (This parameter will be removed in version 2.0. Use monitor_all_code_locations instead.) If set to True, the sensor will monitor all runs in the Dagster instance. If set to True, an error will be raised if you also specify monitored_jobs or job_selection. Defaults to False.
Examples
email_on_run_failure = make_email_on_run_failure_sensor(
email_from="no-reply@example.com",
email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
email_to=["xxx@example.com"],
)
@repository
def my_repo():
return [my_job + email_on_run_failure]
def my_message_fn(context: RunFailureSensorContext) -> str:
return (
f"Job {context.dagster_run.job_name} failed!"
f"Error: {context.failure_event.message}"
)
email_on_run_failure = make_email_on_run_failure_sensor(
email_from="no-reply@example.com",
email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
email_to=["xxx@example.com"],
email_body_fn=my_message_fn,
email_subject_fn=lambda _: "Dagster Alert",
webserver_base_url="http://mycoolsite.com",
)
A pdb subclass that may be used from a forked multiprocessing child.
Examples:
from dagster._utils.forked_pdb import ForkedPdb
@solid
def complex_solid(_):
# some complicated stuff
ForkedPdb().set_trace()
# some other complicated stuff
You can initiate pipeline execution via the webserver and use the pdb debugger to examine/step through execution at the breakpoint.