Source code for dagster._core.definitions.backfill_policy
from enum import Enum
from typing import Iterable, NamedTuple, Optional
import dagster._check as check
from dagster._annotations import experimental, public
from dagster._serdes import whitelist_for_serdes
from dagster._utils.warnings import disable_dagster_warnings
class BackfillPolicyType(Enum):
SINGLE_RUN = "SINGLE_RUN"
MULTI_RUN = "MULTI_RUN"
[docs]
@experimental
@whitelist_for_serdes
class BackfillPolicy(
NamedTuple(
"_BackfillPolicy",
[
("max_partitions_per_run", Optional[int]),
],
)
):
"""A BackfillPolicy specifies how Dagster should attempt to backfill a partitioned asset.
There are two main kinds of backfill policies: single-run and multi-run.
An asset with a single-run backfill policy will take a single run to backfill all of its
partitions at once.
An asset with a multi-run backfill policy will take multiple runs to backfill all of its
partitions. Each run will backfill a subset of the partitions. The number of partitions to
backfill in each run is controlled by the `max_partitions_per_run` parameter.
For example:
- If an asset has 100 partitions, and the `max_partitions_per_run` is set to 10, then it will
be backfilled in 10 runs; each run will backfill 10 partitions.
- If an asset has 100 partitions, and the `max_partitions_per_run` is set to 11, then it will
be backfilled in 10 runs; the first 9 runs will backfill 11 partitions, and the last one run
will backfill the remaining 9 partitions.
**Warning:**
Constructing an BackfillPolicy directly is not recommended as the API is subject to change.
BackfillPolicy.single_run() and BackfillPolicy.multi_run(max_partitions_per_run=x) are the
recommended APIs.
"""
def __new__(cls, max_partitions_per_run: Optional[int] = 1):
return super(BackfillPolicy, cls).__new__(
cls,
max_partitions_per_run=max_partitions_per_run,
)
[docs]
@public
@staticmethod
def single_run() -> "BackfillPolicy":
"""Creates a BackfillPolicy that executes the entire backfill in a single run."""
return BackfillPolicy(max_partitions_per_run=None)
[docs]
@public
@staticmethod
def multi_run(max_partitions_per_run: int = 1) -> "BackfillPolicy":
"""Creates a BackfillPolicy that executes the entire backfill in multiple runs.
Each run will backfill [max_partitions_per_run] number of partitions.
Args:
max_partitions_per_run (Optional[int]): The maximum number of partitions in each run of
the multiple runs. Defaults to 1.
"""
return BackfillPolicy(
max_partitions_per_run=check.int_param(max_partitions_per_run, "max_partitions_per_run")
)
@property
def policy_type(self) -> BackfillPolicyType:
if self.max_partitions_per_run:
return BackfillPolicyType.MULTI_RUN
else:
return BackfillPolicyType.SINGLE_RUN
def __str__(self):
return (
"BackfillPolicy.single_run()"
if self.policy_type == BackfillPolicyType.SINGLE_RUN
else (f"BackfillPolicy.multi_run(max_partitions_per_run={self.max_partitions_per_run})")
)
# In situations where multiple backfill policies are specified, call this to resolve a canonical
# policy, which is the policy with the minimum max_partitions_per_run.
def resolve_backfill_policy(
backfill_policies: Iterable[Optional[BackfillPolicy]],
) -> BackfillPolicy:
policy = next(iter(sorted(backfill_policies, key=_backfill_policy_sort_key)), None)
with disable_dagster_warnings():
return policy or BackfillPolicy.multi_run(1)
def _backfill_policy_sort_key(bp: Optional[BackfillPolicy]) -> float:
if bp is None: # equivalent to max_partitions_per_run=1
return 1
elif bp.max_partitions_per_run is None:
return float("inf")
else:
return bp.max_partitions_per_run