When an exception occurs during op execution, Dagster provides tools to retry that op within the same job run.
|An exception that can be thrown from the body of an op to request a retry|
|A declarative policy to attach which will have retries requested on exception|
|Modification to delay between retries based on attempt number|
|Random modification to delay beween retries|
In Dagster, code is executed within an op. Sometimes this code can fail for transient reasons, and the desired behavior is to retry and run the function again.
Here we start off with an op that is causing us to have to retry the whole job anytime it fails.
@op def problematic(): fails_sometimes()
To get this solid to retry when an exception occurs, we can attach a
@op(retry_policy=RetryPolicy()) def better(): fails_sometimes()
This improves the situation, but we may need additional configuration to control how many times to retry and/or how long to wait between each retry.
@op( retry_policy=RetryPolicy( max_retries=3, delay=0.2, # 200ms backoff=Backoff.EXPONENTIAL, jitter=Jitter.PLUS_MINUS, ) ) def even_better(): fails_sometimes()
In addition to being able to set the policy directly on the op definition, it can also be set on specific invocations of an op, or a
@job to apply to all ops contained within.
default_policy = RetryPolicy(max_retries=1) flakey_op_policy = RetryPolicy(max_retries=10) @job(op_retry_policy=default_policy) def default_and_override_job(): problematic.with_retry_policy(flakey_op_policy)()
In certain more nuanced situations, we may need to evaluate code to determine if we want to retry or not. For this we can use a manual
@op def manual(): try: fails_sometimes() except Exception as e: if should_retry(e): raise RetryRequested(max_retries=1, seconds_to_wait=1) from e else: raise
raise from will ensure the original exceptions information is captured by Dagster.