DuckDB + PySpark (dagster-duckdb-pyspark)

This library provides an integration with the DuckDB database and PySpark data processing library.

Related guides:

dagster_duckdb_pyspark.DuckDBPySparkIOManager IOManagerDefinition[source]

Config Schema:
database (dagster.StringSource):

Path to the DuckDB database.

connection_config (Union[dict, None], optional):

DuckDB connection configuration options. See https://duckdb.org/docs/sql/configuration.html

Default Value:
{}
schema (Union[dagster.StringSource, None], optional):

Name of the schema to use.

An I/O manager definition that reads inputs from and writes PySpark DataFrames to DuckDB. When using the DuckDBPySparkIOManager, any inputs and outputs without type annotations will be loaded as PySpark DataFrames.

Returns:

IOManagerDefinition

Examples

from dagster_duckdb_pyspark import DuckDBPySparkIOManager

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in DuckDB
)
def my_table() -> pyspark.sql.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={"io_manager": DuckDBPySparkIOManager(database="my_db.duckdb")}
)

You can set a default schema to store the assets using the schema configuration value of the DuckDB I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.

defs = Definitions(
    assets=[my_table],
    resources={"io_manager": DuckDBPySparkIOManager(database="my_db.duckdb", schema="my_schema")}
)

On individual assets, you an also specify the schema where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in duckdb
)
def my_table() -> pyspark.sql.DataFrame:
    ...

@asset(
    metadata={"schema": "my_schema"}  # will be used as the schema in duckdb
)
def my_other_table() -> pyspark.sql.DataFrame:
    ...

For ops, the schema can be specified by including a “schema” entry in output metadata.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pyspark.sql.DataFrame:
    ...

If none of these is provided, the schema will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    # my_table will just contain the data from column "a"
    ...
class dagster_duckdb_pyspark.DuckDBPySparkTypeHandler[source]

Stores PySpark DataFrames in DuckDB.

To use this type handler, return it from the type_handlers` method of an I/O manager that inherits from ``DuckDBIOManager.

Example

from dagster_duckdb import DuckDBIOManager
from dagster_duckdb_pyspark import DuckDBPySparkTypeHandler

class MyDuckDBIOManager(DuckDBIOManager):
    @staticmethod
    def type_handlers() -> Sequence[DbTypeHandler]:
        return [DuckDBPySparkTypeHandler()]

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in duckdb
)
def my_table() -> pyspark.sql.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={"io_manager": MyDuckDBIOManager(database="my_db.duckdb")}
)

Legacy

dagster_duckdb_pyspark.duckdb_pyspark_io_manager IOManagerDefinition

Config Schema:
database (dagster.StringSource):

Path to the DuckDB database.

connection_config (Union[dict, None], optional):

DuckDB connection configuration options. See https://duckdb.org/docs/sql/configuration.html

Default Value:
{}
schema (Union[dagster.StringSource, None], optional):

Name of the schema to use.

An I/O manager definition that reads inputs from and writes PySpark DataFrames to DuckDB. When using the duckdb_pyspark_io_manager, any inputs and outputs without type annotations will be loaded as PySpark DataFrames.

Returns:

IOManagerDefinition

Examples

from dagster_duckdb_pyspark import duckdb_pyspark_io_manager

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in DuckDB
)
def my_table() -> pyspark.sql.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={"io_manager": duckdb_pyspark_io_manager.configured({"database": "my_db.duckdb"})}
)

You can set a default schema to store the assets using the schema configuration value of the DuckDB I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.

defs = Definitions(
    assets=[my_table],
    resources={"io_manager": duckdb_pyspark_io_manager.configured({"database": "my_db.duckdb", "schema": "my_schema"})}
)

On individual assets, you an also specify the schema where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
    key_prefix=["my_schema"]  # will be used as the schema in duckdb
)
def my_table() -> pyspark.sql.DataFrame:
    ...

@asset(
    metadata={"schema": "my_schema"}  # will be used as the schema in duckdb
)
def my_other_table() -> pyspark.sql.DataFrame:
    ...

For ops, the schema can be specified by including a “schema” entry in output metadata.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pyspark.sql.DataFrame:
    ...

If none of these is provided, the schema will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    # my_table will just contain the data from column "a"
    ...