Optimize the puzzle solver with MIPROv2
While our baseline Chain-of-Thought solver provides a solid foundation, MIPROv2 takes it to the next level through automatic optimization. This sophisticated algorithm doesn't just use fixed prompts. It actively generates and tests different instruction variants, curates effective few-shot examples, and systematically improves the solver's performance through data-driven refinement.
Optimization configuration
The optimization process requires careful tuning to balance performance gains against computational costs. Our configuration system provides granular control over the optimization algorithm's behavior:
def get_optimizer_config(self) -> dict:
"""Get DSPy optimizer configuration."""
return {
"optimizer": self.dspy_optimizer,
"auto_mode": self.dspy_auto_mode,
"min_samples": self.dspy_min_samples,
"performance_threshold": self.dspy_performance_threshold,
"improvement_threshold": self.dspy_improvement_threshold,
}
These settings work together to create a robust optimization framework: MIPROv2 serves as our primary algorithm due to its superior performance on reasoning tasks, while the auto-mode setting controls optimization intensity. "Light" mode provides quick improvements suitable for development, while "heavy" mode runs exhaustive searches for production deployments.
The threshold settings prevent wasted computation. We only optimize models that show baseline competency (performance_threshold of 0.3), and we only accept changes that provide meaningful improvements (improvement_threshold of 0.05). This ensures our optimization cycles focus on models worth improving and changes worth deploying.
Running optimization
The optimization process transforms our baseline solver into a highly tuned puzzle-solving specialist. This transformation happens automatically through MIPROv2's sophisticated search and refinement algorithms:
@dg.asset(
name=f"{self.model_name}_baseline_performance",
compute_kind="dspy",
group_name="evaluation",
description=f"Performance evaluation of baseline {self.model_name} model",
deps=[
f"{self.model_name}_baseline_model",
"connections_puzzle_data",
],
)
def baseline_performance_asset(
context: dg.AssetExecutionContext,
dspy_resource: DSPyResource,
) -> Dict[str, Any]:
"""Evaluate baseline model performance on Connections puzzles."""
# Configure DSPy
dspy_resource.configure_dspy()
# Load baseline model
model = ConnectionsSolver()
dspy_resource.load_model(model, f"{self.model_name}_baseline", "latest")
# Load evaluation puzzles (simulate loading from previous asset)
data_path = Path(self.connections_data_path)
if not data_path.is_absolute():
data_path = Path.cwd() / data_path
puzzles = load_puzzles_from_csv(str(data_path))
_, test_puzzles = shuffle_and_split_puzzles(puzzles, self.train_test_split)
eval_puzzles = test_puzzles[: self.eval_subset_size]
context.log.info(
f"Evaluating baseline model on {len(eval_puzzles)} puzzles"
)
# Create dataset for dspy.Evaluate
dataset = create_dataset(eval_puzzles)
# Use dspy.Evaluate with threading for faster evaluation
main_lm = _get_main_model()
with dspy.context(lm=main_lm):
evaluate = dspy.Evaluate(
devset=dataset,
metric=success_metric,
display_progress=True,
num_threads=self.num_threads,
)
result = evaluate(model)
# Extract score from EvaluationResult
score = result.score if hasattr(result, "score") else result
success_rate = float(score) if score is not None else 0.0
# Calculate additional metrics from predictions if available
num_evaluated = len(eval_puzzles)
performance = {
"success_rate": success_rate,
"num_evaluated": num_evaluated,
"needs_optimization": success_rate < self.performance_threshold,
"score": success_rate, # For compatibility
}
context.log.info(
f"Baseline Performance - Success Rate: {success_rate:.3f}, "
f"Puzzles Evaluated: {num_evaluated}"
)
context.add_output_metadata(
{
**performance,
"performance_threshold": self.performance_threshold,
"model_name": f"{self.model_name}_baseline",
}
)
return performance
# Only create optimization asset if enabled
assets = [
puzzle_data_asset,
baseline_model_asset,
baseline_performance_asset,
]
if self.optimization_enabled:
@dg.asset(
name=f"{self.model_name}_optimized_model",
compute_kind="dspy",
group_name="models",
description=f"MIPROv2 optimized {self.model_name} DSPy model",
deps=[f"{self.model_name}_baseline_performance"],
)
def optimized_model_asset(
context: dg.AssetExecutionContext,
dspy_resource: DSPyResource,
) -> Dict[str, Any]:
"""Run DSPy optimization if performance is below threshold."""
# Configure DSPy
dspy_resource.configure_dspy()
# Load baseline model
baseline_model = ConnectionsSolver()
dspy_resource.load_model(
baseline_model, f"{self.model_name}_baseline", "latest"
)
# Load puzzle data for optimization
data_path = Path(self.connections_data_path)
if not data_path.is_absolute():
data_path = Path.cwd() / data_path
puzzles = load_puzzles_from_csv(str(data_path))
train_puzzles, _ = shuffle_and_split_puzzles(
puzzles, self.train_test_split
)
# Take subset for optimization (smaller for speed)
opt_puzzles = train_puzzles[: min(20, len(train_puzzles))]
# Create datasets for optimization
train_dataset = create_dataset(opt_puzzles)
# Split into train/val (60/40 split)
train_size = int(0.6 * len(train_dataset))
trainset = train_dataset[:train_size]
valset = train_dataset[train_size:]
context.log.info(
f"Starting MIPROv2 optimization with {len(trainset)} train, {len(valset)} val examples"
)
# Define optimization metric - use the simple success metric
def optimization_metric(example, pred, trace=None):
try:
# Use the simple success metric that matches your solver
return success_metric(example, pred, trace)
except Exception as e:
context.log.warning(f"Metric calculation failed: {e}")
return False
# Get model parameters for Gemini
main_model = os.getenv("GEMINI_MODEL", "gemini-2.0-flash-exp")
api_key = os.getenv("GEMINI_API_KEY", "")
params = {
"api_key": api_key,
"temperature": 0.7,
"max_tokens": 8192,
}
optimizer = MIPROv2(
auto="light", # Use light mode for faster optimization
metric=optimization_metric,
teacher_settings=dict(lm=dspy.LM(f"gemini/{main_model}", **params)),
prompt_model=dspy.LM(f"gemini/{main_model}", **params),
num_threads=4,
)
optimized_model = optimizer.compile(
baseline_model,
trainset=trainset,
valset=valset,
)
# Save optimized model
model_path = dspy_resource.save_model(
optimized_model, f"{self.model_name}_optimized", "latest"
)
context.log.info(f"Optimization complete. Model saved to: {model_path}")
context.add_output_metadata(
{
"model_name": f"{self.model_name}_optimized",
"model_path": str(model_path),
"optimization_method": "MIPROv2",
"base_model": f"{self.model_name}_baseline",
}
)
return {
"model_name": f"{self.model_name}_optimized",
"model_path": str(model_path),
"version": "latest",
"optimized": True,
}
@dg.asset(
name=f"{self.model_name}_optimized_performance",
compute_kind="dspy",
group_name="evaluation",
description=f"Performance evaluation of optimized {self.model_name} model",
deps=[
f"{self.model_name}_optimized_model",
"connections_puzzle_data",
],
)
def optimized_performance_asset(
context: dg.AssetExecutionContext,
dspy_resource: DSPyResource,
) -> Dict[str, Any]:
"""Evaluate optimized model performance."""
# Configure DSPy
dspy_resource.configure_dspy()
# Load optimized model
model = ConnectionsSolver()
dspy_resource.load_model(
model, f"{self.model_name}_optimized", "latest"
)
# Load evaluation puzzles
data_path = Path(self.connections_data_path)
if not data_path.is_absolute():
data_path = Path.cwd() / data_path
puzzles = load_puzzles_from_csv(str(data_path))
_, test_puzzles = shuffle_and_split_puzzles(
puzzles, self.train_test_split
)
eval_puzzles = test_puzzles[: self.eval_subset_size]
context.log.info(
f"Evaluating optimized model on {len(eval_puzzles)} puzzles"
)
# Create dataset for dspy.Evaluate
dataset = create_dataset(eval_puzzles)
# Use dspy.Evaluate with threading for faster evaluation
main_lm = _get_main_model()
with dspy.context(lm=main_lm):
evaluate = dspy.Evaluate(
devset=dataset,
metric=success_metric,
display_progress=True,
num_threads=self.num_threads,
)
result = evaluate(model)
# Extract score from EvaluationResult
score = result.score if hasattr(result, "score") else result
success_rate = float(score) if score is not None else 0.0
# Calculate additional metrics
num_evaluated = len(eval_puzzles)
performance = {
"success_rate": success_rate,
"num_evaluated": num_evaluated,
"model_type": "optimized",
"score": success_rate, # For compatibility
}
context.log.info(
f"Optimized Performance - Success Rate: {success_rate:.3f}, "
f"Puzzles Evaluated: {num_evaluated}"
)
context.add_output_metadata(
{
**performance,
"performance_threshold": self.performance_threshold,
"model_name": f"{self.model_name}_optimized",
}
)
return performance
The magic happens in the optimizer.compile()
call, where MIPROv2 takes our baseline solver and training puzzles, then systematically experiments with different approaches. It generates multiple instruction variants ("Find four words that share a common theme" vs "Identify the strongest semantic connection among these words"), tests various few-shot example combinations, and measures performance against our success metric.
The optimizer uses our training puzzles to understand what constitutes good puzzle-solving behavior, while our custom success metric ensures optimization focuses on actual puzzle completion rather than generic language model metrics. The result is a solver that has learned not just to reason about word relationships, but to do so in ways that consistently lead to successful puzzle solutions.
Performance monitoring
Effective optimization requires intelligent guardrails to prevent wasted computational resources and ensure meaningful improvements. Our monitoring system implements several layers of quality control:
dspy_performance_threshold: float = Field(
default=0.3, env="DSPY_PERFORMANCE_THRESHOLD"
)
dspy_improvement_threshold: float = Field(
default=0.05, env="DSPY_IMPROVEMENT_THRESHOLD"
)
The performance threshold (0.3) acts as a quality gate. We only invest optimization resources in models that show basic puzzle-solving competency. This prevents the optimizer from trying to polish fundamentally flawed approaches and ensures our computational budget focuses on promising candidates.
The improvement threshold (0.05) guards against accepting marginal changes that could be due to random variation rather than genuine optimization. In puzzle-solving tasks, a 5% improvement in success rate represents meaningful progress. The difference between solving 3 out of 10 puzzles versus 3.5 out of 10. This threshold ensures our optimization cycles result in deployable improvements rather than statistical noise.
Component integration
The optimization process is designed for real-world deployment scenarios where different environments may require different optimization strategies. The Dagster Component architecture makes this flexibility possible through configuration inheritance and override patterns:
def __init__(self, **kwargs):
# Set Connections-specific defaults
defaults = {
"model_name": "connections",
"connections_data_path": "data/Connections_Data.csv",
"performance_threshold": 0.3,
"optimization_enabled": True,
"train_test_split": 0.25,
"eval_subset_size": 30,
}
# Override with user-provided values
for key, value in defaults.items():
if key not in kwargs:
kwargs[key] = value
super().__init__(**kwargs)
This component pattern provides powerful flexibility: the base DSPyModelBuilder
contains all the optimization logic and can be used for any DSPy project, while ConnectionsModelBuilder
specializes it with puzzle-specific defaults. Development environments might use smaller evaluation subsets (30 puzzles) and lower performance thresholds for rapid iteration, while production deployments could use larger evaluation sets and stricter thresholds for robust models.
The configuration-driven approach means teams can maintain different optimization strategies for different use cases without duplicating code. A research team might prioritize optimization_enabled=True with heavy auto-mode for maximum performance, while a cost-conscious deployment might disable optimization and rely on carefully tuned baseline models.
Next steps
With our optimized solver in hand, we need comprehensive evaluation to understand how much the optimization improved our puzzle-solving capabilities and whether the model is ready for production deployment.
- Continue this tutorial with performance evaluation