Ask AI

Source code for dagster._core.definitions.time_window_partitions

import functools
import hashlib
import json
import re
from abc import abstractmethod, abstractproperty
from datetime import date, datetime, timedelta
from enum import Enum
from functools import cached_property
from typing import (
    AbstractSet,
    Any,
    Callable,
    FrozenSet,
    Iterable,
    List,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Tuple,
    Type,
    Union,
    cast,
)

import dagster._check as check
from dagster._annotations import PublicAttr, public
from dagster._core.definitions.partition import (
    DEFAULT_DATE_FORMAT,
    AllPartitionsSubset,
    PartitionedConfig,
    PartitionsDefinition,
    PartitionsSubset,
    ScheduleType,
    cron_schedule_from_schedule_type_and_offsets,
)
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.timestamp import TimestampWithTimezone
from dagster._core.errors import (
    DagsterInvalidDefinitionError,
    DagsterInvalidDeserializationVersionError,
)
from dagster._core.instance import DynamicPartitionsStore
from dagster._record import IHaveNew, record_custom
from dagster._serdes import whitelist_for_serdes
from dagster._serdes.serdes import NamedTupleSerializer
from dagster._time import (
    create_datetime,
    datetime_from_timestamp,
    get_current_timestamp,
    get_timezone,
)
from dagster._utils.cronstring import get_fixed_minute_interval, is_basic_daily, is_basic_hourly
from dagster._utils.partitions import DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE
from dagster._utils.schedules import (
    cron_string_iterator,
    cron_string_repeats_every_hour,
    is_valid_cron_schedule,
    reverse_cron_string_iterator,
)


def is_second_ambiguous_time(dt: datetime, tz: str):
    """Returns if a datetime is the second instance of an ambiguous time in the given timezone due
    to DST transitions. Assumes that dt is alraedy in the specified timezone.
    """
    # UTC is never ambiguous
    if tz.upper() == "UTC":
        return False

    # Ensure that the datetime is in the correct timezone
    tzinfo = check.not_none(dt.tzinfo)

    # Only interested in the second instance of an ambiguous time
    if dt.fold == 0:
        return False

    offset_before = cast(
        timedelta,
        (tzinfo.utcoffset(dt.replace(fold=0)) if dt.fold else tzinfo.utcoffset(dt)),
    )
    offset_after = cast(
        timedelta,
        (tzinfo.utcoffset(dt) if dt.fold else tzinfo.utcoffset(dt.replace(fold=1))),
    )
    return offset_before > offset_after


def dst_safe_fmt(fmt: str) -> str:
    """Adds UTC offset information to a datetime format string to disambiguate timestamps around DST
    transitions.
    """
    if "%z" in fmt:
        return fmt
    return fmt + "%z"


def dst_safe_strftime(dt: datetime, tz: str, fmt: str, cron_schedule: str) -> str:
    """A method for converting a datetime to a string which will append a suffix in cases where
    the resulting timestamp would be ambiguous due to DST transitions.

    Assumes that dt is already in the specified timezone.
    """
    # if the format already includes a UTC offset, then we don't need to do anything
    if "%z" in fmt:
        return dt.strftime(fmt)

    # only need to handle ambiguous times for cron schedules which repeat every hour
    if not cron_string_repeats_every_hour(cron_schedule):
        return dt.strftime(fmt)

    # if the datetime is the second instance of an ambiguous time, then we append the UTC offset
    if is_second_ambiguous_time(dt, tz):
        return dt.strftime(dst_safe_fmt(fmt))
    return dt.strftime(fmt)


def dst_safe_strptime(date_string: str, tz: str, fmt: str) -> datetime:
    """A method for parsing a datetime created with the dst_safe_strftime() method."""
    try:
        # first, try to parse the datetime in the normal format
        dt = datetime.strptime(date_string, fmt)
    except ValueError:
        # if this fails, try to parse the datetime with a UTC offset added
        dt = datetime.strptime(date_string, dst_safe_fmt(fmt))

    # the datetime object may have timezone information on it, depending on the format used. If it
    # does, we simply ensure that this timestamp is in the correct timezone.
    if dt.tzinfo:
        return datetime.fromtimestamp(dt.timestamp(), tz=get_timezone(tz))
    # otherwise, ensure that we assume the pre-transition timezone
    else:
        return create_datetime(
            year=dt.year,
            month=dt.month,
            day=dt.day,
            hour=dt.hour,
            minute=dt.minute,
            second=dt.second,
            microsecond=dt.microsecond,
            tz=get_timezone(tz),
            fold=0,
        )


[docs] class TimeWindow(NamedTuple): """An interval that is closed at the start and open at the end. Attributes: start (datetime): A datetime that marks the start of the window. end (datetime): A datetime that marks the end of the window. """ start: PublicAttr[datetime] end: PublicAttr[datetime]
@whitelist_for_serdes( storage_name="TimeWindow", # For back-compat with existing serdes ) class PersistedTimeWindow( NamedTuple( "_PersistedTimeWindow", [("start", TimestampWithTimezone), ("end", TimestampWithTimezone)] ) ): """Internal serialized representation of a time interval that is closed at the start and open at the end. """ def __new__( cls, start: TimestampWithTimezone, end: TimestampWithTimezone, ): return super(cls, PersistedTimeWindow).__new__( cls, start=check.inst_param(start, "start", TimestampWithTimezone), end=check.inst_param(end, "end", TimestampWithTimezone), ) @cached_property def start(self) -> datetime: start_timestamp_with_timezone = self._asdict()["start"] return datetime.fromtimestamp( start_timestamp_with_timezone.timestamp, tz=get_timezone(start_timestamp_with_timezone.timezone), ) @cached_property def end(self) -> datetime: end_timestamp_with_timezone = self._asdict()["end"] return datetime.fromtimestamp( end_timestamp_with_timezone.timestamp, tz=get_timezone(end_timestamp_with_timezone.timezone), ) @staticmethod def from_public_time_window(tw: TimeWindow, timezone: str): return PersistedTimeWindow( TimestampWithTimezone(tw.start.timestamp(), timezone), TimestampWithTimezone(tw.end.timestamp(), timezone), ) def subtract(self, other: "PersistedTimeWindow") -> Sequence["PersistedTimeWindow"]: other_start_timestamp = other.start.timestamp() start_timestamp = self.start.timestamp() other_end_timestamp = other.end.timestamp() end_timestamp = self.end.timestamp() # Case where the two don't intersect at all - just return self # Note that this assumes end is exclusive if end_timestamp <= other_start_timestamp or other_end_timestamp <= start_timestamp: return [self] windows = [] if other_start_timestamp > start_timestamp: windows.append( PersistedTimeWindow(start=self._asdict()["start"], end=other._asdict()["start"]), ) if other_end_timestamp < end_timestamp: windows.append( PersistedTimeWindow(start=other._asdict()["end"], end=self._asdict()["end"]) ) return windows def to_public_time_window(self) -> TimeWindow: """Used for exposing TimeWindows over the public Dagster API.""" return TimeWindow(start=self.start, end=self.end)
[docs] @whitelist_for_serdes @record_custom( field_to_new_mapping={ "start_ts": "start", "end_ts": "end", } ) class TimeWindowPartitionsDefinition(PartitionsDefinition, IHaveNew): r"""A set of partitions where each partition corresponds to a time window. The provided cron_schedule determines the bounds of the time windows. E.g. a cron_schedule of "0 0 \\* \\* \\*" will result in daily partitions that start at midnight and end at midnight of the following day. The string partition_key associated with each partition corresponds to the start of the partition's time window. The first partition in the set will start on at the first cron_schedule tick that is equal to or after the given start datetime. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. We recommended limiting partition counts for each asset to 25,000 partitions or fewer. Args: cron_schedule (str): Determines the bounds of the time windows. start (datetime): The first partition in the set will start on at the first cron_schedule tick that is equal to or after this value. timezone (Optional[str]): The timezone in which each time should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". end (datetime): The last partition (excluding) in the set. fmt (str): The date format to use for partition_keys. Note that if a non-UTC timezone is used, and the cron schedule repeats every hour or faster, the date format must include a timezone offset to disambiguate between multiple instances of the same time before and after the Fall DST transition. If the format does not contain this offset, the second instance of the ambiguous time partition key will have the UTC offset automatically appended to it. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. """ start_ts: TimestampWithTimezone timezone: PublicAttr[str] end_ts: Optional[TimestampWithTimezone] fmt: PublicAttr[str] end_offset: PublicAttr[int] cron_schedule: PublicAttr[str] def __new__( cls, start: Union[datetime, str, TimestampWithTimezone], fmt: str, end: Union[datetime, str, TimestampWithTimezone, None] = None, schedule_type: Optional[ScheduleType] = None, timezone: Optional[str] = None, end_offset: int = 0, minute_offset: Optional[int] = None, hour_offset: Optional[int] = None, day_offset: Optional[int] = None, cron_schedule: Optional[str] = None, ): check.opt_str_param(timezone, "timezone") timezone = timezone or "UTC" if isinstance(start, str): start_dt = dst_safe_strptime(start, timezone, fmt) start = TimestampWithTimezone(start_dt.timestamp(), timezone) elif isinstance(start, datetime): start_dt = start.replace(tzinfo=get_timezone(timezone)) start = TimestampWithTimezone(start_dt.timestamp(), timezone) if not end: end = None elif isinstance(end, str): end_dt = dst_safe_strptime(end, timezone, fmt) end = TimestampWithTimezone(end_dt.timestamp(), timezone) elif isinstance(end, datetime): end_dt = end.replace(tzinfo=get_timezone(timezone)) end = TimestampWithTimezone(end_dt.timestamp(), timezone) if cron_schedule is not None: check.invariant( schedule_type is None and not minute_offset and not hour_offset and not day_offset, "If cron_schedule argument is provided, then schedule_type, minute_offset, " "hour_offset, and day_offset can't also be provided", ) else: if schedule_type is None: check.failed("One of schedule_type and cron_schedule must be provided") cron_schedule = cron_schedule_from_schedule_type_and_offsets( schedule_type=schedule_type, minute_offset=minute_offset or 0, hour_offset=hour_offset or 0, day_offset=day_offset or 0, ) if not is_valid_cron_schedule(cron_schedule): raise DagsterInvalidDefinitionError( f"Found invalid cron schedule '{cron_schedule}' for a" " TimeWindowPartitionsDefinition." ) return super().__new__( cls, start_ts=start, timezone=timezone, end_ts=end, fmt=fmt, end_offset=end_offset, cron_schedule=cron_schedule, ) @public @cached_property def start(self) -> datetime: start_timestamp_with_timezone = self.start_ts return datetime_from_timestamp( start_timestamp_with_timezone.timestamp, start_timestamp_with_timezone.timezone ) @public @cached_property def end(self) -> Optional[datetime]: end_timestamp_with_timezone = self.end_ts if not end_timestamp_with_timezone: return None return datetime.fromtimestamp( end_timestamp_with_timezone.timestamp, get_timezone(end_timestamp_with_timezone.timezone), ) def _get_current_timestamp(self, current_time: Optional[datetime]) -> float: if not current_time: return get_current_timestamp() # if a naive current time was passed in, assume it was the same timezone as the partition set if not current_time.tzinfo: current_time = current_time.replace(tzinfo=get_timezone(self.timezone)) return current_time.timestamp() def get_num_partitions_in_window(self, time_window: TimeWindow) -> int: if self.is_basic_daily: return ( date( time_window.end.year, time_window.end.month, time_window.end.day, ) - date( time_window.start.year, time_window.start.month, time_window.start.day, ) ).days fixed_minute_interval = get_fixed_minute_interval(self.cron_schedule) if fixed_minute_interval: minutes_in_window = (time_window.end.timestamp() - time_window.start.timestamp()) / 60 return int(minutes_in_window // fixed_minute_interval) return len(self.get_partition_keys_in_time_window(time_window)) def get_num_partitions( self, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> int: last_partition_window = self.get_last_partition_window(current_time) first_partition_window = self.get_first_partition_window(current_time) if not last_partition_window or not first_partition_window: return 0 return self.get_num_partitions_in_window( TimeWindow(start=first_partition_window.start, end=last_partition_window.end) ) def get_partition_keys_between_indexes( self, start_idx: int, end_idx: int, current_time: Optional[datetime] = None ) -> List[str]: # Fetches the partition keys between the given start and end indices. # Start index is inclusive, end index is exclusive. # Method added for performance reasons, to only string format # partition keys included within the indices. current_timestamp = self._get_current_timestamp(current_time=current_time) partitions_past_current_time = 0 partition_keys = [] reached_end = False for idx, time_window in enumerate(self._iterate_time_windows(self.start.timestamp())): if time_window.end.timestamp() >= current_timestamp: reached_end = True if self.end and time_window.end.timestamp() > self.end.timestamp(): reached_end = True if ( time_window.end.timestamp() <= current_timestamp or partitions_past_current_time < self.end_offset ): if idx >= start_idx and idx < end_idx: partition_keys.append( dst_safe_strftime( time_window.start, self.timezone, self.fmt, self.cron_schedule ) ) if time_window.end.timestamp() > current_timestamp: partitions_past_current_time += 1 else: break if len(partition_keys) >= end_idx - start_idx: break if reached_end and self.end_offset < 0: partition_keys = partition_keys[: self.end_offset] return partition_keys def get_partition_keys( self, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[str]: current_timestamp = self._get_current_timestamp(current_time=current_time) partitions_past_current_time = 0 partition_keys: List[str] = [] for time_window in self._iterate_time_windows(self.start.timestamp()): if self.end and time_window.end.timestamp() > self.end.timestamp(): break if ( time_window.end.timestamp() <= current_timestamp or partitions_past_current_time < self.end_offset ): partition_keys.append( dst_safe_strftime( time_window.start, self.timezone, self.fmt, self.cron_schedule ) ) if time_window.end.timestamp() > current_timestamp: partitions_past_current_time += 1 else: break if self.end_offset < 0: partition_keys = partition_keys[: self.end_offset] return partition_keys def __str__(self) -> str: schedule_str = ( self.schedule_type.value.capitalize() if self.schedule_type else self.cron_schedule ) partition_def_str = f"{schedule_str}, starting {dst_safe_strftime(self.start, self.timezone, self.fmt, self.cron_schedule)} {self.timezone}." if self.end_offset != 0: partition_def_str += ( " End offsetted by" f" {self.end_offset} partition{'' if self.end_offset == 1 else 's'}." ) return partition_def_str def __repr__(self): # Between python 3.8 and 3.9 the repr of a datetime object changed. # Replaces start time with timestamp as a workaround to make sure the repr is consistent across versions. # Make sure to update this __repr__ if any new fields are added to TimeWindowPartitionsDefinition. return ( f"TimeWindowPartitionsDefinition(start={self.start.timestamp()}," f" end={self.end.timestamp() if self.end else None}," f" timezone='{self.timezone}', fmt='{self.fmt}', end_offset={self.end_offset}," f" cron_schedule='{self.cron_schedule}')" ) def __hash__(self): return hash(tuple(self.__repr__())) @functools.lru_cache(maxsize=100) def time_window_for_partition_key(self, partition_key: str) -> TimeWindow: partition_key_dt = dst_safe_strptime(partition_key, self.timezone, self.fmt) return next(iter(self._iterate_time_windows(partition_key_dt.timestamp()))) @functools.lru_cache(maxsize=5) def time_windows_for_partition_keys( self, partition_keys: FrozenSet[str], validate: bool = True, ) -> Sequence[TimeWindow]: if len(partition_keys) == 0: return [] sorted_pks = sorted( partition_keys, key=lambda pk: dst_safe_strptime(pk, self.timezone, self.fmt).timestamp(), ) cur_windows_iterator = iter( self._iterate_time_windows( dst_safe_strptime(sorted_pks[0], self.timezone, self.fmt).timestamp() ) ) partition_key_time_windows: List[TimeWindow] = [] for partition_key in sorted_pks: next_window = next(cur_windows_iterator) if ( dst_safe_strftime(next_window.start, self.timezone, self.fmt, self.cron_schedule) == partition_key ): partition_key_time_windows.append(next_window) else: cur_windows_iterator = iter( self._iterate_time_windows( dst_safe_strptime(partition_key, self.timezone, self.fmt).timestamp() ) ) partition_key_time_windows.append(next(cur_windows_iterator)) if validate: start_time_window = self.get_first_partition_window() end_time_window = self.get_last_partition_window() if start_time_window is None or end_time_window is None: check.failed("No partitions in the PartitionsDefinition") start_timestamp = start_time_window.start.timestamp() end_timestamp = end_time_window.end.timestamp() partition_key_time_windows = [ tw for tw in partition_key_time_windows if tw.start.timestamp() >= start_timestamp and tw.end.timestamp() <= end_timestamp ] return partition_key_time_windows def start_time_for_partition_key(self, partition_key: str) -> datetime: partition_key_dt = dst_safe_strptime(partition_key, self.timezone, self.fmt) if self.is_basic_hourly or self.is_basic_daily: return partition_key_dt # the datetime format might not include granular components, so we need to recover them, # e.g. if cron_schedule="0 7 * * *" and fmt="%Y-%m-%d". # we make the assumption that the parsed partition key is <= the start datetime. return next(iter(self._iterate_time_windows(partition_key_dt.timestamp()))).start def get_next_partition_key( self, partition_key: str, current_time: Optional[datetime] = None ) -> Optional[str]: last_partition_window = self.get_last_partition_window(current_time) if last_partition_window is None: return None partition_key_dt = dst_safe_strptime(partition_key, self.timezone, self.fmt) windows_iter = iter(self._iterate_time_windows(partition_key_dt.timestamp())) next(windows_iter) start_time = next(windows_iter).start if start_time.timestamp() >= last_partition_window.end.timestamp(): return None else: return dst_safe_strftime(start_time, self.timezone, self.fmt, self.cron_schedule) def get_next_partition_window( self, end_dt: datetime, current_time: Optional[datetime] = None, respect_bounds: bool = True ) -> Optional[TimeWindow]: windows_iter = iter(self._iterate_time_windows(end_dt.timestamp())) next_window = next(windows_iter) if respect_bounds: last_partition_window = self.get_last_partition_window(current_time) if last_partition_window is None: return None if next_window.start.timestamp() >= last_partition_window.end.timestamp(): return None return next_window def get_prev_partition_window( self, start_dt: datetime, respect_bounds: bool = True ) -> Optional[TimeWindow]: windows_iter = iter(self._reverse_iterate_time_windows(start_dt.timestamp())) prev_window = next(windows_iter) if respect_bounds: first_partition_window = self.get_first_partition_window() if ( first_partition_window is None or prev_window.start.timestamp() < first_partition_window.start.timestamp() ): return None return prev_window @functools.lru_cache(maxsize=256) def _get_first_partition_window(self, *, current_timestamp: float) -> Optional[TimeWindow]: time_window = next(iter(self._iterate_time_windows(self.start.timestamp()))) if self.end_offset == 0: return time_window if time_window.end.timestamp() <= current_timestamp else None elif self.end_offset > 0: iterator = iter(self._iterate_time_windows(current_timestamp)) # first returned time window is time window of current time curr_window_plus_offset = next(iterator) for _ in range(self.end_offset): curr_window_plus_offset = next(iterator) return ( time_window if time_window.end.timestamp() <= curr_window_plus_offset.start.timestamp() else None ) else: # end offset < 0 end_window = None iterator = iter(self._reverse_iterate_time_windows(current_timestamp)) for _ in range(abs(self.end_offset)): end_window = next(iterator) if end_window is None: check.failed("end_window should not be None") return ( time_window if time_window.end.timestamp() <= end_window.start.timestamp() else None ) def get_first_partition_window( self, current_time: Optional[datetime] = None ) -> Optional[TimeWindow]: current_timestamp = self._get_current_timestamp(current_time) return self._get_first_partition_window(current_timestamp=current_timestamp) @functools.lru_cache(maxsize=256) def _get_last_partition_window(self, *, current_timestamp: float) -> Optional[TimeWindow]: if self._get_first_partition_window(current_timestamp=current_timestamp) is None: return None if self.end and self.end.timestamp() < current_timestamp: current_timestamp = self.end.timestamp() if self.end_offset == 0: return next(iter(self._reverse_iterate_time_windows(current_timestamp))) else: # TODO: make this efficient last_partition_key = super().get_last_partition_key( datetime.fromtimestamp(current_timestamp, tz=get_timezone(self.timezone)) ) return ( self.time_window_for_partition_key(last_partition_key) if last_partition_key else None ) def get_last_partition_window( self, current_time: Optional[datetime] = None ) -> Optional[TimeWindow]: current_timestamp = self._get_current_timestamp(current_time) return self._get_last_partition_window(current_timestamp=current_timestamp) def get_first_partition_key( self, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Optional[str]: first_window = self.get_first_partition_window(current_time) if first_window is None: return None return dst_safe_strftime(first_window.start, self.timezone, self.fmt, self.cron_schedule) def get_last_partition_key( self, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Optional[str]: last_window = self.get_last_partition_window(current_time) if last_window is None: return None return dst_safe_strftime(last_window.start, self.timezone, self.fmt, self.cron_schedule) def end_time_for_partition_key(self, partition_key: str) -> datetime: return self.time_window_for_partition_key(partition_key).end @functools.lru_cache(maxsize=5) def get_partition_keys_in_time_window(self, time_window: TimeWindow) -> Sequence[str]: result: List[str] = [] time_window_end_timestamp = time_window.end.timestamp() for partition_time_window in self._iterate_time_windows(time_window.start.timestamp()): if partition_time_window.start.timestamp() < time_window_end_timestamp: result.append( dst_safe_strftime( partition_time_window.start, self.timezone, self.fmt, self.cron_schedule ) ) else: break return result def get_partition_key_range_for_time_window(self, time_window: TimeWindow) -> PartitionKeyRange: start_partition_key = self.get_partition_key_for_timestamp(time_window.start.timestamp()) end_partition_key = self.get_partition_key_for_timestamp( check.not_none(self.get_prev_partition_window(time_window.end)).start.timestamp() ) return PartitionKeyRange(start_partition_key, end_partition_key) def get_partition_keys_in_range( self, partition_key_range: PartitionKeyRange, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[str]: start_time = self.start_time_for_partition_key(partition_key_range.start) check.invariant( start_time.timestamp() >= self.start.timestamp(), ( f"Partition key range start {partition_key_range.start} is before " f"the partitions definition start time {self.start}" ), ) end_time = self.end_time_for_partition_key(partition_key_range.end) if self.end: check.invariant( end_time.timestamp() <= self.end.timestamp(), ( f"Partition key range end {partition_key_range.end} is after the " f"partitions definition end time {self.end}" ), ) return self.get_partition_keys_in_time_window(TimeWindow(start_time, end_time)) @public @property def schedule_type(self) -> Optional[ScheduleType]: """Optional[ScheduleType]: An enum representing the partition cadence (hourly, daily, weekly, or monthly). """ if re.fullmatch(r"\d+ \* \* \* \*", self.cron_schedule): return ScheduleType.HOURLY elif re.fullmatch(r"\d+ \d+ \* \* \*", self.cron_schedule): return ScheduleType.DAILY elif re.fullmatch(r"\d+ \d+ \* \* \d+", self.cron_schedule): return ScheduleType.WEEKLY elif re.fullmatch(r"\d+ \d+ \d+ \* \*", self.cron_schedule): return ScheduleType.MONTHLY else: return None @public @property def minute_offset(self) -> int: """int: Number of minutes past the hour to "split" partitions. Defaults to 0. For example, returns 15 if each partition starts at 15 minutes past the hour. """ match = re.fullmatch(r"(\d+) (\d+|\*) (\d+|\*) (\d+|\*) (\d+|\*)", self.cron_schedule) if match is None: check.failed(f"{self.cron_schedule} has no minute offset") return int(match.groups()[0]) @public @property def hour_offset(self) -> int: """int: Number of hours past 00:00 to "split" partitions. Defaults to 0. For example, returns 1 if each partition starts at 01:00. """ match = re.fullmatch(r"(\d+|\*) (\d+) (\d+|\*) (\d+|\*) (\d+|\*)", self.cron_schedule) if match is None: check.failed(f"{self.cron_schedule} has no hour offset") return int(match.groups()[1]) @public @property def day_offset(self) -> int: """int: For a weekly or monthly partitions definition, returns the day to "split" partitions by. Each partition will start on this day, and end before this day in the following week/month. Returns 0 if the day_offset parameter is unset in the WeeklyPartitionsDefinition, MonthlyPartitionsDefinition, or the provided cron schedule. For weekly partitions, returns a value between 0 (representing Sunday) and 6 (representing Saturday). Providing a value of 1 means that a partition will exist weekly from Monday to the following Sunday. For monthly partitions, returns a value between 0 (the first day of the month) and 31 (the last possible day of the month). """ schedule_type = self.schedule_type if schedule_type == ScheduleType.WEEKLY: match = re.fullmatch(r"(\d+|\*) (\d+|\*) (\d+|\*) (\d+|\*) (\d+)", self.cron_schedule) if match is None: check.failed(f"{self.cron_schedule} has no day offset") return int(match.groups()[4]) elif schedule_type == ScheduleType.MONTHLY: match = re.fullmatch(r"(\d+|\*) (\d+|\*) (\d+) (\d+|\*) (\d+|\*)", self.cron_schedule) if match is None: check.failed(f"{self.cron_schedule} has no day offset") return int(match.groups()[2]) else: check.failed(f"Unsupported schedule type for day_offset: {schedule_type}")
[docs] @public def get_cron_schedule( self, minute_of_hour: Optional[int] = None, hour_of_day: Optional[int] = None, day_of_week: Optional[int] = None, day_of_month: Optional[int] = None, ) -> str: """The schedule executes at the cadence specified by the partitioning, but may overwrite the minute/hour/day offset of the partitioning. This is useful e.g. if you have partitions that span midnight to midnight but you want to schedule a job that runs at 2 am. """ if ( minute_of_hour is None and hour_of_day is None and day_of_week is None and day_of_month is None ): return self.cron_schedule schedule_type = self.schedule_type if schedule_type is None: check.failed( f"{self.cron_schedule} does not support" " minute_of_hour/hour_of_day/day_of_week/day_of_month arguments" ) minute_of_hour = cast( int, check.opt_int_param(minute_of_hour, "minute_of_hour", default=self.minute_offset), ) if schedule_type == ScheduleType.HOURLY: check.invariant( hour_of_day is None, "Cannot set hour parameter with hourly partitions." ) else: hour_of_day = cast( int, check.opt_int_param(hour_of_day, "hour_of_day", default=self.hour_offset) ) if schedule_type == ScheduleType.DAILY: check.invariant( day_of_week is None, "Cannot set day of week parameter with daily partitions." ) check.invariant( day_of_month is None, "Cannot set day of month parameter with daily partitions." ) if schedule_type == ScheduleType.MONTHLY: default = self.day_offset or 1 day_offset = check.opt_int_param(day_of_month, "day_of_month", default=default) elif schedule_type == ScheduleType.WEEKLY: default = self.day_offset or 0 day_offset = check.opt_int_param(day_of_week, "day_of_week", default=default) else: day_offset = 0 return cron_schedule_from_schedule_type_and_offsets( schedule_type, minute_offset=minute_of_hour, hour_offset=hour_of_day or 0, day_offset=day_offset, )
def _iterate_time_windows(self, start_timestamp: float) -> Iterable[TimeWindow]: """Returns an infinite generator of time windows that start after the given start time.""" iterator = cron_string_iterator( start_timestamp=start_timestamp, cron_string=self.cron_schedule, execution_timezone=self.timezone, ) prev_time = next(iterator) while prev_time.timestamp() < start_timestamp: prev_time = next(iterator) while True: next_time = next(iterator) yield TimeWindow(prev_time, next_time) prev_time = next_time def _reverse_iterate_time_windows(self, end_timestamp: float) -> Iterable[TimeWindow]: """Returns an infinite generator of time windows that end before the given end time.""" iterator = reverse_cron_string_iterator( end_timestamp=end_timestamp, cron_string=self.cron_schedule, execution_timezone=self.timezone, ) prev_time = next(iterator) while prev_time.timestamp() > end_timestamp: prev_time = next(iterator) while True: next_time = next(iterator) yield TimeWindow(next_time, prev_time) prev_time = next_time def get_partition_key_for_timestamp(self, timestamp: float, end_closed: bool = False) -> str: """Args: timestamp (float): Timestamp from the unix epoch, UTC. end_closed (bool): Whether the interval is closed at the end or at the beginning. """ iterator = cron_string_iterator( timestamp, self.cron_schedule, self.timezone, start_offset=-1 ) # prev will be < timestamp prev = next(iterator) # prev_next will be >= timestamp prev_next = next(iterator) if end_closed or prev_next.timestamp() > timestamp: return dst_safe_strftime(prev, self.timezone, self.fmt, self.cron_schedule) else: return dst_safe_strftime(prev_next, self.timezone, self.fmt, self.cron_schedule) def less_than(self, partition_key1: str, partition_key2: str) -> bool: """Returns true if the partition_key1 is earlier than partition_key2.""" return ( self.start_time_for_partition_key(partition_key1).timestamp() < self.start_time_for_partition_key(partition_key2).timestamp() ) @property def partitions_subset_class(self) -> Type["PartitionsSubset"]: return PartitionKeysTimeWindowPartitionsSubset def empty_subset(self) -> "PartitionsSubset": return self.partitions_subset_class.empty_subset(self) def subset_with_all_partitions( self, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> "PartitionsSubset": first_window = self.get_first_partition_window(current_time) last_window = self.get_last_partition_window(current_time) windows = ( [] if first_window is None or last_window is None else [TimeWindow(first_window.start, last_window.end)] ) return TimeWindowPartitionsSubset( partitions_def=self, num_partitions=None, included_time_windows=windows ) def get_serializable_unique_identifier( self, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None ) -> str: return hashlib.sha1(self.__repr__().encode("utf-8")).hexdigest() def has_partition_key( self, partition_key: str, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> bool: """Returns a boolean representing if the given partition key is valid.""" try: partition_start_time = self.start_time_for_partition_key(partition_key) partition_start_timestamp = partition_start_time.timestamp() except ValueError: # unparseable partition key return False first_partition_window = self.get_first_partition_window(current_time=current_time) last_partition_window = self.get_last_partition_window(current_time=current_time) return not ( # no partitions at all first_partition_window is None or last_partition_window is None # partition starts before the first valid partition or partition_start_timestamp < first_partition_window.start.timestamp() # partition starts after the last valid partition or partition_start_timestamp > last_partition_window.start.timestamp() # partition key string does not represent the start of an actual partition or dst_safe_strftime(partition_start_time, self.timezone, self.fmt, self.cron_schedule) != partition_key ) def equal_except_for_start_or_end(self, other: "TimeWindowPartitionsDefinition") -> bool: """Returns True iff this is identical to other, except they're allowed to have different start and end datetimes. """ return ( self.timezone == other.timezone and self.fmt == other.fmt and self.cron_schedule == other.cron_schedule and self.end_offset == other.end_offset ) @property def is_basic_daily(self) -> bool: return is_basic_daily(self.cron_schedule) @property def is_basic_hourly(self) -> bool: return is_basic_hourly(self.cron_schedule)
[docs] class DailyPartitionsDefinition(TimeWindowPartitionsDefinition): """A set of daily partitions. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. end_date (Union[datetime.datetime, str, None]): The last date(excluding) in the set of partitions. Default is None. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. .. code-block:: python DailyPartitionsDefinition(start_date="2022-03-12") # creates partitions (2022-03-12-00:00, 2022-03-13-00:00), (2022-03-13-00:00, 2022-03-14-00:00), ... DailyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=16) # creates partitions (2022-03-12-16:15, 2022-03-13-16:15), (2022-03-13-16:15, 2022-03-14-16:15), ... """ # mapping for fields defined on TimeWindowPartitionsDefinition to this subclasses __new__ __field_remap__ = { "start_ts": "start_date", "end_ts": "end_date", } def __new__( cls, start_date: Union[datetime, str], end_date: Union[datetime, str, None] = None, minute_offset: int = 0, hour_offset: int = 0, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, **kwargs, ): _fmt = fmt or DEFAULT_DATE_FORMAT schedule_type = ScheduleType.DAILY # We accept cron_schedule "hidden" via kwargs to support record copy() cron_schedule = kwargs.get("cron_schedule") if cron_schedule: schedule_type = None return super(DailyPartitionsDefinition, cls).__new__( cls, schedule_type=schedule_type, start=start_date, end=end_date, minute_offset=minute_offset, hour_offset=hour_offset, timezone=timezone, fmt=_fmt, end_offset=end_offset, cron_schedule=cron_schedule, )
def wrap_time_window_run_config_fn( run_config_fn: Optional[Callable[[datetime, datetime], Mapping[str, Any]]], partitions_def: TimeWindowPartitionsDefinition, ) -> Callable[[str], Mapping[str, Any]]: def _run_config_wrapper(key: str) -> Mapping[str, Any]: if not run_config_fn: return {} time_window = partitions_def.time_window_for_partition_key(key) return run_config_fn(time_window.start, time_window.end) return _run_config_wrapper def wrap_time_window_tags_fn( tags_fn: Optional[Callable[[datetime, datetime], Mapping[str, str]]], partitions_def: TimeWindowPartitionsDefinition, ) -> Callable[[str], Mapping[str, str]]: def _tag_wrapper(key: str) -> Mapping[str, str]: if not tags_fn: return {} time_window = partitions_def.time_window_for_partition_key(key) return tags_fn(time_window.start, time_window.end) return _tag_wrapper
[docs] def daily_partitioned_config( start_date: Union[datetime, str], minute_offset: int = 0, hour_offset: int = 0, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, tags_for_partition_fn: Optional[Callable[[datetime, datetime], Mapping[str, str]]] = None, ) -> Callable[ [Callable[[datetime, datetime], Mapping[str, Any]]], PartitionedConfig[DailyPartitionsDefinition], ]: """Defines run config over a set of daily partitions. The decorated function should accept a start datetime and end datetime, which represent the bounds of the date partition the config should delineate. The decorated function should return a run config dictionary. The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]): A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition. .. code-block:: python @daily_partitioned_config(start_date="2022-03-12") # creates partitions (2022-03-12-00:00, 2022-03-13-00:00), (2022-03-13-00:00, 2022-03-14-00:00), ... @daily_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=16) # creates partitions (2022-03-12-16:15, 2022-03-13-16:15), (2022-03-13-16:15, 2022-03-14-16:15), ... """ def inner( fn: Callable[[datetime, datetime], Mapping[str, Any]], ) -> PartitionedConfig[DailyPartitionsDefinition]: check.callable_param(fn, "fn") partitions_def = DailyPartitionsDefinition( start_date=start_date, minute_offset=minute_offset, hour_offset=hour_offset, timezone=timezone, fmt=fmt, end_offset=end_offset, ) return PartitionedConfig( run_config_for_partition_key_fn=wrap_time_window_run_config_fn(fn, partitions_def), partitions_def=partitions_def, decorated_fn=fn, tags_for_partition_key_fn=wrap_time_window_tags_fn( tags_for_partition_fn, partitions_def ), ) return inner
[docs] class HourlyPartitionsDefinition(TimeWindowPartitionsDefinition): """A set of hourly partitions. The first partition in the set will start on the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset is provided, the start and end times of each partition will be minute_offset past the hour. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. end_date (Union[datetime.datetime, str, None]): The last date(excluding) in the set of partitions. Default is None. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. Note that if a non-UTC timezone is used, the date format must include a timezone offset to disambiguate between multiple instances of the same time before and after the Fall DST transition. If the format does not contain this offset, the second instance of the ambiguous time partition key will have the UTC offset automatically appended to it. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. .. code-block:: python HourlyPartitionsDefinition(start_date=datetime(2022, 03, 12)) # creates partitions (2022-03-12-00:00, 2022-03-12-01:00), (2022-03-12-01:00, 2022-03-12-02:00), ... HourlyPartitionsDefinition(start_date=datetime(2022, 03, 12), minute_offset=15) # creates partitions (2022-03-12-00:15, 2022-03-12-01:15), (2022-03-12-01:15, 2022-03-12-02:15), ... """ # mapping for fields defined on TimeWindowPartitionsDefinition to this subclasses __new__ __field_remap__ = { "start_ts": "start_date", "end_ts": "end_date", } def __new__( cls, start_date: Union[datetime, str], end_date: Union[datetime, str, None] = None, minute_offset: int = 0, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, **kwargs, ): _fmt = fmt or DEFAULT_HOURLY_FORMAT_WITHOUT_TIMEZONE schedule_type = ScheduleType.HOURLY # We accept cron_schedule "hidden" via kwargs to support record copy() cron_schedule = kwargs.get("cron_schedule") if cron_schedule: schedule_type = None return super(HourlyPartitionsDefinition, cls).__new__( cls, schedule_type=schedule_type, start=start_date, end=end_date, minute_offset=minute_offset, timezone=timezone, fmt=_fmt, end_offset=end_offset, cron_schedule=cron_schedule, )
[docs] def hourly_partitioned_config( start_date: Union[datetime, str], minute_offset: int = 0, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, tags_for_partition_fn: Optional[Callable[[datetime, datetime], Mapping[str, str]]] = None, ) -> Callable[ [Callable[[datetime, datetime], Mapping[str, Any]]], PartitionedConfig[HourlyPartitionsDefinition], ]: """Defines run config over a set of hourly partitions. The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate. The decorated function should return a run config dictionary. The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset is provided, the start and end times of each partition will be minute_offset past the hour. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]): A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition. .. code-block:: python @hourly_partitioned_config(start_date=datetime(2022, 03, 12)) # creates partitions (2022-03-12-00:00, 2022-03-12-01:00), (2022-03-12-01:00, 2022-03-12-02:00), ... @hourly_partitioned_config(start_date=datetime(2022, 03, 12), minute_offset=15) # creates partitions (2022-03-12-00:15, 2022-03-12-01:15), (2022-03-12-01:15, 2022-03-12-02:15), ... """ def inner( fn: Callable[[datetime, datetime], Mapping[str, Any]], ) -> PartitionedConfig[HourlyPartitionsDefinition]: check.callable_param(fn, "fn") partitions_def = HourlyPartitionsDefinition( start_date=start_date, minute_offset=minute_offset, timezone=timezone, fmt=fmt, end_offset=end_offset, ) return PartitionedConfig( run_config_for_partition_key_fn=wrap_time_window_run_config_fn(fn, partitions_def), partitions_def=partitions_def, decorated_fn=fn, tags_for_partition_key_fn=wrap_time_window_tags_fn( tags_for_partition_fn, partitions_def ), ) return inner
[docs] class MonthlyPartitionsDefinition(TimeWindowPartitionsDefinition): """A set of monthly partitions. The first partition in the set will start at the soonest first of the month after start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day_offset. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions will be midnight the soonest first of the month following start_date. Can provide in either a datetime or string format. end_date (Union[datetime.datetime, str, None]): The last date(excluding) in the set of partitions. Default is None. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0. day_offset (int): Day of the month to "split" the partition. Defaults to 1. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. .. code-block:: python MonthlyPartitionsDefinition(start_date="2022-03-12") # creates partitions (2022-04-01-00:00, 2022-05-01-00:00), (2022-05-01-00:00, 2022-06-01-00:00), ... MonthlyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=5) # creates partitions (2022-04-05-03:15, 2022-05-05-03:15), (2022-05-05-03:15, 2022-06-05-03:15), ... """ # mapping for fields defined on TimeWindowPartitionsDefinition to this subclasses __new__ __field_remap__ = { "start_ts": "start_date", "end_ts": "end_date", } def __new__( cls, start_date: Union[datetime, str], end_date: Union[datetime, str, None] = None, minute_offset: int = 0, hour_offset: int = 0, day_offset: int = 1, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, **kwargs, ): _fmt = fmt or DEFAULT_DATE_FORMAT schedule_type = ScheduleType.MONTHLY # We accept cron_schedule "hidden" via kwargs to support record copy() cron_schedule = kwargs.get("cron_schedule") if cron_schedule: schedule_type = None check.invariant( day_offset == 1, f"Reconstruction by cron_schedule got unexpected day_offset {day_offset}, expected 1.", ) day_offset = 0 return super(MonthlyPartitionsDefinition, cls).__new__( cls, schedule_type=schedule_type, start=start_date, end=end_date, minute_offset=minute_offset, hour_offset=hour_offset, day_offset=day_offset, timezone=timezone, fmt=_fmt, end_offset=end_offset, cron_schedule=cron_schedule, )
[docs] def monthly_partitioned_config( start_date: Union[datetime, str], minute_offset: int = 0, hour_offset: int = 0, day_offset: int = 1, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, tags_for_partition_fn: Optional[Callable[[datetime, datetime], Mapping[str, str]]] = None, ) -> Callable[ [Callable[[datetime, datetime], Mapping[str, Any]]], PartitionedConfig[MonthlyPartitionsDefinition], ]: """Defines run config over a set of monthly partitions. The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate. The decorated function should return a run config dictionary. The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at midnight on the soonest first of the month after start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day_offset. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions will be midnight the soonest first of the month following start_date. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0. day_offset (int): Day of the month to "split" the partition. Defaults to 1. timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]): A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition. .. code-block:: python @monthly_partitioned_config(start_date="2022-03-12") # creates partitions (2022-04-01-00:00, 2022-05-01-00:00), (2022-05-01-00:00, 2022-06-01-00:00), ... @monthly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=5) # creates partitions (2022-04-05-03:15, 2022-05-05-03:15), (2022-05-05-03:15, 2022-06-05-03:15), ... """ def inner( fn: Callable[[datetime, datetime], Mapping[str, Any]], ) -> PartitionedConfig[MonthlyPartitionsDefinition]: check.callable_param(fn, "fn") partitions_def = MonthlyPartitionsDefinition( start_date=start_date, minute_offset=minute_offset, hour_offset=hour_offset, day_offset=day_offset, timezone=timezone, fmt=fmt, end_offset=end_offset, ) return PartitionedConfig( run_config_for_partition_key_fn=wrap_time_window_run_config_fn(fn, partitions_def), partitions_def=partitions_def, decorated_fn=fn, tags_for_partition_key_fn=wrap_time_window_tags_fn( tags_for_partition_fn, partitions_def ), ) return inner
[docs] class WeeklyPartitionsDefinition(TimeWindowPartitionsDefinition): """Defines a set of weekly partitions. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day of the week corresponding to day_offset (0 indexed with Sunday as the start of the week). If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions will Sunday at midnight following start_date. Can provide in either a datetime or string format. end_date (Union[datetime.datetime, str, None]): The last date(excluding) in the set of partitions. Default is None. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0. day_offset (int): Day of the week to "split" the partition. Defaults to 0 (Sunday). timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. .. code-block:: python WeeklyPartitionsDefinition(start_date="2022-03-12") # creates partitions (2022-03-13-00:00, 2022-03-20-00:00), (2022-03-20-00:00, 2022-03-27-00:00), ... WeeklyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=6) # creates partitions (2022-03-12-03:15, 2022-03-19-03:15), (2022-03-19-03:15, 2022-03-26-03:15), ... """ # mapping for fields defined on TimeWindowPartitionsDefinition to this subclasses __new__ __field_remap__ = { "start_ts": "start_date", "end_ts": "end_date", } def __new__( cls, start_date: Union[datetime, str], end_date: Union[datetime, str, None] = None, minute_offset: int = 0, hour_offset: int = 0, day_offset: int = 0, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, **kwargs, ): _fmt = fmt or DEFAULT_DATE_FORMAT schedule_type = ScheduleType.WEEKLY # We accept cron_schedule "hidden" via kwargs to support record copy() cron_schedule = kwargs.get("cron_schedule") if cron_schedule: schedule_type = None return super(WeeklyPartitionsDefinition, cls).__new__( cls, schedule_type=schedule_type, start=start_date, end=end_date, minute_offset=minute_offset, hour_offset=hour_offset, day_offset=day_offset, timezone=timezone, fmt=_fmt, end_offset=end_offset, cron_schedule=cron_schedule, )
[docs] def weekly_partitioned_config( start_date: Union[datetime, str], minute_offset: int = 0, hour_offset: int = 0, day_offset: int = 0, timezone: Optional[str] = None, fmt: Optional[str] = None, end_offset: int = 0, tags_for_partition_fn: Optional[Callable[[datetime, datetime], Mapping[str, str]]] = None, ) -> Callable[ [Callable[[datetime, datetime], Mapping[str, Any]]], PartitionedConfig[WeeklyPartitionsDefinition], ]: """Defines run config over a set of weekly partitions. The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate. The decorated function should return a run config dictionary. The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day of the week corresponding to day_offset (0 indexed with Sunday as the start of the week). If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day. Args: start_date (Union[datetime.datetime, str]): The first date in the set of partitions will Sunday at midnight following start_date. Can provide in either a datetime or string format. minute_offset (int): Number of minutes past the hour to "split" the partition. Defaults to 0. hour_offset (int): Number of hours past 00:00 to "split" the partition. Defaults to 0. day_offset (int): Day of the week to "split" the partition. Defaults to 0 (Sunday). timezone (Optional[str]): The timezone in which each date should exist. Supported strings for timezones are the ones provided by the `IANA time zone database <https://www.iana.org/time-zones>` - e.g. "America/Los_Angeles". fmt (Optional[str]): The date format to use. Defaults to `%Y-%m-%d`. end_offset (int): Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on. tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]): A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition. .. code-block:: python @weekly_partitioned_config(start_date="2022-03-12") # creates partitions (2022-03-13-00:00, 2022-03-20-00:00), (2022-03-20-00:00, 2022-03-27-00:00), ... @weekly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=6) # creates partitions (2022-03-12-03:15, 2022-03-19-03:15), (2022-03-19-03:15, 2022-03-26-03:15), ... """ def inner( fn: Callable[[datetime, datetime], Mapping[str, Any]], ) -> PartitionedConfig[WeeklyPartitionsDefinition]: check.callable_param(fn, "fn") partitions_def = WeeklyPartitionsDefinition( start_date=start_date, minute_offset=minute_offset, hour_offset=hour_offset, day_offset=day_offset, timezone=timezone, fmt=fmt, end_offset=end_offset, ) return PartitionedConfig( run_config_for_partition_key_fn=wrap_time_window_run_config_fn(fn, partitions_def), partitions_def=partitions_def, decorated_fn=fn, tags_for_partition_key_fn=wrap_time_window_tags_fn( tags_for_partition_fn, partitions_def ), ) return inner
class BaseTimeWindowPartitionsSubset(PartitionsSubset): """A base class that represents PartitionSubsets for TimeWindowPartitionsDefinitions. Contains shared logic for time window partitions subsets, such as building time windows from partition keys. """ # Every time we change the serialization format, we should increment the version number. # This will ensure that we can gracefully degrade when deserializing old data. SERIALIZATION_VERSION = 1 @abstractproperty def included_time_windows(self) -> Sequence[PersistedTimeWindow]: ... @abstractproperty def num_partitions(self) -> int: ... @abstractproperty def partitions_def(self) -> TimeWindowPartitionsDefinition: ... def _get_partition_time_windows_not_in_subset( self, current_time: Optional[datetime] = None, ) -> Sequence[PersistedTimeWindow]: """Returns a list of partition time windows that are not in the subset. Each time window is a single partition. """ first_tw = cast( TimeWindowPartitionsDefinition, self.partitions_def ).get_first_partition_window(current_time=current_time) last_tw = cast( TimeWindowPartitionsDefinition, self.partitions_def ).get_last_partition_window(current_time=current_time) if not first_tw or not last_tw: # no partitions return [] last_tw_end_timestamp = last_tw.end.timestamp() first_tw_start_timestamp = first_tw.start.timestamp() if len(self.included_time_windows) == 0: return [ PersistedTimeWindow.from_public_time_window( TimeWindow(first_tw.start, last_tw.end), self.partitions_def.timezone ) ] time_windows = [] if first_tw_start_timestamp < self.included_time_windows[0].start.timestamp(): time_windows.append( PersistedTimeWindow.from_public_time_window( TimeWindow(first_tw.start, self.included_time_windows[0].start), self.partitions_def.timezone, ) ) for i in range(len(self.included_time_windows) - 1): if self.included_time_windows[i].start.timestamp() >= last_tw_end_timestamp: break if self.included_time_windows[i].end.timestamp() < last_tw_end_timestamp: if self.included_time_windows[i + 1].start.timestamp() <= last_tw_end_timestamp: time_windows.append( PersistedTimeWindow.from_public_time_window( TimeWindow( self.included_time_windows[i].end, self.included_time_windows[i + 1].start, ), self.partitions_def.timezone, ) ) else: time_windows.append( PersistedTimeWindow.from_public_time_window( TimeWindow( self.included_time_windows[i].end, last_tw.end, ), self.partitions_def.timezone, ) ) if last_tw_end_timestamp > self.included_time_windows[-1].end.timestamp(): time_windows.append( PersistedTimeWindow.from_public_time_window( TimeWindow(self.included_time_windows[-1].end, last_tw.end), self.partitions_def.timezone, ) ) return time_windows def get_partition_keys_not_in_subset( self, partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Iterable[str]: partition_keys: List[str] = [] for tw in self._get_partition_time_windows_not_in_subset(current_time): partition_keys.extend( cast( TimeWindowPartitionsDefinition, self.partitions_def ).get_partition_keys_in_time_window(tw) ) return partition_keys @abstractproperty def first_start(self) -> datetime: ... @abstractproperty def is_empty(self) -> bool: ... @abstractmethod def cheap_ends_before(self, dt: datetime, dt_cron_schedule: str) -> bool: ... @abstractmethod def with_partitions_def( self, partitions_def: TimeWindowPartitionsDefinition ) -> "BaseTimeWindowPartitionsSubset": ... def get_partition_key_ranges( self, partitions_def: PartitionsDefinition, current_time: Optional[datetime] = None, dynamic_partitions_store: Optional[DynamicPartitionsStore] = None, ) -> Sequence[PartitionKeyRange]: return [ cast( TimeWindowPartitionsDefinition, self.partitions_def ).get_partition_key_range_for_time_window(window.to_public_time_window()) for window in self.included_time_windows ] def _add_partitions_to_time_windows( self, initial_windows: Sequence[PersistedTimeWindow], partition_keys: Sequence[str], validate: bool = True, ) -> Tuple[Sequence[PersistedTimeWindow], int]: """Merges a set of partition keys into an existing set of time windows, returning the minimized set of time windows and the number of partitions added. """ result_windows = [*initial_windows] time_windows = cast( TimeWindowPartitionsDefinition, self.partitions_def ).time_windows_for_partition_keys(frozenset(partition_keys), validate=validate) num_added_partitions = 0 for window in sorted(time_windows, key=lambda tw: tw.start.timestamp()): window_start_timestamp = window.start.timestamp() # go in reverse order because it's more common to add partitions at the end than the # beginning for i in reversed(range(len(result_windows))): included_window = result_windows[i] lt_end_of_range = window_start_timestamp < included_window.end.timestamp() gte_start_of_range = window_start_timestamp >= included_window.start.timestamp() if lt_end_of_range and gte_start_of_range: break if not lt_end_of_range: merge_with_range = included_window.end.timestamp() == window_start_timestamp merge_with_later_range = i + 1 < len(result_windows) and ( window.end.timestamp() == result_windows[i + 1].start.timestamp() ) if merge_with_range and merge_with_later_range: result_windows[i] = PersistedTimeWindow.from_public_time_window( TimeWindow(included_window.start, result_windows[i + 1].end), self.partitions_def.timezone, ) del result_windows[i + 1] elif merge_with_range: result_windows[i] = PersistedTimeWindow.from_public_time_window( TimeWindow(included_window.start, window.end), self.partitions_def.timezone, ) elif merge_with_later_range: result_windows[i + 1] = PersistedTimeWindow.from_public_time_window( TimeWindow(window.start, result_windows[i + 1].end), self.partitions_def.timezone, ) else: result_windows.insert( i + 1, PersistedTimeWindow.from_public_time_window( window, self.partitions_def.timezone ), ) num_added_partitions += 1 break else: if result_windows and window_start_timestamp == result_windows[0].start.timestamp(): result_windows[0] = PersistedTimeWindow.from_public_time_window( TimeWindow(window.start, included_window.end), self.partitions_def.timezone ) elif ( result_windows and window.end.timestamp() == result_windows[0].start.timestamp() ): result_windows[0] = PersistedTimeWindow.from_public_time_window( TimeWindow(window.start, included_window.end), self.partitions_def.timezone ) else: result_windows.insert( 0, PersistedTimeWindow.from_public_time_window( window, self.partitions_def.timezone, ), ) num_added_partitions += 1 return result_windows, num_added_partitions def serialize(self) -> str: return json.dumps( { "version": self.SERIALIZATION_VERSION, # included_time_windows is already sorted, so no need to sort here to guarantee # stable serialization between identical subsets "time_windows": [ (window.start.timestamp(), window.end.timestamp()) for window in self.included_time_windows ], "num_partitions": self.num_partitions, } ) @classmethod def from_serialized( cls, partitions_def: PartitionsDefinition, serialized: str ) -> "PartitionsSubset": if not isinstance(partitions_def, TimeWindowPartitionsDefinition): check.failed("Partitions definition must be a TimeWindowPartitionsDefinition") partitions_def = cast(TimeWindowPartitionsDefinition, partitions_def) loaded = json.loads(serialized) def tuples_to_time_windows(tuples): return [ PersistedTimeWindow( TimestampWithTimezone(tup[0], partitions_def.timezone), TimestampWithTimezone(tup[1], partitions_def.timezone), ) for tup in tuples ] if isinstance(loaded, list): # backwards compatibility time_windows = tuples_to_time_windows(loaded) num_partitions = sum( len(partitions_def.get_partition_keys_in_time_window(time_window)) for time_window in time_windows ) elif isinstance(loaded, dict) and ( "version" not in loaded or loaded["version"] == cls.SERIALIZATION_VERSION ): # version 1 time_windows = tuples_to_time_windows(loaded["time_windows"]) num_partitions = loaded["num_partitions"] else: raise DagsterInvalidDeserializationVersionError( f"Attempted to deserialize partition subset with version {loaded.get('version')}," f" but only version {cls.SERIALIZATION_VERSION} is supported." ) return TimeWindowPartitionsSubset( partitions_def, num_partitions=num_partitions, included_time_windows=time_windows ) @classmethod def can_deserialize( cls, partitions_def: PartitionsDefinition, serialized: str, serialized_partitions_def_unique_id: Optional[str], serialized_partitions_def_class_name: Optional[str], ) -> bool: if serialized_partitions_def_unique_id: return ( partitions_def.get_serializable_unique_identifier() == serialized_partitions_def_unique_id ) if ( serialized_partitions_def_class_name # note: all TimeWindowPartitionsDefinition subclasses will get serialized as raw # TimeWindowPartitionsDefinitions, so this class name check will not always pass, # hence the unique id check above and serialized_partitions_def_class_name != partitions_def.__class__.__name__ ): return False data = json.loads(serialized) return isinstance(data, list) or ( isinstance(data, dict) and data.get("time_windows") is not None and data.get("num_partitions") is not None ) def __len__(self) -> int: return self.num_partitions def __contains__(self, partition_key: Optional[str]) -> bool: if partition_key is None: return False try: time_window = cast( TimeWindowPartitionsDefinition, self.partitions_def ).time_window_for_partition_key(partition_key) except ValueError: # invalid partition key return False time_window_start_timestamp = time_window.start.timestamp() return any( time_window_start_timestamp >= included_time_window.start.timestamp() and time_window_start_timestamp < included_time_window.end.timestamp() for included_time_window in self.included_time_windows ) def __eq__(self, other): return ( isinstance(other, BaseTimeWindowPartitionsSubset) and self.partitions_def == other.partitions_def and self.included_time_windows == other.included_time_windows ) class PartitionKeysTimeWindowPartitionsSubset(BaseTimeWindowPartitionsSubset): """A PartitionsSubset for a TimeWindowPartitionsDefinition, which internally represents the included partitions using strings. """ def __init__( self, partitions_def: TimeWindowPartitionsDefinition, included_partition_keys: AbstractSet[str], ): self._partitions_def = check.inst_param( partitions_def, "partitions_def", TimeWindowPartitionsDefinition ) self._included_partition_keys = check.set_param( included_partition_keys, "included_partition_keys", of_type=str ) @property def partitions_def(self) -> TimeWindowPartitionsDefinition: return self._partitions_def def with_partition_keys( self, partition_keys: Iterable[str] ) -> "BaseTimeWindowPartitionsSubset": new_partitions = {*(self._included_partition_keys or []), *partition_keys} return PartitionKeysTimeWindowPartitionsSubset( self._partitions_def, included_partition_keys=new_partitions, ) @cached_property def included_time_windows(self) -> Sequence[PersistedTimeWindow]: result_time_windows, _ = self._add_partitions_to_time_windows( initial_windows=[], partition_keys=list(check.not_none(self._included_partition_keys)), validate=False, ) return result_time_windows @cached_property def num_partitions(self) -> int: return len(self._included_partition_keys) @public def get_partition_keys(self) -> Iterable[str]: return list(self._included_partition_keys) if self._included_partition_keys else [] @property def first_start(self) -> datetime: """The start datetime of the earliest partition in the subset.""" if len(self._included_partition_keys) == 1: # Avoid expensive conversion from partition keys to time windows if possible return self._partitions_def.start_time_for_partition_key( next(iter(self._included_partition_keys)) ) else: if len(self.included_time_windows) == 0: check.failed( f"Empty subset. self._included_partition_keys: {self._included_partition_keys}" ) return self.included_time_windows[0].start @property def is_empty(self) -> bool: return len(self._included_partition_keys) == 0 def cheap_ends_before(self, dt: datetime, dt_cron_schedule: str) -> bool: """Performs a cheap calculation that checks whether the latest window in this subset ends before the given dt. If this returns True, then it means the latest window definitely ends before the given dt. If this returns False, it means it may or may not end before the given dt. Args: dt_cron_schedule (str): A cron schedule that dt is on one of the ticks of. """ if len(self._included_partition_keys) == 1: # Getting just the partition start time is cheaper than invoking croniter to get the # full window. # If we know that the start time is earlier than dt and that the partitions are slim # enough that the end time can't put them past dt, then we know that the end time is # earlier than dt. if (self._partitions_def.cron_schedule == dt_cron_schedule) or ( self._partitions_def.is_basic_hourly and dt_cron_schedule in ["0 0 * * *", "0 * * * *"] ): return ( self._partitions_def.start_time_for_partition_key( next(iter(self._included_partition_keys)) ) < dt ) return False def __contains__(self, partition_key: str) -> bool: return partition_key in self._included_partition_keys def __eq__(self, other): return ( isinstance(other, PartitionKeysTimeWindowPartitionsSubset) and self._partitions_def == other._partitions_def and self._included_partition_keys == other._included_partition_keys ) or super(PartitionKeysTimeWindowPartitionsSubset, self).__eq__(other) @classmethod def empty_subset( cls, partitions_def: Optional[PartitionsDefinition] = None ) -> "PartitionsSubset": if not isinstance(partitions_def, TimeWindowPartitionsDefinition): check.failed("Partitions definition must be a TimeWindowPartitionsDefinition") partitions_def = cast(TimeWindowPartitionsDefinition, partitions_def) return cls(partitions_def, set()) def with_partitions_def( self, partitions_def: TimeWindowPartitionsDefinition ) -> "BaseTimeWindowPartitionsSubset": check.invariant( partitions_def.cron_schedule == self._partitions_def.cron_schedule, "num_partitions would become inaccurate if the partitions_defs had different cron" " schedules", ) return PartitionKeysTimeWindowPartitionsSubset( partitions_def=partitions_def, included_partition_keys=self._included_partition_keys, ) def __repr__(self) -> str: return f"PartitionKeysTimeWindowPartitionsSubset({self.get_partition_key_ranges(self.partitions_def)})" def to_serializable_subset(self) -> "TimeWindowPartitionsSubset": from dagster._core.remote_representation.external_data import TimeWindowPartitionsSnap # in cases where we're dealing with (e.g.) HourlyPartitionsDefinition, we need to convert # this partitions definition into a raw TimeWindowPartitionsDefinition to make it # serializable. to do this, we just convert it to its external representation and back. partitions_def = self.partitions_def if type(self.partitions_def) != TimeWindowPartitionsSubset: partitions_def = TimeWindowPartitionsSnap.from_def( partitions_def ).get_partitions_definition() return TimeWindowPartitionsSubset( partitions_def, self.num_partitions, self.included_time_windows ) def __or__(self, other: "PartitionsSubset") -> "PartitionsSubset": if isinstance(other, PartitionKeysTimeWindowPartitionsSubset): return self.partitions_def.subset_with_partition_keys( set(self.get_partition_keys()) | set(other.get_partition_keys()) ) return _attempt_coerce_to_time_window_subset(self) | _attempt_coerce_to_time_window_subset( other ) def __sub__(self, other: "PartitionsSubset") -> "PartitionsSubset": if isinstance(other, PartitionKeysTimeWindowPartitionsSubset): return self.partitions_def.subset_with_partition_keys( set(self.get_partition_keys()) - set(other.get_partition_keys()) ) return _attempt_coerce_to_time_window_subset(self) - _attempt_coerce_to_time_window_subset( other ) def __and__(self, other: "PartitionsSubset") -> "PartitionsSubset": if isinstance(other, PartitionKeysTimeWindowPartitionsSubset): return self.partitions_def.subset_with_partition_keys( set(self.get_partition_keys()) & set(other.get_partition_keys()) ) return _attempt_coerce_to_time_window_subset(self) & _attempt_coerce_to_time_window_subset( other ) class TimeWindowPartitionsSubsetSerializer(NamedTupleSerializer): # TimeWindowPartitionsSubsets have custom logic to delay calculating num_partitions until it # is needed to improve performance. When serializing, we want to serialize the number of # partitions, so we force calculation. def before_pack(self, value: "TimeWindowPartitionsSubset") -> "TimeWindowPartitionsSubset": # value.num_partitions will calculate the number of partitions if the field is None # We want to check if the field is None and replace the value with the calculated value # for serialization if value._asdict()["num_partitions"] is None: return TimeWindowPartitionsSubset( partitions_def=value.partitions_def, num_partitions=value.num_partitions, included_time_windows=value.included_time_windows, ) return value @whitelist_for_serdes(serializer=TimeWindowPartitionsSubsetSerializer) class TimeWindowPartitionsSubset( BaseTimeWindowPartitionsSubset, NamedTuple( "_TimeWindowPartitionsSubset", [ ("partitions_def", TimeWindowPartitionsDefinition), ("num_partitions", Optional[int]), ("included_time_windows", Sequence[PersistedTimeWindow]), ], ), ): """A PartitionsSubset for a TimeWindowPartitionsDefinition, which internally represents the included partitions using TimeWindows. """ def __new__( cls, partitions_def: TimeWindowPartitionsDefinition, num_partitions: Optional[int], included_time_windows: Sequence[Union[PersistedTimeWindow, TimeWindow]], ): included_time_windows = [ PersistedTimeWindow.from_public_time_window(tw, partitions_def.timezone) if isinstance(tw, TimeWindow) else tw for tw in included_time_windows ] return super(TimeWindowPartitionsSubset, cls).__new__( cls, partitions_def=check.inst_param( partitions_def, "partitions_def", TimeWindowPartitionsDefinition ), num_partitions=check.opt_int_param(num_partitions, "num_partitions"), included_time_windows=check.sequence_param( included_time_windows, "included_time_windows", of_type=PersistedTimeWindow ), ) @staticmethod def from_all_partitions_subset(subset: AllPartitionsSubset) -> "TimeWindowPartitionsSubset": partitions_def = check.inst( subset.partitions_def, TimeWindowPartitionsDefinition, "Provided subset must reference a TimeWindowPartitionsDefinition", ) first_window = partitions_def.get_first_partition_window(subset.current_time) last_window = partitions_def.get_last_partition_window(subset.current_time) return TimeWindowPartitionsSubset( partitions_def=partitions_def, included_time_windows=[ PersistedTimeWindow.from_public_time_window( TimeWindow(first_window.start, last_window.end), partitions_def.timezone ) ] if first_window and last_window else [], num_partitions=None, ) @cached_property def included_time_windows(self) -> Sequence[PersistedTimeWindow]: return self._asdict()["included_time_windows"] @property def partitions_def(self) -> TimeWindowPartitionsDefinition: return self._asdict()["partitions_def"] @property def first_start(self) -> datetime: """The start datetime of the earliest partition in the subset.""" if len(self.included_time_windows) == 0: check.failed("Empty subset") return self.included_time_windows[0].start @property def is_empty(self) -> bool: return len(self.included_time_windows) == 0 def cheap_ends_before(self, dt: datetime, dt_cron_schedule: str) -> bool: """Performs a cheap calculation that checks whether the latest window in this subset ends before the given dt. If this returns True, then it means the latest window definitely ends before the given dt. If this returns False, it means it may or may not end before the given dt. Args: dt_cron_schedule (str): A cron schedule that dt is on one of the ticks of. """ return self.included_time_windows[-1].end.timestamp() <= dt.timestamp() @cached_property def num_partitions(self) -> int: num_partitions_ = self._asdict()["num_partitions"] if num_partitions_ is None: return sum( self.partitions_def.get_num_partitions_in_window( time_window.to_public_time_window() ) for time_window in self.included_time_windows ) return num_partitions_ @classmethod def _num_partitions_from_time_windows( cls, partitions_def: TimeWindowPartitionsDefinition, time_windows: Sequence[PersistedTimeWindow], ) -> int: return sum( len(partitions_def.get_partition_keys_in_time_window(time_window)) for time_window in time_windows ) @public def get_partition_keys(self) -> Iterable[str]: return [ pk for time_window in self.included_time_windows for pk in self.partitions_def.get_partition_keys_in_time_window(time_window) ] def with_partition_keys(self, partition_keys: Iterable[str]) -> "TimeWindowPartitionsSubset": result_windows, added_partitions = self._add_partitions_to_time_windows( self.included_time_windows, list(partition_keys) ) return TimeWindowPartitionsSubset( self.partitions_def, num_partitions=self.num_partitions + added_partitions, included_time_windows=result_windows, ) @classmethod def empty_subset( cls, partitions_def: Optional[PartitionsDefinition] = None ) -> "PartitionsSubset": if not isinstance(partitions_def, TimeWindowPartitionsDefinition): check.failed("Partitions definition must be a TimeWindowPartitionsDefinition") partitions_def = cast(TimeWindowPartitionsDefinition, partitions_def) return cls(partitions_def, 0, []) def with_partitions_def( self, partitions_def: TimeWindowPartitionsDefinition ) -> "TimeWindowPartitionsSubset": check.invariant( partitions_def.cron_schedule == self.partitions_def.cron_schedule, "num_partitions would become inaccurate if the partitions_defs had different cron" " schedules", ) return TimeWindowPartitionsSubset( partitions_def=partitions_def, num_partitions=self.num_partitions, included_time_windows=self.included_time_windows, ) def __repr__(self) -> str: return f"TimeWindowPartitionsSubset({self.get_partition_key_ranges(self.partitions_def)})" def __and__(self, other: "PartitionsSubset") -> "PartitionsSubset": other = _attempt_coerce_to_time_window_subset(other) if not isinstance(other, TimeWindowPartitionsSubset): return super().__and__(other) self_time_windows_iter = iter( sorted(self.included_time_windows, key=lambda tw: tw.start.timestamp()) ) other_time_windows_iter = iter( sorted(other.included_time_windows, key=lambda tw: tw.start.timestamp()) ) result_windows = [] self_window = next(self_time_windows_iter, None) other_window = next(other_time_windows_iter, None) while self_window and other_window: # find the intersection between the current two windows start = max(self_window.start, other_window.start) end = min(self_window.end, other_window.end) # these windows intersect if start.timestamp() < end.timestamp(): result_windows.append( PersistedTimeWindow.from_public_time_window( TimeWindow(start=start, end=end), self.partitions_def.timezone ) ) # advance the iterator with the earliest end time to find the next potential intersection if self_window.end.timestamp() < other_window.end.timestamp(): self_window = next(self_time_windows_iter, None) else: other_window = next(other_time_windows_iter, None) return TimeWindowPartitionsSubset( partitions_def=self.partitions_def, num_partitions=None, # lazily calculated included_time_windows=result_windows, ) def __or__(self, other: "PartitionsSubset") -> "PartitionsSubset": other = _attempt_coerce_to_time_window_subset(other) if not isinstance(other, TimeWindowPartitionsSubset): return super().__or__(other) input_time_windows = sorted( [*self.included_time_windows, *other.included_time_windows], key=lambda tw: tw.start.timestamp(), ) result_windows = [input_time_windows[0]] if len(input_time_windows) > 0 else [] for window in input_time_windows[1:]: latest_window = result_windows[-1] if window.start.timestamp() <= latest_window.end.timestamp(): # merge this window with the latest window result_windows[-1] = PersistedTimeWindow.from_public_time_window( TimeWindow(latest_window.start, max(latest_window.end, window.end)), self.partitions_def.timezone, ) else: result_windows.append(window) return TimeWindowPartitionsSubset( partitions_def=self.partitions_def, num_partitions=None, # lazily calculated included_time_windows=result_windows, ) def __sub__(self, other: "PartitionsSubset") -> "PartitionsSubset": other = _attempt_coerce_to_time_window_subset(other) if not isinstance(other, TimeWindowPartitionsSubset): return super().__sub__(other) time_windows = sorted(self.included_time_windows, key=lambda tw: tw.start.timestamp()) other_time_windows = sorted( other.included_time_windows, key=lambda tw: tw.start.timestamp() ) next_time_window_index_to_process = 0 next_other_window_index_to_process = 0 # Slide through both sets of windows, moving to the next window once its start has passed # the end of the window is it being compared to while (next_time_window_index_to_process < len(time_windows)) and ( next_other_window_index_to_process < len(other_time_windows) ): time_window = time_windows[next_time_window_index_to_process] other_time_window = other_time_windows[next_other_window_index_to_process] # Perform the subtraction and splice the 0, 1, or 2 result windows # back into the time_windows list subtracted_time_windows = time_window.subtract(other_time_window) time_windows[ next_time_window_index_to_process : next_time_window_index_to_process + 1 ] = subtracted_time_windows if len(subtracted_time_windows) == 0: # other_time_window fully consumed time_window # next_time_window_index_to_process can stay the same since everything has shifted over one pass else: updated_time_window = time_windows[next_time_window_index_to_process] if updated_time_window.end.timestamp() <= other_time_window.start.timestamp(): # Current subtractor is too early to intersect, can advance next_time_window_index_to_process += 1 elif other_time_window.end.timestamp() <= updated_time_window.start.timestamp(): # current subtractee is too early to intersect, can advance next_other_window_index_to_process += 1 else: check.failed( "After subtraction, the new window should no longer intersect with the other window" ) return TimeWindowPartitionsSubset( partitions_def=self.partitions_def, num_partitions=None, included_time_windows=time_windows, ) def to_serializable_subset(self) -> "TimeWindowPartitionsSubset": from dagster._core.remote_representation.external_data import TimeWindowPartitionsSnap # in cases where we're dealing with (e.g.) HourlyPartitionsDefinition, we need to convert # this partitions definition into a raw TimeWindowPartitionsDefinition to make it # serializable. to do this, we just convert it to its external representation and back. # note that we rarely serialize subsets on the user code side of a serialization boundary, # and so this conversion is rarely necessary. partitions_def = self.partitions_def if type(self.partitions_def) != TimeWindowPartitionsSubset: partitions_def = TimeWindowPartitionsSnap.from_def( partitions_def ).get_partitions_definition() return self.with_partitions_def(partitions_def) return self class PartitionRangeStatus(Enum): MATERIALIZING = "MATERIALIZING" MATERIALIZED = "MATERIALIZED" FAILED = "FAILED" PARTITION_RANGE_STATUS_PRIORITY = [ PartitionRangeStatus.MATERIALIZING, PartitionRangeStatus.FAILED, PartitionRangeStatus.MATERIALIZED, ] class PartitionTimeWindowStatus: def __init__(self, time_window: TimeWindow, status: PartitionRangeStatus): self.time_window = time_window self.status = status def __repr__(self): return f"({self.time_window.start} - {self.time_window.end}): {self.status.value}" def __eq__(self, other): return ( isinstance(other, PartitionTimeWindowStatus) and self.time_window == other.time_window and self.status == other.status ) def _flatten( high_pri_time_windows: List[PartitionTimeWindowStatus], low_pri_time_windows: List[PartitionTimeWindowStatus], ) -> List[PartitionTimeWindowStatus]: high_pri_time_windows = sorted(high_pri_time_windows, key=lambda t: t.time_window.start) low_pri_time_windows = sorted(low_pri_time_windows, key=lambda t: t.time_window.start) high_pri_idx = 0 low_pri_idx = 0 filtered_low_pri: List[PartitionTimeWindowStatus] = [] # slice and dice the low pri time windows so there's no overlap with high pri while True: if low_pri_idx >= len(low_pri_time_windows): # reached end of materialized break if high_pri_idx >= len(high_pri_time_windows): # reached end of failed, add all remaining materialized bc there's no overlap filtered_low_pri.extend(low_pri_time_windows[low_pri_idx:]) break low_pri_tw = low_pri_time_windows[low_pri_idx] high_pri_tw = high_pri_time_windows[high_pri_idx] if low_pri_tw.time_window.start.timestamp() < high_pri_tw.time_window.start.timestamp(): if low_pri_tw.time_window.end.timestamp() <= high_pri_tw.time_window.start.timestamp(): # low_pri_tw is entirely before high pri filtered_low_pri.append(low_pri_tw) low_pri_idx += 1 else: # high pri cuts the low pri short filtered_low_pri.append( PartitionTimeWindowStatus( TimeWindow( low_pri_tw.time_window.start, high_pri_tw.time_window.start, ), low_pri_tw.status, ) ) if low_pri_tw.time_window.end.timestamp() > high_pri_tw.time_window.end.timestamp(): # the low pri time window will continue on the other end of the high pri # and get split in two. Modify low_pri[low_pri_idx] to be # the second half of the low pri time window. It will be added in the next iteration. # (don't add it now, because we need to check if it overlaps with the next high pri) low_pri_time_windows[low_pri_idx] = PartitionTimeWindowStatus( TimeWindow(high_pri_tw.time_window.end, low_pri_tw.time_window.end), low_pri_tw.status, ) high_pri_idx += 1 else: # the rest of the low pri time window is inside the high pri time window low_pri_idx += 1 else: if low_pri_tw.time_window.start.timestamp() >= high_pri_tw.time_window.end.timestamp(): # high pri is entirely before low pri. The next high pri may overlap high_pri_idx += 1 elif low_pri_tw.time_window.end.timestamp() <= high_pri_tw.time_window.end.timestamp(): # low pri is entirely within high pri, skip it low_pri_idx += 1 else: # high pri cuts out the start of the low pri. It will continue on the other end. # Modify low_pri[low_pri_idx] to shorten the start. It will be added # in the next iteration. (don't add it now, because we need to check if it overlaps with the next high pri) low_pri_time_windows[low_pri_idx] = PartitionTimeWindowStatus( TimeWindow(high_pri_tw.time_window.end, low_pri_tw.time_window.end), low_pri_tw.status, ) high_pri_idx += 1 # combine the high pri windwos with the filtered low pri windows flattened_time_windows = high_pri_time_windows flattened_time_windows.extend(filtered_low_pri) flattened_time_windows.sort(key=lambda t: t.time_window.start) return flattened_time_windows def fetch_flattened_time_window_ranges( subsets: Mapping[PartitionRangeStatus, BaseTimeWindowPartitionsSubset], ) -> Sequence[PartitionTimeWindowStatus]: """Given potentially overlapping subsets, return a flattened list of timewindows where the highest priority status wins on overlaps. """ prioritized_subsets = sorted( [(status, subset) for status, subset in subsets.items()], key=lambda t: PARTITION_RANGE_STATUS_PRIORITY.index(t[0]), ) # progressively add lower priority time windows to the list of higher priority time windows flattened_time_window_statuses = [] for status, subset in prioritized_subsets: subset_time_window_statuses = [ PartitionTimeWindowStatus(tw.to_public_time_window(), status) for tw in subset.included_time_windows ] flattened_time_window_statuses = _flatten( flattened_time_window_statuses, subset_time_window_statuses ) return flattened_time_window_statuses def has_one_dimension_time_window_partitioning( partitions_def: Optional[PartitionsDefinition], ) -> bool: from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition if isinstance(partitions_def, TimeWindowPartitionsDefinition): return True elif isinstance(partitions_def, MultiPartitionsDefinition): time_window_dims = [ dim for dim in partitions_def.partitions_defs if isinstance(dim.partitions_def, TimeWindowPartitionsDefinition) ] if len(time_window_dims) == 1: return True return False def get_time_partitions_def( partitions_def: Optional[PartitionsDefinition], ) -> Optional[TimeWindowPartitionsDefinition]: """For a given PartitionsDefinition, return the associated TimeWindowPartitionsDefinition if it exists. """ from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition if partitions_def is None: return None elif isinstance(partitions_def, TimeWindowPartitionsDefinition): return partitions_def elif isinstance( partitions_def, MultiPartitionsDefinition ) and has_one_dimension_time_window_partitioning(partitions_def): return cast( TimeWindowPartitionsDefinition, partitions_def.time_window_dimension.partitions_def ) else: return None def get_time_partition_key( partitions_def: Optional[PartitionsDefinition], partition_key: Optional[str] ) -> str: from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition if partitions_def is None or partition_key is None: check.failed( "Cannot get time partitions key from when partitions def is None or partition key is" " None" ) elif isinstance(partitions_def, TimeWindowPartitionsDefinition): return partition_key elif isinstance(partitions_def, MultiPartitionsDefinition): return partitions_def.get_partition_key_from_str(partition_key).keys_by_dimension[ partitions_def.time_window_dimension.name ] else: check.failed(f"Cannot get time partition from non-time partitions def {partitions_def}") def _attempt_coerce_to_time_window_subset(subset: "PartitionsSubset") -> "PartitionsSubset": """Attempts to convert the input subset into a TimeWindowPartitionsSubset.""" if isinstance(subset, TimeWindowPartitionsSubset): return subset elif isinstance(subset, BaseTimeWindowPartitionsSubset): return TimeWindowPartitionsSubset( partitions_def=subset.partitions_def, num_partitions=subset.num_partitions, included_time_windows=subset.included_time_windows, ) elif isinstance(subset, AllPartitionsSubset): return TimeWindowPartitionsSubset.from_all_partitions_subset(subset) else: return subset