Ask AI

Source code for dagster._core.definitions.partitioned_schedule

from typing import Callable, Mapping, NamedTuple, Optional, Union, cast

import dagster._check as check
from dagster._core.errors import DagsterInvalidDefinitionError

from .decorators.schedule_decorator import schedule
from .job_definition import JobDefinition
from .multi_dimensional_partitions import MultiPartitionsDefinition
from .partition import (
    PartitionsDefinition,
    StaticPartitionsDefinition,
)
from .run_request import RunRequest, SkipReason
from .schedule_definition import (
    DefaultScheduleStatus,
    RunRequestIterator,
    ScheduleDefinition,
    ScheduleEvaluationContext,
)
from .time_window_partitions import (
    TimeWindowPartitionsDefinition,
    get_time_partitions_def,
    has_one_dimension_time_window_partitioning,
)
from .unresolved_asset_job_definition import UnresolvedAssetJobDefinition


class UnresolvedPartitionedAssetScheduleDefinition(NamedTuple):
    """Points to an unresolved asset job. The asset selection isn't resolved yet, so we can't resolve
    the PartitionsDefinition, so we can't resolve the schedule cadence.
    """

    name: str
    job: UnresolvedAssetJobDefinition
    description: Optional[str]
    default_status: DefaultScheduleStatus
    minute_of_hour: Optional[int]
    hour_of_day: Optional[int]
    day_of_week: Optional[int]
    day_of_month: Optional[int]
    tags: Optional[Mapping[str, str]]

    def resolve(self, resolved_job: JobDefinition) -> ScheduleDefinition:
        partitions_def = resolved_job.partitions_def
        if partitions_def is None:
            check.failed(
                f"Job '{resolved_job.name}' provided to build_schedule_from_partitioned_job must"
                " contain partitioned assets or a partitions definition."
            )

        partitions_def = _check_valid_schedule_partitions_def(partitions_def)
        time_partitions_def = check.not_none(get_time_partitions_def(partitions_def))

        return schedule(
            name=self.name,
            cron_schedule=time_partitions_def.get_cron_schedule(
                self.minute_of_hour, self.hour_of_day, self.day_of_week, self.day_of_month
            ),
            job=resolved_job,
            default_status=self.default_status,
            execution_timezone=time_partitions_def.timezone,
            description=self.description,
        )(_get_schedule_evaluation_fn(partitions_def, resolved_job, self.tags))


[docs]def build_schedule_from_partitioned_job( job: Union[JobDefinition, UnresolvedAssetJobDefinition], description: Optional[str] = None, name: Optional[str] = None, minute_of_hour: Optional[int] = None, hour_of_day: Optional[int] = None, day_of_week: Optional[int] = None, day_of_month: Optional[int] = None, default_status: DefaultScheduleStatus = DefaultScheduleStatus.STOPPED, tags: Optional[Mapping[str, str]] = None, cron_schedule: Optional[str] = None, execution_timezone: Optional[str] = None, ) -> Union[UnresolvedPartitionedAssetScheduleDefinition, ScheduleDefinition]: """Creates a schedule from a time window-partitioned job a job that targets time window-partitioned or statically-partitioned assets. The job can also be multipartitioned, as long as one of the partitions dimensions is time-partitioned. The schedule executes at the cadence specified by the time partitioning of the job or assets. Examples: .. code-block:: python ###################################### # Job that targets partitioned assets ###################################### from dagster import ( DailyPartitionsDefinition, asset, build_schedule_from_partitioned_job, define_asset_job, ) @asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01")) def asset1(): ... asset1_job = define_asset_job("asset1_job", selection=[asset1]) # The created schedule will fire daily asset1_job_schedule = build_schedule_from_partitioned_job(asset1_job) defs = Definitions(assets=[asset1], schedules=[asset1_job_schedule]) ################ # Non-asset job ################ from dagster import DailyPartitionsDefinition, build_schedule_from_partitioned_job, jog @job(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01")) def do_stuff_partitioned(): ... # The created schedule will fire daily do_stuff_partitioned_schedule = build_schedule_from_partitioned_job( do_stuff_partitioned, ) defs = Definitions(schedules=[do_stuff_partitioned_schedule]) """ check.invariant( not (day_of_week and day_of_month), "Cannot provide both day_of_month and day_of_week parameter to" " build_schedule_from_partitioned_job.", ) check.invariant( not ( (cron_schedule or execution_timezone) and ( day_of_month is not None or day_of_week is not None or hour_of_day is not None or minute_of_hour is not None ) ), "Cannot provide both cron_schedule / execution_timezone parameters and" " day_of_month / day_of_week / hour_of_day / minute_of_hour parameters to" " build_schedule_from_partitioned_job.", ) if isinstance(job, UnresolvedAssetJobDefinition) and job.partitions_def is None: return UnresolvedPartitionedAssetScheduleDefinition( job=job, default_status=default_status, name=check.opt_str_param(name, "name", f"{job.name}_schedule"), description=check.opt_str_param(description, "description"), minute_of_hour=minute_of_hour, hour_of_day=hour_of_day, day_of_week=day_of_week, day_of_month=day_of_month, tags=tags, ) else: partitions_def = job.partitions_def if partitions_def is None: check.failed("The provided job is not partitioned") partitions_def = _check_valid_schedule_partitions_def(partitions_def) if isinstance(partitions_def, StaticPartitionsDefinition): check.not_none( cron_schedule, "Creating a schedule from a static partitions definition requires a cron schedule", ) else: if cron_schedule or execution_timezone: check.failed( "Cannot provide cron_schedule or execution_timezone to" " build_schedule_from_partitioned_job for a time-partitioned job." ) time_partitions_def = check.not_none(get_time_partitions_def(partitions_def)) cron_schedule = time_partitions_def.get_cron_schedule( minute_of_hour, hour_of_day, day_of_week, day_of_month ) execution_timezone = time_partitions_def.timezone return schedule( cron_schedule=cron_schedule, # type: ignore[arg-type] job=job, default_status=default_status, execution_timezone=execution_timezone, name=check.opt_str_param(name, "name", f"{job.name}_schedule"), description=check.opt_str_param(description, "description"), )(_get_schedule_evaluation_fn(partitions_def, job, tags))
def _get_schedule_evaluation_fn( partitions_def: PartitionsDefinition, job: Union[JobDefinition, UnresolvedAssetJobDefinition], tags: Optional[Mapping[str, str]] = None, ) -> Callable[[ScheduleEvaluationContext], Union[SkipReason, RunRequest, RunRequestIterator]]: def schedule_fn(context): # Run for the latest partition. Prior partitions will have been handled by prior ticks. if isinstance(partitions_def, TimeWindowPartitionsDefinition): partition_key = partitions_def.get_last_partition_key(context.scheduled_execution_time) if partition_key is None: return SkipReason("The job's PartitionsDefinition has no partitions") return job.run_request_for_partition( partition_key=partition_key, run_key=partition_key, tags=tags, current_time=context.scheduled_execution_time, ) if isinstance(partitions_def, StaticPartitionsDefinition): return [ job.run_request_for_partition( partition_key=key, run_key=key, tags=tags, current_time=context.scheduled_execution_time, ) for key in partitions_def.get_partition_keys( current_time=context.scheduled_execution_time ) ] else: check.invariant(isinstance(partitions_def, MultiPartitionsDefinition)) time_window_dimension = partitions_def.time_window_dimension partition_key = time_window_dimension.partitions_def.get_last_partition_key( context.scheduled_execution_time ) if partition_key is None: return SkipReason("The job's PartitionsDefinition has no partitions") return [ job.run_request_for_partition( partition_key=key, run_key=key, tags=tags, current_time=context.scheduled_execution_time, dynamic_partitions_store=context.instance if context.instance_ref else None, ) for key in partitions_def.get_multipartition_keys_with_dimension_value( time_window_dimension.name, partition_key, dynamic_partitions_store=context.instance if context.instance_ref else None, ) ] return schedule_fn def _check_valid_schedule_partitions_def( partitions_def: PartitionsDefinition, ) -> Union[TimeWindowPartitionsDefinition, MultiPartitionsDefinition, StaticPartitionsDefinition]: if not has_one_dimension_time_window_partitioning(partitions_def) and not isinstance( partitions_def, StaticPartitionsDefinition ): raise DagsterInvalidDefinitionError( "Tried to build a partitioned schedule from an asset job, but received an invalid" " partitions definition. The permitted partitions definitions are: \n1." " TimeWindowPartitionsDefinition\n2. MultiPartitionsDefinition with a single" " TimeWindowPartitionsDefinition dimension\n3. StaticPartitionsDefinition" ) return cast( Union[ TimeWindowPartitionsDefinition, MultiPartitionsDefinition, StaticPartitionsDefinition, ], partitions_def, ) schedule_from_partitions = build_schedule_from_partitioned_job