Ask AI

Sensors#

Sensors are definitions in Dagster that allow you to instigate runs based on some external state change. For example, you can:

  • Launch a run whenever a file appears in an s3 bucket
  • Launch a run whenever another job materializes a specific asset
  • Launch a run whenever an external system is down

A sensor defines an evaluation function that returns either:

  • One or more RunRequest objects. Each run request launches a run.
  • An optional SkipReason, which specifies a message which describes why no runs were requested.

The Dagster daemon runs each sensor evaluation function on a tight loop. If using sensors, follow the instructions in the Dagster daemon documentation to run your sensors.


Relevant APIs#

NameDescription
@sensorThe decorator used to define a sensor. The decorated function is called the sensor's evaluation function. The decorator returns a SensorDefinition.
SensorDefinitionClass for sensors. You almost never want to use initialize this class directly. Instead, you should use the @sensor decorator, which returns a SensorDefinition
SensorEvaluationContextThe context object passed to a sensor evaluation function.
build_sensor_contextA function that constructs an instance of SensorEvaluationContext, This is intended to be used to test a sensor.
@run_status_sensorThe decorator used to define a run status sensor. The decorator returns a RunStatusSensorDefinition
@run_failure_sensorThe decorator used to define a run failure sensor. The run failure sensor, is a special case of a run status sensor specifically to detect run failures.
RunStatusSensorDefinitionClass for run status sensors. You almost never want to initialize this class directly. Instead, you should use the @run_status_sensor or @run_failure_sensor
RunStatusSensorContextThe context object passed to a run status sensor evaluation.
build_run_status_sensor_contextA function that constructs an instance of RunStatusSensorContext. This is intended to be used to test a run status sensor.

Defining a sensor#

To define a sensor, use the @sensor decorator. The decorated function can optionally have a context as the first argument. The context is a SensorEvaluationContext.

Let's say you have a job that logs a filename that is specified in the op configuration of the process_file op:

from dagster import op, job, Config, OpExecutionContext


class FileConfig(Config):
    filename: str


@op
def process_file(context: OpExecutionContext, config: FileConfig):
    context.log.info(config.filename)


@job
def log_file_job():
    process_file()

You can write a sensor that watches for new files in a specific directory and yields a RunRequest for each new file in the directory. By default, this sensor runs every 30 seconds.

import os
from dagster import sensor, RunRequest, RunConfig


@sensor(job=log_file_job)
def my_directory_sensor():
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            yield RunRequest(
                run_key=filename,
                run_config=RunConfig(
                    ops={"process_file": FileConfig(filename=filename)}
                ),
            )

This sensor iterates through all the files in MY_DIRECTORY and yields a RunRequest for each file. Note that despite the yield syntax, the function will run to completion before any runs are submitted.

To write a sensor that materializes assets, you can build a job that materializes assets:

asset_job = define_asset_job("asset_job", "*")


@sensor(job=asset_job)
def materializes_asset_sensor():
    yield RunRequest(...)

Once a sensor is added to a Definitions object with the job it yields a RunRequest for, it can be started and will start creating runs. You can start or stop sensors in the Dagster UI, or by setting the default status to DefaultSensorStatus.RUNNING in code:

@sensor(job=asset_job, default_status=DefaultSensorStatus.RUNNING)
def my_running_sensor(): ...

If you manually start or stop a sensor in the UI, that will override any default status that is set in code.

Once your sensor is started, if you're running a Dagster daemon as part of your deployment, the sensor will begin executing immediately without needing to restart the dagster-daemon process.


Idempotence and cursors#

When instigating runs based on external events, you usually want to run exactly one job run for each event. There are two ways to define your sensors to avoid creating duplicate runs for your events:

Idempotence using run keys#

In the example sensor in the previous section, the RunRequest is constructed with a run_key:

yield RunRequest(
    run_key=filename,
    run_config={"ops": {"process_file": {"config": {"filename": filename}}}},
)

Dagster guarantees that for a given sensor, at most one run is created for each RunRequest with a unique run_key. If a sensor yields a new run request with a previously used run_key, Dagster skips processing the new run request.

In the example, a RunRequest is requested for each file during every sensor evaluation. Therefore, for a given sensor evaluation, there already exists a RunRequest with a run_key for any file that existed during the previous sensor evaluation. Dagster skips processing duplicate run requests, so Dagster launches runs for only the files added since the last sensor evaluation. The result is exactly one run per file.

Run keys allow you to write sensor evaluation functions that declaratively describe what job runs should exist, and helps you avoid the need for more complex logic that manages state. However, when dealing with high-volume external events, some state-tracking optimizations might be necessary.

Sensor optimizations using cursors#

When writing a sensor that deals with high-volume events, it might not be feasible to yield a RunRequest during every sensor evaluation. For example, you may have an s3 storage bucket that contains thousands of files.

When writing a sensor for such event sources, you can maintain a cursor that limits the number of yielded run requests for previously processed events. The sensor context, provided to every sensor evaluation function, has a cursor property and a update_cursor method for sensors to track state across evaluations:

  • cursor - A cursor field on SensorEvaluationContext that returns the last persisted cursor value from a previous evaluation
  • update_cursor - A method on SensorEvaluationContext that takes a string to persist and make available to future evaluations

Here is a somewhat contrived example of our directory file sensor using a cursor for updated files:

@sensor(job=log_file_job)
def my_directory_sensor_cursor(context):
    last_mtime = float(context.cursor) if context.cursor else 0

    max_mtime = last_mtime
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            fstats = os.stat(filepath)
            file_mtime = fstats.st_mtime
            if file_mtime <= last_mtime:
                continue

            # the run key should include mtime if we want to kick off new runs based on file modifications
            run_key = f"{filename}:{file_mtime}"
            run_config = {"ops": {"process_file": {"config": {"filename": filename}}}}
            yield RunRequest(run_key=run_key, run_config=run_config)
            max_mtime = max(max_mtime, file_mtime)

    context.update_cursor(str(max_mtime))

For sensors that consume multiple event streams, you may need to serialize and deserialize a more complex data structure in and out of the cursor string to keep track of the sensor's progress over the multiple streams.

Note also that in the example above, both a run_key and cursor are being used. This means that if the cursor is reset but the target files don't change, new runs will not be launched because the run keys have not changed. If you want the ability to fully reset the state of your sensor by resetting the cursor, then you should not set run_key on RunRequest.


Evaluation interval#

By default, the Dagster daemon runs a sensor 30 seconds after that sensor's previous evaluation finishes executing. You can configure the interval using the minimum_interval_seconds argument on the @sensor decorator.

It's important to note that this interval represents a minimum interval between runs of the sensor and not the exact frequency the sensor runs. If you have a sensor that takes 30 seconds to complete, but the minimum_interval_seconds is 5 seconds, the fastest Dagster daemon will run the sensor is every 35 seconds. The minimum_interval_seconds only guarantees that the sensor is not evaluated more frequently than the given interval.

For example, here are two sensors that specify two different minimum intervals:

@sensor(job=my_job, minimum_interval_seconds=30)
def sensor_A():
    yield RunRequest(run_key=None, run_config={})


@sensor(job=my_job, minimum_interval_seconds=45)
def sensor_B():
    yield RunRequest(run_key=None, run_config={})

These sensor definitions are short, so they run in less than a second. Therefore, you can expect these sensors to run consistently around every 30 and 45 seconds, respectively.

If a sensor evaluation function takes more than 60 seconds to return its results, the sensor evaluation will time out and the Dagster daemon will move on to the next sensor without submitting any runs. This 60 second timeout only applies to the time it takes to run the sensor function, not to the execution time of the runs submitted by the sensor. To avoid timeouts, slower sensors can break up their work into chunks, using cursors to let subsequent sensor calls pick up where the previous call left off.


Skipping sensor evaluations#

For debugging purposes, it's often useful to describe why a sensor might not yield any runs for a given evaluation. The sensor evaluation function can yield a SkipReason with a string description that will be displayed in the UI.

For example, here is our directory sensor that now provides a SkipReason when no files are encountered:

@sensor(job=log_file_job)
def my_directory_sensor_with_skip_reasons():
    has_files = False
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            yield RunRequest(
                run_key=filename,
                run_config={
                    "ops": {"process_file": {"config": {"filename": filename}}}
                },
            )
            has_files = True
    if not has_files:
        yield SkipReason(f"No files found in {MY_DIRECTORY}.")

Using resources in sensors#

Dagster's resources system can be used with sensors to make it easier to call out to external systems and to make components of a sensor easier to plug in for testing purposes.

To specify resource dependencies, annotate the resource as a parameter to the sensor's function. Resources are provided by attaching them to your Definitions call.

Here, a resource is provided which provides access to an external API. The same resource could be used in the job that the sensor triggers.

from dagster import (
    sensor,
    RunRequest,
    SensorEvaluationContext,
    ConfigurableResource,
    job,
    Definitions,
    RunConfig,
)
import requests
from typing import List

class UsersAPI(ConfigurableResource):
    url: str

    def fetch_users(self) -> List[str]:
        return requests.get(self.url).json()

@job
def process_user(): ...

@sensor(job=process_user)
def process_new_users_sensor(
    context: SensorEvaluationContext,
    users_api: UsersAPI,
):
    last_user = int(context.cursor) if context.cursor else 0
    users = users_api.fetch_users()

    num_users = len(users)
    for user_id in users[last_user:]:
        yield RunRequest(
            run_key=user_id,
            tags={"user_id": user_id},
        )

    context.update_cursor(str(num_users))

defs = Definitions(
    jobs=[process_user],
    sensors=[process_new_users_sensor],
    resources={"users_api": UsersAPI(url="https://my-api.com/users")},
)

For more information on resources, refer to the Resources documentation. To see how to test schedules with resources, refer to the section on Testing sensors with resources.


Logging in sensors#

Sensor logs are stored in your Dagster instance's compute log storage. You should ensure that your compute log storage is configured to view your sensor logs.

Any sensor can emit log messages during its evaluation function:

@sensor(job=the_job)
def logs_then_skips(context):
    context.log.info("Logging from a sensor!")
    return SkipReason("Nothing to do")

These logs can be viewed when inspecting a tick in the tick history view on the corresponding sensor page.


Testing sensors#

Via the Dagster UI#

Before you test: Test evaluations of sensors run the sensor's underlying Python function, meaning that any side effects contained within that sensor's function may be executed.

In the UI, you can manually trigger a test evaluation of a sensor and view the results.

  1. Click Overview > Sensors.

  2. Click the sensor you want to test.

  3. Click the Test Sensor button, located near the top right corner of the page.

  4. You'll be prompted to provide a cursor value. You can use the existing cursor for the sensor (which will be prepopulated) or enter a different value. If you're not using cursors, leave this field blank.

  5. Click Evaluate to fire the sensor. A window containing the result of the evaluation will display, whether it's run requests, a skip reason, or a Python error:

    If the run was successful, then for each produced run request, you can open the launchpad pre-scaffolded with the config produced by that run request. You'll also see a new computed cursor value from the evaluation, with the option to persist the value.


Monitoring sensors in the Dagster UI#

Using the UI, you can monitor and operate sensors. The UI provides multiple views that help with observing sensor evaluations, skip reasons, and errors.

To view all sensors, navigate to Overview > Sensors. Here, you can start and stop sensors, and view their frequency, last tick, and last run:

Click on any sensor to test the sensor, monitor all sensor evaluations on a timeline, and view a table of runs launched by the sensor.


Run status sensors#

If you want to act on the status of a job run, Dagster provides a way to create a sensor that reacts to run statuses. You can use run_status_sensor with a specified DagsterRunStatus to decorate a function that will run when the given status occurs. This can be used to launch runs of other jobs, send alerts to a monitoring service on run failure, or report a run success.

Here is an example of a run status sensor that launches a run of status_reporting_job if a run is successful:

@run_status_sensor(
    run_status=DagsterRunStatus.SUCCESS,
    request_job=status_reporting_job,
)
def report_status_sensor(context):
    # this condition prevents the sensor from triggering status_reporting_job again after it succeeds
    if context.dagster_run.job_name != status_reporting_job.name:
        run_config = {
            "ops": {
                "status_report": {"config": {"job_name": context.dagster_run.job_name}}
            }
        }
        return RunRequest(run_key=None, run_config=run_config)
    else:
        return SkipReason("Don't report status of status_reporting_job")

request_job is the job that will be run when the RunRequest is returned.

Note that in report_status_sensor we conditionally return a RunRequest. This ensures that when report_status_sensor runs status_reporting_job it doesn't enter an infinite loop where the success of status_reporting_job triggers another run of status_reporting_job, which triggers another run, and so on.

Here is an example of a sensor that reports job success in a Slack message:

from dagster import run_status_sensor, RunStatusSensorContext, DagsterRunStatus


@run_status_sensor(run_status=DagsterRunStatus.SUCCESS)
def my_slack_on_run_success(context: RunStatusSensorContext):
    slack_client = WebClient(token=os.environ["SLACK_DAGSTER_ETL_BOT_TOKEN"])

    slack_client.chat_postMessage(
        channel="#alert-channel",
        text=f'Job "{context.dagster_run.job_name}" succeeded.',
    )

When a run status sensor is triggered by a job run but doesn't return anything, Dagster will report an event back to the run to indicate that the sensor ran.

Once you have written your sensor, you can add the sensor to a Definitions object so it can be enabled and used the same as other sensors:

from dagster import Definitions


defs = Definitions(jobs=[my_sensor_job], sensors=[my_slack_on_run_success])

Run failure sensor#

Dagster provides a set of special run status sensor decorators for defining sensors that monitor run failure events. You can use run_failure_sensor to decorate a function that will run when a run fails:

@run_failure_sensor(request_job=status_reporting_job)
def report_failure_sensor(context):
    run_config = {
        "ops": {"status_report": {"config": {"job_name": context.dagster_run.job_name}}}
    }
    return RunRequest(run_key=None, run_config=run_config)

This run failure sensor sends a Slack message when it runs:

import os
from dagster import run_failure_sensor, RunFailureSensorContext
from slack_sdk import WebClient


@run_failure_sensor
def my_slack_on_run_failure(context: RunFailureSensorContext):
    slack_client = WebClient(token=os.environ["SLACK_DAGSTER_ETL_BOT_TOKEN"])

    slack_client.chat_postMessage(
        channel="#alert-channel",
        text=(
            f'Job "{context.dagster_run.job_name}" failed. Error:'
            f" {context.failure_event.message}"
        ),
    )

Out-of-box run failure sensors#

Dagster also provides the following out-of-box run failure sensors:

make_slack_on_run_failure_sensor#
make_slack_on_run_failure_sensor helps you create a run failure sensor that will message a given Slack channel:
from dagster_slack import make_slack_on_run_failure_sensor

slack_on_run_failure = make_slack_on_run_failure_sensor(
    "#my_channel",
    os.getenv("MY_SLACK_TOKEN"),
)

make_email_on_run_failure_sensor#
make_email_on_run_failure_sensor helps you create a run failure sensor that will send emails via the SMTP protocol:
from dagster import make_email_on_run_failure_sensor


email_on_run_failure = make_email_on_run_failure_sensor(
    email_from="no-reply@example.com",
    email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
    email_to=["xxx@example.com", "xyz@example.com"],
)

To set up success or failure handling policies on ops, refer to the Op hooks documentation.

Cross-code location run status sensors#

Sometimes, you may want to monitor jobs in a code location other than the one where the sensor is defined. You can use special identifiers CodeLocationSelector and JobSelector to tell a run status sensor to monitor jobs in another code location:

@run_status_sensor(
    monitored_jobs=[CodeLocationSelector(location_name="defs")],
    run_status=DagsterRunStatus.SUCCESS,
)
def code_location_a_sensor():
    # when any job in code_location_a succeeds, this sensor will trigger
    send_slack_alert()


@run_failure_sensor(
    monitored_jobs=[
        JobSelector(
            location_name="defs",
            repository_name="code_location_a",
            job_name="data_update",
        )
    ],
)
def code_location_a_data_update_failure_sensor():
    # when the data_update job in code_location_a fails, this sensor will trigger
    send_slack_alert()

You can also monitor every job in your Dagster deployment by specifying monitor_all_code_locations=True on the sensor decorator. Note that monitor_all_code_locations cannot be used along with jobs specified via monitored_jobs.

@run_status_sensor(
    monitor_all_code_locations=True,
    run_status=DagsterRunStatus.SUCCESS,
)
def sensor_monitor_all_code_locations():
    # when any job in the Dagster deployment succeeds, this sensor will trigger
    send_slack_alert()

Testing run status sensors#

As with other sensors, you can directly invoke run status sensors. However, the context provided via run_status_sensor and run_failure_sensor contain objects that are typically only available during run time. Below you'll find code snippets that demonstrate how to build the context so that you can directly invoke your function in unit tests.

If you had written a status sensor like this (assuming you implemented the function email_alert elsewhere):

@run_status_sensor(run_status=DagsterRunStatus.SUCCESS)
def my_email_sensor(context: RunStatusSensorContext):
    message = f'Job "{context.dagster_run.job_name}" succeeded.'
    email_alert(message)

We can first write a simple job that will succeed:

@op
def succeeds():
    return 1


@job
def my_job_succeeds():
    succeeds()

Then we can execute this job and pull the attributes we need to build the context. We provide a function build_run_status_sensor_context that will return the correct context object:

# execute the job
instance = DagsterInstance.ephemeral()
result = my_job_succeeds.execute_in_process(instance=instance)

# retrieve the DagsterRun
dagster_run = result.dagster_run

# retrieve a success event from the completed execution
dagster_event = result.get_job_success_event()

# create the context
run_status_sensor_context = build_run_status_sensor_context(
    sensor_name="my_email_sensor",
    dagster_instance=instance,
    dagster_run=dagster_run,
    dagster_event=dagster_event,
)

# run the sensor
my_email_sensor(run_status_sensor_context)

We have provided convenience functions ExecuteInProcessResult.get_job_success_event and ExecuteInProcessResult.get_job_failure_event for retrieving DagsterRunStatus.SUCCESS and DagsterRunStatus.FAILURE events, respectively. If you have a run status sensor triggered on another status, you can retrieve all events from result and filter based on your event type.

We can use the same pattern to build the context for run_failure_sensor. If we wanted to test this run failure sensor:

@run_failure_sensor
def my_email_failure_sensor(context: RunFailureSensorContext):
    message = (
        f'Job "{context.dagster_run.job_name}" failed. Error:'
        f" {context.failure_event.message}"
    )
    email_alert(message)

We first need to make a simple job that will fail:

from dagster import op, job


@op
def fails():
    raise Exception("failure!")


@job
def my_job_fails():
    fails()

Then we can execute the job and create our context:

from dagster import DagsterInstance, build_run_status_sensor_context

# execute the job
instance = DagsterInstance.ephemeral()
result = my_job_fails.execute_in_process(instance=instance, raise_on_error=False)

# retrieve the DagsterRun
dagster_run = result.dagster_run

# retrieve a failure event from the completed job execution
dagster_event = result.get_job_failure_event()

# create the context
run_failure_sensor_context = build_run_status_sensor_context(
    sensor_name="my_email_failure_sensor",
    dagster_instance=instance,
    dagster_run=dagster_run,
    dagster_event=dagster_event,
).for_run_failure()

# run the sensor
my_email_failure_sensor(run_failure_sensor_context)

Note the additional function call RunStatusSensorContext.for_run_failure after creating the context. The context provided by run_failure_sensor is a subclass of the context provided by run_status_sensor and can be built using this additional call.


Examples#

S3 sensors#

For jobs that should initiate new runs for new paths in an S3 bucket, the dagster-aws package provides the useful helper function get_s3_keys.

Here is an example of a sensor that listens to a particular S3 bucket my_s3_bucket:

from dagster_aws.s3.sensor import get_s3_keys


@sensor(job=my_job)
def my_s3_sensor(context):
    since_key = context.cursor or None
    new_s3_keys = get_s3_keys("my_s3_bucket", since_key=since_key)
    if not new_s3_keys:
        return SkipReason("No new s3 files found for bucket my_s3_bucket.")
    last_key = new_s3_keys[-1]
    run_requests = [RunRequest(run_key=s3_key, run_config={}) for s3_key in new_s3_keys]
    context.update_cursor(last_key)
    return run_requests

See it in action#

For more examples of sensors, check out the following in our Hacker News example: