Dagster & Elasticsearch
This is a community-maintained integration. To report bugs or leave feedback, open an issue in the Dagster community integrations repo.
The community-supported Elasticsearch integration provides a resource for interacting with Elasticsearch clusters and an IO manager for bulk-indexing Dagster asset outputs as searchable documents.
Installation
- uv
- pip
uv add dagster-elasticsearch
pip install dagster-elasticsearch
Optional extras are available for table-like asset outputs. Install the extras that match the data types your assets return:
- uv
- pip
uv add dagster-elasticsearch[pandas]
pip install dagster-elasticsearch[pandas]
- uv
- pip
uv add dagster-elasticsearch[polars]
pip install dagster-elasticsearch[polars]
- uv
- pip
uv add dagster-elasticsearch[arrow]
pip install dagster-elasticsearch[arrow]
The underlying elasticsearch Python client must match the major version of your Elasticsearch cluster. Pin the client major version in your project if needed, for example elasticsearch>=8.10,<9 for Elasticsearch 8.x.
Example
from dagster_elasticsearch import (
ElasticsearchIOManager,
ElasticsearchResource,
HostsConfig,
build_indexed_asset_check,
)
import dagster as dg
@dg.asset(compute_kind="elasticsearch")
def indexed_doc(es: ElasticsearchResource) -> None:
with es.get_client() as client:
client.index(index="docs", id="hello", document={"title": "hello"})
client.indices.refresh(index="docs")
@dg.asset(io_manager_key="elasticsearch_io_manager")
def search_docs() -> list[dict[str, str]]:
return [
{"_id": "1", "title": "hello"},
{"_id": "2", "title": "world"},
]
@dg.definitions
def defs() -> dg.Definitions:
return dg.Definitions(
assets=[indexed_doc, search_docs],
asset_checks=[build_indexed_asset_check(asset=search_docs, min_indexed=1)],
resources={
"es": ElasticsearchResource(
connection_config=HostsConfig(
hosts=["https://es.example.com:9200"],
api_key=dg.EnvVar("ELASTICSEARCH_API_KEY"),
),
),
"elasticsearch_io_manager": ElasticsearchIOManager(
connection_config=HostsConfig(
hosts=["https://es.example.com:9200"],
api_key=dg.EnvVar("ELASTICSEARCH_API_KEY"),
),
index="docs",
use_alias=True,
rollover_strategy="auto",
keep_last=3,
),
},
)
Using the Elasticsearch resource
Use ElasticsearchResource when an asset or op needs direct access to an Elasticsearch client. Configure it with either:
HostsConfigfor self-hosted or generic Elasticsearch endpointsCloudConfigfor Elastic Cloud deployments
Both configuration types support API-key authentication or basic authentication with a username and password. HostsConfig also supports bearer auth, certificate authorities, and certificate verification settings.
Using the Elasticsearch IO manager
Use ElasticsearchIOManager to bulk-index asset outputs into Elasticsearch. It supports outputs such as dictionaries, lists of dictionaries, pandas DataFrames, Polars DataFrames or LazyFrames, and PyArrow tables. The id_field option, which defaults to _id, is used as the Elasticsearch document ID when present.
Common IO manager options include:
| Option | Description |
|---|---|
index | Target index name. When use_alias=True, this is the stable alias name. |
bulk_chunk_size | Number of documents per bulk request. |
max_chunk_bytes | Optional maximum bulk request size in bytes. |
refresh | Whether to refresh the index after writes. |
lazy_load | Return an iterator from load_input instead of loading all hits eagerly. |
scan_size | Page size for scroll-based reads. |
Most IO manager settings can be overridden for individual assets with asset metadata, including index, id_field, bulk_chunk_size, max_chunk_bytes, refresh, rollover_strategy, and index_config.
Alias rollover
Set use_alias=True to write each materialization to a fresh physical index and atomically swap a stable alias to the new index. This lets readers and downstream assets query the alias while avoiding partial updates.
Rollover strategies include:
| Strategy | Behavior |
|---|---|
auto | Uses the partition key for partitioned assets, otherwise a timestamp. |
timestamp | Appends a UTC timestamp suffix. |
run_id | Appends the Dagster run ID. |
partition | Appends a slugified partition key. |
none | Uses no suffix. |
Use keep_last to delete older rollover indices after successful alias swaps.
Asset checks
The IO manager records materialization metadata such as index, indexed, failures, and alias. Use build_indexed_asset_check to assert that a materialization indexed at least a minimum number of documents and did not exceed a maximum failure count.
About Elasticsearch
Elasticsearch is a distributed search and analytics engine for indexing, searching, and analyzing large volumes of data in near real time. Learn more in the Elasticsearch documentation.