Ingest and preprocess data
The foundation of reliable ML systems starts with clean, well-structured data pipelines. Our data ingestion system automates MNIST dataset downloading, applies proper preprocessing transforms, and creates stratified train/validation splits that ensure honest model evaluation.
Automated data downloading with normalization
The raw_mnist_data
asset handles the critical first step of our pipeline—downloading and normalizing the MNIST dataset automatically. Rather than requiring manual data management, this asset leverages PyTorch's built-in dataset utilities to handle downloading, caching, and initial preprocessing:
@dg.asset(
description="Download and load raw MNIST dataset",
group_name="data_processing",
)
def raw_mnist_data(context) -> dict[str, Any]:
"""Download the raw MNIST dataset."""
transform = transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize((MNIST_MEAN,), (MNIST_STD,)), # MNIST mean and std
]
)
# Download training data
train_dataset = datasets.MNIST(
root=str(DATA_DIR), train=True, download=True, transform=transform
)
# Download test data
test_dataset = datasets.MNIST(
root=str(DATA_DIR), train=False, download=True, transform=transform
)
# Convert to tensors
train_data = torch.stack([train_dataset[i][0] for i in range(len(train_dataset))])
train_labels = torch.tensor([train_dataset[i][1] for i in range(len(train_dataset))])
test_data = torch.stack([test_dataset[i][0] for i in range(len(test_dataset))])
test_labels = torch.tensor([test_dataset[i][1] for i in range(len(test_dataset))])
context.log.info(f"Loaded {len(train_data)} training samples and {len(test_data)} test samples")
return {
"train_data": train_data,
"train_labels": train_labels,
"test_data": test_data,
"test_labels": test_labels,
}
The normalization transforms applied during data loading use MNIST's precomputed statistics (mean=0.1307, std=0.3081) to standardize pixel values across the entire dataset. This preprocessing is crucial for neural network training stability, it prevents certain neurons from dominating due to input scale differences and enables faster convergence. The asset returns structured tensors that downstream assets can depend on, creating clear data lineage in Dagster's dependency graph.
Strategic training and validation splitting
The processed_mnist_data
asset implements stratified validation splitting that maintains class balance while creating proper boundaries between training and validation data. This separation is essential for honest performance estimation during model development:
@dg.asset(
description="Preprocess MNIST images for training",
group_name="data_processing",
)
def processed_mnist_data(context, raw_mnist_data: dict[str, Any]) -> dict[str, torch.Tensor]:
"""Process MNIST data and create train/validation split."""
train_data = raw_mnist_data["train_data"]
train_labels = raw_mnist_data["train_labels"]
test_data = raw_mnist_data["test_data"]
test_labels = raw_mnist_data["test_labels"]
# Create validation split from training data
train_data, val_data, train_labels, val_labels = train_test_split(
train_data,
train_labels,
test_size=VALIDATION_SPLIT,
random_state=RANDOM_SEED,
stratify=train_labels,
)
# Convert back to tensors
train_data = torch.tensor(train_data)
val_data = torch.tensor(val_data)
train_labels = torch.tensor(train_labels)
val_labels = torch.tensor(val_labels)
context.add_output_metadata(
{
"train_samples": len(train_data),
"val_samples": len(val_data),
"test_samples": len(test_data),
"image_shape": str(train_data.shape[1:]),
"num_classes": len(torch.unique(train_labels)),
}
)
context.log.info(
f"Processed data - Train: {len(train_data)}, Val: {len(val_data)}, Test: {len(test_data)}"
)
return {
"train_data": train_data,
"val_data": val_data,
"train_labels": train_labels,
"val_labels": val_labels,
"test_data": test_data,
"test_labels": test_labels,
}
Stratified splitting ensures each digit class (0-9) appears proportionally in both training and validation sets, preventing validation bias that could occur with random splits. The 20% validation split provides sufficient data for reliable performance monitoring while preserving most training data. The asset generates comprehensive metadata about dataset sizes, image dimensions, and class distributions that appears in Dagster's UI for immediate data quality visibility.
Configuration and reproducibility management
The data pipeline uses centralized constants to control critical parameters like validation split ratios and random seeds. This approach ensures reproducible experiments across different environments while enabling easy adjustment of preprocessing strategies:
VALIDATION_SPLIT = 0.2
RANDOM_SEED = 42
These constants integrate seamlessly with Dagster's configuration system, allowing different environments to use different preprocessing strategies without code changes. The random seed ensures reproducible data splits for consistent experimentation, while the validation split ratio can be adjusted based on dataset size and modeling requirements.
Asset dependency and metadata tracking
Dagster automatically tracks the relationship between raw and processed data assets, creating clear lineage from downloads through final processed tensors. Each asset generates rich metadata that enables data quality monitoring and pipeline debugging:
The asset system provides automatic lineage tracking from raw downloads through processed data, enabling full reproducibility of any downstream model results. Metadata including dataset statistics, processing parameters, and data quality metrics appears in Dagster's UI, providing immediate visibility into pipeline health and enabling rapid debugging when issues arise.
This asset-based approach scales naturally to production scenarios where data processing might involve multiple transformation steps, external data sources, or complex validation requirements while maintaining the same clear dependency structure and comprehensive observability.
Next steps
With clean, preprocessed data available through our asset pipeline, we can move to model training where we'll build and optimize a CNN classifier using this structured data foundation.
- Continue this tutorial with model training