Skip to main content

Using ClickHouse with Dagster

This guide shows how to use ClickHouse with Dagster assets. The dagster-clickhouse library provides:

You can use either approach, or both, depending on whether you want full control over SQL or you prefer Dagster to manage table storage and loads.

tip

Dagster’s schema (on assets and I/O managers) maps to a ClickHouse database name. The connection’s database setting is the default database on the server session, which is separate from the database used in fully qualified table names for assets.

Prerequisites

  • Install the libraries you need:

    uv add dagster-clickhouse dagster-clickhouse-pandas

    For Polars, add dagster-clickhouse-polars to the list.

  • A ClickHouse instance reachable from your Dagster code (for example localhost:9000 for the native protocol).

Option 1: Using the ClickHouse resource

Step 1: Configure the resource

Add ClickhouseResource to your Definitions. You must set host; port defaults to 9000 (native protocol).


from dagster_clickhouse import ClickhouseResource

from dagster import Definitions

defs = Definitions(
assets=[iris_dataset],
resources={
"clickhouse": ClickhouseResource(
host="localhost",
port=9000,
)
},
)

Step 2: Create tables with SQL

Use get_connection() to obtain a clickhouse_driver.Client. The following asset downloads the Iris sample CSV and loads it into the iris.iris_dataset table:

import pandas as pd
from dagster_clickhouse import ClickhouseResource

from dagster import asset


@asset
def iris_dataset(clickhouse: ClickhouseResource) -> None:
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)

with clickhouse.get_connection() as client:
client.execute("CREATE DATABASE IF NOT EXISTS iris")
client.execute(
"CREATE TABLE IF NOT EXISTS iris.iris_dataset ("
" `sepal_length_cm` Float64,"
" `sepal_width_cm` Float64,"
" `petal_length_cm` Float64,"
" `petal_width_cm` Float64,"
" `species` String"
") ENGINE = MergeTree() ORDER BY tuple()"
)
if not iris_df.empty:
client.insert_dataframe(
"INSERT INTO iris.iris_dataset VALUES",
iris_df,
settings={"use_numpy": True},
)

Step 3: Downstream assets

You can run arbitrary SQL (for example CREATE TABLE … AS SELECT) in a downstream asset that depends on iris_dataset:

from dagster import asset

# This example executes a query against the `iris.iris_dataset` table created in the
# Using ClickHouse with Dagster guide.
iris_dataset = asset(name="iris_dataset")(lambda: None)


@asset(deps=[iris_dataset])
def iris_setosa(clickhouse: ClickhouseResource) -> None:
with clickhouse.get_connection() as client:
client.execute("DROP TABLE IF EXISTS iris.iris_setosa")
client.execute(
"CREATE TABLE iris.iris_setosa ENGINE = MergeTree() ORDER BY tuple() AS"
" SELECT * FROM iris.iris_dataset WHERE species = 'Iris-setosa'"
)

Completed example (resource)

import pandas as pd
from dagster_clickhouse import ClickhouseResource

from dagster import Definitions, asset


@asset
def iris_dataset(clickhouse: ClickhouseResource) -> None:
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)

with clickhouse.get_connection() as client:
client.execute("CREATE DATABASE IF NOT EXISTS iris")
client.execute(
"CREATE TABLE IF NOT EXISTS iris.iris_dataset ("
" `sepal_length_cm` Float64,"
" `sepal_width_cm` Float64,"
" `petal_length_cm` Float64,"
" `petal_width_cm` Float64,"
" `species` String"
") ENGINE = MergeTree() ORDER BY tuple()"
)
if not iris_df.empty:
client.insert_dataframe(
"INSERT INTO iris.iris_dataset VALUES",
iris_df,
settings={"use_numpy": True},
)


@asset(deps=[iris_dataset])
def iris_setosa(clickhouse: ClickhouseResource) -> None:
with clickhouse.get_connection() as client:
client.execute("DROP TABLE IF EXISTS iris.iris_setosa")
client.execute(
"CREATE TABLE iris.iris_setosa ENGINE = MergeTree() ORDER BY tuple() AS"
" SELECT * FROM iris.iris_dataset WHERE species = 'Iris-setosa'"
)


defs = Definitions(
assets=[iris_dataset, iris_setosa],
resources={
"clickhouse": ClickhouseResource(
host="localhost",
port=9000,
)
},
)

Option 2: Using the ClickHouse I/O manager (Pandas)

Step 1: Configure the I/O manager

Configure ClickhousePandasIOManager with connection fields and an optional default schema (ClickHouse database) for table assets.


from dagster_clickhouse_pandas import ClickhousePandasIOManager

from dagster import Definitions

defs = Definitions(
assets=[iris_dataset],
resources={
"io_manager": ClickhousePandasIOManager(
host="localhost",
port=9000,
database="default",
schema="iris",
)
},
)

Step 2: Materialize a DataFrame as a table

Return a Pandas DataFrame from an asset. Dagster creates a MergeTree table if needed and replaces data on materialization.

import pandas as pd

from dagster import asset


@asset
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)

Step 3: Load upstream tables in downstream assets

Pass the upstream asset as a typed input; Dagster loads the table as a DataFrame.


import pandas as pd

from dagster import asset


@asset
def iris_setosa(iris_dataset: pd.DataFrame) -> pd.DataFrame:
return iris_dataset[iris_dataset["species"] == "Iris-setosa"]

Completed example (I/O manager)

import pandas as pd
from dagster_clickhouse_pandas import ClickhousePandasIOManager

from dagster import Definitions, asset


@asset
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)


@asset
def iris_setosa(iris_dataset: pd.DataFrame) -> pd.DataFrame:
return iris_dataset[iris_dataset["species"] == "Iris-setosa"]


defs = Definitions(
assets=[iris_dataset, iris_setosa],
resources={
"io_manager": ClickhousePandasIOManager(
host="localhost",
port=9000,
database="default",
schema="iris",
)
},
)

Polars

Configure ClickhousePolarsIOManager the same way as the Pandas I/O manager, and annotate assets with pl.DataFrame as inputs and outputs. Install dagster-clickhouse-polars.

Templated SQL with dg and ClickhouseQueryComponent

To define templated SQL assets alongside a reusable ClickHouse connection in a components project, use ClickhouseQueryComponent with TemplatedSqlComponent. See ClickHouse SQL component.