Ask AI

Partitions Definitions

class dagster.PartitionsDefinition[source]

Defines a set of partitions, which can be attached to a software-defined asset or job.

Abstract class with implementations for different kinds of partitions.

abstract get_partition_keys(current_time=None, dynamic_partitions_store=None)[source]

Returns a list of strings representing the partition keys of the PartitionsDefinition.

Parameters:
  • current_time (Optional[datetime]) – A datetime object representing the current time, only applicable to time-based partitions definitions.

  • dynamic_partitions_store (Optional[DynamicPartitionsStore]) – The DynamicPartitionsStore object that is responsible for fetching dynamic partitions. Required when the partitions definition is a DynamicPartitionsDefinition with a name defined. Users can pass the DagsterInstance fetched via context.instance to this argument.

Returns:

Sequence[str]

class dagster.HourlyPartitionsDefinition(start_date, end_date=None, minute_offset=0, timezone=None, fmt=None, end_offset=0)[source]

A set of hourly partitions.

The first partition in the set will start on the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset is provided, the start and end times of each partition will be minute_offset past the hour.

Parameters:
  • start_date (Union[datetime.datetime, str]) – The first date in the set of partitions. Can provide in either a datetime or string format.

  • end_date (Union[datetime.datetime, str, None]) – The last date(excluding) in the set of partitions. Default is None. Can provide in either a datetime or string format.

  • minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d. Note that if a non-UTC timezone is used, the date format must include a timezone offset to disambiguate between multiple instances of the same time before and after the Fall DST transition. If the format does not contain this offset, the second instance of the ambiguous time partition key will have the UTC offset automatically appended to it.

  • timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

  • end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.

HourlyPartitionsDefinition(start_date=datetime(2022, 03, 12))
# creates partitions (2022-03-12-00:00, 2022-03-12-01:00), (2022-03-12-01:00, 2022-03-12-02:00), ...

HourlyPartitionsDefinition(start_date=datetime(2022, 03, 12), minute_offset=15)
# creates partitions (2022-03-12-00:15, 2022-03-12-01:15), (2022-03-12-01:15, 2022-03-12-02:15), ...
class dagster.DailyPartitionsDefinition(start_date, end_date=None, minute_offset=0, hour_offset=0, timezone=None, fmt=None, end_offset=0)[source]

A set of daily partitions.

The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.

Parameters:
  • start_date (Union[datetime.datetime, str]) – The first date in the set of partitions. Can provide in either a datetime or string format.

  • end_date (Union[datetime.datetime, str, None]) – The last date(excluding) in the set of partitions. Default is None. Can provide in either a datetime or string format.

  • minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.

  • hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.

  • timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.

  • end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.

DailyPartitionsDefinition(start_date="2022-03-12")
# creates partitions (2022-03-12-00:00, 2022-03-13-00:00), (2022-03-13-00:00, 2022-03-14-00:00), ...

DailyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=16)
# creates partitions (2022-03-12-16:15, 2022-03-13-16:15), (2022-03-13-16:15, 2022-03-14-16:15), ...
class dagster.WeeklyPartitionsDefinition(start_date, end_date=None, minute_offset=0, hour_offset=0, day_offset=0, timezone=None, fmt=None, end_offset=0)[source]

Defines a set of weekly partitions.

The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day of the week corresponding to day_offset (0 indexed with Sunday as the start of the week). If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.

Parameters:
  • start_date (Union[datetime.datetime, str]) – The first date in the set of partitions will Sunday at midnight following start_date. Can provide in either a datetime or string format.

  • end_date (Union[datetime.datetime, str, None]) – The last date(excluding) in the set of partitions. Default is None. Can provide in either a datetime or string format.

  • minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.

  • hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.

  • day_offset (int) – Day of the week to “split” the partition. Defaults to 0 (Sunday).

  • timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.

  • end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.

WeeklyPartitionsDefinition(start_date="2022-03-12")
# creates partitions (2022-03-13-00:00, 2022-03-20-00:00), (2022-03-20-00:00, 2022-03-27-00:00), ...

WeeklyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=6)
# creates partitions (2022-03-12-03:15, 2022-03-19-03:15), (2022-03-19-03:15, 2022-03-26-03:15), ...
class dagster.MonthlyPartitionsDefinition(start_date, end_date=None, minute_offset=0, hour_offset=0, day_offset=1, timezone=None, fmt=None, end_offset=0)[source]

A set of monthly partitions.

The first partition in the set will start at the soonest first of the month after start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day_offset. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.

Parameters:
  • start_date (Union[datetime.datetime, str]) – The first date in the set of partitions will be midnight the sonnest first of the month following start_date. Can provide in either a datetime or string format.

  • end_date (Union[datetime.datetime, str, None]) – The last date(excluding) in the set of partitions. Default is None. Can provide in either a datetime or string format.

  • minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.

  • hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.

  • day_offset (int) – Day of the month to “split” the partition. Defaults to 1.

  • timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.

  • end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.

MonthlyPartitionsDefinition(start_date="2022-03-12")
# creates partitions (2022-04-01-00:00, 2022-05-01-00:00), (2022-05-01-00:00, 2022-06-01-00:00), ...

MonthlyPartitionsDefinition(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=5)
# creates partitions (2022-04-05-03:15, 2022-05-05-03:15), (2022-05-05-03:15, 2022-06-05-03:15), ...
class dagster.TimeWindowPartitionsDefinition(start, fmt, end=None, schedule_type=None, timezone=None, end_offset=0, minute_offset=None, hour_offset=None, day_offset=None, cron_schedule=None)[source]

A set of partitions where each partition corresponds to a time window.

The provided cron_schedule determines the bounds of the time windows. E.g. a cron_schedule of “0 0 \* \* \*” will result in daily partitions that start at midnight and end at midnight of the following day.

The string partition_key associated with each partition corresponds to the start of the partition’s time window.

The first partition in the set will start on at the first cron_schedule tick that is equal to or after the given start datetime. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number.

We recommended limiting partition counts for each asset to 25,000 partitions or fewer.

Parameters:
  • cron_schedule (str) – Determines the bounds of the time windows.

  • start (datetime) – The first partition in the set will start on at the first cron_schedule tick that is equal to or after this value.

  • timezone (Optional[str]) – The timezone in which each time should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

  • end (datetime) – The last partition (excluding) in the set.

  • fmt (str) – The date format to use for partition_keys. Note that if a non-UTC timezone is used, and the cron schedule repeats every hour or faster, the date format must include a timezone offset to disambiguate between multiple instances of the same time before and after the Fall DST transition. If the format does not contain this offset, the second instance of the ambiguous time partition key will have the UTC offset automatically appended to it.

  • end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.

property day_offset

For a weekly or monthly partitions definition, returns the day to “split” partitions by. Each partition will start on this day, and end before this day in the following week/month. Returns 0 if the day_offset parameter is unset in the WeeklyPartitionsDefinition, MonthlyPartitionsDefinition, or the provided cron schedule.

For weekly partitions, returns a value between 0 (representing Sunday) and 6 (representing Saturday). Providing a value of 1 means that a partition will exist weekly from Monday to the following Sunday.

For monthly partitions, returns a value between 0 (the first day of the month) and 31 (the last possible day of the month).

Type:

int

get_cron_schedule(minute_of_hour=None, hour_of_day=None, day_of_week=None, day_of_month=None)[source]

The schedule executes at the cadence specified by the partitioning, but may overwrite the minute/hour/day offset of the partitioning.

This is useful e.g. if you have partitions that span midnight to midnight but you want to schedule a job that runs at 2 am.

property hour_offset

Number of hours past 00:00 to “split” partitions. Defaults to 0.

For example, returns 1 if each partition starts at 01:00.

Type:

int

property minute_offset

Number of minutes past the hour to “split” partitions. Defaults to 0.

For example, returns 15 if each partition starts at 15 minutes past the hour.

Type:

int

property schedule_type

An enum representing the partition cadence (hourly, daily, weekly, or monthly).

Type:

Optional[ScheduleType]

class dagster.TimeWindow(start, end)[source]

An interval that is closed at the start and open at the end.

start

A pendulum datetime that marks the start of the window.

Type:

datetime

end

A pendulum datetime that marks the end of the window.

Type:

datetime

class dagster.StaticPartitionsDefinition(partition_keys)[source]

A statically-defined set of partitions.

We recommended limiting partition counts for each asset to 25,000 partitions or fewer.

Example

from dagster import StaticPartitionsDefinition, asset

oceans_partitions_def = StaticPartitionsDefinition(
    ["arctic", "atlantic", "indian", "pacific", "southern"]
)

@asset(partitions_def=oceans_partitions_defs)
def ml_model_for_each_ocean():
    ...
get_partition_keys(current_time=None, dynamic_partitions_store=None)[source]

Returns a list of strings representing the partition keys of the PartitionsDefinition.

Parameters:
  • current_time (Optional[datetime]) – A datetime object representing the current time, only applicable to time-based partitions definitions.

  • dynamic_partitions_store (Optional[DynamicPartitionsStore]) – The DynamicPartitionsStore object that is responsible for fetching dynamic partitions. Only applicable to DynamicPartitionsDefinitions.

Returns:

Sequence[str]

class dagster.MultiPartitionsDefinition(partitions_defs)[source]

Takes the cross-product of partitions from two partitions definitions.

For example, with a static partitions definition where the partitions are [“a”, “b”, “c”] and a daily partitions definition, this partitions definition will have the following partitions:

2020-01-01|a 2020-01-01|b 2020-01-01|c 2020-01-02|a 2020-01-02|b …

We recommended limiting partition counts for each asset to 25,000 partitions or fewer.

Parameters:

partitions_defs (Mapping[str, PartitionsDefinition]) – A mapping of dimension name to partitions definition. The total set of partitions will be the cross-product of the partitions from each PartitionsDefinition.

partitions_defs

A sequence of PartitionDimensionDefinition objects, each of which contains a dimension name and a PartitionsDefinition. The total set of partitions will be the cross-product of the partitions from each PartitionsDefinition. This sequence is ordered by dimension name, to ensure consistent ordering of the partitions.

Type:

Sequence[PartitionDimensionDefinition]

get_partition_keys(current_time=None, dynamic_partitions_store=None)[source]

Returns a list of MultiPartitionKeys representing the partition keys of the PartitionsDefinition.

Parameters:
  • current_time (Optional[datetime]) – A datetime object representing the current time, only applicable to time-based partition dimensions.

  • dynamic_partitions_store (Optional[DynamicPartitionsStore]) – The DynamicPartitionsStore object that is responsible for fetching dynamic partitions. Required when a dimension is a DynamicPartitionsDefinition with a name defined. Users can pass the DagsterInstance fetched via context.instance to this argument.

Returns:

Sequence[MultiPartitionKey]

class dagster.MultiPartitionKey(keys_by_dimension)[source]

A multi-dimensional partition key stores the partition key for each dimension. Subclasses the string class to keep partition key type as a string.

Contains additional methods to access the partition key for each dimension. Creates a string representation of the partition key for each dimension, separated by a pipe (|). Orders the dimensions by name, to ensure consistent string representation.

class dagster.DynamicPartitionsDefinition(partition_fn=None, name=None)[source]

A partitions definition whose partition keys can be dynamically added and removed.

This is useful for cases where the set of partitions is not known at definition time, but is instead determined at runtime.

Partitions can be added and removed using instance.add_dynamic_partitions and instance.delete_dynamic_partition methods.

We recommended limiting partition counts for each asset to 25,000 partitions or fewer.

Parameters:
  • name (Optional[str]) – The name of the partitions definition.

  • partition_fn (Optional[Callable[[Optional[datetime]], Union[Sequence[Partition], Sequence[str]]]]) – deprecated (This parameter will be removed in version 2.0. Provide partition definition name instead.) A function that returns the current set of partitions. This argument is deprecated and will be removed in 2.0.0.

Examples

fruits = DynamicPartitionsDefinition(name="fruits")

@sensor(job=my_job)
def my_sensor(context):
    return SensorResult(
        run_requests=[RunRequest(partition_key="apple")],
        dynamic_partitions_requests=[fruits.build_add_request(["apple"])]
    )
get_partition_keys(current_time=None, dynamic_partitions_store=None)[source]

Returns a list of strings representing the partition keys of the PartitionsDefinition.

Parameters:
  • current_time (Optional[datetime]) – A datetime object representing the current time, only applicable to time-based partitions definitions.

  • dynamic_partitions_store (Optional[DynamicPartitionsStore]) – The DynamicPartitionsStore object that is responsible for fetching dynamic partitions. Required when the partitions definition is a DynamicPartitionsDefinition with a name defined. Users can pass the DagsterInstance fetched via context.instance to this argument.

Returns:

Sequence[str]

class dagster.PartitionKeyRange(start, end)[source]

Defines a range of partitions.

start

The starting partition key in the range (inclusive).

Type:

str

end

The ending partition key in the range (inclusive).

Type:

str

Examples

partitions_def = StaticPartitionsDefinition(["a", "b", "c", "d"])
partition_key_range = PartitionKeyRange(start="a", end="c") # Represents ["a", "b", "c"]

Partitioned Schedules

dagster.build_schedule_from_partitioned_job(job, description=None, name=None, minute_of_hour=None, hour_of_day=None, day_of_week=None, day_of_month=None, default_status=DefaultScheduleStatus.STOPPED, tags=None, cron_schedule=None, execution_timezone=None)[source]

Creates a schedule from a time window-partitioned job a job that targets time window-partitioned or statically-partitioned assets. The job can also be multipartitioned, as long as one of the partitions dimensions is time-partitioned.

The schedule executes at the cadence specified by the time partitioning of the job or assets.

Examples

######################################
# Job that targets partitioned assets
######################################

from dagster import (
    DailyPartitionsDefinition,
    asset,
    build_schedule_from_partitioned_job,
    define_asset_job,
)

@asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"))
def asset1():
    ...

asset1_job = define_asset_job("asset1_job", selection=[asset1])

# The created schedule will fire daily
asset1_job_schedule = build_schedule_from_partitioned_job(asset1_job)

defs = Definitions(assets=[asset1], schedules=[asset1_job_schedule])

################
# Non-asset job
################

from dagster import DailyPartitionsDefinition, build_schedule_from_partitioned_job, jog


@job(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"))
def do_stuff_partitioned():
    ...

# The created schedule will fire daily
do_stuff_partitioned_schedule = build_schedule_from_partitioned_job(
    do_stuff_partitioned,
)

defs = Definitions(schedules=[do_stuff_partitioned_schedule])

Partition Mapping

class dagster.PartitionMapping[source]

Defines a correspondence between the partitions in an asset and the partitions in an asset that it depends on.

Overriding PartitionMapping outside of Dagster is not supported. The abstract methods of this class may change at any time.

abstract get_downstream_partitions_for_partitions(upstream_partitions_subset, upstream_partitions_def, downstream_partitions_def, current_time=None, dynamic_partitions_store=None)[source]

Returns the subset of partition keys in the downstream asset that use the data in the given partition key subset of the upstream asset.

Parameters:
  • upstream_partitions_subset (Union[PartitionKeyRange, PartitionsSubset]) – The subset of partition keys in the upstream asset.

  • downstream_partitions_def (PartitionsDefinition) – The partitions definition for the downstream asset.

abstract get_upstream_mapped_partitions_result_for_partitions(downstream_partitions_subset, downstream_partitions_def, upstream_partitions_def, current_time=None, dynamic_partitions_store=None)[source]

Returns a UpstreamPartitionsResult object containing the partition keys the downstream partitions subset was mapped to in the upstream partitions definition.

Valid upstream partitions will be included in UpstreamPartitionsResult.partitions_subset. Invalid upstream partitions will be included in UpstreamPartitionsResult.required_but_nonexistent_partition_keys.

For example, if an upstream asset is time-partitioned and starts in June 2023, and the downstream asset is time-partitioned and starts in May 2023, this function would return a UpstreamPartitionsResult(PartitionsSubset(“2023-06-01”), required_but_nonexistent_partition_keys=[“2023-05-01”]) when downstream_partitions_subset contains 2023-05-01 and 2023-06-01.

class dagster.TimeWindowPartitionMapping(start_offset=0, end_offset=0, allow_nonexistent_upstream_partitions=False)[source]

The default mapping between two TimeWindowPartitionsDefinitions.

A partition in the downstream partitions definition is mapped to all partitions in the upstream asset whose time windows overlap it.

This means that, if the upstream and downstream partitions definitions share the same time period, then this mapping is essentially the identity partition mapping - plus conversion of datetime formats.

If the upstream time period is coarser than the downstream time period, then each partition in the downstream asset will map to a single (larger) upstream partition. E.g. if the downstream is hourly and the upstream is daily, then each hourly partition in the downstream will map to the daily partition in the upstream that contains that hour.

If the upstream time period is finer than the downstream time period, then each partition in the downstream asset will map to multiple upstream partitions. E.g. if the downstream is daily and the upstream is hourly, then each daily partition in the downstream asset will map to the 24 hourly partitions in the upstream that occur on that day.

start_offset

If not 0, then the starts of the upstream windows are shifted by this offset relative to the starts of the downstream windows. For example, if start_offset=-1 and end_offset=0, then the downstream partition “2022-07-04” would map to the upstream partitions “2022-07-03” and “2022-07-04”. Only permitted to be non-zero when the upstream and downstream PartitionsDefinitions are the same. Defaults to 0.

Type:

int

end_offset

If not 0, then the ends of the upstream windows are shifted by this offset relative to the ends of the downstream windows. For example, if start_offset=0 and end_offset=1, then the downstream partition “2022-07-04” would map to the upstream partitions “2022-07-04” and “2022-07-05”. Only permitted to be non-zero when the upstream and downstream PartitionsDefinitions are the same. Defaults to 0.

Type:

int

allow_nonexistent_upstream_partitions

Defaults to false. If true, does not raise an error when mapped upstream partitions fall outside the start-end time window of the partitions def. For example, if the upstream partitions def starts on “2023-01-01” but the downstream starts on “2022-01-01”, setting this bool to true would return no partition keys when get_upstream_partitions_for_partitions is called with “2022-06-01”. When set to false, would raise an error.

Type:

bool

Examples

from dagster import DailyPartitionsDefinition, TimeWindowPartitionMapping, AssetIn, asset

partitions_def = DailyPartitionsDefinition(start_date="2020-01-01")

@asset(partitions_def=partitions_def)
def asset1():
    ...

@asset(
    partitions_def=partitions_def,
    ins={
        "asset1": AssetIn(
            partition_mapping=TimeWindowPartitionMapping(start_offset=-1)
        )
    }
)
def asset2(asset1):
    ...
class dagster.IdentityPartitionMapping[source]

Expects that the upstream and downstream assets are partitioned in the same way, and maps partitions in the downstream asset to the same partition in the upstream asset.

class dagster.AllPartitionMapping[source]

Maps every partition in the downstream asset to every partition in the upstream asset.

Commonly used in the case when the downstream asset is not partitioned, in which the entire downstream asset depends on all partitions of the usptream asset.

class dagster.LastPartitionMapping[source]

Maps all dependencies to the last partition in the upstream asset.

Commonly used in the case when the downstream asset is not partitioned, in which the entire downstream asset depends on the last partition of the upstream asset.

class dagster.StaticPartitionMapping(downstream_partition_keys_by_upstream_partition_key)[source]

Define an explicit correspondence between two StaticPartitionsDefinitions.

Parameters:

downstream_partition_keys_by_upstream_partition_key (Dict[str, str | Collection[str]]) – The single or multi-valued correspondence from upstream keys to downstream keys.

class dagster.SpecificPartitionsPartitionMapping(partition_keys)[source]

Maps to a specific subset of partitions in the upstream asset.

Example

from dagster import SpecificPartitionsPartitionMapping, StaticPartitionsDefinition, asset

@asset(partitions_def=StaticPartitionsDefinition(["a", "b", "c"]))
def upstream():
    ...

@asset(
    ins={
        "upstream": AssetIn(partition_mapping=SpecificPartitionsPartitionMapping(["a"]))
    }
)
def a_downstream(upstream):
    ...
class dagster.MultiToSingleDimensionPartitionMapping(partition_dimension_name=None)[source]

experimental This API may break in future versions, even between dot releases.

Defines a correspondence between an single-dimensional partitions definition and a MultiPartitionsDefinition. The single-dimensional partitions definition must be a dimension of the MultiPartitionsDefinition.

This class handles the case where the upstream asset is multipartitioned and the downstream asset is single dimensional, and vice versa.

For a partition key X, this partition mapping assumes that any multi-partition key with X in the selected dimension is a dependency.

Parameters:

partition_dimension_name (Optional[str]) – The name of the partition dimension in the MultiPartitionsDefinition that matches the single-dimension partitions definition.

class dagster.MultiPartitionMapping(downstream_mappings_by_upstream_dimension)[source]

experimental This API may break in future versions, even between dot releases.

Defines a correspondence between two MultiPartitionsDefinitions.

Accepts a mapping of upstream dimension name to downstream DimensionPartitionMapping, representing the explicit correspondence between the upstream and downstream MultiPartitions dimensions and the partition mapping used to calculate the downstream partitions.

Examples

weekly_abc = MultiPartitionsDefinition(
    {
        "abc": StaticPartitionsDefinition(["a", "b", "c"]),
        "weekly": WeeklyPartitionsDefinition("2023-01-01"),
    }
)
daily_123 = MultiPartitionsDefinition(
    {
        "123": StaticPartitionsDefinition(["1", "2", "3"]),
        "daily": DailyPartitionsDefinition("2023-01-01"),
    }
)

MultiPartitionMapping(
    {
        "abc": DimensionPartitionMapping(
            dimension_name="123",
            partition_mapping=StaticPartitionMapping({"a": "1", "b": "2", "c": "3"}),
        ),
        "weekly": DimensionPartitionMapping(
            dimension_name="daily",
            partition_mapping=TimeWindowPartitionMapping(),
        )
    }
)

For upstream or downstream dimensions not explicitly defined in the mapping, Dagster will assume an AllPartitionsMapping, meaning that all upstream partitions in those dimensions will be mapped to all downstream partitions in those dimensions.

Examples

weekly_abc = MultiPartitionsDefinition(
    {
        "abc": StaticPartitionsDefinition(["a", "b", "c"]),
        "daily": DailyPartitionsDefinition("2023-01-01"),
    }
)
daily_123 = MultiPartitionsDefinition(
    {
        "123": StaticPartitionsDefinition(["1", "2", "3"]),
        "daily": DailyPartitionsDefinition("2023-01-01"),
    }
)

MultiPartitionMapping(
    {
        "daily": DimensionPartitionMapping(
            dimension_name="daily",
            partition_mapping=IdentityPartitionMapping(),
        )
    }
)

# Will map `daily_123` partition key {"123": "1", "daily": "2023-01-01"} to the upstream:
# {"abc": "a", "daily": "2023-01-01"}
# {"abc": "b", "daily": "2023-01-01"}
# {"abc": "c", "daily": "2023-01-01"}
Parameters:

downstream_mappings_by_upstream_dimension (Mapping[str, DimensionPartitionMapping]) – A mapping that defines an explicit correspondence between one dimension of the upstream MultiPartitionsDefinition and one dimension of the downstream MultiPartitionsDefinition. Maps a string representing upstream dimension name to downstream DimensionPartitionMapping, containing the downstream dimension name and partition mapping.

Backfill Policy (Experimental)

class dagster.BackfillPolicy(max_partitions_per_run=1)[source]

experimental This API may break in future versions, even between dot releases.

A BackfillPolicy specifies how Dagster should attempt to backfill a partitioned asset.

There are two main kinds of backfill policies: single-run and multi-run.

An asset with a single-run backfill policy will take a single run to backfill all of its partitions at once.

An asset with a multi-run backfill policy will take multiple runs to backfill all of its partitions. Each run will backfill a subset of the partitions. The number of partitions to backfill in each run is controlled by the max_partitions_per_run parameter.

For example:

  • If an asset has 100 partitions, and the max_partitions_per_run is set to 10, then it will be backfilled in 10 runs; each run will backfill 10 partitions.

  • If an asset has 100 partitions, and the max_partitions_per_run is set to 11, then it will be backfilled in 10 runs; the first 9 runs will backfill 11 partitions, and the last one run will backfill the remaining 9 partitions.

Warning:

Constructing an BackfillPolicy directly is not recommended as the API is subject to change. BackfillPolicy.single_run() and BackfillPolicy.multi_run(max_partitions_per_run=x) are the recommended APIs.

static multi_run(max_partitions_per_run=1)[source]

Creates a BackfillPolicy that executes the entire backfill in multiple runs. Each run will backfill [max_partitions_per_run] number of partitions.

Parameters:

max_partitions_per_run (Optional[int]) – The maximum number of partitions in each run of the multiple runs. Defaults to 1.

static single_run()[source]

Creates a BackfillPolicy that executes the entire backfill in a single run.

Partitioned Config

class dagster.PartitionedConfig(partitions_def, run_config_for_partition_fn=None, decorated_fn=None, tags_for_partition_fn=None, run_config_for_partition_key_fn=None, tags_for_partition_key_fn=None)[source]

Defines a way of configuring a job where the job can be run on one of a discrete set of partitions, and each partition corresponds to run configuration for the job.

Setting PartitionedConfig as the config for a job allows you to launch backfills for that job and view the run history across partitions.

get_partition_keys(current_time=None)[source]

Returns a list of partition keys, representing the full set of partitions that config can be applied to.

Parameters:

current_time (Optional[datetime]) – A datetime object representing the current time. Only applicable to time-based partitions definitions.

Returns:

Sequence[str]

property partitions_def

The partitions definition associated with this PartitionedConfig.

Type:

T_PartitionsDefinition

property run_config_for_partition_fn

deprecated This API will be removed in version 2.0.

Use run_config_for_partition_key_fn instead..

A function that accepts a partition and returns a dictionary representing the config to attach to runs for that partition. Deprecated as of 1.3.3.

Type:

Optional[Callable[[Partition], Mapping[str, Any]]]

property run_config_for_partition_key_fn

A function that accepts a partition key and returns a dictionary representing the config to attach to runs for that partition.

Type:

Optional[Callable[[str], Mapping[str, Any]]]

property tags_for_partition_fn

deprecated This API will be removed in version 2.0.

Use tags_for_partition_key_fn instead..

A function that accepts a partition and returns a dictionary of tags to attach to runs for that partition. Deprecated as of 1.3.3.

Type:

Optional[Callable[[Partition], Mapping[str, str]]]

property tags_for_partition_key_fn

A function that accepts a partition key and returns a dictionary of tags to attach to runs for that partition.

Type:

Optional[Callable[[str], Mapping[str, str]]]

dagster.static_partitioned_config(partition_keys, tags_for_partition_fn=None, tags_for_partition_key_fn=None)[source]

Creates a static partitioned config for a job.

The provided partition_keys is a static list of strings identifying the set of partitions. The list of partitions is static, so while the run config returned by the decorated function may change over time, the list of valid partition keys does not.

This has performance advantages over dynamic_partitioned_config in terms of loading different partition views in the Dagster UI.

The decorated function takes in a partition key and returns a valid run config for a particular target job.

Parameters:
  • partition_keys (Sequence[str]) – A list of valid partition keys, which serve as the range of values that can be provided to the decorated run config function.

  • tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – deprecated (This parameter will be removed in version 2.0. Use tags_for_partition_key_fn instead.) A function that accepts a partition key and returns a dictionary of tags to attach to runs for that partition.

  • tags_for_partition_key_fn (Optional[Callable[[str], Mapping[str, str]]]) – A function that accepts a partition key and returns a dictionary of tags to attach to runs for that partition.

Returns:

PartitionedConfig

dagster.dynamic_partitioned_config(partition_fn, tags_for_partition_fn=None, tags_for_partition_key_fn=None)[source]

Creates a dynamic partitioned config for a job.

The provided partition_fn returns a list of strings identifying the set of partitions, given an optional datetime argument (representing the current time). The list of partitions returned may change over time.

The decorated function takes in a partition key and returns a valid run config for a particular target job.

Parameters:
  • partition_fn (Callable[[datetime.datetime], Sequence[str]]) – A function that generates a list of valid partition keys, which serve as the range of values that can be provided to the decorated run config function.

  • tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – deprecated (This parameter will be removed in version 2.0. Use tags_for_partition_key_fn instead.) A function that accepts a partition key and returns a dictionary of tags to attach to runs for that partition.

Returns:

PartitionedConfig

dagster.hourly_partitioned_config(start_date, minute_offset=0, timezone=None, fmt=None, end_offset=0, tags_for_partition_fn=None)[source]

Defines run config over a set of hourly partitions.

The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.

The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset is provided, the start and end times of each partition will be minute_offset past the hour.

Parameters:
  • start_date (Union[datetime.datetime, str]) – The first date in the set of partitions. Can provide in either a datetime or string format.

  • minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.

  • timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

  • end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.

  • tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition.

@hourly_partitioned_config(start_date=datetime(2022, 03, 12))
# creates partitions (2022-03-12-00:00, 2022-03-12-01:00), (2022-03-12-01:00, 2022-03-12-02:00), ...

@hourly_partitioned_config(start_date=datetime(2022, 03, 12), minute_offset=15)
# creates partitions (2022-03-12-00:15, 2022-03-12-01:15), (2022-03-12-01:15, 2022-03-12-02:15), ...
dagster.daily_partitioned_config(start_date, minute_offset=0, hour_offset=0, timezone=None, fmt=None, end_offset=0, tags_for_partition_fn=None)[source]

Defines run config over a set of daily partitions.

The decorated function should accept a start datetime and end datetime, which represent the bounds of the date partition the config should delineate.

The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.

Parameters:
  • start_date (Union[datetime.datetime, str]) – The first date in the set of partitions. Can provide in either a datetime or string format.

  • minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.

  • hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.

  • timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.

  • end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.

  • tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition.

@daily_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-03-12-00:00, 2022-03-13-00:00), (2022-03-13-00:00, 2022-03-14-00:00), ...

@daily_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=16)
# creates partitions (2022-03-12-16:15, 2022-03-13-16:15), (2022-03-13-16:15, 2022-03-14-16:15), ...
dagster.weekly_partitioned_config(start_date, minute_offset=0, hour_offset=0, day_offset=0, timezone=None, fmt=None, end_offset=0, tags_for_partition_fn=None)[source]

Defines run config over a set of weekly partitions.

The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.

The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day of the week corresponding to day_offset (0 indexed with Sunday as the start of the week). If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.

Parameters:
  • start_date (Union[datetime.datetime, str]) – The first date in the set of partitions will Sunday at midnight following start_date. Can provide in either a datetime or string format.

  • minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.

  • hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.

  • day_offset (int) – Day of the week to “split” the partition. Defaults to 0 (Sunday).

  • timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.

  • end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.

  • tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition.

@weekly_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-03-13-00:00, 2022-03-20-00:00), (2022-03-20-00:00, 2022-03-27-00:00), ...

@weekly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=6)
# creates partitions (2022-03-12-03:15, 2022-03-19-03:15), (2022-03-19-03:15, 2022-03-26-03:15), ...
dagster.monthly_partitioned_config(start_date, minute_offset=0, hour_offset=0, day_offset=1, timezone=None, fmt=None, end_offset=0, tags_for_partition_fn=None)[source]

Defines run config over a set of monthly partitions.

The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.

The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at midnight on the soonest first of the month after start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day_offset. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.

Parameters:
  • start_date (Union[datetime.datetime, str]) – The first date in the set of partitions will be midnight the sonnest first of the month following start_date. Can provide in either a datetime or string format.

  • minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.

  • hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.

  • day_offset (int) – Day of the month to “split” the partition. Defaults to 1.

  • timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.

  • end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.

  • tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition.

@monthly_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-04-01-00:00, 2022-05-01-00:00), (2022-05-01-00:00, 2022-06-01-00:00), ...

@monthly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=5)
# creates partitions (2022-04-05-03:15, 2022-05-05-03:15), (2022-05-05-03:15, 2022-06-05-03:15), ...