Ask AI

Source code for dagster_deltalake_pandas.deltalake_pandas_type_handler

from typing import Any, Dict, Optional, Sequence, Tuple, Type

import pandas as pd
import pyarrow as pa
from dagster._core.storage.db_io_manager import DbTypeHandler
from dagster_deltalake.handler import DeltalakeBaseArrowTypeHandler, DeltaLakePyArrowTypeHandler
from dagster_deltalake.io_manager import DeltaLakeIOManager


[docs] class DeltaLakePandasTypeHandler(DeltalakeBaseArrowTypeHandler[pd.DataFrame]): def from_arrow( self, obj: pa.RecordBatchReader, target_type: Type[pd.DataFrame] ) -> pd.DataFrame: return obj.read_pandas() def to_arrow(self, obj: pd.DataFrame) -> Tuple[pa.RecordBatchReader, Dict[str, Any]]: return pa.Table.from_pandas(obj).to_reader(), {} @property def supported_types(self) -> Sequence[Type[object]]: return [pd.DataFrame]
[docs] class DeltaLakePandasIOManager(DeltaLakeIOManager): @staticmethod def type_handlers() -> Sequence[DbTypeHandler]: return [DeltaLakePandasTypeHandler(), DeltaLakePyArrowTypeHandler()] @staticmethod def default_load_type() -> Optional[Type]: return pd.DataFrame