DagsterDocs

Sensors#

Sensors allow you to instigate runs based on any external state change.

Relevant APIs#

NameDescription
@sensorThe decorator used to define a sensor. The decorated function is called the evaluation_fn. The decorator returns a SensorDefinition
RunRequestThe sensor evaluation function can yield one or more run requests. Each run request creates a pipeline run.
SkipReasonIf a sensor evaluation doesn't yield any run requests, it can instead yield a skip reason to log why the evaluation was skipped or why there were no events to be processed.
SensorDefinitionClass for sensors. You almost never want to use initialize this class directly. Instead, you should use the @sensor which returns a SensorDefinition
SensorEvaluationContextThe context object passed to a sensor evaluation function.
build_sensor_contextA function that constructs an instance of SensorEvaluationContext, This is intended to be used to test a sensor.
@asset_sensorThe decorator used to define an asset sensor. The decorated function is an evaluation function that takes in a SensorEvaluationContext and an asset materialization event. The decorator returns an AssetSensorDefinition
AssetSensorDefinitionA special sensor definition class for asset sensors. You almost never want to use initialize this class directly. Instead, you should use the @asset_sensor which returns a AssetSensorDefinition

Overview#

Sensors are definitions in Dagster that allow you to instigate runs based on some external state change automatically. For example, you can:

  • Launch a run whenever a file appears in an s3 bucket
  • Launch a run whenever another pipeline materializes a specific asset
  • Launch a run whenever an external system is down

Sensors have several important properties:

  • Each sensor targets a specific pipeline
  • A sensor optionally defines tags, a mode, and a solid selection for the targeted pipeline.
  • A sensor defines an evaluation function that returns either:
    • One or more RunRequest objects. Each run request launches a run.
    • An optional SkipReason, which specifies a message which describes why no runs were requested.

The Dagster Daemon runs each sensor evaluation function on a tight loop. If you are using sensors, make sure to follow the instructions on the DagsterDaemon page to run your sensors.

Defining a sensor#

To define a sensor, use the @sensor decorator. The decorated function is called the execution_fn and can optionally have a context as the first argument. The context is a SensorEvaluationContext.

Let's say you have a pipeline that logs a filename that is specified in the solid configuration of the process_file solid:

from dagster import solid, pipeline


@solid(config_schema={"filename": str})
def process_file(context):
    filename = context.solid_config["filename"]
    context.log.info(filename)


@pipeline
def log_file_pipeline():
    process_file()

You can write a sensor that watches for new files in a specific directory and yields a RunRequest for each new file in the directory. By default, this sensor every 30 seconds.

import os
from dagster import sensor, RunRequest


@sensor(pipeline_name="log_file_pipeline")
def my_directory_sensor():
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            yield RunRequest(
                run_key=filename,
                run_config={"solids": {"process_file": {"config": {"filename": filename}}}},
            )

This sensor iterates through all the files in MY_DIRECTORY and yields a RunRequest for each file.

Once my_directory_sensor is added to a repository with log_file_pipeline, it can be enabled and used.

Idempotence and Cursors#

When instigating runs based on external events, you usually want to run exactly one pipeline run for each event. There are two ways to define your sensors to avoid creating duplicate runs for your events: using run_key and using a cursor.

Idempotence using run keys#

In the example sensor above, the RunRequest is constructed with a run_key.

yield RunRequest(
        run_key=filename,
        run_config={"solids": {"process_file": {"config": {"filename": filename}}}},
    )

Dagster guarantees that for a given sensor, at most one run is created for each RunRequest with a unique run_key. If a sensor yields a new run request with a previously used run_key, Dagster skips processing the new run request.

In the example, a RunRequest is requested for each file during every sensor evaluation. Therefore, for a given sensor evaluation, there already exists a RunRequest with a run_key for any file that existed during the previous sensor evaluation. Dagster skips processing duplicate run requests, so Dagster launches runs for only the files added since the last sensor evaluation. The result is exactly one run per file.

Run keys allow you to write sensor evaluation functions that declaratively describe what pipeline runs should exist, and helps you avoid the need for more complex logic that manages state. However, when dealing with high-volume external events, some state-tracking optimizations might be necessary.

Sensor optimizations using cursors#

When writing a sensor that deals with high-volume events, it might not be feasible to yield a RunRequest during every sensor evaluation. For example, you may have an s3 storage bucket that contains thousands of files.

When writing a sensor for such event sources, you can maintain a cursor that limits the number of yielded run requests for previously processed events. The sensor context, provided to every sensor evaluation function, has a cursor property and a update_cursor method for sensors to track state across evaluations.

  • cursor: A cursor field on SensorEvaluationContext that returns the last persisted cursor value from a previous evaluation.
  • update_cursor: A method on SensorEvaluationContext that takes a string to persist and make available to future evaluations.

Here is a somewhat contrived example of our directory file sensor using a cursor for updated files.

@sensor(pipeline_name="log_file_pipeline")
def my_directory_sensor_cursor(context):
    last_mtime = float(context.cursor) if context.cursor else 0

    max_mtime = last_mtime
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            fstats = os.stat(filepath)
            file_mtime = fstats.st_mtime
            if file_mtime <= last_mtime:
                continue

            # the run key should include mtime if we want to kick off new runs based on file modifications
            run_key = f"{filename}:{str(file_mtime)}"
            run_config = {"solids": {"process_file": {"config": {"filename": filename}}}}
            yield RunRequest(run_key=run_key, run_config=run_config)
            max_mtime = max(max_mtime, file_mtime)

    context.update_cursor(str(max_mtime))

For sensors that consume multiple event streams, you may need to serialize and deserialize a more complex data structure in and out of the cursor string to keep track of the sensor's progress over the multiple streams.

Evaluation Interval#

By default, the Dagster Daemon runs a sensor 30 seconds after the previous sensor evaluation finishes executing. You can configure the interval using the minimum_interval_seconds argument on the @sensor decorator.

It's important to note that this interval represents a minimum interval between runs of the sensor and not the exact frequency the sensor runs. If you have a sensor that takes 2 minutes to complete, but the minimum_interval_seconds is 5 seconds, the fastest Dagster Daemon will run the sensor is every 2 minutes and 5 seconds. The minimum_interval_seconds only guarantees that the sensor is not evaluated more frequently than the given interval.

For example, here are two sensors that specify two different minimum intervals:

@sensor(pipeline_name="my_pipeline", minimum_interval_seconds=30)
def sensor_A():
    yield RunRequest(run_key=None, run_config={})


@sensor(pipeline_name="my_pipeline", minimum_interval_seconds=45)
def sensor_B():
    yield RunRequest(run_key=None, run_config={})

These sensor definitions are short, so they run in less than a second. Therefore, you can expect these sensors to run consistently around every 30 and 45 seconds, respectively.

Skipping sensor evaluations#

For debugging purposes, it is often useful to describe why a sensor might not yield any runs for a given evaluation. The sensor evaluation function can yield a SkipReason with a string description that will be displayed in Dagit.

For example, here is our directory sensor that now provides a SkipReason when no files are encountered:

@sensor(pipeline_name="log_file_pipeline")
def my_directory_sensor_with_skip_reasons():
    has_files = False
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            yield RunRequest(
                run_key=filename,
                run_config={"solids": {"process_file": {"config": {"filename": filename}}}},
            )
            has_files = True
    if not has_files:
        yield SkipReason(f"No files found in {MY_DIRECTORY}.")

Testing sensors#

To quickly preview what an existing sensor would generate when evaluated, you can run the CLI command dagster sensor preview my_sensor_name.

In order to unit test sensors, you can invoke the sensor directly. This will return all the run requests yielded by the sensor. The config obtained from these can be validated using the validate_run_config function.

from dagster import validate_run_config


@sensor(pipeline_name="log_file_pipeline")
def sensor_to_test():
    yield RunRequest(
        run_key="foo",
        run_config={"solids": {"process_file": {"config": {"filename": "foo"}}}},
    )


def test_sensor():
    for run_request in sensor_to_test():
        assert validate_run_config(log_file_pipeline, run_request.run_config)

Notice that since we did not use the context argument in our sensor, we don't have to provide a context object. However, if we do in fact need the context object for our sensor, we can provide it via build_sensor_context. Consider again the my_directory_sensor_cursor example.

@sensor(pipeline_name="log_file_pipeline")
def my_directory_sensor_cursor(context):
    last_mtime = float(context.cursor) if context.cursor else 0

    max_mtime = last_mtime
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            fstats = os.stat(filepath)
            file_mtime = fstats.st_mtime
            if file_mtime <= last_mtime:
                continue

            # the run key should include mtime if we want to kick off new runs based on file modifications
            run_key = f"{filename}:{str(file_mtime)}"
            run_config = {"solids": {"process_file": {"config": {"filename": filename}}}}
            yield RunRequest(run_key=run_key, run_config=run_config)
            max_mtime = max(max_mtime, file_mtime)

    context.update_cursor(str(max_mtime))

This sensor makes use of the context argument, and thus to invoke it, we need to provide one.

from dagster import build_sensor_context


def test_my_directory_sensor_cursor():
    context = build_sensor_context(cursor="0")
    for run_request in my_directory_sensor_cursor(context):
        assert validate_run_config(log_file_pipeline, run_request.run_config)

Monitoring sensors in Dagit#

You can monitor and operate sensors in Dagit. There are multiple views that help with observing sensor evaluations, skip reasons, and errors.

To view the sensors page, click the "All sensors" in the left-hand navigation pane. Here you can turn sensors on and off using the toggle.

All Sensors

If you click on any sensor, you can monitor all sensor evaluations and runs created:

Sensor A

If your sensor throws an error or yields a skip reason, the sensor timeline view will display more information about the errors and skips:

My Directory Sensor

Asset sensors#

A useful pattern is to create a sensor that checks for new AssetMaterialization events for a particular asset key. This can be used to kick off a pipeline that computes downstream assets or notifies appropriate stakeholders.

One benefit of this pattern is that it enables cross-pipeline and even cross-repository dependencies. Each pipeline run instigated by an asset sensor is agnostic to the pipeline event that caused it.

Dagster provides a special asset sensor definition format for sensors that fire a single RunRequest based on a single asset materialization. Here is an example of a sensor that generates a RunRequest for every materialization for the asset key my_table:

from dagster import AssetKey, asset_sensor


@asset_sensor(asset_key=AssetKey("my_table"), pipeline_name="my_pipeline")
def my_asset_sensor(context, asset_event):
    yield RunRequest(
        run_key=context.cursor,
        run_config={
            "solids": {
                "read_materialization": {
                    "config": {
                        "asset_key": asset_event.asset_key.path,
                        "pipeline": asset_event.pipeline_name,
                    }
                }
            }
        },
    )

Multi-asset sensors#

Multi-asset sensors, which can trigger pipeline executions based on some combination of states from multiple asset materialization event streams, can be handled using the base sensor definition and manual cursor management based on the asset event streams. These asset event streams can be queried using the instance attribute off of the SensorEvaluationContext object.

import json
from dagster import EventRecordsFilter, DagsterEventType


@sensor(pipeline_name="my_pipeline")
def multi_asset_sensor(context):
    cursor_dict = json.loads(context.cursor) if context.cursor else {}
    a_cursor = cursor_dict.get("a")
    b_cursor = cursor_dict.get("b")

    a_event_records = context.instance.get_event_records(
        EventRecordsFilter(
            event_type=DagsterEventType.ASSET_MATERIALIZATION,
            asset_key=AssetKey("table_a"),
            after_cursor=a_cursor,
        ),
        ascending=False,
        limit=1,
    )
    b_event_records = context.instance.get_event_records(
        EventRecordsFilter(
            event_type=DagsterEventType.ASSET_MATERIALIZATION,
            asset_key=AssetKey("table_a"),
            after_cursor=b_cursor,
        ),
        ascending=False,
        limit=1,
    )

    if not a_event_records or not b_event_records:
        return

    # make sure we only generate events if both table_a and table_b have been materialized since
    # the last evaluation.
    yield RunRequest(run_key=None)

    # update the sensor cursor by combining the individual event cursors from the two separate
    # asset event streams
    context.update_cursor(
        json.dumps(
            {
                "a": a_event_records[0].storage_id,
                "b": b_event_records[0].storage_id,
            }
        )
    )

Run status sensors#

If you want to act on the status of a pipeline run, Dagster provides a way to create a sensor that reacts to run statuses. You can use run_status_sensor with a specified PipelineRunStatus to decorate a function that will run when the given status occurs. This can be used to send alerts to a monitoring service on pipeline failure or report a run success.

Here is an example of a sensor that reports pipeline success:

from dagster import run_status_sensor, RunStatusSensorContext, PipelineRunStatus


@run_status_sensor(pipeline_run_status=PipelineRunStatus.SUCCESS)
def my_slack_on_pipeline_success(context: RunStatusSensorContext):
    slack_client = WebClient(token=os.environ["SLACK_DAGSTER_ETL_BOT_TOKEN"])

    slack_client.chat_postMessage(
        channel="#alert-channel",
        message=f'Pipeline "{context.pipeline_run.pipeline_name}" succeeded.',
    )

Then, you can add the sensor to a repository so it can be enabled and used the same as other sensors:

from dagster import repository


@repository
def my_repository():
    return my_pipelines + [my_slack_on_pipeline_success]

Pipeline failure sensor#

Dagster provides a set of special run status sensor decorators for defining sensors that monitor pipeline failure events. You can use pipeline_failure_sensor to decorate a function that will run when a pipeline run fails.

For example, you can write a sensor that sends a slack message when it runs using this decorator:

import os
from dagster import pipeline_failure_sensor, PipelineFailureSensorContext
from slack_sdk import WebClient


@pipeline_failure_sensor
def my_slack_on_pipeline_failure(context: PipelineFailureSensorContext):
    slack_client = WebClient(token=os.environ["SLACK_DAGSTER_ETL_BOT_TOKEN"])

    slack_client.chat_postMessage(
        channel="#alert-channel",
        message=f'Pipeline "{context.pipeline_run.pipeline_name}" failed. Error: {context.failure_event.message}',
    )

Dagster also provides the following out-of-box pipeline failure sensors:

make_slack_on_pipeline_failure_sensor helps you create a pipeline failure sensor that will message a given Slack channel:
from dagster_slack import make_slack_on_pipeline_failure_sensor

slack_on_pipeline_failure = make_slack_on_pipeline_failure_sensor(
    "#my_channel", os.getenv("MY_SLACK_TOKEN")
)
make_email_on_pipeline_failure_sensor helps you create a pipeline failure sensor that will send emails via the SMTP protocol:
from dagster.utils import make_email_on_pipeline_failure_sensor


email_on_pipeline_failure = make_email_on_pipeline_failure_sensor(
    email_from="no-reply@example.com",
    email_password=os.getenv("ALERT_EMAIL_PASSWORD"),
    email_to=["xxx@example.com", "xyz@example.com"],
)

Besides, if you would like to set up success or failure handling policies on solids, you can find more information on the Solid Hooks page.

Examples#

S3 sensors#

For pipelines that should initiate new runs for new paths in an s3 bucket, the dagster-aws package provides the useful helper function get_s3_keys.

Here is an example of a sensor that listens to a particular s3 bucket my_s3_bucket:

from dagster_aws.s3.sensor import get_s3_keys


@sensor(pipeline_name="my_pipeline")
def my_s3_sensor(context):
    new_s3_keys = get_s3_keys("my_s3_bucket", since_key=context.last_run_key)
    if not new_s3_keys:
        yield SkipReason("No new s3 files found for bucket my_s3_bucket.")
        return
    for s3_key in new_s3_keys:
        yield RunRequest(run_key=s3_key, run_config={})