Ask AI

Source code for dagster_duckdb.resource

from contextlib import contextmanager
from typing import Any, Dict

import duckdb
from dagster import ConfigurableResource
from dagster._utils.backoff import backoff
from packaging.version import Version
from pydantic import Field


[docs] class DuckDBResource(ConfigurableResource): """Resource for interacting with a DuckDB database. Examples: .. code-block:: python from dagster import Definitions, asset from dagster_duckdb import DuckDBResource @asset def my_table(duckdb: DuckDBResource): with duckdb.get_connection() as conn: conn.execute("SELECT * from MY_SCHEMA.MY_TABLE") defs = Definitions( assets=[my_table], resources={"duckdb": DuckDBResource(database="path/to/db.duckdb")} ) """ database: str = Field( description=( "Path to the DuckDB database. Setting database=':memory:' will use an in-memory" " database " ) ) connection_config: Dict[str, Any] = Field( description=( "DuckDB connection configuration options. See" " https://duckdb.org/docs/sql/configuration.html" ), default={}, ) @classmethod def _is_dagster_maintained(cls) -> bool: return True @contextmanager def get_connection(self): config = self.connection_config # support for `custom_user_agent` was added in v1.0.0 # https://github.com/duckdb/duckdb/commit/0c66b6007b736ed2197bca54d20c9ad9a5eeef46 if Version(duckdb.__version__) >= Version("1.0.0"): config = { "custom_user_agent": "dagster", **config, } conn = backoff( fn=duckdb.connect, retry_on=(RuntimeError, duckdb.IOException), kwargs={ "database": self.database, "read_only": False, "config": config, }, max_retries=10, ) yield conn conn.close()