Ask AI

Source code for dagster_polars.io_managers.base

from abc import abstractmethod
from typing import TYPE_CHECKING, Any, Dict, Mapping, Optional, Tuple, Union

import polars as pl
from dagster import (
    ConfigurableIOManager,
    EnvVar,
    InitResourceContext,
    InputContext,
    MetadataValue,
    OutputContext,
    UPathIOManager,
    _check as check,
)
from pydantic import PrivateAttr
from pydantic.fields import Field

from dagster_polars.io_managers.type_routers import TypeRouter, resolve_type_router
from dagster_polars.io_managers.utils import get_polars_metadata

if TYPE_CHECKING:
    from upath import UPath


def _process_env_vars(config: Mapping[str, Any]) -> Dict[str, Any]:
    out = {}
    for key, value in config.items():
        if isinstance(value, dict) and len(value) == 1 and next(iter(value.keys())) == "env":
            out[key] = EnvVar(next(iter(value.values()))).get_value()
        else:
            out[key] = value
    return out


[docs] class BasePolarsUPathIOManager(ConfigurableIOManager, UPathIOManager): """Base class for `dagster-polars` IOManagers. Doesn't define a specific storage format. To implement a specific storage format (parquet, csv, etc), inherit from this class and implement the `write_df_to_path`, `sink_df_to_path` and `scan_df_from_path` methods. Features: - All the features of :py:class:`~dagster.UPathIOManager` - works with local and remote filesystems (like S3), supports loading multiple partitions with respect to :py:class:`~dagster.PartitionMapping`, and more - loads the correct type - `polars.DataFrame`, `polars.LazyFrame`, or other types defined in :py:mod:`dagster_polars.types` - based on the input type annotation (or `dagster.DagsterType`'s `typing_type`) - can sink lazy `pl.LazyFrame` DataFrames - handles `Nones` with `Optional` types by skipping loading missing inputs or saving `None` outputs - logs various metadata about the DataFrame - size, schema, sample, stats, ... - the `"columns"` input metadata value can be used to select a subset of columns to load """ # method calling chain: # 1. Non-partitioned: UPathIOManager.load_input -> UPathIOManager._load_single_input -> UPathIOManager.load_from_path -> BasePolarsUPathIOManager.scan_df_from_path # 2. Partitioned: UPathIOManager.load_input -> UPathIOManager.load_partitions -> UPathIOManager.load_from_path -> BasePolarsUPathIOManager.scan_df_from_path # If a child IOManager supports loading multiple partitions at once, it should override .load_partitions to immidiately return a LazyFrame (by using scan_df_from_path) base_dir: Optional[str] = Field(default=None, description="Base directory for storing files.") cloud_storage_options: Optional[Mapping[str, Any]] = Field( default=None, description="Storage authentication for cloud object store", alias="storage_options", ) _base_path = PrivateAttr() def setup_for_execution(self, context: InitResourceContext) -> None: from upath import UPath sp = ( _process_env_vars(self.cloud_storage_options) if self.cloud_storage_options is not None else {} ) self._base_path = ( UPath(self.base_dir, **sp) if self.base_dir is not None else UPath(check.not_none(context.instance).storage_directory()) ) @abstractmethod def write_df_to_path( self, context: OutputContext, df: pl.DataFrame, path: "UPath", ): ... @abstractmethod def sink_df_to_path( self, context: OutputContext, df: pl.LazyFrame, path: "UPath", ): ... @abstractmethod def scan_df_from_path( self, path: "UPath", context: InputContext, ) -> pl.LazyFrame: ... def type_router_is_eager(self, type_router: TypeRouter) -> bool: if type_router.is_base_type: if type_router.typing_type in [Any, type(None), None] or issubclass( type_router.typing_type, pl.DataFrame ): return True elif issubclass(type_router.typing_type, pl.LazyFrame): return False else: raise NotImplementedError( f"Can't determine if type annotation {type_router.typing_type} corresponds to an eager or lazy DataFrame" ) else: return self.type_router_is_eager(type_router.parent_type_router) def dump_to_path( self, context: OutputContext, obj: Union[ pl.DataFrame, Optional[pl.DataFrame], Tuple[pl.DataFrame, Dict[str, Any]], pl.LazyFrame, Optional[pl.LazyFrame], Tuple[pl.LazyFrame, Dict[str, Any]], ], path: "UPath", ): type_router = resolve_type_router(context, context.dagster_type.typing_type) if self.type_router_is_eager(type_router): dump_fn = self.write_df_to_path else: dump_fn = self.sink_df_to_path type_router.dump(obj, path, dump_fn) def load_from_path( self, context: InputContext, path: "UPath" ) -> Union[ pl.DataFrame, pl.LazyFrame, Tuple[pl.DataFrame, Dict[str, Any]], Tuple[pl.LazyFrame, Dict[str, Any]], None, ]: type_router = resolve_type_router(context, context.dagster_type.typing_type) ldf = type_router.load(path, self.scan_df_from_path) # missing files detection in UPathIOManager doesn't work with `LazyFrame` # since the FileNotFoundError is raised only when calling `.collect()` outside of the UPathIOManager # as a workaround, we check if the file exists and if not, we raise the error here # this is needed for allow_missing_partitions input metadata setting to work correctly if ( ldf is not None and not self.type_router_is_eager(type_router) and type_router and not path.exists() ): raise FileNotFoundError(f"File {path} does not exist") columns = context.definition_metadata.get("columns") if columns is not None: context.log.debug(f"Loading {columns=}") ldf = ldf.select(columns) if ldf is None: return None elif self.type_router_is_eager(type_router) and ldf is not None: return ldf.collect() else: return ldf def get_metadata( self, context: OutputContext, obj: Union[pl.DataFrame, pl.LazyFrame, None] ) -> Dict[str, MetadataValue]: if obj is None: return {"missing": MetadataValue.bool(True)} else: return ( get_polars_metadata(context, obj) if obj is not None else {"missing": MetadataValue.bool(True)} )