Skip to main content

Managing concurrency of Dagster assets, jobs, and Dagster instances

You often want to control the number of concurrent runs for a Dagster job, a specific asset, or for a type of asset or job. Limiting concurrency in your data pipelines can help prevent performance problems and downtime.

note

This article assumes familiarity with assets and jobs

Limit the number of total runs that can be in progress at the same time

concurrency:
runs:
max_concurrent_runs: 15

Limit the number of assets or ops actively executing across all runs

You can assign assets and ops to concurrency pools which allow you to limit the number of in progress op executions across all runs. You first assign your asset or op to a concurrency pool using the pool keyword argument.

Specifying pools on assets and ops
import time

import dagster as dg


@dg.asset(pool="foo")
def my_asset():
pass


@dg.op(pool="bar")
def my_op():
pass


@dg.op(pool="barbar")
def my_downstream_op(inp):
return inp


@dg.graph_asset
def my_graph_asset():
return my_downstream_op(my_op())


defs = dg.Definitions(
assets=[my_asset, my_graph_asset],
)

You should be able to verify that you have set the pool correctly by viewing the details pane for the asset or op in the Dagster UI.

Viewing the pool tag

Once you have assigned your assets and ops to a concurrency pool, you can configure a pool limit for that pool in your deployment by using the Dagster UI or the Dagster CLI.

To specify a limit for the pool "database" using the UI, navigate to the DeploymentsConcurrency settings page and click the Add pool limit button:

Setting the pool limit

To specify a limit for the pool "database" using the CLI, use:

dagster instance concurrency set database 1

Limit the number of runs that can be in progress for a set of ops

You can also use concurrency pools to limit the number of in progress runs containing those assets or ops. You can follow the steps in the Limit the number of assets or ops actively in execution across all runs section to assign your assets and ops to pools and to configure the desired limit.

Once you have assigned your assets and ops to your pool, you can change your deployment settings to set the pool enforcement granularity. To limit the total number of runs containing a specific op at any given time (instead of the total number of ops actively executing), we need to set the pool granularity to run.

concurrency:
pools:
granularity: 'run'

Without this granularity set, the default granularity is set to the op. This means that for a pool foo with a limit 1, we enforce that only one op is executing at a given time across all runs, but the number of runs in progress is unaffected by the pool limit.

Setting a default limit for concurrency pools

concurrency:
pools:
default_limit: 1

Limit the number of runs that can be in progress by run tag

You can also limit the number of in progress runs by run tag. This is useful for limiting sets of runs independent of which assets or ops it is executing. For example, you might want to limit the number of in-progress runs for a particular schedule. Or, you might want to limit the number of in-progress runs for all backfills.

concurrency:
runs:
tag_concurrency_limits:
- key: "dagster/sensor_name"
value: "my_cool_sensor"
limit: 5
- key: "dagster/backfill"
limit: 10

Limit the number of runs that can be in progress by unique tag value

To apply separate limits to each unique value of a run tag, set a limit for each unique value using applyLimitPerUniqueValue. For example, instead of limiting the number of backfill runs across all backfills, you may want to limit the number of runs for each backfill in progress:

concurrency:
runs:
tag_concurrency_limits:
- key: "dagster/backfill"
value:
applyLimitPerUniqueValue: true
limit: 10

Prevent runs from starting if another run is already occurring (advanced)

You can use Dagster's rich metadata to use a schedule or a sensor to only start a run when there are no currently running jobs.

No more than 1 running job from a schedule
import time

import dagster as dg


@dg.asset
def first_asset(context: dg.AssetExecutionContext):
# sleep so that the asset takes some time to execute
time.sleep(75)
context.log.info("First asset executing")


my_job = dg.define_asset_job("my_job", [first_asset])


@dg.schedule(
job=my_job,
# Runs every minute to show the effect of the concurrency limit
cron_schedule="* * * * *",
)
def my_schedule(context):
# Find runs of the same job that are currently running
run_records = context.instance.get_run_records(
dg.RunsFilter(job_name="my_job", statuses=[dg.DagsterRunStatus.STARTED])
)
# skip a schedule run if another run of the same job is already running
if len(run_records) > 0:
return dg.SkipReason(
"Skipping this run because another run of the same job is already running"
)
return dg.RunRequest()


defs = dg.Definitions(
assets=[first_asset],
jobs=[my_job],
schedules=[my_schedule],
)

Troubleshooting

When limiting concurrency, you might run into some issues until you get the configuration right.

Runs going to STARTED status and skipping QUEUED

info

This only applies to Dagster Open Source.

The run_queue key may not be set in your instance's settings. In the Dagster UI, navigate to Deployment > Configuration and verify that the run_queue key is set.

Runs remaining in QUEUED status

The possible causes for runs remaining in QUEUED status depend on whether you're using Dagster+ or Dagster Open Source.

If runs aren't being dequeued in Dagster+, the root causes could be:

  • If using a hybrid deployment, the agent serving the deployment may be down. In this situation, runs will be paused.
  • Dagster+ is experiencing downtime. Check the status page for the latest on potential outages.