Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 256 additions & 0 deletions autobot-backend/services/autoresearch/knowledge_synthesizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
# AutoBot - AI-Powered Automation Platform
# Copyright (c) 2025 mrveiss
# Author: mrveiss
"""
Knowledge Synthesizer for AutoResearch

Issue #2600: Two-layer ChromaDB intelligence:
1. Enhanced per-experiment indexing (richer documents)
2. Distilled cross-experiment insights (synthesized lessons)

Insights are generated by LLM after each ExperimentSession completes
and stored in a dedicated ChromaDB collection for RAG queries.
"""

from __future__ import annotations

import json
import logging
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

from .config import AutoResearchConfig
from .store import ExperimentStore

logger = logging.getLogger(__name__)

_SYNTHESIS_SYSTEM_PROMPT = (
"You are an ML experiment analyst. Analyze the following experiment results "
"and extract reusable insights about what works and what doesn't.\n\n"
"For each insight, provide:\n"
"- statement: A clear, actionable finding\n"
"- confidence: 0.0-1.0 based on how many experiments support it\n"
"- supporting_experiments: List of experiment IDs that support this finding\n"
"- related_hyperparams: List of hyperparameter names involved\n\n"
"Return a JSON array of insight objects."
)


@dataclass
class ExperimentInsight:
"""A distilled cross-experiment finding."""

id: str = field(default_factory=lambda: str(uuid.uuid4()))
statement: str = ""
confidence: float = 0.0
supporting_experiments: List[str] = field(default_factory=list)
related_hyperparams: List[str] = field(default_factory=list)
synthesized_at: float = field(default_factory=time.time)
session_id: Optional[str] = None

def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"statement": self.statement,
"confidence": self.confidence,
"supporting_experiments": self.supporting_experiments,
"related_hyperparams": self.related_hyperparams,
"synthesized_at": self.synthesized_at,
"session_id": self.session_id,
}


class KnowledgeSynthesizer:
"""Synthesize cross-experiment insights and store in ChromaDB."""

INSIGHTS_COLLECTION = "autoresearch_insights"

def __init__(
self,
store: ExperimentStore,
llm_service: Any,
config: Optional[AutoResearchConfig] = None,
) -> None:
self._store = store
self._llm = llm_service
self._config = config or AutoResearchConfig()
self._insights_collection = None

async def _get_insights_collection(self):
if self._insights_collection is None:
from utils.chromadb_client import get_async_chromadb_client

client = await get_async_chromadb_client()
self._insights_collection = await client.get_or_create_collection(
name=self.INSIGHTS_COLLECTION,
metadata={"description": "Distilled AutoResearch experiment insights"},
)
return self._insights_collection

async def synthesize_session(self, session_id: str) -> List[ExperimentInsight]:
"""Synthesize insights from all experiments in a session.

Args:
session_id: The experiment session to analyze.

Returns:
List of generated ExperimentInsight objects.
"""
# Query experiments tagged with this session — use a large limit
# to avoid silently missing experiments beyond the default page size
all_experiments = await self._store.list_experiments(limit=500)
session_experiments = [
e for e in all_experiments if f"session:{session_id}" in e.tags
]

if not session_experiments:
logger.info("No experiments found for session %s", session_id)
return []

experiment_summary = self._build_experiment_summary(session_experiments)

try:
response = await self._llm.chat(
messages=[
{"role": "system", "content": _SYNTHESIS_SYSTEM_PROMPT},
{"role": "user", "content": experiment_summary},
],
temperature=0.3,
max_tokens=2000,
)
raw_insights = json.loads(response.content)
except json.JSONDecodeError as exc:
logger.warning("KnowledgeSynthesizer: failed to parse LLM response: %s", exc)
return []
except Exception as exc:
logger.exception("KnowledgeSynthesizer: LLM call failed: %s", exc)
return []

insights = []
for raw in raw_insights:
insight = ExperimentInsight(
statement=raw.get("statement", ""),
confidence=max(0.0, min(1.0, float(raw.get("confidence", 0.0)))),
supporting_experiments=raw.get("supporting_experiments", []),
related_hyperparams=raw.get("related_hyperparams", []),
session_id=session_id,
)
insights.append(insight)

await self._index_insights(insights)
return insights

async def query_insights(
self, query: str, limit: int = 5
) -> List[ExperimentInsight]:
"""Semantic search over distilled insights.

Args:
query: Free-text search query.
limit: Maximum results to return.

Returns:
List of matching ExperimentInsight objects.
"""
collection = await self._get_insights_collection()
results = await collection.query(
query_texts=[query],
n_results=limit,
)

insights = []
if results and results.get("ids") and results["ids"][0]:
for i, doc_id in enumerate(results["ids"][0]):
meta = results["metadatas"][0][i] if results.get("metadatas") else {}
document = (
results["documents"][0][i] if results.get("documents") else ""
)
insight = ExperimentInsight(
id=doc_id,
statement=document,
confidence=float(meta.get("confidence", 0.0)),
supporting_experiments=(
meta.get("supporting_experiments", "").split(",")
if meta.get("supporting_experiments")
else []
),
related_hyperparams=(
meta.get("related_hyperparams", "").split(",")
if meta.get("related_hyperparams")
else []
),
session_id=meta.get("session_id"),
)
insights.append(insight)

return insights

async def get_relevant_context(self, topic: str, limit: int = 3) -> str:
"""Build RAG context string for hypothesis generation.

Args:
topic: Research topic to find relevant insights for.
limit: Number of insights to include.

Returns:
Formatted context string, empty if no insights found.
"""
insights = await self.query_insights(topic, limit=limit)
if not insights:
return ""

lines = ["Prior experiment insights:"]
for insight in insights:
lines.append(
f"- {insight.statement} (confidence: {insight.confidence:.0%})"
)
return "\n".join(lines)

def _build_experiment_summary(self, experiments: list) -> str:
"""Build a text summary of experiments for LLM synthesis."""
parts = []
for exp in experiments:
hp_dict = exp.hyperparams.to_dict()
summary = (
f"Experiment {exp.id}:\n"
f" Hypothesis: {exp.hypothesis}\n"
f" State: {exp.state.value}\n"
f" Hyperparams: {json.dumps(hp_dict)}\n"
)
if exp.result and exp.result.val_bpb is not None:
summary += f" val_bpb: {exp.result.val_bpb}\n"
if exp.baseline_val_bpb is not None:
improvement = exp.baseline_val_bpb - exp.result.val_bpb
summary += (
f" Baseline: {exp.baseline_val_bpb}, "
f"Improvement: {improvement:.4f}\n"
)
parts.append(summary)
return "\n".join(parts)

async def _index_insights(self, insights: List[ExperimentInsight]) -> None:
"""Store insights in ChromaDB."""
if not insights:
return

collection = await self._get_insights_collection()
ids = [i.id for i in insights]
documents = [i.statement for i in insights]
metadatas = [
{
"confidence": i.confidence,
"supporting_experiments": ",".join(i.supporting_experiments),
"related_hyperparams": ",".join(i.related_hyperparams),
"session_id": i.session_id or "",
"synthesized_at": i.synthesized_at,
}
for i in insights
]

try:
await collection.upsert(ids=ids, documents=documents, metadatas=metadatas)
logger.info("Indexed %d insights in ChromaDB", len(insights))
except Exception:
logger.exception("Failed to index insights in ChromaDB")
118 changes: 118 additions & 0 deletions autobot-backend/services/autoresearch/knowledge_synthesizer_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# AutoBot - AI-Powered Automation Platform
# Copyright (c) 2025 mrveiss
# Author: mrveiss
"""Tests for knowledge synthesizer — Issue #2600."""

from __future__ import annotations

import json

import pytest
from unittest.mock import AsyncMock, MagicMock, patch

from services.autoresearch.knowledge_synthesizer import (
ExperimentInsight,
KnowledgeSynthesizer,
)
from services.autoresearch.models import (
Experiment,
ExperimentResult,
ExperimentState,
HyperParams,
)


class TestExperimentInsight:
def test_to_dict(self):
insight = ExperimentInsight(
statement="Dropout < 0.1 degrades val_bpb",
confidence=0.85,
supporting_experiments=["exp1", "exp2"],
related_hyperparams=["dropout"],
)
d = insight.to_dict()
assert d["statement"] == "Dropout < 0.1 degrades val_bpb"
assert d["confidence"] == 0.85
assert len(d["supporting_experiments"]) == 2


class TestKnowledgeSynthesizer:
@pytest.fixture
def mock_store(self):
store = AsyncMock()
store.list_experiments.return_value = [
Experiment(
id="e1",
hypothesis="Lower dropout to 0.05",
state=ExperimentState.DISCARDED,
hyperparams=HyperParams(dropout=0.05),
result=ExperimentResult(val_bpb=6.0),
baseline_val_bpb=5.5,
tags=["session:session-1"],
),
Experiment(
id="e2",
hypothesis="Increase warmup to 300",
state=ExperimentState.KEPT,
hyperparams=HyperParams(warmup_steps=300),
result=ExperimentResult(val_bpb=5.2),
baseline_val_bpb=5.5,
tags=["session:session-1"],
),
]
return store

@pytest.fixture
def mock_llm(self):
llm = AsyncMock()
mock_response = MagicMock()
mock_response.content = json.dumps([
{
"statement": "Warmup steps >= 300 improve convergence",
"confidence": 0.8,
"supporting_experiments": ["e2"],
"related_hyperparams": ["warmup_steps"],
}
])
llm.chat.return_value = mock_response
return llm

@pytest.fixture
def mock_chromadb(self):
collection = AsyncMock()
return collection

@pytest.fixture
def synthesizer(self, mock_store, mock_llm, mock_chromadb):
s = KnowledgeSynthesizer(
store=mock_store,
llm_service=mock_llm,
)
s._insights_collection = mock_chromadb
return s

@pytest.mark.asyncio
async def test_synthesize_session(self, synthesizer, mock_llm, mock_chromadb):
insights = await synthesizer.synthesize_session("session-1")

assert len(insights) == 1
assert insights[0].statement == "Warmup steps >= 300 improve convergence"
assert insights[0].confidence == 0.8
mock_llm.chat.assert_called_once()
mock_chromadb.upsert.assert_called_once()

@pytest.mark.asyncio
async def test_query_insights(self, synthesizer, mock_chromadb):
mock_chromadb.query.return_value = {
"ids": [["i1"]],
"documents": [["Warmup steps >= 300 improve convergence"]],
"metadatas": [[{
"confidence": 0.8,
"supporting_experiments": "e2",
"related_hyperparams": "warmup_steps",
"session_id": "s1",
}]],
}
results = await synthesizer.query_insights("warmup", limit=5)
assert len(results) == 1
assert "Warmup" in results[0].statement
Loading
Loading