Ask AI

Source code for dagster._core.definitions.backfill_policy

from enum import Enum
from typing import NamedTuple, Optional

import dagster._check as check
from dagster._annotations import experimental, public
from dagster._serdes import whitelist_for_serdes


class BackfillPolicyType(Enum):
    SINGLE_RUN = "SINGLE_RUN"
    MULTI_RUN = "MULTI_RUN"


[docs]@experimental @whitelist_for_serdes class BackfillPolicy( NamedTuple( "_BackfillPolicy", [ ("max_partitions_per_run", Optional[int]), ], ) ): """A BackfillPolicy specifies how Dagster should attempt to backfill a partitioned asset. There are two main kinds of backfill policies: single-run and multi-run. An asset with a single-run backfill policy will take a single run to backfill all of its partitions at once. An asset with a multi-run backfill policy will take multiple runs to backfill all of its partitions. Each run will backfill a subset of the partitions. The number of partitions to backfill in each run is controlled by the `max_partitions_per_run` parameter. For example: - If an asset has 100 partitions, and the `max_partitions_per_run` is set to 10, then it will be backfilled in 10 runs; each run will backfill 10 partitions. - If an asset has 100 partitions, and the `max_partitions_per_run` is set to 11, then it will be backfilled in 10 runs; the first 9 runs will backfill 11 partitions, and the last one run will backfill the remaining 9 partitions. **Warning:** Constructing an BackfillPolicy directly is not recommended as the API is subject to change. BackfillPolicy.single_run() and BackfillPolicy.multi_run(max_partitions_per_run=x) are the recommended APIs. """ def __new__(cls, max_partitions_per_run: Optional[int] = 1): return super(BackfillPolicy, cls).__new__( cls, max_partitions_per_run=max_partitions_per_run, )
[docs] @public @staticmethod def single_run() -> "BackfillPolicy": """Creates a BackfillPolicy that executes the entire backfill in a single run.""" return BackfillPolicy(max_partitions_per_run=None)
[docs] @public @staticmethod def multi_run(max_partitions_per_run: int = 1) -> "BackfillPolicy": """Creates a BackfillPolicy that executes the entire backfill in multiple runs. Each run will backfill [max_partitions_per_run] number of partitions. Args: max_partitions_per_run (Optional[int]): The maximum number of partitions in each run of the multiple runs. Defaults to 1. """ return BackfillPolicy( max_partitions_per_run=check.int_param(max_partitions_per_run, "max_partitions_per_run") )
@property def policy_type(self) -> BackfillPolicyType: if self.max_partitions_per_run: return BackfillPolicyType.MULTI_RUN else: return BackfillPolicyType.SINGLE_RUN