Ask AI

Source code for dagster_deltalake.resource

from typing import Optional, Union

from dagster import ConfigurableResource
from deltalake import DeltaTable
from pydantic import Field

from dagster_deltalake.config import AzureConfig, ClientConfig, GcsConfig, LocalConfig, S3Config


[docs] class DeltaTableResource(ConfigurableResource): """Resource for interacting with a Delta table. Examples: .. code-block:: python from dagster import Definitions, asset from dagster_deltalake import DeltaTableResource, LocalConfig @asset def my_table(delta_table: DeltaTableResource): df = delta_table.load().to_pandas() defs = Definitions( assets=[my_table], resources={ "delta_table": DeltaTableResource( url="/path/to/table", storage_options=LocalConfig() ) } ) """ url: str storage_options: Union[AzureConfig, S3Config, LocalConfig, GcsConfig] = Field( discriminator="provider" ) client_options: Optional[ClientConfig] = Field( default=None, description="Additional configuration passed to http client." ) version: Optional[int] = Field(default=None, description="Version to load delta table.") def load(self) -> DeltaTable: storage_options = self.storage_options.str_dict() if self.storage_options else {} client_options = self.client_options.str_dict() if self.client_options else {} options = {**storage_options, **client_options} table = DeltaTable(table_uri=self.url, storage_options=options, version=self.version) return table