Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Optuna to Learn from Full Evaluations in Minibatch Mode #2237

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 70 additions & 25 deletions dspy/teleprompt/mipro_optimizer_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import numpy as np
import optuna
from optuna.distributions import CategoricalDistribution

import dspy
from dspy.evaluate.evaluate import Evaluate
Expand Down Expand Up @@ -477,8 +478,19 @@ def _optimize_prompt_parameters(
minibatch_full_eval_steps: int,
seed: int,
) -> Optional[Any]:
logger.info("Evaluating the default program...\n")
default_score = eval_candidate_program(len(valset), valset, program, evaluate, self.rng)

# Run optimization
optuna.logging.set_verbosity(optuna.logging.WARNING)
logger.info("==> STEP 3: FINDING OPTIMAL PROMPT PARAMETERS <==")
logger.info(
"We will evaluate the program over a series of trials with different combinations of instructions and few-shot examples to find the optimal combination using Bayesian Optimization.\n"
)

# Compute the adjusted total trials that we will run (including full evals)
adjusted_num_trials = (num_trials + num_trials // minibatch_full_eval_steps + 1) if minibatch else num_trials
logger.info(f"== Trial {1} / {adjusted_num_trials} - Full Evaluation of Default Program ==")

default_score, baseline_results = eval_candidate_program(len(valset), valset, program, evaluate, self.rng, return_all_scores=True)
logger.info(f"Default program score: {default_score}\n")

trial_logs = {}
Expand All @@ -504,7 +516,7 @@ def objective(trial):

trial_num = trial.number + 1
if minibatch:
logger.info(f"== Minibatch Trial {trial_num} / {num_trials} ==")
logger.info(f"== Trial {trial_num} / {adjusted_num_trials} - Minibatch ==")
else:
logger.info(f"===== Trial {trial_num} / {num_trials} =====")

Expand All @@ -514,7 +526,7 @@ def objective(trial):
candidate_program = program.deepcopy()

# Choose instructions and demos, insert them into the program
chosen_params = self._select_and_insert_instructions_and_demos(
chosen_params, raw_chosen_params = self._select_and_insert_instructions_and_demos(
candidate_program,
instruction_candidates,
demo_candidates,
Expand All @@ -531,7 +543,7 @@ def objective(trial):
# Evaluate the candidate program (on minibatch if minibatch=True)
batch_size = minibatch_size if minibatch else len(valset)
score = eval_candidate_program(
batch_size, valset, candidate_program, evaluate, self.rng
batch_size, valset, candidate_program, evaluate, self.rng
)
total_eval_calls += batch_size

Expand All @@ -551,7 +563,7 @@ def objective(trial):
chosen_params,
score_data,
trial,
num_trials,
adjusted_num_trials,
trial_logs,
trial_num,
candidate_program,
Expand All @@ -563,16 +575,17 @@ def objective(trial):
)
categorical_key = ",".join(map(str, chosen_params))
param_score_dict[categorical_key].append(
(score, candidate_program),
(score, candidate_program, raw_chosen_params),
)

# If minibatch, perform full evaluation at intervals
# If minibatch, perform full evaluation at intervals (and at the very end)
if minibatch and (
(trial_num % minibatch_full_eval_steps == 0)
or (trial_num == num_trials)
or (trial_num == (adjusted_num_trials -1))
):
best_score, best_program, total_eval_calls = self._perform_full_evaluation(
trial_num,
adjusted_num_trials,
param_score_dict,
fully_evaled_param_combos,
evaluate,
Expand All @@ -582,20 +595,28 @@ def objective(trial):
score_data,
best_score,
best_program,
study,
instruction_candidates,
demo_candidates
)

return score

# Run optimization
optuna.logging.set_verbosity(optuna.logging.WARNING)
logger.info("==> STEP 3: FINDING OPTIMAL PROMPT PARAMETERS <==")
logger.info(
"We will evaluate the program over a series of trials with different combinations of instructions and few-shot examples to find the optimal combination using Bayesian Optimization.\n"
)

sampler = optuna.samplers.TPESampler(seed=seed, multivariate=True)
study = optuna.create_study(direction="maximize", sampler=sampler)
study.optimize(objective, n_trials=num_trials)

default_params = {f"{i}_predictor_instruction": 0 for i in range(len(program.predictors()))}
if demo_candidates:
default_params.update({f"{i}_predictor_demos": 0 for i in range(len(program.predictors()))})

# Add default run as a baseline in optuna (TODO: figure out how to weight this by # of samples evaluated on)
trial = optuna.trial.create_trial(
params=default_params,
distributions= self._get_param_distributions(program, instruction_candidates, demo_candidates),
value=default_score,
)
study.add_trial(trial)
study.optimize(objective, n_trials=num_trials-1)

# Attach logs to best program
if best_program is not None and self.track_stats:
Expand All @@ -621,7 +642,7 @@ def _log_minibatch_eval(
chosen_params,
score_data,
trial,
num_trials,
adjusted_num_trials,
trial_logs,
trial_num,
candidate_program,
Expand All @@ -633,7 +654,7 @@ def _log_minibatch_eval(
trial_logs[trial_num]["mb_score"] = score
trial_logs[trial_num]["total_eval_calls_so_far"] = total_eval_calls
trial_logs[trial_num]["mb_program"] = candidate_program.deepcopy()

logger.info(
f"Score: {score} on minibatch of size {batch_size} with parameters {chosen_params}."
)
Expand All @@ -642,7 +663,7 @@ def _log_minibatch_eval(
logger.info(f"Full eval scores so far: {trajectory}")
logger.info(f"Best full score so far: {best_score}")
logger.info(
f'{"="*len(f"== Minibatch Trial {trial.number+1} / {num_trials} ==")}\n\n'
f'{"="*len(f"== Trial {trial.number+1} / {adjusted_num_trials} - Minibatch Evaluation ==")}\n\n'
)

def _log_normal_eval(
Expand Down Expand Up @@ -670,6 +691,7 @@ def _select_and_insert_instructions_and_demos(
trial_num: int,
) -> List[str]:
chosen_params = []
raw_chosen_params = {}

for i, predictor in enumerate(candidate_program.predictors()):
# Select instruction
Expand All @@ -683,7 +705,7 @@ def _select_and_insert_instructions_and_demos(
set_signature(predictor, updated_signature)
trial_logs[trial_num][f"{i}_predictor_instruction"] = instruction_idx
chosen_params.append(f"Predictor {i}: Instruction {instruction_idx}")

raw_chosen_params[f"{i}_predictor_instruction"] = instruction_idx
# Select demos if available
if demo_candidates:
demos_idx = trial.suggest_categorical(
Expand All @@ -692,12 +714,24 @@ def _select_and_insert_instructions_and_demos(
predictor.demos = demo_candidates[i][demos_idx]
trial_logs[trial_num][f"{i}_predictor_demos"] = demos_idx
chosen_params.append(f"Predictor {i}: Few-Shot Set {demos_idx}")
raw_chosen_params[f"{i}_predictor_demos"] = instruction_idx

return chosen_params, raw_chosen_params

return chosen_params
def _get_param_distributions(self, program, instruction_candidates, demo_candidates):
param_distributions = {}

for i in range(len(instruction_candidates)):
param_distributions[f"{i}_predictor_instruction"] = CategoricalDistribution(range(len(instruction_candidates[i])))
if demo_candidates:
param_distributions[f"{i}_predictor_demos"] = CategoricalDistribution(range(len(demo_candidates[i])))

return param_distributions

def _perform_full_evaluation(
self,
trial_num: int,
adjusted_num_trials: int,
param_score_dict: Dict,
fully_evaled_param_combos: Dict,
evaluate: Evaluate,
Expand All @@ -707,11 +741,14 @@ def _perform_full_evaluation(
score_data,
best_score: float,
best_program: Any,
study: optuna.Study,
instruction_candidates: List,
demo_candidates: List,
):
logger.info(f"===== Full Eval {len(fully_evaled_param_combos)+1} =====")
logger.info(f"===== Trial {trial_num+1} /{adjusted_num_trials} - Full Evaluation =====")

# Identify best program to evaluate fully
highest_mean_program, mean_score, combo_key = (
highest_mean_program, mean_score, combo_key, params = (
get_program_with_highest_avg_score(
param_score_dict, fully_evaled_param_combos
)
Expand All @@ -724,6 +761,14 @@ def _perform_full_evaluation(
)
score_data.append((full_eval_score, highest_mean_program, True))

# Log full eval as a trial so that optuna can learn from the new results
trial = optuna.trial.create_trial(
params=params,
distributions= self._get_param_distributions(best_program, instruction_candidates, demo_candidates),
value=full_eval_score,
)
study.add_trial(trial)

# Log full evaluation results
fully_evaled_param_combos[combo_key] = {
"program": highest_mean_program,
Expand Down
21 changes: 10 additions & 11 deletions dspy/teleprompt/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,23 @@ def create_minibatch(trainset, batch_size=50, rng=None):
return minibatch


def eval_candidate_program(batch_size, trainset, candidate_program, evaluate, rng=None):
def eval_candidate_program(batch_size, trainset, candidate_program, evaluate, rng=None, return_all_scores=False):
"""Evaluate a candidate program on the trainset, using the specified batch size."""

try:
# Evaluate on the full trainset
if batch_size >= len(trainset):
score = evaluate(candidate_program, devset=trainset)
return evaluate(candidate_program, devset=trainset, return_all_scores=return_all_scores)
# Or evaluate on a minibatch
else:
score = evaluate(
return evaluate(
candidate_program,
devset=create_minibatch(trainset, batch_size, rng),
return_all_scores=return_all_scores
)
except Exception as e:
print(f"Exception occurred: {e}")
score = 0.0 # TODO: Handle this better, as -ve scores are possible

return score

return 0.0 # TODO: Handle this better, as -ve scores are possible

def eval_candidate_program_with_pruning(
trial, trial_logs, trainset, candidate_program, evaluate, trial_num, batch_size=100,
Expand Down Expand Up @@ -114,22 +112,23 @@ def get_program_with_highest_avg_score(param_score_dict, fully_evaled_param_comb
scores = np.array([v[0] for v in values])
mean = np.average(scores)
program = values[0][1]
results.append((key, mean, program))
params = values[0][2]
results.append((key, mean, program, params))

# Sort results by the mean
sorted_results = sorted(results, key=lambda x: x[1], reverse=True)

# Find the combination with the highest mean, skip fully evaluated ones
for combination in sorted_results:
key, mean, program = combination
key, mean, program, params = combination

if key in fully_evaled_param_combos:
continue

return program, mean, key
return program, mean, key, params

# If no valid program is found, we return the last valid one that we found
return program, mean, key
return program, mean, key, params


def calculate_last_n_proposed_quality(
Expand Down
Loading