Skip to main content

Dagster & Elasticsearch

Community integration

This is a community-maintained integration. To report bugs or leave feedback, open an issue in the Dagster community integrations repo.

The community-supported Elasticsearch integration provides a resource for interacting with Elasticsearch clusters and an IO manager for bulk-indexing Dagster asset outputs as searchable documents.

Installation

uv add dagster-elasticsearch

Optional extras are available for table-like asset outputs. Install the extras that match the data types your assets return:

uv add dagster-elasticsearch[pandas]
uv add dagster-elasticsearch[polars]
uv add dagster-elasticsearch[arrow]

The underlying elasticsearch Python client must match the major version of your Elasticsearch cluster. Pin the client major version in your project if needed, for example elasticsearch>=8.10,<9 for Elasticsearch 8.x.

Example

from dagster_elasticsearch import (
ElasticsearchIOManager,
ElasticsearchResource,
HostsConfig,
build_indexed_asset_check,
)

import dagster as dg


@dg.asset(compute_kind="elasticsearch")
def indexed_doc(es: ElasticsearchResource) -> None:
with es.get_client() as client:
client.index(index="docs", id="hello", document={"title": "hello"})
client.indices.refresh(index="docs")


@dg.asset(io_manager_key="elasticsearch_io_manager")
def search_docs() -> list[dict[str, str]]:
return [
{"_id": "1", "title": "hello"},
{"_id": "2", "title": "world"},
]


@dg.definitions
def defs() -> dg.Definitions:
return dg.Definitions(
assets=[indexed_doc, search_docs],
asset_checks=[build_indexed_asset_check(asset=search_docs, min_indexed=1)],
resources={
"es": ElasticsearchResource(
connection_config=HostsConfig(
hosts=["https://es.example.com:9200"],
api_key=dg.EnvVar("ELASTICSEARCH_API_KEY"),
),
),
"elasticsearch_io_manager": ElasticsearchIOManager(
connection_config=HostsConfig(
hosts=["https://es.example.com:9200"],
api_key=dg.EnvVar("ELASTICSEARCH_API_KEY"),
),
index="docs",
use_alias=True,
rollover_strategy="auto",
keep_last=3,
),
},
)

Using the Elasticsearch resource

Use ElasticsearchResource when an asset or op needs direct access to an Elasticsearch client. Configure it with either:

  • HostsConfig for self-hosted or generic Elasticsearch endpoints
  • CloudConfig for Elastic Cloud deployments

Both configuration types support API-key authentication or basic authentication with a username and password. HostsConfig also supports bearer auth, certificate authorities, and certificate verification settings.

Using the Elasticsearch IO manager

Use ElasticsearchIOManager to bulk-index asset outputs into Elasticsearch. It supports outputs such as dictionaries, lists of dictionaries, pandas DataFrames, Polars DataFrames or LazyFrames, and PyArrow tables. The id_field option, which defaults to _id, is used as the Elasticsearch document ID when present.

Common IO manager options include:

OptionDescription
indexTarget index name. When use_alias=True, this is the stable alias name.
bulk_chunk_sizeNumber of documents per bulk request.
max_chunk_bytesOptional maximum bulk request size in bytes.
refreshWhether to refresh the index after writes.
lazy_loadReturn an iterator from load_input instead of loading all hits eagerly.
scan_sizePage size for scroll-based reads.

Most IO manager settings can be overridden for individual assets with asset metadata, including index, id_field, bulk_chunk_size, max_chunk_bytes, refresh, rollover_strategy, and index_config.

Alias rollover

Set use_alias=True to write each materialization to a fresh physical index and atomically swap a stable alias to the new index. This lets readers and downstream assets query the alias while avoiding partial updates.

Rollover strategies include:

StrategyBehavior
autoUses the partition key for partitioned assets, otherwise a timestamp.
timestampAppends a UTC timestamp suffix.
run_idAppends the Dagster run ID.
partitionAppends a slugified partition key.
noneUses no suffix.

Use keep_last to delete older rollover indices after successful alias swaps.

Asset checks

The IO manager records materialization metadata such as index, indexed, failures, and alias. Use build_indexed_asset_check to assert that a materialization indexed at least a minimum number of documents and did not exceed a maximum failure count.

About Elasticsearch

Elasticsearch is a distributed search and analytics engine for indexing, searching, and analyzing large volumes of data in near real time. Learn more in the Elasticsearch documentation.