This library provides an integration with the DuckDB database.
Related Guides:
Path to the DuckDB database.
DuckDB connection configuration options. See https://duckdb.org/docs/sql/configuration.html
{}
Name of the schema to use.
Default Value: None
Base class for an IO manager definition that reads inputs from and writes outputs to DuckDB.
Examples
from dagster_duckdb import DuckDBIOManager
from dagster_duckdb_pandas import DuckDBPandasTypeHandler
class MyDuckDBIOManager(DuckDBIOManager):
@staticmethod
def type_handlers() -> Sequence[DbTypeHandler]:
return [DuckDBPandasTypeHandler()]
@asset(
key_prefix=["my_schema"] # will be used as the schema in duckdb
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
defs = Definitions(
assets=[my_table],
resources={"io_manager": MyDuckDBIOManager(database="my_db.duckdb")}
)
You can set a default schema to store the assets using the schema
configuration value of the DuckDB I/O
Manager. This schema will be used if no other schema is specified directly on an asset or op.
defs = Definitions(
assets=[my_table],
resources={"io_manager": MyDuckDBIOManager(database="my_db.duckdb", schema="my_schema")}
)
On individual assets, you an also specify the schema where they should be stored using metadata or
by adding a key_prefix
to the asset key. If both key_prefix
and metadata are defined, the metadata will
take precedence.
@asset(
key_prefix=["my_schema"] # will be used as the schema in duckdb
)
def my_table() -> pd.DataFrame:
...
@asset(
metadata={"schema": "my_schema"} # will be used as the schema in duckdb
)
def my_other_table() -> pd.DataFrame:
...
For ops, the schema can be specified by including a “schema” entry in output metadata.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...
If none of these is provided, the schema will default to “public”.
To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
# my_table will just contain the data from column "a"
...
Set DuckDB configuration options using the connection_config field. See https://duckdb.org/docs/sql/configuration.html for all available settings.
defs = Definitions(
assets=[my_table],
resources={"io_manager": MyDuckDBIOManager(database="my_db.duckdb",
connection_config={"arrow_large_buffer_size": True})}
)
Path to the DuckDB database. Setting database=’:memory:’ will use an in-memory database
DuckDB connection configuration options. See https://duckdb.org/docs/sql/configuration.html
{}
Resource for interacting with a DuckDB database.
Examples
from dagster import Definitions, asset
from dagster_duckdb import DuckDBResource
@asset
def my_table(duckdb: DuckDBResource):
with duckdb.get_connection() as conn:
conn.execute("SELECT * from MY_SCHEMA.MY_TABLE")
defs = Definitions(
assets=[my_table],
resources={"duckdb": DuckDBResource(database="path/to/db.duckdb")}
)
Path to the DuckDB database.
DuckDB connection configuration options. See https://duckdb.org/docs/sql/configuration.html
{}
Name of the schema to use.
Default Value: None
Builds an IO manager definition that reads inputs from and writes outputs to DuckDB.
type_handlers (Sequence[DbTypeHandler]) – Each handler defines how to translate between DuckDB tables and an in-memory type - e.g. a Pandas DataFrame. If only one DbTypeHandler is provided, it will be used as teh default_load_type.
default_load_type (Type) – When an input has no type annotation, load it as this type.
IOManagerDefinition
Examples
from dagster_duckdb import build_duckdb_io_manager
from dagster_duckdb_pandas import DuckDBPandasTypeHandler
@asset(
key_prefix=["my_schema"] # will be used as the schema in duckdb
)
def my_table() -> pd.DataFrame: # the name of the asset will be the table name
...
duckdb_io_manager = build_duckdb_io_manager([DuckDBPandasTypeHandler()])
defs = Definitions(
assets=[my_table]
resources={"io_manager" duckdb_io_manager.configured({"database": "my_db.duckdb"})}
)
You can set a default schema to store the assets using the schema
configuration value of the DuckDB I/O
Manager. This schema will be used if no other schema is specified directly on an asset or op.
defs = Definitions(
assets=[my_table]
resources={"io_manager" duckdb_io_manager.configured(
{"database": "my_db.duckdb", "schema": "my_schema"} # will be used as the schema
)}
)
On individual assets, you an also specify the schema where they should be stored using metadata or
by adding a key_prefix
to the asset key. If both key_prefix
and metadata are defined, the metadata will
take precedence.
@asset(
key_prefix=["my_schema"] # will be used as the schema in duckdb
)
def my_table() -> pd.DataFrame:
...
@asset(
metadata={"schema": "my_schema"} # will be used as the schema in duckdb
)
def my_other_table() -> pd.DataFrame:
...
For ops, the schema can be specified by including a “schema” entry in output metadata.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...
If none of these is provided, the schema will default to “public”.
To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
# my_table will just contain the data from column "a"
...