Dagster & HF Datasets
Community integration
This is a community-maintained integration. To report bugs or leave feedback, open an issue in the Dagster community integrations repo.
Use Dagster and Hugging Face Datasets to orchestrate data pipelines, materialize dataset assets, track metadata and lineage, and publish datasets to the Hub.
The integration with HF Datasets makes it easy within Dagster to:
- Load a Hugging Face dataset as a Dagster asset with metadata.
- Stream large datasets efficiently in runtime-only mode
- Capture metadata and lineage for improved observability
- Build multi-asset pipelines that support dataset splits.
- Publish processed datasets back to the Hugging Face Hub.
Installation
- uv
- pip
uv add dagster-hf-datasets
pip install dagster-hf-datasets
Example
This example illustrates the working of a dataset pipeline for transformation, split-aware assets, and Hugging Face Hub publishing.
from dagster_hf_datasets import (
HFDatasetPublisher,
HFParquetIOManager,
HuggingFaceResource,
hf_dataset_asset,
)
from datasets import Dataset
from dagster import AssetExecutionContext, Definitions, MaterializeResult, asset
@hf_dataset_asset(
path="nyu-mll/glue", # Authenticate first with: hf auth login
config="qqp",
split="train",
group_name="golden_glue_pipeline",
io_manager_key="hf_parquet_io_manager",
)
def raw_glue_qqp(
context: AssetExecutionContext,
dataset: Dataset,
) -> MaterializeResult:
"""Load the raw GLUE QQP training split
from the Hugging Face Hub.
"""
context.log.info(
"Loaded raw dataset with %s rows",
len(dataset),
)
return MaterializeResult(
value=dataset,
metadata={
"rows": len(dataset),
"source_dataset": "nyu-mll/glue",
"split": "train",
"config": "qqp",
},
)
@asset(
io_manager_key="hf_parquet_io_manager",
group_name="golden_glue_pipeline",
)
def deduplicated_glue_qqp(
raw_glue_qqp: Dataset,
) -> Dataset:
"""Remove duplicate question pairs."""
seen = set()
def unique_example(example: dict) -> bool:
key = (
example["question1"],
example["question2"],
)
if key in seen:
return False
seen.add(key)
return True
return raw_glue_qqp.filter(unique_example)
@asset(
io_manager_key="hf_parquet_io_manager",
group_name="golden_glue_pipeline",
)
def filtered_glue_qqp(
deduplicated_glue_qqp: Dataset,
) -> Dataset:
"""Remove malformed and short examples."""
def valid_example(example: dict) -> bool:
q1 = example["question1"]
q2 = example["question2"]
return (
q1 is not None
and q2 is not None
and len(q1.split()) >= 5
and len(q2.split()) >= 5
)
return deduplicated_glue_qqp.filter(valid_example)
@asset(
io_manager_key="hf_parquet_io_manager",
group_name="golden_glue_pipeline",
)
def golden_glue_qqp(
filtered_glue_qqp: Dataset,
) -> Dataset:
"""Normalize text formatting and
produce a curated "golden" dataset.
"""
def normalize(example: dict) -> dict:
return {
"question1": (example["question1"].strip().lower()),
"question2": (example["question2"].strip().lower()),
"label": example["label"],
}
return filtered_glue_qqp.map(normalize)
@asset(
group_name="golden_glue_pipeline",
)
def publish_golden_glue(
context: AssetExecutionContext,
golden_glue_qqp: Dataset,
) -> str:
"""Publish the processed dataset
to the Hugging Face Hub.
"""
context.log.info("Preparing dataset publication")
context.log.info(
"Dataset rows: %s",
len(golden_glue_qqp),
)
publisher = HFDatasetPublisher(
repo_id="username/golden-glue-qqp",
private=False,
)
hub_url = publisher.publish(
dataset=golden_glue_qqp,
source_dataset="nyu-mll/glue",
source_revision="main",
description=(
"Curated GLUE QQP dataset "
"with deduplication, filtering, "
"and normalization applied."
),
processing_steps=[
"Removed duplicate question pairs",
"Removed malformed examples",
"Filtered short questions",
"Normalized text formatting",
],
metadata={
"task": ("duplicate-question-detection"),
"source_config": "qqp",
"pipeline": ("golden_dataset_pipeline"),
},
)
context.log.info("Dataset successfully pushed to the Hugging Face Hub")
context.log.info(
"Hub URL: %s",
hub_url,
)
return hub_url
defs = Definitions(
assets=[
raw_glue_qqp,
deduplicated_glue_qqp,
filtered_glue_qqp,
golden_glue_qqp,
publish_golden_glue,
],
resources={
"huggingface": HuggingFaceResource(
cache_dir=".hf_cache",
offline=False,
),
"hf_parquet_io_manager": (
HFParquetIOManager(
base_dir=".dagster_hf_storage",
)
),
},
)
About HF Datasets
HF Datasets is a library for accessing, processing, and sharing AI datasets for Audio, Computer Vision, and Natural Language Processing (NLP) tasks.