Ask AI

Partitioning assets#

This page is specific to Software-defined Assets (SDAs). Looking for ops? Refer to the Partitioned ops documentation.

A Software-defined Asset (SDA) can represent a collection of partitions that can be tracked and materialized independently. In many ways, each partition functions like its own mini-asset, but they all share a common materialization function and dependencies. Typically, each partition will correspond to a separate file or a slice of a table in a database.

A common use is for each partition to represent all the records in a data set that fall within a particular time window, e.g. hourly, daily or monthly. Alternatively, each partition can represent a region, a customer, an experiment - any dimension along which you want to be able to materialize and monitor independently. An asset can also be partitioned along multiple dimensions, e.g. by region and by hour.

A graph of assets with the same partitions implicitly forms a partitioned data pipeline, and you can launch a run that selects multiple assets and materializes the same partition in each asset.

Once an asset has a set of partitions, you can launch materializations of individual partitions and view the materialization history by partition in the Dagster UI.


Prerequisites#

Before continuing, you should be familiar with:


Defining partitioned assets#

A Software-defined Asset can be assigned a PartitionsDefinition, which determines the set of partitions that compose it. If the asset is stored in a filesystem or an object store, then each partition will typically correspond to a file or object. If the asset is stored in a database, then each partition will typically correspond to a range of values in a table that fall within a particular window.

The following example demonstrates creating an asset that has a partition for each day since October 1st, 2023. Materializing partition 2023-11-13 of this asset would result in fetching data from the URL https://api.nasa.gov/planetary/apod?date=2023-11-13 and storing it at the path nasa/2023-11-13.csv. Note that api_key=DEMO_KEY is used but has a limited number of calls:

import os
import urllib.request

# Create a new 'nasa' directory if needed
dir_name = "nasa"
if not os.path.exists(dir_name):
    os.makedirs(dir_name)

from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset


@asset(partitions_def=DailyPartitionsDefinition(start_date="2023-10-01"))
def my_daily_partitioned_asset(context: AssetExecutionContext) -> None:
    partition_date_str = context.partition_key

    url = f"https://api.nasa.gov/planetary/apod?api_key=DEMO_KEY&date={partition_date_str}"
    target_location = f"nasa/{partition_date_str}.csv"

    urllib.request.urlretrieve(url, target_location)

In the following sections, we'll demonstrate a few additional ways to partition assets:

Multi-dimensionally partitioned assets#

Heads up! Multipartitions definitions are currently limited to two dimensions.

The MultiPartitionsDefinition class accepts a mapping of dimension names to a PartitionsDefinition, creating a partition for each unique combination of dimension partitions.

Consider the following asset:

from dagster import (
    AssetExecutionContext,
    DailyPartitionsDefinition,
    MultiPartitionsDefinition,
    StaticPartitionsDefinition,
    asset,
)


@asset(
    partitions_def=MultiPartitionsDefinition(
        {
            "date": DailyPartitionsDefinition(start_date="2022-01-01"),
            "color": StaticPartitionsDefinition(["red", "yellow", "blue"]),
        }
    )
)
def multi_partitions_asset(context: AssetExecutionContext):
    if isinstance(context.partition_key, MultiPartitionKey):
        context.log.info(context.partition_key.keys_by_dimension)

In this example, the asset would contain a partition for each combination of color and date:

  • red|2022-01-01
  • yellow|2022-01-01
  • blue|2022-01-01
  • red|2022-01-02
  • ... and so on

Additionally, notice the code snippet above fetches the partition key from the asset context. Multi-dimensional partition keys are returned as MultiPartitionKey objects, which contain a MultiPartitionKey.keys_by_dimension method that returns the key per dimension. This object can also be passed into partition key execution parameters:

from dagster import MultiPartitionKey, materialize

result = materialize(
    [multi_partitions_asset],
    partition_key=MultiPartitionKey({"date": "2022-01-01", "color": "red"}),
)

Dynamically partitioned assets#

Looking for example projects? Check out the Dynamic Partitions example for a look at a full project that uses dynamic asset partitions.

Sometimes you don't know the set of partitions ahead of time when defining assets. For example, maybe you want to add a new partition every time a new data file lands in a directory or every time you want to experiment with a new set of hyperparameters. In these cases, you can use a DynamicPartitionsDefinition.

The DynamicPartitionsDefinition class accepts a name argument, representing the name of the partition set:

images_partitions_def = DynamicPartitionsDefinition(name="images")


@asset(partitions_def=images_partitions_def)
def images(context: AssetExecutionContext): ...

Partition keys can be added and removed for a given dynamic partition set. For example, the following code snippet demonstrates the usage of a sensor to detect the presence of a new partition and then trigger a run for that partition:

images_job = define_asset_job(
    "images_job", AssetSelection.assets("images"), partitions_def=images_partitions_def
)


@sensor(job=images_job)
def image_sensor(context: SensorEvaluationContext):
    new_images = [
        img_filename
        for img_filename in os.listdir(os.getenv("MY_DIRECTORY"))
        if not images_partitions_def.has_partition_key(
            img_filename, dynamic_partitions_store=context.instance
        )
    ]

    return SensorResult(
        run_requests=[
            RunRequest(partition_key=img_filename) for img_filename in new_images
        ],
        dynamic_partitions_requests=[
            images_partitions_def.build_add_request(new_images)
        ],
    )

We recommend limiting the number of partitions for each asset to 25,000 or fewer. Assets with partition counts exceeding this limit will likely have slower load times in the UI.


Defining partition dependencies#

Partitioned assets can depend on other partitioned assets. In this case, each partition in the downstream asset will depend on a partition or multiple partitions in the upstream asset.

Default partition dependency rules#

A few rules govern default partition-to-partition dependencies:

  • When the upstream asset and downstream asset have the same PartitionsDefinition, each partition in the downstream asset will depend on the same partition in the upstream asset.

  • When the upstream asset and downstream asset are both time window-partitioned, each partition in the downstream asset will depend on all partitions in the upstream asset that intersect its time window.

    For example, if an asset with a DailyPartitionsDefinition depends on an asset with an HourlyPartitionsDefinition, then partition 2022-04-12 of the daily asset would depend on 24 partitions of the hourly asset: 2022-04-12-00:00 through 2022-04-12-23:00.

Overriding default dependency rules#

Default partition dependency rules can be overridden by providing a PartitionMapping when specifying a dependency on an asset. How this is accomplished depends on the type of dependency the asset has - refer to the following tabs for more info.

Basic dependencies#

To override partition dependency rules for basic asset dependencies, you can use AssetDep to specify the partition dependency on an upstream asset:

from dagster import (
    AssetDep,
    DailyPartitionsDefinition,
    TimeWindowPartitionMapping,
    asset,
)

partitions_def = DailyPartitionsDefinition(start_date="2023-01-21")


@asset(partitions_def=partitions_def)
def events(): ...


@asset(
    partitions_def=partitions_def,
    deps=[
        AssetDep(
            events,
            partition_mapping=TimeWindowPartitionMapping(
                start_offset=-1, end_offset=-1
            ),
        )
    ],
)
def yesterday_event_stats(): ...

Refer to the API docs for a list of available PartitionMappings.


Partitioned asset jobs#

A partitioned asset job is a job that materializes a particular set of partitioned assets every time it runs.

In the following code snippet, the partitioned_asset_job will materialize two hourly-materialized assets (asset1 and asset2) every time it runs:

from dagster import (
    AssetSelection,
    Definitions,
    HourlyPartitionsDefinition,
    asset,
    define_asset_job,
)

hourly_partitions_def = HourlyPartitionsDefinition(start_date="2022-05-31-00:00")


@asset(partitions_def=hourly_partitions_def)
def asset1(): ...


@asset(partitions_def=hourly_partitions_def)
def asset2(): ...


partitioned_asset_job = define_asset_job(
    name="asset_1_and_2_job",
    selection=AssetSelection.assets(asset1, asset2),
    partitions_def=hourly_partitions_def,
)


defs = Definitions(
    assets=[asset1, asset2],
    jobs=[partitioned_asset_job],
)

Asset partitions in the Dagster UI#

Viewing the status of asset partitions#

To view all partitions for an asset, open the Definition tab of the asset's details page. The bar in the Partitions section represents all of the partitions for the asset.

In the following image, the partitions bar is entirely gray. This is because none of the partitions have been materialized:

Materializing partitioned assets#

When you materialize a partitioned asset, you choose which partitions to materialize and Dagster will launch a run for each partition. Note: If you choose more than one partition, the Dagster daemon needs to be running to queue the multiple runs.

The following image shows the Launch runs dialog on an asset's Details page, where you'll be prompted to select a partition to materialize:

After a partition has been successfully materialized, it will display as green in the partitions bar:

Viewing materializations by partition#

To view materializations by partition for a specific asset, navigate to the Activity tab of the asset's Details page:


Examples#

For more examples of partitions, check out the following in our Hacker News example:

Partitioned assets with partitioned I/O managers#

Heads up! Familiarity with I/O managers is required for this section.

Asset functions can write data out to files, but they can also delegate the writing operation to an I/O manager. Dagster's built-in I/O managers support handling partitioned assets, but you can also write your own I/O manager if you want additional customization.

For example, the following demonstrates how to define an asset that relies on an I/O manager to store its output:

import pandas as pd

from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset


@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def my_daily_partitioned_asset(context: AssetExecutionContext) -> pd.DataFrame:
    partition_date_str = context.partition_key
    return pd.read_csv(f"coolweatherwebsite.com/weather_obs&date={partition_date_str}")

If using the default I/O manager, materializing partition 2022-07-23 of this asset would store the output DataFrame in a pickle file at a path like my_daily_partitioned_asset/2022-07-23.


Relevant APIs#

NameDescription
PartitionsDefinitionSuperclass - defines the set of partitions that can be materialized for an asset.
HourlyPartitionsDefinitionA partitions definition with a partition for each hour.
DailyPartitionsDefinitionA partitions definition with a partition for each day.
WeeklyPartitionsDefinitionA partitions definition with a partition for each week.
MonthlyPartitionsDefinitionA partitions definition with a partition for each month.
StaticPartitionsDefinitionA partitions definition with a fixed set of partitions.
MultiPartitionsDefinitionA partitions definition with multiple dimensions.
MultiPartitionKeyA multi-dimensional partition key.
DynamicPartitionsDefinitionA partitions definition whose partitions can be dynamically added and removed.
AssetExecutionContext.partition_keyThe partition key for the current run, which can be accessed in the computation.