Skip to main content

Evaluate model performance and deploy to production

Reliable ML systems require comprehensive model evaluation and flexible deployment strategies. Our evaluation system generates detailed performance metrics and confusion matrix analysis, while the deployment pipeline supports multiple strategies from automatic quality based deployment to manual model selection.

Comprehensive model evaluation

The model_evaluation asset provides multi-metric assessment using the held out test set that was never seen during training. This evaluation goes beyond simple accuracy to provide detailed insights into model behavior across all digit classes:

src/project_ml/defs/assets/model_assets.py
@dg.asset(
description="Evaluate model performance on test set",
group_name="model_pipeline",
required_resource_keys={"model_storage"},
deps=["digit_classifier"],
)
def model_evaluation(
context,
raw_mnist_data: dict[str, Any],
config: ModelEvaluationConfig,
) -> dict[str, Any]:
"""Evaluate the trained model on the test set."""
model_store = context.resources.model_storage

# Get the latest trained model
try:
saved_models = model_store.list_models()
if not saved_models:
context.log.error("No saved models found for evaluation")
return {
"test_accuracy": 0.0,
"predictions": [],
"labels": [],
"classification_report": {},
}

# Get the latest model name (first one is newest due to sorting)
latest_model_name = saved_models[0] # Already just the model name
context.log.info(f"Loading model for evaluation: {latest_model_name}")

model_data = model_store.load_model(latest_model_name)

if isinstance(model_data, dict) and "model" in model_data:
model_to_evaluate = model_data["model"]
else:
model_to_evaluate = model_data # Direct model object

# Log model metadata
context.log.info("Model loaded successfully")
context.log.info(f"Model architecture:\n{model_to_evaluate!s}")

except Exception as e:
context.log.error(f"Failed to load model for evaluation: {e!s}")
context.log.error(f"Exception details: {e.__class__.__name__!s}")
import traceback

context.log.error(f"Traceback: {traceback.format_exc()}")
return {
"test_accuracy": 0.0,
"predictions": [],
"labels": [],
"classification_report": {},
}

test_data = raw_mnist_data["test_data"]
test_labels = raw_mnist_data["test_labels"]

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_to_evaluate.to(device)
model_to_evaluate.eval()

# Make predictions
all_predictions = []
all_labels = []

test_dataset = TensorDataset(test_data, test_labels)
test_loader = DataLoader(test_dataset, batch_size=config.batch_size, shuffle=False)

with torch.no_grad():
for data, target in test_loader:
_data, _target = data.to(device), target.to(device)
outputs = model_to_evaluate(_data)
_, predicted = torch.max(outputs.data, 1)
all_predictions.extend(predicted.cpu().numpy())
all_labels.extend(_target.cpu().numpy())

all_predictions = np.array(all_predictions)
all_labels = np.array(all_labels)

# Calculate metrics
test_accuracy = float(np.mean(all_predictions == all_labels))

# Create confusion matrix plot
cm = confusion_matrix(all_labels, all_predictions)
fig, ax = plt.subplots(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", ax=ax)
ax.set_title("Confusion Matrix - Test Set")
ax.set_xlabel("Predicted")
ax.set_ylabel("Actual")

# Generate classification report
class_report = classification_report(all_labels, all_predictions, output_dict=True)

context.add_output_metadata(
{
"test_accuracy": test_accuracy,
"test_samples": len(all_labels),
"precision_macro": float(class_report["macro avg"]["precision"]),
"recall_macro": float(class_report["macro avg"]["recall"]),
"f1_macro": float(class_report["macro avg"]["f1-score"]),
"evaluated_model_path": latest_model_name,
},
output_name="result",
)

context.log.info(f"Model evaluation completed. Test accuracy: {test_accuracy:.4f}")

plt.close(fig)
return {
"test_accuracy": test_accuracy,
"predictions": all_predictions.tolist(),
"labels": all_labels.tolist(),
"classification_report": class_report,
"model_info": {"path": latest_model_name},
}

The evaluation asset automatically loads the most recently trained model and generates comprehensive metrics including per-class precision, recall, and F1-scores, alongside detailed confusion matrices that reveal systematic error patterns. The asset includes robust error handling for model loading failures while generating rich metadata that enables performance tracking across different training runs and model versions.

Flexible deployment configuration

Production deployment requires balancing model quality, deployment speed, and operational safety. Our deployment system uses configurable strategies that adapt to different organizational needs and risk tolerances:

src/project_ml/defs/assets/model_assets.py
class DeploymentConfig(dg.Config):
"""Configuration for model deployment."""

accuracy_threshold: float = ACCURACY_THRESHOLD
model_path: str = str(MODELS_DIR)
custom_model_name: Optional[str] = None # Allow users to specify a specific model to deploy
force_deploy: bool = False # Allow users to bypass accuracy threshold

This configuration enables multiple deployment scenarios: quality based automatic deployment promotes models meeting accuracy thresholds, manual model selection allows deploying specific versions by name for expert override, and force deployment bypasses quality gates for development environments. The flexible approach supports different strategies across environments—development might use lower thresholds for rapid iteration, while production employs strict quality gates.

Model storage abstraction

Scalable ML systems require storage solutions that work across development and production environments. Our abstract storage interface enables seamless transitions from local development to cloud based production systems:

src/project_ml/defs/resources.py
class ModelStoreResource(dg.ConfigurableResource, ABC):
"""Abstract base class for model storage resources."""

@abstractmethod
def save_model(self, model_data: dict[str, Any], model_name: str):
pass

@abstractmethod
def load_model(self, model_name: str) -> dict[str, Any]:
pass

@abstractmethod
def list_models(self) -> list[str]:
"""List available models, sorted by modification time (newest first)."""
pass

The storage abstraction supports both local filesystem storage for development experimentation and cloud based S3 storage for production durability and scalability. This design allows the same deployment logic to work across different environments while providing the persistence characteristics needed for each scenario—immediate access for development, durability and multi region support for production.

Inference services and prediction endpoints

Production ML systems serve predictions through different patterns depending on requirements. Our system supports both batch processing for high throughput scenarios and real-time inference for interactive applications:

src/project_ml/defs/assets/prediction_assets.py
class BatchPredictionConfig(dg.Config):
"""Configuration for batch prediction processing."""

batch_size: int = 64
num_test_images: int = 100 # Number of dummy images for demo
confidence_threshold: float = 0.8 # Threshold for low confidence warning
device: str = "cuda" # Will fallback to CPU if CUDA not available

src/project_ml/defs/assets/prediction_assets.py
class RealTimePredictionConfig(dg.Config):
"""Configuration for real-time prediction processing."""

batch_size: int = 10 # Default number of images to process at once
device: str = "cuda" # Will fallback to CPU if CUDA not available
confidence_threshold: float = 0.9 # Higher threshold for real-time predictions
return_probabilities: bool = False # Whether to return full probability distribution

Batch inference optimizes for throughput through vectorized operations and GPU utilization, ideal for overnight processing or analytical workloads. Real-time inference prioritizes latency for interactive applications, with configurable confidence thresholds and optional probability distribution returns. Both services include comprehensive error handling and monitoring capabilities essential for production deployment.

Production monitoring and quality assurance

The complete system demonstrates several production ready capabilities: automated quality gates prevent low-performing models from reaching production while providing override mechanisms for expert judgment, comprehensive metadata generation throughout the pipeline enables performance tracking and debugging, and configurable storage backends support seamless transitions from development to production environments.

Model versioning enables rapid rollback when new deployments cause issues, while confidence scoring in inference services allows routing uncertain predictions to human review. The asset based architecture provides clear dependency tracking and lineage visualization, essential for debugging model performance issues and ensuring reproducible results.

This foundation supports extending to more complex scenarios like multi model ensembles, advanced architectures, distributed training, and integration with MLOps platforms while maintaining the same clear structure and comprehensive observability that makes the system reliable and maintainable.

Summary

This tutorial demonstrated building a complete production ready ML pipeline using Dagster and PyTorch, covering data ingestion through model deployment and inference. The asset based architecture provides clear dependencies, comprehensive metadata, and flexible configuration while supporting both development experimentation and production reliability requirements.