Limiting concurrency in data pipelines#

Concurrency is an essential concept in modern programming, particularly when working with data pipelines. While concurrency can improve the efficiency of your pipelines, too many processes executing at once can cause issues. Limiting concurrency in your data pipelines can help prevent performance problems and downtime.

By the end of this guide, you'll:

  • Understand the basics of concurrency
  • Learn about the options Dagster offers for limiting concurrency
  • Learn how to configure concurrency in Dagster
  • Understand how to troubleshoot run queueing issues

Understanding the basics#

Concurrency is the ability of a system to execute multiple processes in parallel. Before we go any further, let's go over the terminology in this guide:

TermDefinition
OpThe core unit of computation in Dagster, an op is a function that performs a task — for example, sending an email or kicking off a job in dbt.
Software-defined assetA Software-defined asset is a Dagster object that couples an asset to the function and upstream assets used to produce its contents. An asset is an object in persistent storage, such as a table, file, or persisted machine learning model.
JobA job is a graph of ops or assets connected via data dependencies. Jobs are the main unit of execution and monitoring in Dagster.
RunA run is a single execution of a job in Dagster.
Run queueA sequence of Dagster runs waiting to be executed.
ExecutorExecutors are responsible for executing steps within a job run. Executors can range from single-process serial executors to managing per-step computational resources with a control plane.

Understanding Dagster’s concurrency limit options#

Dagster supports placing limits on the number of processes that can be in progress at a single time, at the run and op-level. Depending on your needs, one or both types may be suitable:

  • Run concurrency controls the total number of runs in a deployment that can execute at the same time. Run concurrency applies to both op and asset-based jobs and all code locations in a single deployment. Any runs beyond the limit will be queued and won’t use any compute.
  • Op/asset concurrency controls the number of ops/assets that can execute in parallel in a single run.
  • Global op/asset concurrency
    Experimental
    controls the number of ops/assets that can execute in parallel across all runs.

By limiting the number of parallel processes Dagster executes, you can ensure that resources are not overwhelmed and that each process has the resources it needs to run efficiently. This can lead to faster and more reliable pipeline execution, as well as easier monitoring and debugging of issues.


Configuring run-level concurrency#

Run concurrency can be configured by:

  • Limiting the overall number of runs in a deployment
  • Specifying limits using tags, which is useful for applying limits to specific jobs. For example, jobs that share a resource could all have the same tag.

These methods of limiting concurrency can be used individually or together. For each queued run, Dagster will check that launching the run will satisfy all conditions. For example, if launching a run would exceed the maximum number of concurrent runs, Dagster will queue the run until another run finishes.

Note: Run-level concurrency rules can't currently be used to prevent a run from being submitted to the queue; they only control limits for the number of runs in the queue that will be launched at once. If you want to prevent runs from ever being launched if another job is already running, check out this Github discussion for an example.

Limiting overall runs#

How you limit the overall number of concurrent runs in a deployment depends on whether you're using Dagster Cloud or Dagster Open Source:

To enable this limit, use run_queue.max_concurrent_runs. For example, the following would limit the number of concurrent runs for the deployment to 15:

run_queue:
  max_concurrent_runs: 15

When defining a value for max_concurrent_runs, keep the following in mind:

  • This setting defaults to 10
  • Disable the setting with a -1 value, which means no limit will be enforced. Note: All other negative numbers are invalid, and disabling this setting isn't supported for Dagster Cloud.
  • A value of 0 prevents any runs from launching

Configuring op/asset-level concurrency#

Looking for global op/asset concurrency? Check out Dagster's experimental support for global op/asset concurrency limits. For more fine-grained control, you can also check out our integration with Celery.

Utilizing op/asset-level concurrency provides fine-grained control for the maximum number of ops/assets that can be executed at once within a single run, ensuring that shared resources aren't overwhelmed.

Op and asset-level concurrency is enabled by configuring a job executor. Using the executor, you can:

  • Limit the number of ops/assets that can execute in a single run at once
  • Specify limits using tags

Limiting overall concurrency in a job#

To limit concurrency for ops and assets in jobs, use max_concurrent in the job’s config, either in Python or using the Launchpad in the Dagster UI.

Note: The examples in this section use the multiprocess_executor, which uses the multiprocess key. This key will change depending on the type of executor you're using. Refer to the Job executor documentation for more info.

While max_concurrent is used to limit concurrency in both asset and op-based jobs, how jobs are defined differs between assets and ops. Click the tabs below for examples.

Asset-based jobs#

Asset jobs are defined using define_asset_job. In this example, using the multiprocess_executor, the job will execute up to three assets at once:

assets_job = define_asset_job(
    name="assets_job",
    config={
        "execution": {
            "config": {
                "multiprocess": {
                    "max_concurrent": 3,      # limits concurrent assets to 3
                },
            }
        }
    }
)

Limiting concurrency using tags#

Limits can be specified for all ops/assets with a specific tag key or key-value pair. If any limit would be exceeded by launching an op/asset, then the op/asset will be queued.

To specify limits on the number of ops/assets with a specific tag, use tag_concurrency_limits in the job’s config, either in Python or using the Launchpad in the Dagster UI.

Note: The examples in this section use the multiprocess_executor, which uses the multiprocess key. This key will change depending on the type of executor you're using. Refer to the Job executor documentation for more info.

While tag_concurrency_limits is used to limit concurrency in both asset and op-based jobs, how jobs are defined differs between assets and ops. Click the tabs below for examples.

Asset-based jobs#

Unlike op-based jobs, asset jobs use the op_tags field on each asset when checking them for tag concurrency limits. In this example, using the multiprocess_executor, the job will execute up to three assets at once with the database tag equal to snowflake:

# example asset with tags, specified using op_tags
@asset(op_tags={"database": "snowflake"})
def asset1():
    ...

assets_job = define_asset_job(
    name="assets_job",
    config={
        "execution": {
            "config": {
                "multiprocess": {
                    "tag_concurrency_limits": [
                        {
                            "key": "database",
                            "value": "snowflake",
                            "limit": 3,
                        }
                    ],
                },
            }
        }
    }
)

Limiting op/asset concurrency across runs#

This feature is experimental and is only supported with Postgres/MySQL storages.

Limits can be specified on the Dagster instance using the special op tag dagster/concurrency_key. If this instance limit would be exceeded by launching an op/asset, then the op/asset will be queued.

For example, to globally limit the number of running ops touching Redshift to two, the op/asset must be first tagged with the global concurrency key:

@op(tags={"dagster/concurrency_key": "redshift"})
def my_redshift_op():
    ...


@asset(op_tags={"dagster/concurrency_key": "redshift"})
def my_redshift_table():
    ...

To specify limits on the number of ops/assets tagged with a particular concurrency key, you can either use the dagster CLI command or the Dagster UI:

  • To specify a global concurrency limit using the CLI, use:

    dagster instance concurrency set <concurrency_key> <limit>
    
  • To specify a global concurrency limit using the Dagster UI, navigate to the Concurrency limits tab on the Deployment page.

The concurrency key should match the name that the op/asset is tagged with. For example, if the op/asset is tagged with dagster/concurrency_key: redshift, then the concurrency key should be redshift.

A default concurrency limit can be configured for the instance, for any concurrency keys that do not have an explicit limit set:

To enable this default value, use concurrency.default_op_concurrency_limit. For example, the following would set the default concurrency value for the deployment to 1:

concurrency:
  default_op_concurrency_limit: 1

Setting priority for ops/assets#

By default, slots will be allocated to op / asset execution on a first-in-first-out basis. However, steps from high priority runs will take precedence over steps from low priority runs, as determined by the priority tag on the individual run.

@job(tags={"dagster/priority": "3"})
def important_job():
    ...


@schedule(
    cron_schedule="* * * * *",
    job_name="important_job",
    execution_timezone="US/Central",
    tags={"dagster/priority": "-1"},
)
def less_important_schedule(_):
    ...

Priority can also be assigned based on the priority tag of the op/asset definition.

@op(tags={"dagster/concurrency_key": "foo", "dagster/priority": "3"})
def my_op():
    ...


@asset(op_tags={"dagster/concurrency_key": "foo", "dagster/priority": "3"})
def my_asset():
    ...

When the priority tag is set on both the run and the asset/op definition, the step priority will be the sum of the two values.

Freeing concurrency slots#

With a global concurrency limit set, it is useful to configure your instance to automatically free slots for canceled/failed runs. Certain execution errors can cause steps to exit without freeing an occupied concurrency slot.

To automatically recover from this state, you can configure your instance's run monitoring to automatically free concurrency slots for runs that have reached some terminal state (e.g. failed or canceled runs).

In dagster.yaml, you can configure the interval between a run end and the slot clean up like so:

run_monitoring:
  enabled: true
  free_slots_after_run_end_seconds: 300 # free any hanging concurrency slots after 5 minutes from the end of a run

Troubleshooting#

When limiting concurrency, you might run into some issues until you get the configuration right.

Runs going to STARTED status and skipping QUEUED#

This only applies to Dagster Open Source.

The run_queue key may not be set in your instance's settings. In the Dagster UI, navigate to Deployment > Configuration and verify that the run_queue key is set.

Runs remaining in QUEUED status#

The possible causes for runs remaining in QUEUED status depend on whether you're using Dagster Cloud or Dagster Open Source.

If runs aren't being dequeued in Dagster Cloud, the root causes could be:

  • If using a Hybrid deployment, the agent serving the deployment may be down. In this situation, runs will be paused.
  • Dagster Cloud is experiencing downtime. Check the status page for the latest on potential outages.