From 3eac514ebb9cd2fff562852fe24a14135db0c7ea Mon Sep 17 00:00:00 2001 From: Ching-An Cheng Date: Tue, 2 Dec 2025 19:30:46 -0800 Subject: [PATCH 01/13] Fix incorrect sample count update in training logging Fix missed updating of the number of samples processed. --- opto/features/priority_search/search_template.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opto/features/priority_search/search_template.py b/opto/features/priority_search/search_template.py index ec244f74..616dd1ff 100644 --- a/opto/features/priority_search/search_template.py +++ b/opto/features/priority_search/search_template.py @@ -230,13 +230,13 @@ def train(self, train_scores.append(info_sample['mean_score']) # so that mean can be computed train_num_samples.append(info_sample['num_samples']) + self.n_samples += len(samples) # update the number of samples processed if self.n_iters % log_frequency == 0: avg_train_score = np.sum(np.array(train_scores) * np.array(train_num_samples)) / np.sum(train_num_samples) self.logger.log('Algo/Average train score', avg_train_score, self.n_iters, color='blue') self.log(info_update, prefix="Update/") self.log(info_sample, prefix="Sample/") - self.n_samples += len(samples) # update the number of samples processed self.logger.log('Algo/Number of training samples', self.n_samples, self.n_iters, color='blue') # Log parameters for p in self.agent.parameters(): From 3004f949f95f4efce3774544d538e8a2b58e2963 Mon Sep 17 00:00:00 2001 From: Allen Nie Date: Wed, 3 Dec 2025 15:18:55 -0800 Subject: [PATCH 02/13] Update Python version in CI workflow to 3.13 --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7889b69d..622c9626 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,7 +49,7 @@ jobs: # 6) Set up Python & install dependencies - uses: actions/setup-python@v5 - with: { python-version: "3.10" } + with: { python-version: "3.13" } - name: Install Python deps run: | pip install -e . From c0af1d8c06a2891dc410920fab25171470b71c00 Mon Sep 17 00:00:00 2001 From: Allen Nie Date: Wed, 3 Dec 2025 15:19:10 -0800 Subject: [PATCH 03/13] Update python-app.yml --- .github/workflows/python-app.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 8074be85..a111e34f 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -19,10 +19,10 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Set up Python 3.10 + - name: Set up Python 3.13 uses: actions/setup-python@v3 with: - python-version: "3.10" + python-version: "3.13" - name: Install dependencies run: | python -m pip install --upgrade pip From c06e265867dc27a464120bc3789fbbe753d5fdf2 Mon Sep 17 00:00:00 2001 From: xuanfeiren Date: Thu, 11 Dec 2025 14:32:06 -0600 Subject: [PATCH 04/13] add a summarizer --- opto/features/priority_search/summarizer.py | 148 ++++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 opto/features/priority_search/summarizer.py diff --git a/opto/features/priority_search/summarizer.py b/opto/features/priority_search/summarizer.py new file mode 100644 index 00000000..f18eb1ce --- /dev/null +++ b/opto/features/priority_search/summarizer.py @@ -0,0 +1,148 @@ +from opto.optimizers.utils import print_color +from opto.utils.llm import LLM # For the selector LLM +import random +import re + + +def get_trajectory_of_one_rollout(rollout): + """ + Convert a rollout into a structured markdown trajectory for optimization. + + This function extracts the trainable parameters and formats the trajectory + to guide the optimizer in improving the module's performance. + + Parameters + ---------- + rollout : dict + A rollout dictionary containing: + - 'module': trace.Module - the agent module with trainable parameters + - 'x': Any - the input data + - 'info': Any - additional information about the input + - 'target': Any - the generated output + - 'score': float - evaluation score (0 = failed, 1 = success) + - 'feedback': Any - detailed feedback from the evaluation + + Returns + ------- + str + A markdown-formatted trajectory string for optimizer guidance. + """ + assert rollout['module'] is not None, "rollout['module'] is None." + assert rollout['x'] is not None, "rollout['x'] is None." + assert rollout['target'] is not None, "rollout['target'] is None." + assert rollout['score'] is not None, "rollout['score'] is None." + assert rollout['feedback'] is not None, "rollout['feedback'] is None." + + # Extract trainable parameters + parameters = rollout['module'].parameters() + parameters_dict = {p.py_name: p.data for p in parameters} + + # Construct structured markdown trajectory + trajectory = f"""## Task Trajectory + +## Module Parameters +{parameters_dict} + +## Input +{rollout['x']} + +## Output +{rollout['target']} + +## Result +- **Score:** {rollout['score']} +- **Feedback:** {rollout['feedback']} + +## Optimization Note +Analyze what parameter patterns lead to successful vs. failed outputs. +""" + return trajectory + + + + +class Summarizer: + """A class which use LLM to summarize the trajectories of the memory. It should be able to learn the patterns of the trajectories. Generate a summary to guide the optimizer to generate better candidates. + """ + def __init__(self, model_name: str = "gemini/gemini-2.0-flash"): + self.llm = LLM() # use the default model + self.max_candidates_in_prompt = 50 + + def _get_trajecories_for_memory(self, memory): + """ + Get trajectories for the memory. Memory is a list of (neg_score, candidate) tuples. + We first collect rollouts from the each candidate, and then get the trajectories for each rollout. + + Return one single string of all trajectories. + """ + trajectories = [] + print_color(f"Getting trajectories from {len(memory)} candidates.", "blue") + # copy a random shuffle of the memory + memory_with_rollouts = [(neg_score, candidate) for neg_score, candidate in memory if len([rollout for rollout in candidate.rollouts if rollout['score'] is not None]) > 0] + temporary_memory = random.sample(memory_with_rollouts, k=min(self.max_candidates_in_prompt, len(memory_with_rollouts))) + for _, candidate in temporary_memory: + rollouts = [rollout for rollout in candidate.rollouts if rollout['score'] is not None] + if len(rollouts) == 0: + continue + # For each candidate, add one (if exists) successful_rollout and one (if exists) failed_rollout. + candidate_update_dict = candidate.update_dict.values() + # print_color(f"Candidate pamameters: {candidate_update_dict}", "blue")# For debugging + prompt = f"Candidate pamameters: {candidate_update_dict}." + successful_rollouts = [rollout for rollout in rollouts if rollout['score'] > 0] + failed_rollouts = [rollout for rollout in rollouts if rollout['score'] == 0] + if len(successful_rollouts) > 0: + random_successful_rollout = random.choice(successful_rollouts) + prompt += f"\nSuccessful trajectory: {get_trajectory_of_one_rollout(random_successful_rollout)}." + if len(failed_rollouts) > 0: + random_failed_rollout = random.choice(failed_rollouts) + prompt += f"\nFailed trajectory: {get_trajectory_of_one_rollout(random_failed_rollout)}." + + trajectories.append(prompt) + + print_color(f"Generated trajectories from {len(trajectories)} candidates.", "green") + return '\n'.join(trajectories) + + def summarize(self, memory) -> str: + """Summarize the trajectories using the LLM. + Args: + memory: The memory containing trajectories to summarize. + Returns: + str: The summary. + """ + + history_trajectories = self._get_trajecories_for_memory(memory) + + # print_color(f"History trajectories: {history_trajectories}", "green") + + if len(history_trajectories) == 0: + return "No successful trajectories found for the memory." + + system_prompt = "You are an expert at analyzing agent behavior patterns and providing actionable guidance for parameter optimization." + + user_prompt = f"""Analyze the following agent rollout trajectories and extract insights for optimization. + + Trajectories: + {history_trajectories} + + Provide your analysis in XML format: + + Analyze the key patterns and strategies that led to success or failure in these trajectories. + + + Concrete recommendations for improving output quality based on successful or failed patterns observed in the trajectories. + """ + + prompt_messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ] + + response = self.llm(messages=prompt_messages) + response = response.choices[0].message.content + + # Extract summary using XML regex + summary_match = re.search(r'(.*?)', response, re.DOTALL) + + return summary_match.group(1).strip() + + From 2cc6cd6bde16166e7955027ba571ad1b34065588 Mon Sep 17 00:00:00 2001 From: xuanfeiren Date: Thu, 11 Dec 2025 14:34:29 -0600 Subject: [PATCH 05/13] add a regressor template --- .../priority_search/module_regressor.py | 141 ++++++++++++++++++ 1 file changed, 141 insertions(+) diff --git a/opto/features/priority_search/module_regressor.py b/opto/features/priority_search/module_regressor.py index a92793c5..4b9b48d9 100644 --- a/opto/features/priority_search/module_regressor.py +++ b/opto/features/priority_search/module_regressor.py @@ -17,6 +17,147 @@ from opto.utils.auto_retry import retry_with_exponential_backoff import litellm import time +from opto.features.priority_search.priority_search import ModuleCandidate + +class RegressorTemplate: + """Base class template for regression-based predictors for ModuleCandidate objects. + + Provides common functionality for embedding generation and candidate processing. + Subclasses should implement update() and predict_scores() methods. + + Regressors can be built on this template by implementing the update() and predict_scores() methods. + This class itself is enough for getting embeddings for candidates. + """ + + def __init__(self, embedding_model="gemini/text-embedding-004", num_threads=None, regularization_strength=1, linear_dim=None, rich_text=True): + # In the regressor, no need for calling LLM to make the prediction. So we could predict the entire memory at once. + self.max_candidates_to_predict = 500 + self.embedding_model = embedding_model + self.num_threads = num_threads + self.regularization_strength = regularization_strength # L2 regularization strength (lambda) + self.rich_text = rich_text + + # Default original embedding dimension (from text-embedding-004) + self.original_embedding_dim = 768 + + # if linear_dim is not None: + # # Use random projection from 768D to linear_dim + # self.linear_dim = linear_dim + # print_color(f"Using random projection: {self.original_embedding_dim}D → {linear_dim}D", "blue") + # self.random_projector = GaussianRandomProjection( + # input_dim=self.original_embedding_dim, + # output_dim=linear_dim, + # random_seed=42 + # ) + # else: + # # Use default 768D without projection + # self.linear_dim = self.original_embedding_dim + # self.random_projector = None + self.linear_dim = self.original_embedding_dim + self.random_projector = None + + # Initialize weights with larger values for more aggressive learning + self.weights = np.random.normal(0, 0.1, self.linear_dim) + self.bias = 0.0 + + def _get_parameter_text(self, candidate): + """Get the parameter text for a ModuleCandidate.""" + if not hasattr(candidate, 'update_dict'): + print(candidate) + assert hasattr(candidate, 'update_dict'), "ModuleCandidate must have an update_dict" + # Convert parameter nodes to readable names for deterministic embedding + params_with_names = {k.py_name: v for k, v in candidate.update_dict.items()} + + # if self.rich_text: + # # Create rich text representation with problem definition and rating question + # rich_text_parts = [] + + # # Add problem definition + # rich_text_parts.append(f"Problem Definition: {DOMAIN_CONTEXT_VERIBENCH.strip()}") + # rich_text_parts.append("") # Empty line for separation + + # # Add parameter configuration + # rich_text_parts.append("Parameter Configuration:") + # for param_name, param_value in params_with_names.items(): + # rich_text_parts.append(f"{param_name}: {param_value}") + # rich_text_parts.append("") # Empty line for separation + + # # Add rating question + # rich_text_parts.append("Question: Based on the problem context above and this parameter configuration, how do you rate this parameter?") + + # return "\n".join(rich_text_parts) + # else: + return str(params_with_names) + + + def _get_embedding(self, candidate): + """Get the embedding for a ModuleCandidate.""" + parameter_text = self._get_parameter_text(candidate) + + def single_embedding_call(): + return litellm.embedding( + model=self.embedding_model, + input=parameter_text + ) + + try: + response = retry_with_exponential_backoff( + single_embedding_call, + max_retries=10, + base_delay=1.0, + operation_name="Embedding API call" + ) + embedding = response.data[0].embedding + if self.random_projector is not None: + # Convert to numpy array and reshape for transform (expects 2D: n_samples x n_features) + embedding_array = np.array(embedding).reshape(1, -1) + projected = self.random_projector.transform(embedding_array) + # Convert back to list and flatten + embedding = projected.flatten().tolist() + return embedding + except Exception as e: + print_color(f"ERROR: Embedding API call failed after retries: {e}", "red") + return None + + def add_embeddings_to_candidates(self, candidates: List[ModuleCandidate]): + """Add embeddings to a list of candidates. This function could be used outside.""" + self._update_memory_embeddings_for_batch(candidates) + + def _update_memory_embeddings_for_batch(self, batch): + """Update the embeddings for a batch of candidates.""" + # Separate candidates that need embeddings from those that already have them + candidates_needing_embeddings = [] + for candidate in batch: + if not hasattr(candidate, "embedding"): + candidates_needing_embeddings.append(candidate) + + # Generate embeddings in parallel for candidates that need them + if candidates_needing_embeddings: + def get_embedding_for_candidate(candidate): + return self._get_embedding(candidate) + + # Create function list for async_run + embedding_functions = [lambda c=candidate: get_embedding_for_candidate(c) + for candidate in candidates_needing_embeddings] + + # Run embedding generation in parallel + new_embeddings = async_run( + embedding_functions, + max_workers=50, + description=f"Generating embeddings for {len(candidates_needing_embeddings)} candidates" + ) + + # Assign embeddings back to candidates + for candidate, embedding in zip(candidates_needing_embeddings, new_embeddings): + candidate.embedding = embedding + + def update(self, memory: List[Tuple[float, ModuleCandidate]]): + """Update the regression model parameters. Should be implemented by subclasses.""" + raise NotImplementedError("Subclasses must implement the update method") + + def predict_scores(self, memory: List[Tuple[float, ModuleCandidate]]): + """Predict scores for candidates. Should be implemented by subclasses.""" + raise NotImplementedError("Subclasses must implement the predict_scores method") class ModuleCandidateRegressor: """ From edbc0d7195123e4b3242d81a6b380bada0357b46 Mon Sep 17 00:00:00 2001 From: xuanfeiren Date: Thu, 11 Dec 2025 15:05:50 -0600 Subject: [PATCH 06/13] add a method to filter new candidates, move "adding exploration samples" before proposing. --- .../priority_search/priority_search.py | 37 +++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/opto/features/priority_search/priority_search.py b/opto/features/priority_search/priority_search.py index 86bcbd60..ceaf518e 100644 --- a/opto/features/priority_search/priority_search.py +++ b/opto/features/priority_search/priority_search.py @@ -424,7 +424,14 @@ def update(self, # samples is None in the first iteration if samples is not None: # 1. Propose new parameters based on running LLM optimizers on the collected samples + + # We add the exploration rollouts to the exploration candidates, before proposing. Then these samples will not be added in the validate step. + self.add_exploration_rollouts_to_candidates(self._exploration_candidates, samples) candidates = self.propose(samples, verbose=verbose, **kwargs) # List of ModuleCandidates + + # Filter the new candidates, only some of them will be added to the memory. Default is no filtering. + candidates = self.filter_candidates(candidates) + # 2. Validate the proposed parameters validate_results = self.validate(candidates, samples, verbose=verbose, **kwargs) # this updates the priority queue # 3. Update the priority queue with the validation results @@ -612,7 +619,9 @@ def validate(self, assert self._exploration_candidates is not None, "exploration_candidates must be set before calling validate." # The current batch of samples can be used to validate the exploration candidates - validate_samples = copy.copy(samples) + # validate_samples = copy.copy(samples) + # Exploration samples are added before proposing, so we don't need to add them again here. + validate_samples = Samples([], {'inputs': [], 'infos': []}) # Validate newly proposed candidates use_prev_batch = self.use_prev_batch # when True, self.validate_sampler == self.train_sampler, and the current batch is used for validation @@ -629,8 +638,8 @@ def validate(self, description_prefix='Validating exploration candidates: ')) # sample the exploration agents validate_samples.add_samples(exploration_samples) # append the exploration samples to the validate_samples - - matched_candidates_and_samples = self.match_candidates_and_samples(exploration_candidates + candidates, validate_samples.samples) + candidates_to_be_matched = exploration_candidates + candidates if self.validate_exploration_candidates else candidates + matched_candidates_and_samples = self.match_candidates_and_samples(candidates_to_be_matched, validate_samples.samples) results = {} # dict of ModuleCandidate id: (ModuleCandidate, list of rollouts) for c, rollouts in matched_candidates_and_samples.items(): # rollouts is a list of BatchRollouts results[c] = [ r for rr in rollouts for r in rr.to_list()] # we only need the list of dicts @@ -802,3 +811,25 @@ def _process_rollout(rollout): for rollout in candidate.rollouts: _process_rollout(rollout) return candidate + + # For the further usage. + def filter_candidates(self, candidates: List[ModuleCandidate]) -> List[ModuleCandidate]: + """ Filter candidates. + This function can be overridden by subclasses to filter candidates by other criteria. + Args: + candidates (List[ModuleCandidate]): A list of candidates to filter. + Returns: + List[ModuleCandidate]: A list of filtered candidates. + """ + return candidates + + # For the further usage, we decide to add the exploration rollouts to the exploration candidates, before proposing. + def add_exploration_rollouts_to_candidates(self, exploration_candidates: List[ModuleCandidate], samples: Samples): + """ Add the exploration rollouts to the exploration candidates. + """ + matched_exploration_candidates_and_samples = self.match_candidates_and_samples(exploration_candidates, samples.samples) + exploration_results = {} # dict of ModuleCandidate id: (ModuleCandidate, list of rollouts) + for c, rollouts in matched_exploration_candidates_and_samples.items(): # rollouts is a list of BatchRollouts + exploration_results[c] = [ r for rr in rollouts for r in rr.to_list()] + for candidate, rollouts in exploration_results.items(): + candidate.add_rollouts(rollouts) From 3dde2fab52fde310c2389e17e6f044da96dfb316 Mon Sep 17 00:00:00 2001 From: xuanfeiren Date: Thu, 11 Dec 2025 15:08:58 -0600 Subject: [PATCH 07/13] add PS with epsNet and summarizer features --- .../epsNetPS_plus_summarizer.py | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 opto/features/priority_search/epsNetPS_plus_summarizer.py diff --git a/opto/features/priority_search/epsNetPS_plus_summarizer.py b/opto/features/priority_search/epsNetPS_plus_summarizer.py new file mode 100644 index 00000000..bae3cfcf --- /dev/null +++ b/opto/features/priority_search/epsNetPS_plus_summarizer.py @@ -0,0 +1,125 @@ +from opto.features.priority_search.priority_search import PrioritySearch, ModuleCandidate +from opto.features.priority_search.module_regressor import RegressorTemplate +from opto.features.priority_search.summarizer import Summarizer +from typing import Union, List, Tuple, Dict, Any, Optional, Callable +from opto.optimizers.utils import print_color +import numpy as np +from opto.features.priority_search.search_template import Samples + + +def calculate_distance_to_memory(memory, new_candidate): + """For a new candidate, calculate the distance to the current memory. That's the least L2 distance to any candidate in the memory. + + To use this funciton in PrioritySearch, set memory to be self.memory.memory. + """ + assert hasattr(new_candidate, 'embedding') and all(hasattr(candidate, 'embedding') for _, candidate in memory), "All candidates should have the embedding attribute." + min_distance = float('inf') + for _, candidate in memory: + distance = np.linalg.norm(np.array(new_candidate.embedding) - np.array(candidate.embedding)) + if distance < min_distance: + min_distance = distance + return min_distance + +class EpsilonNetPS_plus_Summarizer(PrioritySearch): + """ + A subclass of PrioritySearch, which keeps an epsilon-net as the memory. Reject new candidates that are in the epsilon-net of the memory. + + This class uses a summarizer to summarize the memory and the exploration candidates. It then sets the context for the optimizer to use the summary to guide the exploration. + + Args: + epsilon: The epsilon value for the epsilon-net. 0 means no filtering, the same as vanilla PrioritySearch. + use_summarizer: Whether to use a summarizer to summarize the memory and the exploration candidates. + summarizer_model_name: The model name for the summarizer. + *args: Additional arguments for the parent class. + **kwargs: Additional keyword arguments for the parent class. + """ + def __init__(self, + epsilon: float = 0.1, + use_summarizer: bool = False, + summarizer_model_name: str = "gemini/gemini-2.0-flash", + *args, + **kwargs): + super().__init__(*args, **kwargs) + self.epsilon = epsilon + self.use_summarizer = use_summarizer + self.regressor = RegressorTemplate() + self.summarizer = Summarizer(model_name=summarizer_model_name) + self.context = "Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: " + + def filter_candidates(self, new_candidates: List[ModuleCandidate]) -> List[ModuleCandidate]: + """ Filter candidates by their embeddings. + """ + if self.epsilon == 0: # no filtering + print_color(f"No filtering of candidates.", "green") + return new_candidates + exploration_memory = [(0, candidate) for candidate in self._exploration_candidates] + current_memory = self.memory.memory + exploration_memory + # Add embeddings to all the candidates. The regressor will check if the candidates have embeddings, and if not, it will add them in parallel. + current_candidates = [candidate for _, candidate in current_memory] + self.regressor.add_embeddings_to_candidates(current_candidates+new_candidates) + + # filter new candidates based on the distance to the current memory. + num_new_candidates = len(new_candidates) + + added_candidates = [] + success_distances = [] + + while len(new_candidates) > 0: + # calculate the distance to the memory for each new candidate + distances = [calculate_distance_to_memory(current_memory, new_candidate) for new_candidate in new_candidates] + + # filter candidates: keep only those with distance > epsilon + filtered_candidates = [] + filtered_distances = [] + for i, (candidate, distance) in enumerate(zip(new_candidates, distances)): + if distance > self.epsilon: + filtered_candidates.append(candidate) + filtered_distances.append(distance) + + # if no candidates remain, exit the loop + if len(filtered_candidates) == 0: + break + + # add the candidate with the largest distance to the memory + max_distance_idx = np.argmax(filtered_distances) + new_node = filtered_candidates[max_distance_idx] + current_memory.append((0, new_node)) + added_candidates.append(new_node) + success_distances.append(float(filtered_distances[max_distance_idx])) + + # remove the added candidate from new_candidates list + new_candidates = [c for c in filtered_candidates if c is not new_node] + + print_color(f"Proposed {num_new_candidates} new candidates, {len(added_candidates)} of them are added to the memory.", "green") + # print the distances between the added candidates and the memory before adding them. + print_color(f"Distances between the added candidates and the memory before adding them: {success_distances}", "green") + return added_candidates + + def compress_candidate_memory(self, candidate: ModuleCandidate) -> ModuleCandidate: + """ For the summarizer usage, we keep the entire rollout. """ + return candidate + + def propose(self, + samples : Samples, + verbose : bool = False, + **kwargs): + """ + Override the propose method to include a summary into the context of the optimizer. + """ + + # Use the summarizer to summarize the memory and the exploration candidates. + if self.use_summarizer: + # Summarize the memory and the exploration candidates. + exploration_memory = [(0, candidate) for candidate in self._exploration_candidates] + print_color(f"Summarizing the history...", "green") + try: + summary = self.summarizer.summarize(self.memory.memory+exploration_memory) + print_color(f"Summary: {summary}", "green") + self.context = f"Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: {summary}" + except Exception as e: + print_color(f"Error: {e}", "red") + print_color(f"Using fallback context: {self.context}", "red") + # Set the context for the optimizer. + for candidate in self._exploration_candidates: + candidate.optimizer.set_context(self.context) + return super().propose(samples, verbose, **kwargs) From 24d94369e15a41ad5f3d85ece7552171ae694848 Mon Sep 17 00:00:00 2001 From: xuanfeiren Date: Thu, 11 Dec 2025 15:12:32 -0600 Subject: [PATCH 08/13] do not compress memory if using summarizer, since we need more details to summarize --- opto/features/priority_search/epsNetPS_plus_summarizer.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/opto/features/priority_search/epsNetPS_plus_summarizer.py b/opto/features/priority_search/epsNetPS_plus_summarizer.py index bae3cfcf..e41c948e 100644 --- a/opto/features/priority_search/epsNetPS_plus_summarizer.py +++ b/opto/features/priority_search/epsNetPS_plus_summarizer.py @@ -97,7 +97,10 @@ def filter_candidates(self, new_candidates: List[ModuleCandidate]) -> List[Modul def compress_candidate_memory(self, candidate: ModuleCandidate) -> ModuleCandidate: """ For the summarizer usage, we keep the entire rollout. """ - return candidate + if self.use_summarizer: + return candidate + else: + return super().compress_candidate_memory(candidate) def propose(self, samples : Samples, From 02007021b38555c2465cfca292faa1d6ccf99c9a Mon Sep 17 00:00:00 2001 From: xuanfeiren Date: Thu, 11 Dec 2025 17:00:47 -0600 Subject: [PATCH 09/13] fix a bug --- opto/features/priority_search/priority_search.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opto/features/priority_search/priority_search.py b/opto/features/priority_search/priority_search.py index ceaf518e..04dcbd70 100644 --- a/opto/features/priority_search/priority_search.py +++ b/opto/features/priority_search/priority_search.py @@ -638,7 +638,8 @@ def validate(self, description_prefix='Validating exploration candidates: ')) # sample the exploration agents validate_samples.add_samples(exploration_samples) # append the exploration samples to the validate_samples - candidates_to_be_matched = exploration_candidates + candidates if self.validate_exploration_candidates else candidates + # Only match exploration candidates if they were actually sampled (i.e., validate_exploration_candidates=True and use_prev_batch=False) + candidates_to_be_matched = exploration_candidates + candidates if (self.validate_exploration_candidates and not use_prev_batch) else candidates matched_candidates_and_samples = self.match_candidates_and_samples(candidates_to_be_matched, validate_samples.samples) results = {} # dict of ModuleCandidate id: (ModuleCandidate, list of rollouts) for c, rollouts in matched_candidates_and_samples.items(): # rollouts is a list of BatchRollouts From c7177fdef920e50532f6387248e1f799df843a99 Mon Sep 17 00:00:00 2001 From: xuanfeiren Date: Wed, 17 Dec 2025 12:26:00 -0600 Subject: [PATCH 10/13] fix a bug --- opto/features/priority_search/priority_search.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/opto/features/priority_search/priority_search.py b/opto/features/priority_search/priority_search.py index 04dcbd70..a98b370e 100644 --- a/opto/features/priority_search/priority_search.py +++ b/opto/features/priority_search/priority_search.py @@ -644,6 +644,11 @@ def validate(self, results = {} # dict of ModuleCandidate id: (ModuleCandidate, list of rollouts) for c, rollouts in matched_candidates_and_samples.items(): # rollouts is a list of BatchRollouts results[c] = [ r for rr in rollouts for r in rr.to_list()] # we only need the list of dicts + # Add exploration candidates that weren't included in match_candidates_and_samples + # This ensures they get re-added to memory even if they weren't validated again + for candidate in exploration_candidates: + if candidate not in results: + results[candidate] = [] # Add with empty rollouts list return results From 94fd78d8edd8a609c7132a03411df4a5859aeb75 Mon Sep 17 00:00:00 2001 From: xuanfeiren Date: Fri, 6 Feb 2026 10:43:48 -0600 Subject: [PATCH 11/13] update --- .../epsNetPS_plus_summarizer.py | 3 +- .../priority_search/module_regressor.py | 15 ++-- opto/features/priority_search/summarizer.py | 72 ++++++++++++------- 3 files changed, 59 insertions(+), 31 deletions(-) diff --git a/opto/features/priority_search/epsNetPS_plus_summarizer.py b/opto/features/priority_search/epsNetPS_plus_summarizer.py index e41c948e..3500072b 100644 --- a/opto/features/priority_search/epsNetPS_plus_summarizer.py +++ b/opto/features/priority_search/epsNetPS_plus_summarizer.py @@ -36,14 +36,13 @@ class EpsilonNetPS_plus_Summarizer(PrioritySearch): def __init__(self, epsilon: float = 0.1, use_summarizer: bool = False, - summarizer_model_name: str = "gemini/gemini-2.0-flash", *args, **kwargs): super().__init__(*args, **kwargs) self.epsilon = epsilon self.use_summarizer = use_summarizer self.regressor = RegressorTemplate() - self.summarizer = Summarizer(model_name=summarizer_model_name) + self.summarizer = Summarizer() self.context = "Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: " def filter_candidates(self, new_candidates: List[ModuleCandidate]) -> List[ModuleCandidate]: diff --git a/opto/features/priority_search/module_regressor.py b/opto/features/priority_search/module_regressor.py index 4b9b48d9..ddbe8eac 100644 --- a/opto/features/priority_search/module_regressor.py +++ b/opto/features/priority_search/module_regressor.py @@ -29,7 +29,7 @@ class RegressorTemplate: This class itself is enough for getting embeddings for candidates. """ - def __init__(self, embedding_model="gemini/text-embedding-004", num_threads=None, regularization_strength=1, linear_dim=None, rich_text=True): + def __init__(self, embedding_model="gemini/text-embedding-004", num_threads=None, regularization_strength=1, linear_dim=None, rich_text=True,verbose: bool = False): # In the regressor, no need for calling LLM to make the prediction. So we could predict the entire memory at once. self.max_candidates_to_predict = 500 self.embedding_model = embedding_model @@ -59,6 +59,7 @@ def __init__(self, embedding_model="gemini/text-embedding-004", num_threads=None # Initialize weights with larger values for more aggressive learning self.weights = np.random.normal(0, 0.1, self.linear_dim) self.bias = 0.0 + self.verbose = verbose def _get_parameter_text(self, candidate): """Get the parameter text for a ModuleCandidate.""" @@ -257,7 +258,8 @@ def get_embedding_for_candidate(candidate): def update(self): """Update the regression model parameters using the current memory with logistic regression.""" start_time = time.time() - print_color("Updating regression model using the current memory with logistic regression...", "blue") + if self.verbose: + print_color("Updating regression model using the current memory with logistic regression...", "blue") # Extract candidates from memory (memory contains (neg_score, candidate) tuples) batch = [candidate for _, candidate in self.memory] # Ensure all candidates have embeddings @@ -267,10 +269,12 @@ def update(self): training_candidates = [candidate for neg_score, candidate in self.memory if candidate.num_rollouts > 0 and candidate.mean_score() is not None] if len(training_candidates) == 0: - print_color("Warning: No training data available for regression model.", "yellow") + if self.verbose: + print_color("Warning: No training data available for regression model.", "yellow") end_time = time.time() elapsed_time = end_time - start_time - print_color(f"Regressor update completed in {elapsed_time:.4f} seconds (no training data)", "cyan") + if self.verbose: + print_color(f"Regressor update completed in {elapsed_time:.4f} seconds (no training data)", "cyan") return # Extract raw binary training data from each candidate @@ -310,7 +314,8 @@ def update(self): print_color("Warning: No binary training samples generated.", "yellow") end_time = time.time() elapsed_time = end_time - start_time - print_color(f"Regressor update completed in {elapsed_time:.4f} seconds (no binary samples)", "cyan") + if self.verbose: + print_color(f"Regressor update completed in {elapsed_time:.4f} seconds (no binary samples)", "cyan") return # Convert to numpy arrays diff --git a/opto/features/priority_search/summarizer.py b/opto/features/priority_search/summarizer.py index f18eb1ce..4a47c348 100644 --- a/opto/features/priority_search/summarizer.py +++ b/opto/features/priority_search/summarizer.py @@ -51,24 +51,20 @@ def get_trajectory_of_one_rollout(rollout): ## Result - **Score:** {rollout['score']} -- **Feedback:** {rollout['feedback']} - -## Optimization Note -Analyze what parameter patterns lead to successful vs. failed outputs. -""" +- **Feedback:** {rollout['feedback']}""" return trajectory - - - class Summarizer: """A class which use LLM to summarize the trajectories of the memory. It should be able to learn the patterns of the trajectories. Generate a summary to guide the optimizer to generate better candidates. """ - def __init__(self, model_name: str = "gemini/gemini-2.0-flash"): + def __init__(self,verbose: bool = False): self.llm = LLM() # use the default model - self.max_candidates_in_prompt = 50 + self.max_candidates_in_prompt = 5 + self.current_summary = "Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: " + self.used_candidates = set() # Track candidates that have been summarized + self.verbose = verbose - def _get_trajecories_for_memory(self, memory): + def _get_trajectories_for_memory(self, memory): """ Get trajectories for the memory. Memory is a list of (neg_score, candidate) tuples. We first collect rollouts from the each candidate, and then get the trajectories for each rollout. @@ -76,10 +72,20 @@ def _get_trajecories_for_memory(self, memory): Return one single string of all trajectories. """ trajectories = [] - print_color(f"Getting trajectories from {len(memory)} candidates.", "blue") - # copy a random shuffle of the memory - memory_with_rollouts = [(neg_score, candidate) for neg_score, candidate in memory if len([rollout for rollout in candidate.rollouts if rollout['score'] is not None]) > 0] - temporary_memory = random.sample(memory_with_rollouts, k=min(self.max_candidates_in_prompt, len(memory_with_rollouts))) + if self.verbose: + print_color(f"Getting trajectories from {len(memory)} candidates.", "blue") + # Filter out candidates that have already been used and have rollouts + memory_with_rollouts = [(neg_score, candidate) for neg_score, candidate in memory + if len([rollout for rollout in candidate.rollouts if rollout['score'] is not None]) > 0 + and id(candidate) not in self.used_candidates] + if self.verbose: + print_color(f"Memory (unseen candidates) with rollouts: {len(memory_with_rollouts)}", "blue") + # Sample 5 candidates (or fewer if not enough available) + num_to_sample = min(5, len(memory_with_rollouts)) + temporary_memory = random.sample(memory_with_rollouts, k=num_to_sample) + # Mark sampled candidates as used + for _, candidate in temporary_memory: + self.used_candidates.add(id(candidate)) for _, candidate in temporary_memory: rollouts = [rollout for rollout in candidate.rollouts if rollout['score'] is not None] if len(rollouts) == 0: @@ -98,8 +104,9 @@ def _get_trajecories_for_memory(self, memory): prompt += f"\nFailed trajectory: {get_trajectory_of_one_rollout(random_failed_rollout)}." trajectories.append(prompt) + if self.verbose: + print_color(f"Generated trajectories from {len(trajectories)} candidates.", "green") - print_color(f"Generated trajectories from {len(trajectories)} candidates.", "green") return '\n'.join(trajectories) def summarize(self, memory) -> str: @@ -110,39 +117,56 @@ def summarize(self, memory) -> str: str: The summary. """ - history_trajectories = self._get_trajecories_for_memory(memory) + history_trajectories = self._get_trajectories_for_memory(memory) # print_color(f"History trajectories: {history_trajectories}", "green") if len(history_trajectories) == 0: - return "No successful trajectories found for the memory." + return "No trajectories found for the memory." system_prompt = "You are an expert at analyzing agent behavior patterns and providing actionable guidance for parameter optimization." - user_prompt = f"""Analyze the following agent rollout trajectories and extract insights for optimization. + user_prompt = f"""Analyze the following agent conversation trajectories and extract insights for optimization. - Trajectories: + Current Summary (from previous analysis): + {self.current_summary} + + New Trajectories to Analyze: {history_trajectories} + Instructions: + - Review both the Current Summary and the New Trajectories + - Synthesize ALL insights into a single, cohesive summary + - Integrate new patterns with existing knowledge + - Reorganize and consolidate information as needed for clarity + - DO NOT use incremental language like "[Previous points remain valid, plus:]" + - Generate a complete, standalone summary that incorporates everything + Provide your analysis in XML format: - Analyze the key patterns and strategies that led to success or failure in these trajectories. + Analyze the key patterns and strategies that led to success or failure in these trajectories. Consider both the current summary and new trajectories. - Concrete recommendations for improving output quality based on successful or failed patterns observed in the trajectories. + A complete, consolidated summary with concrete recommendations for generating better Lean 4 code. This should be a standalone summary that integrates insights from both the current summary and new trajectories, without using incremental modification language. """ prompt_messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] + + # print_color(f"User prompt: {user_prompt}", "blue") + + # print_color(f"System prompt: {system_prompt}", "blue") + # print_color(f"User prompt: {user_prompt}", "blue") response = self.llm(messages=prompt_messages) response = response.choices[0].message.content + # print_color(f"Response: {response}", "yellow") # Extract summary using XML regex summary_match = re.search(r'(.*?)', response, re.DOTALL) - return summary_match.group(1).strip() - + self.current_summary = summary_match.group(1).strip() + return self.current_summary \ No newline at end of file From df82e28de1bc8e34fab302b2e51b8690006f8127 Mon Sep 17 00:00:00 2001 From: xuanfeiren Date: Fri, 20 Mar 2026 11:57:29 -0500 Subject: [PATCH 12/13] update polca name, make the summarizer support multi-objective --- .../{epsNetPS_plus_summarizer.py => polca.py} | 2 +- .../priority_search/priority_search.py | 7 +++- opto/features/priority_search/summarizer.py | 34 +++++++++++++++---- 3 files changed, 35 insertions(+), 8 deletions(-) rename opto/features/priority_search/{epsNetPS_plus_summarizer.py => polca.py} (99%) diff --git a/opto/features/priority_search/epsNetPS_plus_summarizer.py b/opto/features/priority_search/polca.py similarity index 99% rename from opto/features/priority_search/epsNetPS_plus_summarizer.py rename to opto/features/priority_search/polca.py index 3500072b..b0ed29bc 100644 --- a/opto/features/priority_search/epsNetPS_plus_summarizer.py +++ b/opto/features/priority_search/polca.py @@ -20,7 +20,7 @@ def calculate_distance_to_memory(memory, new_candidate): min_distance = distance return min_distance -class EpsilonNetPS_plus_Summarizer(PrioritySearch): +class POLCA(PrioritySearch): """ A subclass of PrioritySearch, which keeps an epsilon-net as the memory. Reject new candidates that are in the epsilon-net of the memory. diff --git a/opto/features/priority_search/priority_search.py b/opto/features/priority_search/priority_search.py index 802628f2..c164889f 100644 --- a/opto/features/priority_search/priority_search.py +++ b/opto/features/priority_search/priority_search.py @@ -761,8 +761,13 @@ def validate(self, results[candidate] = [] # Add with empty rollouts list # Populate score_dict in each rollout when multi-objective is active + # Populate per-metric score_dict for all multi-objective modes (including "scalar"). + # This allows mode="scalar" with HeapMemory to leverage weighted scalarization + # in compute_exploration_priority, rather than falling back to the single scalar score. + # Without this, mode="scalar" would skip score_dict collection and behave + # identically to objective_config=None, making the "scalar" mode a no-op. cfg = getattr(self, 'objective_config', None) - if cfg is not None and cfg.mode != "scalar": + if cfg is not None: guide = self.validate_sampler.guide for c, rollout_list in results.items(): for rollout in rollout_list: diff --git a/opto/features/priority_search/summarizer.py b/opto/features/priority_search/summarizer.py index 4a47c348..6e802769 100644 --- a/opto/features/priority_search/summarizer.py +++ b/opto/features/priority_search/summarizer.py @@ -37,7 +37,25 @@ def get_trajectory_of_one_rollout(rollout): parameters = rollout['module'].parameters() parameters_dict = {p.py_name: p.data for p in parameters} - # Construct structured markdown trajectory + # In multi-objective mode, rollouts carry a 'score_dict' with per-metric scores + # (e.g. {"accuracy": 0.9, "latency": 0.2}) populated by validate() in PrioritySearch. + # We render the full breakdown so the summarizer LLM can analyze trade-offs across + # objectives, rather than seeing only the aggregate scalar score. + # When score_dict is absent (single-objective mode), we fall back to scalar-only display. + score_dict = rollout.get('score_dict') + if isinstance(score_dict, dict) and score_dict: + breakdown = "\n".join(f" - {k}: {v}" for k, v in score_dict.items()) + result_section = ( + f"- **Overall Score:** {rollout['score']}\n" + f"- **Score Breakdown:**\n{breakdown}\n" + f"- **Feedback:** {rollout['feedback']}" + ) + else: + result_section = ( + f"- **Score:** {rollout['score']}\n" + f"- **Feedback:** {rollout['feedback']}" + ) + trajectory = f"""## Task Trajectory ## Module Parameters @@ -50,19 +68,23 @@ def get_trajectory_of_one_rollout(rollout): {rollout['target']} ## Result -- **Score:** {rollout['score']} -- **Feedback:** {rollout['feedback']}""" +{result_section}""" return trajectory class Summarizer: """A class which use LLM to summarize the trajectories of the memory. It should be able to learn the patterns of the trajectories. Generate a summary to guide the optimizer to generate better candidates. """ - def __init__(self,verbose: bool = False): + def __init__(self, verbose: bool = False, success_threshold: float = 0): self.llm = LLM() # use the default model self.max_candidates_in_prompt = 5 self.current_summary = "Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: " self.used_candidates = set() # Track candidates that have been summarized self.verbose = verbose + # Configurable threshold for classifying rollouts as successful (score > threshold) + # or failed (score <= threshold). Defaults to 0 for backward compatibility. + # Previously hardcoded as 0, which also caused rollouts with negative scores to be + # missed by both the success and failure lists. + self.success_threshold = success_threshold def _get_trajectories_for_memory(self, memory): """ @@ -94,8 +116,8 @@ def _get_trajectories_for_memory(self, memory): candidate_update_dict = candidate.update_dict.values() # print_color(f"Candidate pamameters: {candidate_update_dict}", "blue")# For debugging prompt = f"Candidate pamameters: {candidate_update_dict}." - successful_rollouts = [rollout for rollout in rollouts if rollout['score'] > 0] - failed_rollouts = [rollout for rollout in rollouts if rollout['score'] == 0] + successful_rollouts = [rollout for rollout in rollouts if rollout['score'] > self.success_threshold] + failed_rollouts = [rollout for rollout in rollouts if rollout['score'] <= self.success_threshold] if len(successful_rollouts) > 0: random_successful_rollout = random.choice(successful_rollouts) prompt += f"\nSuccessful trajectory: {get_trajectory_of_one_rollout(random_successful_rollout)}." From 18c1bc95a3fd6b9ec4b2a11f3f16b3a6c6ebd986 Mon Sep 17 00:00:00 2001 From: xuanfeiren Date: Sat, 21 Mar 2026 13:20:09 -0500 Subject: [PATCH 13/13] update based on comments --- .../priority_search/module_regressor.py | 177 +++++------------- opto/features/priority_search/polca.py | 25 +-- .../priority_search/priority_search.py | 20 +- opto/features/priority_search/summarizer.py | 81 ++++---- 4 files changed, 106 insertions(+), 197 deletions(-) diff --git a/opto/features/priority_search/module_regressor.py b/opto/features/priority_search/module_regressor.py index ddbe8eac..818df096 100644 --- a/opto/features/priority_search/module_regressor.py +++ b/opto/features/priority_search/module_regressor.py @@ -1,24 +1,23 @@ import numpy as np -import copy -from typing import Union -from opto.trainer.loader import DataLoader from opto.trainer.utils import batch_run, async_run from opto.optimizers.utils import print_color -# from opto.trainer.evaluators import evaluate from typing import Union, List, Tuple, Dict, Any, Optional -from collections import deque -from opto.utils.llm import LLM # For the selector LLM -# from opto.trace.nodes import ParameterNode -import json -# import warnings -# from black import format_str, FileMode -import random -# import mathX from opto.utils.auto_retry import retry_with_exponential_backoff import litellm import time from opto.features.priority_search.priority_search import ModuleCandidate + +def embed_text(model, text): + """Call the embedding API for a given model and text string. + + This is a standalone function so users can easily replace it with a custom + embedding provider (e.g. local model, different API) without subclassing. + Must return a litellm-compatible response with response.data[0].embedding. + """ + return litellm.embedding(model=model, input=text) + + class RegressorTemplate: """Base class template for regression-based predictors for ModuleCandidate objects. @@ -29,38 +28,18 @@ class RegressorTemplate: This class itself is enough for getting embeddings for candidates. """ - def __init__(self, embedding_model="gemini/text-embedding-004", num_threads=None, regularization_strength=1, linear_dim=None, rich_text=True,verbose: bool = False): - # In the regressor, no need for calling LLM to make the prediction. So we could predict the entire memory at once. - self.max_candidates_to_predict = 500 - self.embedding_model = embedding_model - self.num_threads = num_threads - self.regularization_strength = regularization_strength # L2 regularization strength (lambda) - self.rich_text = rich_text - - # Default original embedding dimension (from text-embedding-004) - self.original_embedding_dim = 768 - - # if linear_dim is not None: - # # Use random projection from 768D to linear_dim - # self.linear_dim = linear_dim - # print_color(f"Using random projection: {self.original_embedding_dim}D → {linear_dim}D", "blue") - # self.random_projector = GaussianRandomProjection( - # input_dim=self.original_embedding_dim, - # output_dim=linear_dim, - # random_seed=42 - # ) - # else: - # # Use default 768D without projection - # self.linear_dim = self.original_embedding_dim - # self.random_projector = None - self.linear_dim = self.original_embedding_dim - self.random_projector = None - - # Initialize weights with larger values for more aggressive learning - self.weights = np.random.normal(0, 0.1, self.linear_dim) - self.bias = 0.0 - self.verbose = verbose - + def __init__(self, embedding_model="gemini/gemini-embedding-001", num_threads=None, regularization_strength=1, linear_dim=None, rich_text=True,verbose: bool = False, max_candidates_to_predict=500,original_embedding_dim=768): + ''' + Args: + embedding_model: The embedding model to use. + num_threads: The number of threads to use for the embedding generation. + regularization_strength: The regularization strength for the logistic regression. + linear_dim: The dimension of the linear space. + rich_text: Whether to use rich text for the parameter text. + verbose: Whether to print the verbose output. + max_candidates_to_predict: The maximum number of candidates to predict. + original_embedding_dim: The original dimension of the embedding. + ''' def _get_parameter_text(self, candidate): """Get the parameter text for a ModuleCandidate.""" if not hasattr(candidate, 'update_dict'): @@ -68,52 +47,24 @@ def _get_parameter_text(self, candidate): assert hasattr(candidate, 'update_dict'), "ModuleCandidate must have an update_dict" # Convert parameter nodes to readable names for deterministic embedding params_with_names = {k.py_name: v for k, v in candidate.update_dict.items()} - - # if self.rich_text: - # # Create rich text representation with problem definition and rating question - # rich_text_parts = [] - - # # Add problem definition - # rich_text_parts.append(f"Problem Definition: {DOMAIN_CONTEXT_VERIBENCH.strip()}") - # rich_text_parts.append("") # Empty line for separation - - # # Add parameter configuration - # rich_text_parts.append("Parameter Configuration:") - # for param_name, param_value in params_with_names.items(): - # rich_text_parts.append(f"{param_name}: {param_value}") - # rich_text_parts.append("") # Empty line for separation - - # # Add rating question - # rich_text_parts.append("Question: Based on the problem context above and this parameter configuration, how do you rate this parameter?") - - # return "\n".join(rich_text_parts) - # else: return str(params_with_names) - def _get_embedding(self, candidate): + def _get_embedding(self, candidate,max_retries=10,base_delay=1.0): """Get the embedding for a ModuleCandidate.""" parameter_text = self._get_parameter_text(candidate) - def single_embedding_call(): - return litellm.embedding( - model=self.embedding_model, - input=parameter_text - ) - try: response = retry_with_exponential_backoff( - single_embedding_call, - max_retries=10, - base_delay=1.0, + lambda: embed_text(self.embedding_model, parameter_text), + max_retries=max_retries, + base_delay=base_delay, operation_name="Embedding API call" ) embedding = response.data[0].embedding if self.random_projector is not None: - # Convert to numpy array and reshape for transform (expects 2D: n_samples x n_features) embedding_array = np.array(embedding).reshape(1, -1) projected = self.random_projector.transform(embedding_array) - # Convert back to list and flatten embedding = projected.flatten().tolist() return embedding except Exception as e: @@ -124,7 +75,7 @@ def add_embeddings_to_candidates(self, candidates: List[ModuleCandidate]): """Add embeddings to a list of candidates. This function could be used outside.""" self._update_memory_embeddings_for_batch(candidates) - def _update_memory_embeddings_for_batch(self, batch): + def _update_memory_embeddings_for_batch(self, batch,max_workers=50,max_retries=10,base_delay=1.0): """Update the embeddings for a batch of candidates.""" # Separate candidates that need embeddings from those that already have them candidates_needing_embeddings = [] @@ -144,7 +95,7 @@ def get_embedding_for_candidate(candidate): # Run embedding generation in parallel new_embeddings = async_run( embedding_functions, - max_workers=50, + max_workers=max_workers, description=f"Generating embeddings for {len(candidates_needing_embeddings)} candidates" ) @@ -167,9 +118,8 @@ class ModuleCandidateRegressor: predict_scores has no parameters, it could return predicted scores for all candidates in the memory. predict_scores_for_batch has one parameter, a batch of candidates, it could return predicted scores for the batch of candidates.""" - def __init__(self, memory=None, embedding_model="gemini/text-embedding-004", num_threads=None, learning_rate=0.2, regularization_strength=1e-4, max_iterations=20000, tolerance=5e-3): - # In the regressor, no need for calling LLM to make the prediction. So we could predict the entire memory at once. - self.max_candidates_to_predict = 500 + def __init__(self, memory=None, embedding_model="gemini/text-embedding-004", num_threads=None, learning_rate=0.2, regularization_strength=1e-4, max_iterations=20000, tolerance=5e-3, max_candidates_to_predict=500, original_embedding_dim=768,patience=20,lr_decay_factor=0.8): + self.max_candidates_to_predict = max_candidates_to_predict self.memory = memory self.embedding_model = embedding_model self.num_threads = num_threads @@ -178,10 +128,9 @@ def __init__(self, memory=None, embedding_model="gemini/text-embedding-004", num self.regularization_strength = regularization_strength # L2 regularization strength (lambda) self.max_iterations = max_iterations self.tolerance = tolerance - self.patience = 20 # Early stopping patience - self.lr_decay_factor = 0.8 # Learning rate decay factor - # default linear dimension is 768 - self.linear_dim = 768 + self.patience = patience # Early stopping patience + self.lr_decay_factor = lr_decay_factor # Learning rate decay factor + self.linear_dim = original_embedding_dim # Initialize weights with larger values for more aggressive learning self.weights = np.random.normal(0, 0.1, self.linear_dim) self.bias = 0.0 @@ -192,42 +141,33 @@ def _sigmoid(self, z): def _get_parameter_text(self, candidate): """Get the parameter text for a ModuleCandidate.""" - if not candidate.update_dict: - # If update_dict is empty, use a default text or base module info - return "base_module_parameters" - - # Get the first value from update_dict (similar to additional_instructions) - # TODO: support for multiple parameters - parameter_text = list(candidate.update_dict.values())[0] - return str(parameter_text) + if not hasattr(candidate, 'update_dict'): + print(candidate) + assert hasattr(candidate, 'update_dict'), "ModuleCandidate must have an update_dict" + # Convert parameter nodes to readable names for deterministic embedding + params_with_names = {k.py_name: v for k, v in candidate.update_dict.items()} + return str(params_with_names) - def _get_embedding(self, candidate): + def _get_embedding(self, candidate,max_retries=10,base_delay=1.0): """Get the embedding for a ModuleCandidate.""" parameter_text = self._get_parameter_text(candidate) - def single_embedding_call(): - return litellm.embedding( - model=self.embedding_model, - input=parameter_text - ) - try: response = retry_with_exponential_backoff( - single_embedding_call, - max_retries=10, - base_delay=1.0, + lambda: embed_text(self.embedding_model, parameter_text), + max_retries=max_retries, + base_delay=base_delay, operation_name="Embedding API call" ) embedding = response.data[0].embedding return embedding except Exception as e: print_color(f"ERROR: Embedding API call failed after retries: {e}", "red") - # Return a random embedding as fallback to prevent complete failure print_color("Using random embedding as fallback", "yellow") fallback_embedding = np.random.normal(0, 0.01, self.linear_dim) return fallback_embedding / np.linalg.norm(fallback_embedding) - def _update_memory_embeddings_for_batch(self, batch): + def _update_memory_embeddings_for_batch(self, batch,max_workers=1000,max_retries=10,base_delay=1.0): """Update the embeddings for a batch of candidates.""" # Separate candidates that need embeddings from those that already have them candidates_needing_embeddings = [] @@ -247,7 +187,7 @@ def get_embedding_for_candidate(candidate): # Run embedding generation in parallel new_embeddings = async_run( embedding_functions, - max_workers=1000, + max_workers=max_workers, description=f"Generating embeddings for {len(candidates_needing_embeddings)} candidates" ) @@ -329,21 +269,7 @@ def update(self): self.weights = np.random.normal(0, 0.1, self.linear_dim) # Convergence-based regularized logistic regression training using all raw binary data - m = len(X_list) - # print_color(f"Training regularized logistic regression with {m} binary samples from {len(training_candidates)} candidates until convergence.", "blue") - # print_color(f"Using L2 regularization strength: {self.regularization_strength}, learning rate: {self.learning_rate}", "blue") - # print_color(f"Max iterations: {self.max_iterations}, tolerance: {self.tolerance}", "blue") - - # Debug: Print initial weight statistics - initial_weight_norm = np.linalg.norm(self.weights) - # print_color(f"Initial weight norm: {initial_weight_norm:.6f}", "yellow") - - # Debug: Print embedding statistics - embedding_mean = np.mean(X) - embedding_std = np.std(X) - embedding_norm_mean = np.mean([np.linalg.norm(row) for row in X]) - # print_color(f"Embedding stats - mean: {embedding_mean:.6f}, std: {embedding_std:.6f}, avg norm: {embedding_norm_mean:.6f}", "yellow") - + m = len(X_list) # Training loop until convergence with adaptive learning rate and early stopping prev_cost = float('inf') best_cost = float('inf') @@ -399,15 +325,6 @@ def update(self): self.weights -= self.learning_rate * dw self.bias -= self.learning_rate * db - # Print progress periodically - # if iteration == 0 or (iteration + 1) % max(1, min(50, self.max_iterations // 20)) == 0: - # z_mean, z_std = np.mean(z), np.std(z) - # weight_norm = np.linalg.norm(self.weights) - # print_color(f"Iteration {iteration + 1}: Cost: {total_cost:.6f} (change: {cost_change:.8f}), LR: {self.learning_rate:.6f}, Weight norm: {weight_norm:.6f}, Gradient norm: {gradient_norm:.8f}", "cyan") - # print_color(f" Logits - mean: {z_mean:.6f}, std: {z_std:.6f}, range: [{np.min(z):.6f}, {np.max(z):.6f}]", "cyan") - # print_color(f" Predictions - range: [{np.min(predictions):.6f}, {np.max(predictions):.6f}], mean: {np.mean(predictions):.6f}", "cyan") - # print_color(f" Patience: {patience_counter}/{self.patience}", "cyan") - prev_cost = total_cost # Final status diff --git a/opto/features/priority_search/polca.py b/opto/features/priority_search/polca.py index b0ed29bc..2850a2a6 100644 --- a/opto/features/priority_search/polca.py +++ b/opto/features/priority_search/polca.py @@ -8,17 +8,17 @@ def calculate_distance_to_memory(memory, new_candidate): - """For a new candidate, calculate the distance to the current memory. That's the least L2 distance to any candidate in the memory. - - To use this funciton in PrioritySearch, set memory to be self.memory.memory. - """ - assert hasattr(new_candidate, 'embedding') and all(hasattr(candidate, 'embedding') for _, candidate in memory), "All candidates should have the embedding attribute." - min_distance = float('inf') - for _, candidate in memory: - distance = np.linalg.norm(np.array(new_candidate.embedding) - np.array(candidate.embedding)) - if distance < min_distance: - min_distance = distance - return min_distance + """For a new candidate, calculate the distance to the current memory. That's the least L2 distance to any candidate in the memory. + + To use this funciton in PrioritySearch, set memory to be self.memory.memory. + """ + assert hasattr(new_candidate, 'embedding') and all(hasattr(candidate, 'embedding') for _, candidate in memory), "All candidates should have the embedding attribute." + min_distance = float('inf') + for _, candidate in memory: + distance = np.linalg.norm(np.array(new_candidate.embedding) - np.array(candidate.embedding)) + if distance < min_distance: + min_distance = distance + return min_distance class POLCA(PrioritySearch): """ @@ -36,6 +36,7 @@ class POLCA(PrioritySearch): def __init__(self, epsilon: float = 0.1, use_summarizer: bool = False, + context: str = "Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: ", *args, **kwargs): super().__init__(*args, **kwargs) @@ -43,7 +44,7 @@ def __init__(self, self.use_summarizer = use_summarizer self.regressor = RegressorTemplate() self.summarizer = Summarizer() - self.context = "Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: " + self.context = context def filter_candidates(self, new_candidates: List[ModuleCandidate]) -> List[ModuleCandidate]: """ Filter candidates by their embeddings. diff --git a/opto/features/priority_search/priority_search.py b/opto/features/priority_search/priority_search.py index c164889f..d35b114f 100644 --- a/opto/features/priority_search/priority_search.py +++ b/opto/features/priority_search/priority_search.py @@ -728,8 +728,6 @@ def validate(self, exploration_candidates = self._exploration_candidates # exploration candidates from the previous iteration assert self._exploration_candidates is not None, "exploration_candidates must be set before calling validate." - # The current batch of samples can be used to validate the exploration candidates - # validate_samples = copy.copy(samples) # Exploration samples are added before proposing, so we don't need to add them again here. validate_samples = Samples([], {'inputs': [], 'infos': []}) @@ -739,7 +737,7 @@ def validate(self, validate_samples.add_samples(Samples(*self.validate_sampler.sample(candidate_agents, use_prev_batch=use_prev_batch, description_prefix='Validating newly proposed candidates: '))) # list of BatchRollout objects - + candidates_to_be_matched = candidates if self.validate_exploration_candidates: if not use_prev_batch: # validate the exploration candidates that collected the samples as well # validate the agents in the validate_dataset @@ -747,9 +745,9 @@ def validate(self, exploration_samples = Samples(*self.validate_sampler.sample(exploration_agents, description_prefix='Validating exploration candidates: ')) # sample the exploration agents validate_samples.add_samples(exploration_samples) # append the exploration samples to the validate_samples + # Only match exploration candidates if they were actually sampled (i.e., validate_exploration_candidates=True and use_prev_batch=False) + candidates_to_be_matched = exploration_candidates + candidates - # Only match exploration candidates if they were actually sampled (i.e., validate_exploration_candidates=True and use_prev_batch=False) - candidates_to_be_matched = exploration_candidates + candidates if (self.validate_exploration_candidates and not use_prev_batch) else candidates matched_candidates_and_samples = self.match_candidates_and_samples(candidates_to_be_matched, validate_samples.samples) results = {} # dict of ModuleCandidate id: (ModuleCandidate, list of rollouts) for c, rollouts in matched_candidates_and_samples.items(): # rollouts is a list of BatchRollouts @@ -761,11 +759,6 @@ def validate(self, results[candidate] = [] # Add with empty rollouts list # Populate score_dict in each rollout when multi-objective is active - # Populate per-metric score_dict for all multi-objective modes (including "scalar"). - # This allows mode="scalar" with HeapMemory to leverage weighted scalarization - # in compute_exploration_priority, rather than falling back to the single scalar score. - # Without this, mode="scalar" would skip score_dict collection and behave - # identically to objective_config=None, making the "scalar" mode a no-op. cfg = getattr(self, 'objective_config', None) if cfg is not None: guide = self.validate_sampler.guide @@ -973,8 +966,5 @@ def add_exploration_rollouts_to_candidates(self, exploration_candidates: List[Mo """ Add the exploration rollouts to the exploration candidates. """ matched_exploration_candidates_and_samples = self.match_candidates_and_samples(exploration_candidates, samples.samples) - exploration_results = {} # dict of ModuleCandidate id: (ModuleCandidate, list of rollouts) - for c, rollouts in matched_exploration_candidates_and_samples.items(): # rollouts is a list of BatchRollouts - exploration_results[c] = [ r for rr in rollouts for r in rr.to_list()] - for candidate, rollouts in exploration_results.items(): - candidate.add_rollouts(rollouts) + for c, rollouts in matched_exploration_candidates_and_samples.items(): + c.add_rollouts([r for rr in rollouts for r in rr.to_list()]) \ No newline at end of file diff --git a/opto/features/priority_search/summarizer.py b/opto/features/priority_search/summarizer.py index 6e802769..4530ae09 100644 --- a/opto/features/priority_search/summarizer.py +++ b/opto/features/priority_search/summarizer.py @@ -74,12 +74,44 @@ def get_trajectory_of_one_rollout(rollout): class Summarizer: """A class which use LLM to summarize the trajectories of the memory. It should be able to learn the patterns of the trajectories. Generate a summary to guide the optimizer to generate better candidates. """ - def __init__(self, verbose: bool = False, success_threshold: float = 0): + DEFAULT_SYSTEM_PROMPT = "You are an expert at analyzing agent behavior patterns and providing actionable guidance for parameter optimization." + + DEFAULT_USER_PROMPT_TEMPLATE = """Analyze the following agent conversation trajectories and extract insights for optimization. + + Current Summary (from previous analysis): + {current_summary} + + New Trajectories to Analyze: + {history_trajectories} + + Instructions: + - Review both the Current Summary and the New Trajectories + - Synthesize ALL insights into a single, cohesive summary + - Integrate new patterns with existing knowledge + - Reorganize and consolidate information as needed for clarity + - DO NOT use incremental language like "[Previous points remain valid, plus:]" + - Generate a complete, standalone summary that incorporates everything + + Provide your analysis in XML format: + + Analyze the key patterns and strategies that led to success or failure in these trajectories. Consider both the current summary and new trajectories. + + + A complete, consolidated summary with concrete recommendations for generating better results. This should be a standalone summary that integrates insights from both the current summary and new trajectories, without using incremental modification language. + """ + + def __init__(self, verbose: bool = False, success_threshold: float = 0, + max_candidates_in_prompt: int = 5, + current_summary: str = "Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: ", + system_prompt: str = None, + user_prompt_template: str = None): self.llm = LLM() # use the default model - self.max_candidates_in_prompt = 5 - self.current_summary = "Concrete recommendations for generating better agent parameters based on successful patterns observed in the trajectories: " + self.max_candidates_in_prompt = max_candidates_in_prompt + self.current_summary = current_summary self.used_candidates = set() # Track candidates that have been summarized self.verbose = verbose + self.system_prompt = system_prompt or self.DEFAULT_SYSTEM_PROMPT + self.user_prompt_template = user_prompt_template or self.DEFAULT_USER_PROMPT_TEMPLATE # Configurable threshold for classifying rollouts as successful (score > threshold) # or failed (score <= threshold). Defaults to 0 for backward compatibility. # Previously hardcoded as 0, which also caused rollouts with negative scores to be @@ -114,7 +146,6 @@ def _get_trajectories_for_memory(self, memory): continue # For each candidate, add one (if exists) successful_rollout and one (if exists) failed_rollout. candidate_update_dict = candidate.update_dict.values() - # print_color(f"Candidate pamameters: {candidate_update_dict}", "blue")# For debugging prompt = f"Candidate pamameters: {candidate_update_dict}." successful_rollouts = [rollout for rollout in rollouts if rollout['score'] > self.success_threshold] failed_rollouts = [rollout for rollout in rollouts if rollout['score'] <= self.success_threshold] @@ -140,52 +171,22 @@ def summarize(self, memory) -> str: """ history_trajectories = self._get_trajectories_for_memory(memory) - - # print_color(f"History trajectories: {history_trajectories}", "green") - if len(history_trajectories) == 0: return "No trajectories found for the memory." - system_prompt = "You are an expert at analyzing agent behavior patterns and providing actionable guidance for parameter optimization." - - user_prompt = f"""Analyze the following agent conversation trajectories and extract insights for optimization. - - Current Summary (from previous analysis): - {self.current_summary} - - New Trajectories to Analyze: - {history_trajectories} - - Instructions: - - Review both the Current Summary and the New Trajectories - - Synthesize ALL insights into a single, cohesive summary - - Integrate new patterns with existing knowledge - - Reorganize and consolidate information as needed for clarity - - DO NOT use incremental language like "[Previous points remain valid, plus:]" - - Generate a complete, standalone summary that incorporates everything - - Provide your analysis in XML format: - - Analyze the key patterns and strategies that led to success or failure in these trajectories. Consider both the current summary and new trajectories. - - - A complete, consolidated summary with concrete recommendations for generating better Lean 4 code. This should be a standalone summary that integrates insights from both the current summary and new trajectories, without using incremental modification language. - """ + user_prompt = self.user_prompt_template.format( + current_summary=self.current_summary, + history_trajectories=history_trajectories, + ) prompt_messages = [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt} + {"role": "system", "content": self.system_prompt}, + {"role": "user", "content": user_prompt}, ] - # print_color(f"User prompt: {user_prompt}", "blue") - - # print_color(f"System prompt: {system_prompt}", "blue") - # print_color(f"User prompt: {user_prompt}", "blue") response = self.llm(messages=prompt_messages) response = response.choices[0].message.content - # print_color(f"Response: {response}", "yellow") - # Extract summary using XML regex summary_match = re.search(r'(.*?)', response, re.DOTALL)