Concurrency is an essential concept in modern programming, particularly when working with data pipelines. While concurrency can improve the efficiency of your pipelines, too many processes executing at once can cause issues. Limiting concurrency in your data pipelines can help prevent performance problems and downtime.
By the end of this guide, you'll:
Understand the basics of concurrency
Learn about the options Dagster offers for limiting concurrency
Learn how to configure concurrency in Dagster
Understand how to troubleshoot run queueing issues
Concurrency is the ability of a system to execute multiple processes in parallel. Before we go any further, let's go over the terminology in this guide:
An asset definition is a Dagster object that couples an asset to the function and upstream assets used to produce its contents. An asset is an object in persistent storage, such as a table, file, or persisted machine learning model.
Executors are responsible for executing steps within a job run. Executors can range from single-process serial executors to managing per-step computational resources with a control plane.
Dagster supports placing limits on the number of processes that can be in progress at a single time, at the run and op-level. Depending on your needs, one or both types may be suitable:
Run concurrency controls the total number of runs in a deployment that can execute at the same time. Run concurrency applies to both op and asset-based jobs and all code locations in a single deployment. Any runs beyond the limit will be queued and won’t use any compute.
Op/asset concurrency controls the number of ops/assets that can execute in parallel in a single run.
Global op/asset concurrency
(Experimental)
controls the number of ops/assets that can execute in parallel across all runs.
By limiting the number of parallel processes Dagster executes, you can ensure that resources are not overwhelmed and that each process has the resources it needs to run efficiently. This can lead to faster and more reliable pipeline execution, as well as easier monitoring and debugging of issues.
Limiting the overall number of runs in a deployment
Specifying limits using tags, which is useful for applying limits to specific jobs. For example, jobs that share a resource could all have the same tag.
These methods of limiting concurrency can be used individually or together. For each queued run, Dagster will check that launching the run will satisfy all conditions. For example, if launching a run would exceed the maximum number of concurrent runs, Dagster will queue the run until another run finishes.
Note: Run-level concurrency rules can't currently be used to prevent a run from being submitted to the queue; they only control limits for the number of runs in the queue that will be launched at once. If you want to prevent runs from ever being launched if another job is already running, check out this Github discussion for an example.
Dagster Open Source: Use your instance's dagster.yaml
To enable this limit, use run_queue.max_concurrent_runs. For example, the following would limit the number of concurrent runs for the deployment to 15:
run_queue:max_concurrent_runs:15
When defining a value for max_concurrent_runs, keep the following in mind:
This setting defaults to 10
Disable the setting with a -1 value, which means no limit will be enforced. Note: All other negative numbers are invalid, and disabling this setting isn't supported for Dagster+.
Dagster Open Source: Use your instance's dagster.yaml
To enable this limit, use run_queue.tag_concurrency_limits. This key accepts a list of tags and their corresponding concurrency limits.
run_queue:max_concurrent_runs:15tag_concurrency_limits:-key:"database"value:"redshift"# applies when the `database` tag has a value of `redshift`limit:4-key:"dagster/backfill"# applies when the `dagster/backfill` tag is present, regardless of valuelimit:10
Let’s review what this configuration will do:
For runs with a database tag with a value of redshift, a maximum of four runs can execute concurrently
For runs with a dagster/backfill tag, a maximum of 10 runs can execute concurrently. Note that this implementation applies to any value of the dagster/backfill tag, whereas the database: redshift example only applies the limit when there’s a specific tag value.
With this configuration, each unique value of the use-case tag will be limited to three concurrent runs. For example, team: marketing and team: sales will both be limited to three concurrent runs each.
Utilizing op/asset-level concurrency provides fine-grained control for the maximum number of ops/assets that can be executed at once within a single run, ensuring that shared resources aren't overwhelmed.
Op and asset-level concurrency is enabled by configuring a job executor. Using the executor, you can:
Limit the number of ops/assets that can execute in a single run at once
To limit concurrency for ops and assets in jobs, use max_concurrent in the job’s config, either in Python or using the Launchpad in the Dagster UI.
Note: The examples in this section use the multiprocess_executor, which uses the multiprocess key. This key will change depending on the type of executor you're using. Refer to the Job executor documentation for more info.
While max_concurrent is used to limit concurrency in both asset and op-based jobs, how jobs are defined differs between assets and ops. Click the tabs below for examples.
In the Dagster UI, navigate to the details page for a job, then the Launchpad tab. In this example, using the multiprocess_executor, the job will execute up to three ops/assets at once:
Limits can be specified for all ops/assets with a specific tag key or key-value pair. If any limit would be exceeded by launching an op/asset, then the op/asset will be queued.
To specify limits on the number of ops/assets with a specific tag, use tag_concurrency_limits in the job’s config, either in Python or using the Launchpad in the Dagster UI.
Note: The examples in this section use the multiprocess_executor, which uses the multiprocess key. This key will change depending on the type of executor you're using. Refer to the Job executor documentation for more info.
While tag_concurrency_limits is used to limit concurrency in both asset and op-based jobs, how jobs are defined differs between assets and ops. Click the tabs below for examples.
Unlike op-based jobs, asset jobs use the op_tags field on each asset when checking them for tag concurrency limits. In this example, using the multiprocess_executor, the job will execute up to three assets at once with the database tag equal to snowflake:
# example asset with tags, specified using op_tags@asset(op_tags={"database":"snowflake"})defasset1():...
assets_job = define_asset_job(
name="assets_job",
config={"execution":{"config":{"multiprocess":{"tag_concurrency_limits":[{"key":"database","value":"snowflake","limit":3,}],},}}})
In the Dagster UI, navigate to the details page for a job, then the Launchpad tab. In this example, using the multiprocess_executor, the job will execute up to two ops/assets at once with the database tag equal to redshift:
Limits can be specified on the Dagster instance using the special op tag dagster/concurrency_key. If this instance limit would be exceeded by launching an op/asset, then the op/asset will be queued.
For example, to globally limit the number of running ops touching Redshift to two, the op/asset must be first tagged with the global concurrency key:
To specify a global concurrency limit using the Dagster UI, navigate to the Concurrency limits tab on the Deployment page.
The concurrency key should match the name that the op/asset is tagged with. For example, if the op/asset is tagged with dagster/concurrency_key: redshift, then the concurrency key should be redshift.
Dagster Open Source: Use your instance's dagster.yaml
To enable this default value, use concurrency.default_op_concurrency_limit. For example, the following would set the default concurrency value for the deployment to 1:
By default, slots will be allocated to op / asset execution on a first-in-first-out basis. However, steps from high priority runs will take precedence over steps from low priority runs, as determined by the priority tag on the individual run.
With a global concurrency limit set, it is useful to configure your instance to automatically free slots for canceled/failed runs. Certain execution errors can cause steps to exit without freeing an occupied concurrency slot.
To automatically recover from this state, you can configure your instance's run monitoring to automatically free concurrency slots for runs that have reached some terminal state (e.g. failed or canceled runs).
In dagster.yaml, you can configure the interval between a run end and the slot clean up like so:
run_monitoring:enabled:truefree_slots_after_run_end_seconds:300# free any hanging concurrency slots after 5 minutes from the end of a run
If a global concurrency limit is set, some runs may be active without an executing op due to concurrency restraints. When this occurs, runs can unnecessarily utilize resources or make it difficult to see whether other runs are progressing.
You can throttle runs and block their step progression to mitigate this issue. To do this, configure the run queue to keep these runs in a QUEUED state until a slot becomes free for one of its steps:
run_queue:block_op_concurrency_limited_runs:enabled:trueop_concurrency_slot_buffer:1# buffer of 1 means 1 run might be started even though there are no slots available
The run_queue key may not be set in your instance's settings. In the Dagster UI, navigate to Deployment > Configuration and verify that the run_queue key is set.
The possible causes for runs remaining in QUEUED status depend on whether you're using Dagster+ or Dagster Open Source.
If runs aren't being dequeued in Dagster+, the root causes could be:
If using a Hybrid deployment, the agent serving the deployment may be down. In this situation, runs will be paused.
Dagster+ is experiencing downtime. Check the status page for the latest on potential outages.
If runs aren’t being dequeued in Dagster Open Source, the root cause is likely an issue with the Dagster daemon or the run queue configuration.
Troubleshoot the Dagster daemon:
Verify the Dagster daemon is set up and running. In the Dagster UI, navigate to Deployment > Daemons and verify that the daemon is running. The Run queue should also be running. If you used dagster dev to start the Dagster UI, the daemon should have been started for you.
If the daemon isn’t running, proceed to step 2.
Verify the Dagster daemon can access the same storage as the Dagster webserver process. Both the webserver process and the Dagster daemon should access the same storage, meaning they should use the same dagster.yaml. Locally, this means both processes should have the same set DAGSTER_HOME environment variable. If you used dagster dev to start the Dagster UI, both processes should be using the same storage.
Troubleshoot the run queue configuration: If the daemon is running, runs may intentionally be left in the queue due to concurrency rules. To investigate, you can:
Check the output logged from the daemon process, as this will include skipped runs.
Check the max_concurrent_runs setting in your instance’s dagster.yaml. If set to 0, this may block the queue. You can check this setting in the Dagster UI by navigating to Deployment > Configuration and locating the run_queue.max_concurrent_runs setting. Refer to the Limiting overall runs section for more info.
Check the state of your run queue. In some cases, the queue may be blocked by some number of in-progress runs. To view the status of your run queue, click Runs in the top navigation of the Dagster UI and then open the Queued and In Progress tabs.
If there are queued or in-progress runs blocking the queue, you can terminate them to allow other runs to proceed.