Sensors

Sensors allow you to instigate runs when some external state changes. For example, you might want to launch a run whenever a new file appears in an S3 bucket, or when an asset is materialized by another Dagster pipeline.

Each sensor:

  • Targets a pipeline
  • Defines an evaluation function that yields a number of RunRequest objects, each corresponding to a requested pipeline run
  • Optionally defines mode and solid_selection for the target pipeline

Defining a Sensor

To create a sensor, define a SensorDefinition and include it in your repository.

The core of a sensor definition is the evaluation function. This function checks some external state and yields a RunRequest for each pipeline run that should be executed. Each RunRequest consists a run config, a tags dictionary, and a run key string.

The dagster-daemon process is responsible for periodically invoking the evaluation function for each of your running sensors and launching runs from any RunRequests that are returned.

Example sensor

For example, consider a pipeline that takes a filepath as input and logs it:

pipeline.py
@solid(config_schema={"filename": str})
def process_file(context):
    filename = context.solid_config["filename"]
    context.log.info(filename)


@pipeline
def log_file_pipeline():
    process_file()

Below is a sensor that iterates through the contents of a directory and kicks off a job for each file that exists in the directory.

sensor.py
@sensor(pipeline_name="log_file_pipeline")
def my_directory_sensor(_context):
    for filename in os.listdir(MY_DIRECTORY):
        filepath = os.path.join(MY_DIRECTORY, filename)
        if os.path.isfile(filepath):
            yield RunRequest(
                run_key=filename,
                run_config={"solids": {"process_file": {"config": {"filename": filename}}}},
            )

Adding a new file to the directory will cause a new run for that file to be created the next time that the sensor is evaluated.

Idempotence using run keys

In the above example, we use the run_key parameter to enforce idempotence for each filename. Dagster will ensure that only one run is created for each unique run_key value for a given sensor. In the example, even though the full directory contents are requested on every sensor evaluation, only new files will actually instigate new runs due to the run key.

Run keys enable a sensor evaluation function to declaratively describe what runs should exist, rather than keeping track of cursors or timestamps. However, the last_run_key and last_completion_time attributes on the SensorExecutionContext that's passed into the sensor evaluation function can also be used to keep track of timestamps and cursors.

Sensors in Dagit

You can view and operate sensors in Dagit. To view the sensors page, click the "All sensors" button in the left-hand navigation pane.

Here, we can turn sensors on and off, and also see past executions of each sensor.

Sensors in Dagit

Testing sensors on the command-line

The dagster command-line tool contains a number of options to manage sensors. You can view the set of known sensors, turn them on and off, and preview the set of runs that will be requested in the next evaluation of the sensor.

You can view the full set of sensor command-line options by running dagster sensor --help.

Custom evaluation intervals

By default, sensors are configured to run roughly every 30 seconds, but they can be configured to run at a different interval by setting the minimum_interval_seconds parameter on the sensor decorator. The minimum interval specified on the sensor definition guarantees that the sensor will not be evaluated more frequently than that interval.

For example, here are two sensors that are defined with two different intervals:

sensor.py
@sensor(pipeline_name="my_pipeline", minimum_interval_seconds=30)
def sensor_A(_context):
    yield RunRequest(run_key=None, run_config={})


@sensor(pipeline_name="my_pipeline", minimum_interval_seconds=45)
def sensor_B(_context):
    yield RunRequest(run_key=None, run_config={})