Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ All notable changes to RecallForge will be documented in this file.
## [Unreleased]

- Added staged background reindex promotion so document, video, audio, and conversation replacements stay hidden until their parent/child memory batches are complete.
- Added index-version-aware query caching for repeated text/media embeddings and generated expansion branches.
- Added deterministic memory graph enrichment with entity/relation side tables and new `memory_graph_entities` / `memory_graph_related` MCP tools.
- Replaced the tiny UAT video clips with compact episodic-memory fixtures, richer transcript sidecars, related artifact metadata, and regression coverage for the video corpus.
- Added `memory_add_conversation` so conversation threads ingest as canonical parent memories with turn-level child memories and standard memory rollups.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ src/recallforge/
│ └── torch_backend.py # PyTorch (CUDA/MPS/CPU)
├── storage/
│ └── lancedb_backend.py # LanceDB + Tantivy FTS
├── cache.py # LRU embedding cache
├── cache.py # LRU query cache with index-version invalidation
├── search.py # Hybrid search pipeline (BM25 + vector + RRF)
├── server.py # MCP server (26 tools, stdio + HTTP/SSE)
├── documents.py # PDF/DOCX/PPTX extraction
Expand Down
5 changes: 5 additions & 0 deletions docs/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class LanceDBBackend(StorageBackend):
def search_vec(self, vector, limit, ...) -> List[SearchResult]: ...
def get_cached(self, key) -> Optional[str]: ...
def set_cached(self, key, value) -> None: ...
def get_index_version(self) -> str: ...
def bump_index_version(self, reason: str = "") -> str: ...
```

#### LanceDB Tables
Expand Down Expand Up @@ -149,6 +151,8 @@ hash | doc | content_type | created_at
key | value | created_at
```

The cache table stores durable metadata such as the global storage `index_version`. Query embeddings and generated query-expansion variants are cached in memory with keys that include backend model identity plus this index version, so repeat queries avoid redundant model calls without crossing a changed index boundary.

### 3. HybridSearcher (`src/recallforge/search.py`)

Takes injected `ModelBackend` and `StorageBackend`. Runs tiered pipeline:
Expand Down Expand Up @@ -298,6 +302,7 @@ This keeps audio searchable without introducing an unbounded local transcription

- **N+1 Lookup Elimination**: Search result construction prefers `text_body` from the embeddings table row (already fetched via LanceDB query) over calling `get_content()` which requires a separate lookup to the content table. Falls back to `get_content()` only when `text_body` is empty.
- **Lazy Content Loading**: Full document body is only fetched when explicitly needed for final output. The reranker input path uses chunk text (`text_body`) for candidate scoring, avoiding unnecessary content lookups during the scoring phase.
- **Versioned Query Cache**: Text, image, and video query embeddings plus generated expansion branches use model-aware, index-version-aware cache keys. Visible storage mutations bump the index version; hidden staged rows do not invalidate caches until promotion.
- **Output Contract**: The MCP `search` tool output remains stable - results include all fields (`filepath`, `score`, `body`, etc.) with the same shape as before. Only internal lookup patterns have changed.

## Stage Roadmap
Expand Down
2 changes: 1 addition & 1 deletion docs/mcp-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ Example MCP client config (Claude Desktop):

**Notes:**
- Reuses the same retrieval pipeline as `search`, so explanations reflect the actual ranking path.
- `expand=true` is still opt-in. It adds extra retrieval branches, so expect a latency/quality tradeoff rather than a free win.
- `expand=true` is still opt-in. Generated expansion branches are cached by model and index version, but extra retrieval branches can still add latency.
- `provenance.rrf.sources` maps each contributing RRF list to that result’s rank in the list.
- `provenance.reranker.scoring_path` shows whether the reranker used text or VL scoring.
- `media_compensation_applied` is `true` for image/video candidates that received RRF compensation because BM25 cannot surface them structurally.
Expand Down
4 changes: 3 additions & 1 deletion docs/research/recallforge-memory-mcp-roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@ Goal:

Current Linear fit:
- `REC-130`
- `REC-115`
- `REC-147`
- `REC-168`

Shipped Linear work:
- `REC-115`

What this phase should look like:
- retrieve first
- rerank only a strict top-K
Expand Down
59 changes: 48 additions & 11 deletions src/recallforge/cache.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,74 @@
"""
cache.py - LRU Embedding Cache for RecallForge.
cache.py - LRU query cache for RecallForge.

Avoids redundant embed_text / embed_image calls for repeated queries.
The cache is deterministic: same input → same vector, so caching is safe.
Avoids redundant query embeddings and generated expansion calls for repeated
queries. Keys can include model identity and storage index version so cached
retrieval inputs never cross model or index boundaries.
"""

import json
from hashlib import sha256
import numpy as np
from typing import Any


class EmbeddingCache:
"""Simple LRU cache backed by a dict + insertion-order list."""

def __init__(self, maxsize: int = 256):
self._maxsize = maxsize
self._cache: dict[str, np.ndarray] = {}
self._cache: dict[str, Any] = {}
self._order: list[str] = []
self._hits = 0
self._misses = 0

def get(self, key: str) -> "np.ndarray | None":
return self._cache.get(key)
def get(self, key: str) -> Any:
if key not in self._cache:
self._misses += 1
return None
self._hits += 1
self._order.remove(key)
self._order.append(key)
return self._cache[key]

def put(self, key: str, vector: np.ndarray) -> None:
def put(self, key: str, value: Any) -> None:
if key in self._cache:
self._order.remove(key)
elif len(self._cache) >= self._maxsize:
evict = self._order.pop(0)
del self._cache[evict]
self._cache[key] = vector
self._cache[key] = value
self._order.append(key)

def make_key(self, input_type: str, input_data: str) -> str:
return sha256(f"{input_type}:{input_data}".encode()).hexdigest()
def make_key(
self,
input_type: str,
input_data: str,
*,
model: str | None = None,
index_version: str | int | None = None,
namespace: str | None = None,
) -> str:
if model is None and index_version is None and namespace is None:
return sha256(f"{input_type}:{input_data}".encode()).hexdigest()

payload = {
"type": input_type,
"data": input_data,
"model": model or "",
"index_version": "" if index_version is None else str(index_version),
"namespace": namespace or "",
}
return sha256(json.dumps(payload, sort_keys=True, separators=(",", ":")).encode()).hexdigest()

@property
def stats(self) -> dict:
return {"size": len(self._cache), "maxsize": self._maxsize}

@property
def metrics(self) -> dict:
return {
"size": len(self._cache),
"maxsize": self._maxsize,
"hits": self._hits,
"misses": self._misses,
}
79 changes: 72 additions & 7 deletions src/recallforge/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,50 @@ def __init__(
self.expand = expand
self.enable_media_query_probe = enable_media_query_probe

def _cache_model_id(self) -> str:
"""Return a stable model/backend identity for cache key separation."""
for attr in ("model_name", "model_id", "model", "_model_name"):
value = getattr(self.backend, attr, None)
if isinstance(value, str) and value:
return value
return type(self.backend).__name__
Comment on lines +517 to +521
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Derive cache model ID from active embedder model

_cache_model_id() only inspects model_name/model_id/model/_model_name, then falls back to the backend class name, but the shipped backends track the active embedding model in fields like EMBEDDER_MODEL (and MLX can change it at runtime via set_model_ids). In that case the cache key does not change when the embedder model changes, so repeated queries can reuse vectors generated by the previous model, producing retrieval in the wrong embedding space after a model switch.

Useful? React with 👍 / 👎.


def _cache_index_version(self) -> str:
"""Return the storage index version used to invalidate query caches."""
get_index_version = getattr(self.storage, "get_index_version", None)
if callable(get_index_version):
try:
version = get_index_version()
if isinstance(version, (str, int, float)):
return str(version)
except Exception as exc:
logger.debug("index_version lookup failed; using static cache version: %s", exc)
return "0"

def _make_query_cache_key(self, input_type: str, input_data: str) -> str:
"""Build a model- and index-version-aware query cache key."""
return self.cache.make_key(
input_type,
input_data,
model=self._cache_model_id(),
index_version=self._cache_index_version(),
)

def _expand_query_cached(self, query: str, expand: bool) -> List[str]:
"""Expand a text query with an index-version-aware cache."""
if not expand or not query or not query.strip():
return [query] if query else []

cache_key = self._make_query_cache_key("expansion", query)
cached = self.cache.get(cache_key)
if isinstance(cached, list):
logger.debug("Expansion cache hit for text query (key=%s…)", cache_key[:8])
return list(cached)

variants = expand_query(query, self.backend, expand=True)
self.cache.put(cache_key, list(variants))
return variants

def _vector_results_to_hybrid(self, results: List[SearchResult]) -> List[HybridResult]:
"""Convert raw vector results into HybridResult objects."""
hybrid_results: List[HybridResult] = []
Expand Down Expand Up @@ -558,7 +602,7 @@ def _bm25_probe(self, query: str) -> List[SearchResult]:
def _vector_search(self, query: str) -> List[SearchResult]:
"""Run vector search."""
t0 = time.perf_counter()
cache_key = self.cache.make_key("text", query)
cache_key = self._make_query_cache_key("text", query)
vector = self.cache.get(cache_key)
if vector is not None:
logger.debug("Embedding cache hit for text query (key=%s…)", cache_key[:8])
Expand Down Expand Up @@ -590,7 +634,7 @@ def _embed_image_cached(self, image_path: str):
except OSError:
file_hash = image_path # fall back to path string if unreadable

cache_key = self.cache.make_key("image", file_hash)
cache_key = self._make_query_cache_key("image", file_hash)
vector = self.cache.get(cache_key)
if vector is not None:
logger.debug("Embedding cache hit for image (key=%s…)", cache_key[:8])
Expand All @@ -600,6 +644,27 @@ def _embed_image_cached(self, image_path: str):
self.cache.put(cache_key, vector)
return vector

def _embed_video_cached(self, video_path: str, embed_video):
"""Embed a video with cache lookup keyed by content hash."""
try:
h = sha256()
with open(video_path, "rb") as fh:
for chunk in iter(lambda: fh.read(1024 * 1024), b""):
h.update(chunk)
file_hash = h.hexdigest()
except OSError:
file_hash = video_path

cache_key = self._make_query_cache_key("video", file_hash)
vector = self.cache.get(cache_key)
if vector is not None:
logger.debug("Embedding cache hit for video (key=%s…)", cache_key[:8])
return vector

vector = embed_video(video_path)
self.cache.put(cache_key, vector)
return vector

def _caption_image_query(self, image_path: str) -> str:
"""Caption an image query for BM25 probing when the backend supports it."""
try:
Expand Down Expand Up @@ -691,7 +756,7 @@ def _add_text_expansion_branches(
if not self.expand or not query_text.strip():
return

query_variants = expand_query(query_text, self.backend, expand=True)
query_variants = self._expand_query_cached(query_text, expand=True)
if len(query_variants) <= 1:
return

Expand Down Expand Up @@ -758,10 +823,10 @@ def search_video(self, video_path: str) -> List[HybridResult]:
raise NotImplementedError(
f"Backend {type(self.backend).__name__} does not support raw video queries. "
"Install a backend with video support (e.g. recallforge[mlx] or recallforge[torch])."
)
)
query_video_path_for_rerank = video_path
try:
vector = embed_video(video_path)
vector = self._embed_video_cached(video_path, embed_video)
all_results["original_vec"] = self.storage.search_vec(
vector.tolist() if hasattr(vector, 'tolist') else list(vector),
limit=self.fts_probe_limit,
Expand Down Expand Up @@ -825,7 +890,7 @@ def _run_parallel_searches(

# Original vector - use cache like _vector_search() does
try:
cache_key = self.cache.make_key("text", query)
cache_key = self._make_query_cache_key("text", query)
vector = self.cache.get(cache_key)
if vector is not None:
logger.debug("Embedding cache hit for text query in parallel search (key=%s…)", cache_key[:8])
Expand Down Expand Up @@ -1487,7 +1552,7 @@ def search(self, query: str) -> List[HybridResult]:
self.limit, self.candidate_limit, self.expand)

# Step 1: Expand query if enabled
query_variants = expand_query(query, self.backend, expand=self.expand)
query_variants = self._expand_query_cached(query, expand=self.expand)
logger.debug("query_expansion enabled=%s variants=%d", self.expand, len(query_variants))

# Step 2: BM25 probe (only on original query to avoid noise)
Expand Down
8 changes: 8 additions & 0 deletions src/recallforge/storage/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ def initialize(self, store_path: str) -> None:
def close(self) -> None:
"""Close connections and release resources."""
pass

def get_index_version(self) -> str:
"""Return a storage-level version token for query-cache invalidation."""
return "0"

def bump_index_version(self, reason: str = "") -> str:
"""Advance and return the storage-level version token after visible writes."""
return self.get_index_version()

# =========================================================================
# Document Operations
Expand Down
2 changes: 2 additions & 0 deletions src/recallforge/storage/indexing_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ def delete_memory(

# Schedule debounced FTS rebuild
self._backend._fts.schedule_fts_rebuild()
self._backend.bump_index_version("delete_memory")

trace_log("delete_memory_done", path=normalized_path, removed_vectors=removed_vectors)
return {
Expand Down Expand Up @@ -706,6 +707,7 @@ def delete_path(
if include_children:
self._delete_video_frame_artifacts(normalized_path)
self._backend._fts.schedule_fts_rebuild()
self._backend.bump_index_version("delete_path")
return {
"success": True,
"path": normalized_path,
Expand Down
Loading