Ask AI

Polars (dagster-polars)

This library provides Dagster integration with Polars. It allows using Polars eager or lazy DataFrames as inputs and outputs with Dagster’s @asset and @op. Type annotations are used to control whether to load an eager or lazy DataFrame. Lazy DataFrames can be sinked as output. Multiple serialization formats (Parquet, Delta Lake, BigQuery) and filesystems (local, S3, GCS, …) are supported.

Comprehensive list of dagster-polars behavior for supported type annotations can be found in Type Annotations section.

Installation

pip install dagster-polars

Some IOManagers (like PolarsDeltaIOManager) may require additional dependencies, which are provided with extras like dagster-polars[delta]. Please check the documentation for each IOManager for more details.

Quickstart

Common filesystem-based IOManagers features highlights, using PolarsParquetIOManager as an example (see BasePolarsUPathIOManager for the full list of features provided by dagster-polars):

Type annotations are not required. By default an eager pl.DataFrame will be loaded.

from dagster import asset
import polars as pl

@asset(io_manager_key="polars_parquet_io_manager")
def upstream():
    return DataFrame({"foo": [1, 2, 3]})

@asset(io_manager_key="polars_parquet_io_manager")
def downstream(upstream) -> pl.LazyFrame:
    assert isinstance(upstream, pl.DataFrame)
    return upstream.lazy()  # LazyFrame will be sinked

Lazy pl.LazyFrame can be scanned by annotating the input with pl.LazyFrame, and returning a pl.LazyFrame will sink it:

@asset(io_manager_key="polars_parquet_io_manager")
def downstream(upstream: pl.LazyFrame) -> pl.LazyFrame:
    assert isinstance(upstream, pl.LazyFrame)
    return upstream

The same logic applies to partitioned assets:

@asset
def downstream(partitioned_upstream: Dict[str, pl.LazyFrame]):
    assert isinstance(partitioned_upstream, dict)
    assert isinstance(partitioned_upstream["my_partition"], pl.LazyFrame)

Optional inputs and outputs are supported:

@asset
def upstream() -> Optional[pl.DataFrame]:
    if has_data:
        return DataFrame({"foo": [1, 2, 3]})  # type check will pass
    else:
        return None  # type check will pass and `dagster_polars` will skip writing the output completely

@asset
def downstream(upstream: Optional[pl.LazyFrame]):  # upstream will be None if it doesn't exist in storage
    ...

Some IOManagers support saving/loading custom metadata along with the DataFrame. This is often useful for external systems which would read the asset data outside of Dagster. The DataFrameWithMetadata is a type alias provided by dagster_polars.types.

from dagster_polars import DataFrameWithMetadata

@asset
def downstream(upstream: DataFrameWithMetadata):
    df, metadata = upstream
    assert isinstance(upstream[0], pl.DataFrame)
    assert isinstance(upstream[1], dict)

By default all the IOManagers store separate partitions as physically separated locations, such as:

  • /my/asset/key/partition_0.extension

  • /my/asset/key/partition_1.extension

This mode is useful for e.g. snapshotting.

Some IOManagers (like PolarsDeltaIOManager) support reading and writing partitions in storage-native format in the same location. This mode can be typically enabled by setting “partition_by” metadata value. For example, PolarsDeltaIOManager would store different partitions in the same /my/asset/key.delta directory, which will be properly partitioned.

This mode should be preferred for true partitioning.

Type Annotations

Type aliases like DataFrameWithPartitions are provided by dagster_polars.types for convenience.

In the table below StorageMetadata expands to Dict[str, Any].

Supported type annotations and dagster-polars behavior

Type annotation

Type Alias

Behavior

DataFrame

read/write a DataFrame

LazyFrame

read/sink a LazyFrame

Optional[DataFrame]

read/write a DataFrame. Do nothing if no data is found in storage or the output is None

Optional[LazyFrame]

read a LazyFrame. Do nothing if no data is found in storage

Dict[str, DataFrame]

DataFrameWithPartitions

read multiple DataFrame`s as `Dict[str, DataFrame]. Raises an error for missing partitions, unless “allow_missing_partitions” input metadata is set to True

Dict[str, LazyFrame]

LazyFramePartitions

read multiple LazyFrame`s as `Dict[str, LazyFrame]. Raises an error for missing partitions, unless “allow_missing_partitions” input metadata is set to True

Tuple[DataFrame, StorageMetadata]

DataFrameWithMetadata

read/write a DataFrame and a metadata dict (if supported by the IOManager)

Tuple[LazyFrame, StorageMetadata]

LazyFrameWithMetadata

read a LazyFrame and a metadata dict (if supported by the IOManager)

Optional[DataFrameWithMetadata]

read/write a DataFrame and metadata dict (if supported by the IOManager). Do nothing if no data is found in storage or the output is None

Optional[LazyFrameWithMetadata]

read LazyFrame and a metadata dict. Do nothing if no data is found in storage or the output is None

Dict[str, DataFrameWithMetadata]

DataFramePartitionsWithMetadata

read multiple DataFrame`s and metadata dicts as `Dict[str, Tuple[DataFrame, StorageMetadata]]. Raises an error for missing partitions, unless “allow_missing_partitions” input metadata is set to True

Dict[str, LazyFrameWithMetadata]

LazyFramePartitionsWithMetadata

read multiple LazyFrame`s and metadata dicts as `Dict[str, Tuple[LazyFrame, StorageMetadata]]. Raises an error for missing partitions, unless “allow_missing_partitions” input metadata is set to True

Generic builtins (like tuple[…] instead of Tuple[…]) are supported for Python >= 3.9.

API Documentation

dagster_polars.BasePolarsUPathIOManager IOManagerDefinition[source]

Config Schema:
base_dir (Union[dagster.StringSource, None], optional):

Base directory for storing files.

storage_options (Union[dict, None], optional):

Storage authentication for cloud object store

Base class for dagster-polars IOManagers.

Doesn’t define a specific storage format.

To implement a specific storage format (parquet, csv, etc), inherit from this class and implement the write_df_to_path, sink_df_to_path and scan_df_from_path methods.

Features:
  • All the features of UPathIOManager - works with local and remote filesystems (like S3), supports loading multiple partitions with respect to PartitionMapping, and more

  • loads the correct type - polars.DataFrame, polars.LazyFrame, or other types defined in dagster_polars.types - based on the input type annotation (or dagster.DagsterType’s typing_type)

  • can sink lazy pl.LazyFrame DataFrames

  • handles Nones with Optional types by skipping loading missing inputs or saving None outputs

  • logs various metadata about the DataFrame - size, schema, sample, stats, …

  • the “columns” input metadata value can be used to select a subset of columns to load

dagster_polars.PolarsParquetIOManager IOManagerDefinition[source]

Config Schema:
base_dir (Union[dagster.StringSource, None], optional):

Base directory for storing files.

storage_options (Union[dict, None], optional):

Storage authentication for cloud object store

extension (Union[dagster.StringSource, None], optional):

Default Value: ‘.parquet’

experimental This API may break in future versions, even between dot releases.

Implements reading and writing Polars DataFrames in Apache Parquet format.

Features:
  • All features provided by BasePolarsUPathIOManager.

  • All read/write options can be set via corresponding metadata or config parameters (metadata takes precedence).

  • Supports reading partitioned Parquet datasets (for example, often produced by Spark).

  • Supports reading/writing custom metadata in the Parquet file’s schema as json-serialized bytes at “dagster_polars_metadata” key.

Examples

from dagster import asset
from dagster_polars import PolarsParquetIOManager
import polars as pl

@asset(
    io_manager_key="polars_parquet_io_manager",
    key_prefix=["my_dataset"]
)
def my_asset() -> pl.DataFrame:  # data will be stored at <base_dir>/my_dataset/my_asset.parquet
    ...

defs = Definitions(
    assets=[my_table],
    resources={
        "polars_parquet_io_manager": PolarsParquetIOManager(base_dir="s3://my-bucket/my-dir")
    }
)

Reading partitioned Parquet datasets:

from dagster import SourceAsset

my_asset = SourceAsset(
    key=["path", "to", "dataset"],
    io_manager_key="polars_parquet_io_manager",
    metadata={
        "partition_by": ["year", "month", "day"]
    }
)

Storing custom metadata in the Parquet file schema (this metadata can be read outside of Dagster with a helper function dagster_polars.PolarsParquetIOManager.read_parquet_metadata()):

from dagster_polars import DataFrameWithMetadata


@asset(
    io_manager_key="polars_parquet_io_manager",
)
def upstream() -> DataFrameWithMetadata:
    return pl.DataFrame(...), {"my_custom_metadata": "my_custom_value"}


@asset(
    io_manager_key="polars_parquet_io_manager",
)
def downsteam(upstream: DataFrameWithMetadata):
    df, metadata = upstream
    assert metadata["my_custom_metadata"] == "my_custom_value"
dagster_polars.PolarsDeltaIOManager IOManagerDefinition[source]

Config Schema:
base_dir (Union[dagster.StringSource, None], optional):

Base directory for storing files.

storage_options (Union[dict, None], optional):

Storage authentication for cloud object store

extension (Union[dagster.StringSource, None], optional):

Default Value: ‘.delta’

mode (Union[DeltaWriteMode, None], optional):

Default Value: ‘overwrite’

overwrite_schema (Union[dagster.BoolSource, None], optional):

Default Value: False

version (Union[dagster.IntSource, None], optional):

experimental This API may break in future versions, even between dot releases.

Implements writing and reading DeltaLake tables.

Features:
  • All features provided by BasePolarsUPathIOManager.

  • All read/write options can be set via corresponding metadata or config parameters (metadata takes precedence).

  • Supports native DeltaLake partitioning by storing different asset partitions in the same DeltaLake table. To enable this behavior, set the partition_by metadata value or config parameter and use a non-dict type annotation when loading the asset. The partition_by value will be used in delta_write_options of pl.DataFrame.write_delta and pyarrow_options of pl.scan_detla). When using a one-dimensional PartitionsDefinition, it should be a single string like “column`. When using a MultiPartitionsDefinition, it should be a dict with dimension to column names mapping, like {“dimension”: “column”}.

  • Supports writing/reading custom metadata to/from .dagster_polars_metadata/<version>.json file in the DeltaLake table directory.

Install dagster-polars[delta] to use this IOManager.

Examples

from dagster import asset
from dagster_polars import PolarsDeltaIOManager
import polars as pl

@asset(
    io_manager_key="polars_delta_io_manager",
    key_prefix=["my_dataset"]
)
def my_asset() -> pl.DataFrame:  # data will be stored at <base_dir>/my_dataset/my_asset.delta
    ...

defs = Definitions(
    assets=[my_table],
    resources={
        "polars_parquet_io_manager": PolarsDeltaIOManager(base_dir="s3://my-bucket/my-dir")
    }
)

Appending to a DeltaLake table:

@asset(
    io_manager_key="polars_delta_io_manager",
    metadata={
        "mode": "append"
    },
)
def my_table() -> pl.DataFrame:
    ...

Using native DeltaLake partitioning by storing different asset partitions in the same DeltaLake table:

from dagster import AssetExecutionContext, DailyPartitionedDefinition
from dagster_polars import LazyFramePartitions

@asset(
    io_manager_key="polars_delta_io_manager",
    metadata={
        "partition_by": "partition_col"
    },
    partitions_def=StaticPartitionsDefinition(["a, "b", "c"])
)
def upstream(context: AssetExecutionContext) -> pl.DataFrame:
    df = ...

    # column with the partition_key must match `partition_by` metadata value
    return df.with_columns(pl.lit(context.partition_key).alias("partition_col"))

@asset
def downstream(upstream: pl.LazyFrame) -> pl.DataFrame:
    ...

When using MuiltiPartitionsDefinition, partition_by metadata value should be a dictionary mapping dimensions to column names.

from dagster import AssetExecutionContext, DailyPartitionedDefinition, MultiPartitionsDefinition, StaticPartitionsDefinition
from dagster_polars import LazyFramePartitions

@asset(
    io_manager_key="polars_delta_io_manager",
    metadata={
        "partition_by": {"time": "date", "clients": "client"}  # dimension->column mapping
    },
    partitions_def=MultiPartitionsDefinition(
        {
            "date": DailyPartitionedDefinition(...),
            "clients": StaticPartitionsDefinition(...)
        }
    )
)
def upstream(context: AssetExecutionContext) -> pl.DataFrame:
    df = ...

    partition_keys_by_dimension = context.partition_key.keys_by_dimension

    return df.with_columns(
        pl.lit(partition_keys_by_dimension["time"]).alias("date"),  # time dimension matches date column
        pl.lit(partition_keys_by_dimension["clients"]).alias("client")  # clients dimension matches client column
    )


@asset
def downstream(upstream: pl.LazyFrame) -> pl.DataFrame:
    ...
dagster_polars.PolarsBigQueryIOManager IOManagerDefinition[source]

Config Schema:
project (dagster.StringSource):

The GCP project to use.

dataset (Union[dagster.StringSource, None], optional):

Name of the BigQuery dataset to use. If not provided, the last prefix before the asset name will be used.

location (Union[dagster.StringSource, None], optional):

The GCP location. Note: When using PySpark DataFrames, the default location of the project will be used. A custom location can be specified in your SparkSession configuration.

gcp_credentials (Union[dagster.StringSource, None], optional):

GCP authentication credentials. If provided, a temporary file will be created with the credentials and GOOGLE_APPLICATION_CREDENTIALS will be set to the temporary file. To avoid issues with newlines in the keys, you must base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_AUTH_CREDENTIALS | base64

temporary_gcs_bucket (Union[dagster.StringSource, None], optional):

When using PySpark DataFrames, optionally specify a temporary GCS bucket to store data. If not provided, data will be directly written to BigQuery.

timeout (Union[Float, None], optional):

When using Pandas DataFrames, optionally specify a timeout for the BigQuery queries (loading and reading from tables).

Implements reading and writing Polars DataFrames from/to BigQuery).

Features: - All DBIOManager features - Supports writing partitioned tables (“partition_expr” input metadata key must be specified).

Returns:

IOManagerDefinition

Examples

from dagster import Definitions, EnvVar
from dagster_polars import PolarsBigQueryIOManager

@asset(
    key_prefix=["my_dataset"]  # will be used as the dataset in BigQuery
)
def my_table() -> pl.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={
        "io_manager": PolarsBigQueryIOManager(project=EnvVar("GCP_PROJECT"))
    }
)

You can tell Dagster in which dataset to create tables by setting the “dataset” configuration value. If you do not provide a dataset as configuration to the I/O manager, Dagster will determine a dataset based on the assets and ops using the I/O Manager. For assets, the dataset will be determined from the asset key, as shown in the above example. The final prefix before the asset name will be used as the dataset. For example, if the asset “my_table” had the key prefix [“gcp”, “bigquery”, “my_dataset”], the dataset “my_dataset” will be used. For ops, the dataset can be specified by including a “schema” entry in output metadata. If “schema” is not provided via config or on the asset/op, “public” will be used for the dataset.

@op(
    out={"my_table": Out(metadata={"schema": "my_dataset"})}
)
def make_my_table() -> pl.DataFrame:
    # the returned value will be stored at my_dataset.my_table
    ...

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pl.DataFrame) -> pd.DataFrame:
    # my_table will just contain the data from column "a"
    ...

If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the “gcp_credentials” configuration. Dagster will store this key in a temporary file and set GOOGLE_APPLICATION_CREDENTIALS to point to the file. After the run completes, the file will be deleted, and GOOGLE_APPLICATION_CREDENTIALS will be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_APPLICATION_CREDENTIALS | base64

The “write_disposition” metadata key can be used to set the write_disposition parameter of bigquery.JobConfig. For example, set it to “WRITE_APPEND” to append to an existing table intead of overwriting it.

Install dagster-polars[gcp] to use this IOManager.