Using ClickHouse with Dagster
This guide shows how to use ClickHouse with Dagster assets. The dagster-clickhouse library provides:
- Resource: Run SQL directly inside an asset using
ClickhouseResourceand aclickhouse_driver.Client. - I/O manager: Let Dagster materialize and load Pandas or Polars DataFrames as ClickHouse tables using
ClickhousePandasIOManagerorClickhousePolarsIOManager.
You can use either approach, or both, depending on whether you want full control over SQL or you prefer Dagster to manage table storage and loads.
Dagster’s schema (on assets and I/O managers) maps to a ClickHouse database name. The connection’s database setting is the default database on the server session, which is separate from the database used in fully qualified table names for assets.
Prerequisites
-
Install the libraries you need:
- uv
- pip
uv add dagster-clickhouse dagster-clickhouse-pandaspip install dagster-clickhouse dagster-clickhouse-pandasFor Polars, add
dagster-clickhouse-polarsto the list. -
A ClickHouse instance reachable from your Dagster code (for example
localhost:9000for the native protocol).
Option 1: Using the ClickHouse resource
Step 1: Configure the resource
Add ClickhouseResource to your Definitions. You must set host; port defaults to 9000 (native protocol).
from dagster_clickhouse import ClickhouseResource
from dagster import Definitions
defs = Definitions(
assets=[iris_dataset],
resources={
"clickhouse": ClickhouseResource(
host="localhost",
port=9000,
)
},
)
Step 2: Create tables with SQL
Use get_connection() to obtain a clickhouse_driver.Client. The following asset downloads the Iris sample CSV and loads it into the iris.iris_dataset table:
import pandas as pd
from dagster_clickhouse import ClickhouseResource
from dagster import asset
@asset
def iris_dataset(clickhouse: ClickhouseResource) -> None:
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
with clickhouse.get_connection() as client:
client.execute("CREATE DATABASE IF NOT EXISTS iris")
client.execute(
"CREATE TABLE IF NOT EXISTS iris.iris_dataset ("
" `sepal_length_cm` Float64,"
" `sepal_width_cm` Float64,"
" `petal_length_cm` Float64,"
" `petal_width_cm` Float64,"
" `species` String"
") ENGINE = MergeTree() ORDER BY tuple()"
)
if not iris_df.empty:
client.insert_dataframe(
"INSERT INTO iris.iris_dataset VALUES",
iris_df,
settings={"use_numpy": True},
)
Step 3: Downstream assets
You can run arbitrary SQL (for example CREATE TABLE … AS SELECT) in a downstream asset that depends on iris_dataset:
from dagster import asset
# This example executes a query against the `iris.iris_dataset` table created in the
# Using ClickHouse with Dagster guide.
iris_dataset = asset(name="iris_dataset")(lambda: None)
@asset(deps=[iris_dataset])
def iris_setosa(clickhouse: ClickhouseResource) -> None:
with clickhouse.get_connection() as client:
client.execute("DROP TABLE IF EXISTS iris.iris_setosa")
client.execute(
"CREATE TABLE iris.iris_setosa ENGINE = MergeTree() ORDER BY tuple() AS"
" SELECT * FROM iris.iris_dataset WHERE species = 'Iris-setosa'"
)
Completed example (resource)
import pandas as pd
from dagster_clickhouse import ClickhouseResource
from dagster import Definitions, asset
@asset
def iris_dataset(clickhouse: ClickhouseResource) -> None:
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
with clickhouse.get_connection() as client:
client.execute("CREATE DATABASE IF NOT EXISTS iris")
client.execute(
"CREATE TABLE IF NOT EXISTS iris.iris_dataset ("
" `sepal_length_cm` Float64,"
" `sepal_width_cm` Float64,"
" `petal_length_cm` Float64,"
" `petal_width_cm` Float64,"
" `species` String"
") ENGINE = MergeTree() ORDER BY tuple()"
)
if not iris_df.empty:
client.insert_dataframe(
"INSERT INTO iris.iris_dataset VALUES",
iris_df,
settings={"use_numpy": True},
)
@asset(deps=[iris_dataset])
def iris_setosa(clickhouse: ClickhouseResource) -> None:
with clickhouse.get_connection() as client:
client.execute("DROP TABLE IF EXISTS iris.iris_setosa")
client.execute(
"CREATE TABLE iris.iris_setosa ENGINE = MergeTree() ORDER BY tuple() AS"
" SELECT * FROM iris.iris_dataset WHERE species = 'Iris-setosa'"
)
defs = Definitions(
assets=[iris_dataset, iris_setosa],
resources={
"clickhouse": ClickhouseResource(
host="localhost",
port=9000,
)
},
)
Option 2: Using the ClickHouse I/O manager (Pandas)
Step 1: Configure the I/O manager
Configure ClickhousePandasIOManager with connection fields and an optional default schema (ClickHouse database) for table assets.
from dagster_clickhouse_pandas import ClickhousePandasIOManager
from dagster import Definitions
defs = Definitions(
assets=[iris_dataset],
resources={
"io_manager": ClickhousePandasIOManager(
host="localhost",
port=9000,
database="default",
schema="iris",
)
},
)
Step 2: Materialize a DataFrame as a table
Return a Pandas DataFrame from an asset. Dagster creates a MergeTree table if needed and replaces data on materialization.
import pandas as pd
from dagster import asset
@asset
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
Step 3: Load upstream tables in downstream assets
Pass the upstream asset as a typed input; Dagster loads the table as a DataFrame.
import pandas as pd
from dagster import asset
@asset
def iris_setosa(iris_dataset: pd.DataFrame) -> pd.DataFrame:
return iris_dataset[iris_dataset["species"] == "Iris-setosa"]
Completed example (I/O manager)
import pandas as pd
from dagster_clickhouse_pandas import ClickhousePandasIOManager
from dagster import Definitions, asset
@asset
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
@asset
def iris_setosa(iris_dataset: pd.DataFrame) -> pd.DataFrame:
return iris_dataset[iris_dataset["species"] == "Iris-setosa"]
defs = Definitions(
assets=[iris_dataset, iris_setosa],
resources={
"io_manager": ClickhousePandasIOManager(
host="localhost",
port=9000,
database="default",
schema="iris",
)
},
)
Polars
Configure ClickhousePolarsIOManager the same way as the Pandas I/O manager, and annotate assets with pl.DataFrame as inputs and outputs. Install dagster-clickhouse-polars.
Templated SQL with dg and ClickhouseQueryComponent
To define templated SQL assets alongside a reusable ClickHouse connection in a components project, use ClickhouseQueryComponent with TemplatedSqlComponent. See ClickHouse SQL component.