From 42bc9198786793f9f4a8c26c0ee996752ba6e526 Mon Sep 17 00:00:00 2001 From: MollyAI Date: Sun, 17 May 2026 16:10:18 -0400 Subject: [PATCH] Add index-versioned query cache --- CHANGELOG.md | 1 + README.md | 2 +- docs/ARCHITECTURE.md | 5 ++ docs/mcp-tools.md | 2 +- .../recallforge-memory-mcp-roadmap.md | 4 +- src/recallforge/cache.py | 59 +++++++++++--- src/recallforge/search.py | 79 +++++++++++++++++-- src/recallforge/storage/base.py | 8 ++ src/recallforge/storage/indexing_ops.py | 2 + src/recallforge/storage/lancedb_backend.py | 65 +++++++++++++++ tests/test_embedding_cache.py | 2 +- tests/test_query_expansion.py | 67 ++++++++++++++++ tests/test_storage.py | 32 ++++++++ 13 files changed, 306 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 225ffb4..39a7bd3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ All notable changes to RecallForge will be documented in this file. ## [Unreleased] - Added staged background reindex promotion so document, video, audio, and conversation replacements stay hidden until their parent/child memory batches are complete. +- Added index-version-aware query caching for repeated text/media embeddings and generated expansion branches. - Added deterministic memory graph enrichment with entity/relation side tables and new `memory_graph_entities` / `memory_graph_related` MCP tools. - Replaced the tiny UAT video clips with compact episodic-memory fixtures, richer transcript sidecars, related artifact metadata, and regression coverage for the video corpus. - Added `memory_add_conversation` so conversation threads ingest as canonical parent memories with turn-level child memories and standard memory rollups. diff --git a/README.md b/README.md index e8ae72b..db52195 100644 --- a/README.md +++ b/README.md @@ -274,7 +274,7 @@ src/recallforge/ │ └── torch_backend.py # PyTorch (CUDA/MPS/CPU) ├── storage/ │ └── lancedb_backend.py # LanceDB + Tantivy FTS -├── cache.py # LRU embedding cache +├── cache.py # LRU query cache with index-version invalidation ├── search.py # Hybrid search pipeline (BM25 + vector + RRF) ├── server.py # MCP server (26 tools, stdio + HTTP/SSE) ├── documents.py # PDF/DOCX/PPTX extraction diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 89d36b6..6b209e9 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -94,6 +94,8 @@ class LanceDBBackend(StorageBackend): def search_vec(self, vector, limit, ...) -> List[SearchResult]: ... def get_cached(self, key) -> Optional[str]: ... def set_cached(self, key, value) -> None: ... + def get_index_version(self) -> str: ... + def bump_index_version(self, reason: str = "") -> str: ... ``` #### LanceDB Tables @@ -149,6 +151,8 @@ hash | doc | content_type | created_at key | value | created_at ``` +The cache table stores durable metadata such as the global storage `index_version`. Query embeddings and generated query-expansion variants are cached in memory with keys that include backend model identity plus this index version, so repeat queries avoid redundant model calls without crossing a changed index boundary. + ### 3. HybridSearcher (`src/recallforge/search.py`) Takes injected `ModelBackend` and `StorageBackend`. Runs tiered pipeline: @@ -298,6 +302,7 @@ This keeps audio searchable without introducing an unbounded local transcription - **N+1 Lookup Elimination**: Search result construction prefers `text_body` from the embeddings table row (already fetched via LanceDB query) over calling `get_content()` which requires a separate lookup to the content table. Falls back to `get_content()` only when `text_body` is empty. - **Lazy Content Loading**: Full document body is only fetched when explicitly needed for final output. The reranker input path uses chunk text (`text_body`) for candidate scoring, avoiding unnecessary content lookups during the scoring phase. +- **Versioned Query Cache**: Text, image, and video query embeddings plus generated expansion branches use model-aware, index-version-aware cache keys. Visible storage mutations bump the index version; hidden staged rows do not invalidate caches until promotion. - **Output Contract**: The MCP `search` tool output remains stable - results include all fields (`filepath`, `score`, `body`, etc.) with the same shape as before. Only internal lookup patterns have changed. ## Stage Roadmap diff --git a/docs/mcp-tools.md b/docs/mcp-tools.md index 94c56da..ad915a5 100644 --- a/docs/mcp-tools.md +++ b/docs/mcp-tools.md @@ -215,7 +215,7 @@ Example MCP client config (Claude Desktop): **Notes:** - Reuses the same retrieval pipeline as `search`, so explanations reflect the actual ranking path. -- `expand=true` is still opt-in. It adds extra retrieval branches, so expect a latency/quality tradeoff rather than a free win. +- `expand=true` is still opt-in. Generated expansion branches are cached by model and index version, but extra retrieval branches can still add latency. - `provenance.rrf.sources` maps each contributing RRF list to that result’s rank in the list. - `provenance.reranker.scoring_path` shows whether the reranker used text or VL scoring. - `media_compensation_applied` is `true` for image/video candidates that received RRF compensation because BM25 cannot surface them structurally. diff --git a/docs/research/recallforge-memory-mcp-roadmap.md b/docs/research/recallforge-memory-mcp-roadmap.md index 6f8c8fa..8a5e44f 100644 --- a/docs/research/recallforge-memory-mcp-roadmap.md +++ b/docs/research/recallforge-memory-mcp-roadmap.md @@ -108,10 +108,12 @@ Goal: Current Linear fit: - `REC-130` -- `REC-115` - `REC-147` - `REC-168` +Shipped Linear work: +- `REC-115` + What this phase should look like: - retrieve first - rerank only a strict top-K diff --git a/src/recallforge/cache.py b/src/recallforge/cache.py index f2f392a..c394719 100644 --- a/src/recallforge/cache.py +++ b/src/recallforge/cache.py @@ -1,12 +1,14 @@ """ -cache.py - LRU Embedding Cache for RecallForge. +cache.py - LRU query cache for RecallForge. -Avoids redundant embed_text / embed_image calls for repeated queries. -The cache is deterministic: same input → same vector, so caching is safe. +Avoids redundant query embeddings and generated expansion calls for repeated +queries. Keys can include model identity and storage index version so cached +retrieval inputs never cross model or index boundaries. """ +import json from hashlib import sha256 -import numpy as np +from typing import Any class EmbeddingCache: @@ -14,24 +16,59 @@ class EmbeddingCache: def __init__(self, maxsize: int = 256): self._maxsize = maxsize - self._cache: dict[str, np.ndarray] = {} + self._cache: dict[str, Any] = {} self._order: list[str] = [] + self._hits = 0 + self._misses = 0 - def get(self, key: str) -> "np.ndarray | None": - return self._cache.get(key) + def get(self, key: str) -> Any: + if key not in self._cache: + self._misses += 1 + return None + self._hits += 1 + self._order.remove(key) + self._order.append(key) + return self._cache[key] - def put(self, key: str, vector: np.ndarray) -> None: + def put(self, key: str, value: Any) -> None: if key in self._cache: self._order.remove(key) elif len(self._cache) >= self._maxsize: evict = self._order.pop(0) del self._cache[evict] - self._cache[key] = vector + self._cache[key] = value self._order.append(key) - def make_key(self, input_type: str, input_data: str) -> str: - return sha256(f"{input_type}:{input_data}".encode()).hexdigest() + def make_key( + self, + input_type: str, + input_data: str, + *, + model: str | None = None, + index_version: str | int | None = None, + namespace: str | None = None, + ) -> str: + if model is None and index_version is None and namespace is None: + return sha256(f"{input_type}:{input_data}".encode()).hexdigest() + + payload = { + "type": input_type, + "data": input_data, + "model": model or "", + "index_version": "" if index_version is None else str(index_version), + "namespace": namespace or "", + } + return sha256(json.dumps(payload, sort_keys=True, separators=(",", ":")).encode()).hexdigest() @property def stats(self) -> dict: return {"size": len(self._cache), "maxsize": self._maxsize} + + @property + def metrics(self) -> dict: + return { + "size": len(self._cache), + "maxsize": self._maxsize, + "hits": self._hits, + "misses": self._misses, + } diff --git a/src/recallforge/search.py b/src/recallforge/search.py index c9204b4..37b8697 100644 --- a/src/recallforge/search.py +++ b/src/recallforge/search.py @@ -512,6 +512,50 @@ def __init__( self.expand = expand self.enable_media_query_probe = enable_media_query_probe + def _cache_model_id(self) -> str: + """Return a stable model/backend identity for cache key separation.""" + for attr in ("model_name", "model_id", "model", "_model_name"): + value = getattr(self.backend, attr, None) + if isinstance(value, str) and value: + return value + return type(self.backend).__name__ + + def _cache_index_version(self) -> str: + """Return the storage index version used to invalidate query caches.""" + get_index_version = getattr(self.storage, "get_index_version", None) + if callable(get_index_version): + try: + version = get_index_version() + if isinstance(version, (str, int, float)): + return str(version) + except Exception as exc: + logger.debug("index_version lookup failed; using static cache version: %s", exc) + return "0" + + def _make_query_cache_key(self, input_type: str, input_data: str) -> str: + """Build a model- and index-version-aware query cache key.""" + return self.cache.make_key( + input_type, + input_data, + model=self._cache_model_id(), + index_version=self._cache_index_version(), + ) + + def _expand_query_cached(self, query: str, expand: bool) -> List[str]: + """Expand a text query with an index-version-aware cache.""" + if not expand or not query or not query.strip(): + return [query] if query else [] + + cache_key = self._make_query_cache_key("expansion", query) + cached = self.cache.get(cache_key) + if isinstance(cached, list): + logger.debug("Expansion cache hit for text query (key=%s…)", cache_key[:8]) + return list(cached) + + variants = expand_query(query, self.backend, expand=True) + self.cache.put(cache_key, list(variants)) + return variants + def _vector_results_to_hybrid(self, results: List[SearchResult]) -> List[HybridResult]: """Convert raw vector results into HybridResult objects.""" hybrid_results: List[HybridResult] = [] @@ -558,7 +602,7 @@ def _bm25_probe(self, query: str) -> List[SearchResult]: def _vector_search(self, query: str) -> List[SearchResult]: """Run vector search.""" t0 = time.perf_counter() - cache_key = self.cache.make_key("text", query) + cache_key = self._make_query_cache_key("text", query) vector = self.cache.get(cache_key) if vector is not None: logger.debug("Embedding cache hit for text query (key=%s…)", cache_key[:8]) @@ -590,7 +634,7 @@ def _embed_image_cached(self, image_path: str): except OSError: file_hash = image_path # fall back to path string if unreadable - cache_key = self.cache.make_key("image", file_hash) + cache_key = self._make_query_cache_key("image", file_hash) vector = self.cache.get(cache_key) if vector is not None: logger.debug("Embedding cache hit for image (key=%s…)", cache_key[:8]) @@ -600,6 +644,27 @@ def _embed_image_cached(self, image_path: str): self.cache.put(cache_key, vector) return vector + def _embed_video_cached(self, video_path: str, embed_video): + """Embed a video with cache lookup keyed by content hash.""" + try: + h = sha256() + with open(video_path, "rb") as fh: + for chunk in iter(lambda: fh.read(1024 * 1024), b""): + h.update(chunk) + file_hash = h.hexdigest() + except OSError: + file_hash = video_path + + cache_key = self._make_query_cache_key("video", file_hash) + vector = self.cache.get(cache_key) + if vector is not None: + logger.debug("Embedding cache hit for video (key=%s…)", cache_key[:8]) + return vector + + vector = embed_video(video_path) + self.cache.put(cache_key, vector) + return vector + def _caption_image_query(self, image_path: str) -> str: """Caption an image query for BM25 probing when the backend supports it.""" try: @@ -691,7 +756,7 @@ def _add_text_expansion_branches( if not self.expand or not query_text.strip(): return - query_variants = expand_query(query_text, self.backend, expand=True) + query_variants = self._expand_query_cached(query_text, expand=True) if len(query_variants) <= 1: return @@ -758,10 +823,10 @@ def search_video(self, video_path: str) -> List[HybridResult]: raise NotImplementedError( f"Backend {type(self.backend).__name__} does not support raw video queries. " "Install a backend with video support (e.g. recallforge[mlx] or recallforge[torch])." - ) + ) query_video_path_for_rerank = video_path try: - vector = embed_video(video_path) + vector = self._embed_video_cached(video_path, embed_video) all_results["original_vec"] = self.storage.search_vec( vector.tolist() if hasattr(vector, 'tolist') else list(vector), limit=self.fts_probe_limit, @@ -825,7 +890,7 @@ def _run_parallel_searches( # Original vector - use cache like _vector_search() does try: - cache_key = self.cache.make_key("text", query) + cache_key = self._make_query_cache_key("text", query) vector = self.cache.get(cache_key) if vector is not None: logger.debug("Embedding cache hit for text query in parallel search (key=%s…)", cache_key[:8]) @@ -1487,7 +1552,7 @@ def search(self, query: str) -> List[HybridResult]: self.limit, self.candidate_limit, self.expand) # Step 1: Expand query if enabled - query_variants = expand_query(query, self.backend, expand=self.expand) + query_variants = self._expand_query_cached(query, expand=self.expand) logger.debug("query_expansion enabled=%s variants=%d", self.expand, len(query_variants)) # Step 2: BM25 probe (only on original query to avoid noise) diff --git a/src/recallforge/storage/base.py b/src/recallforge/storage/base.py index 0c5b002..cdfecd0 100644 --- a/src/recallforge/storage/base.py +++ b/src/recallforge/storage/base.py @@ -93,6 +93,14 @@ def initialize(self, store_path: str) -> None: def close(self) -> None: """Close connections and release resources.""" pass + + def get_index_version(self) -> str: + """Return a storage-level version token for query-cache invalidation.""" + return "0" + + def bump_index_version(self, reason: str = "") -> str: + """Advance and return the storage-level version token after visible writes.""" + return self.get_index_version() # ========================================================================= # Document Operations diff --git a/src/recallforge/storage/indexing_ops.py b/src/recallforge/storage/indexing_ops.py index de1db87..6fe6b1e 100644 --- a/src/recallforge/storage/indexing_ops.py +++ b/src/recallforge/storage/indexing_ops.py @@ -642,6 +642,7 @@ def delete_memory( # Schedule debounced FTS rebuild self._backend._fts.schedule_fts_rebuild() + self._backend.bump_index_version("delete_memory") trace_log("delete_memory_done", path=normalized_path, removed_vectors=removed_vectors) return { @@ -706,6 +707,7 @@ def delete_path( if include_children: self._delete_video_frame_artifacts(normalized_path) self._backend._fts.schedule_fts_rebuild() + self._backend.bump_index_version("delete_path") return { "success": True, "path": normalized_path, diff --git a/src/recallforge/storage/lancedb_backend.py b/src/recallforge/storage/lancedb_backend.py index 59c0964..bd4c535 100644 --- a/src/recallforge/storage/lancedb_backend.py +++ b/src/recallforge/storage/lancedb_backend.py @@ -91,6 +91,7 @@ class LanceDBBackend(StorageBackend): FTS_REBUILD_PENDING_THRESHOLD = 10 # Rebuild after this many pending writes BULK_FLUSH_DOCS_THRESHOLD = max(1, int(os.environ.get("RECALLFORGE_BULK_FLUSH_DOCS", "75"))) BULK_FLUSH_EMBEDDINGS_THRESHOLD = max(1, int(os.environ.get("RECALLFORGE_BULK_FLUSH_EMBEDDINGS", "600"))) + INDEX_VERSION_CACHE_KEY = "__recallforge_index_version__" def __init__(self, store_path: Optional[str] = None): """ @@ -108,6 +109,8 @@ def __init__(self, store_path: Optional[str] = None): self._entities_table = None self._relations_table = None self._visibility_lock = threading.RLock() + self._index_version_lock = threading.RLock() + self._index_version: Optional[int] = None # FTS rebuild debouncing state self._fts_rebuild_pending = 0 @@ -133,6 +136,12 @@ def _get_visibility_lock(self): if not hasattr(self, "_visibility_lock") or self._visibility_lock is None: self._visibility_lock = threading.RLock() return self._visibility_lock + + def _get_index_version_lock(self): + """Return the index-version lock, creating it for __new__ tests.""" + if not hasattr(self, "_index_version_lock") or self._index_version_lock is None: + self._index_version_lock = threading.RLock() + return self._index_version_lock def initialize(self, store_path: Optional[str] = None) -> None: """Initialize the LanceDB database.""" @@ -693,6 +702,9 @@ def insert_document( self._documents_table.delete(_safe_filter("id", doc_id)) self._documents_table.add(pa.Table.from_pylist([row], schema=self._build_documents_schema())) + if int(active) == 1 and index_batch_id is None: + self.bump_index_version("insert_document") + return doc_id def find_document(self, collection: str, file_path: str) -> Optional[Document]: @@ -746,6 +758,7 @@ def deactivate_document(self, collection: str, file_path: str) -> None: where=f"{_safe_filter('collection', collection)} AND {_safe_filter('file_path', file_path)} AND active = 1", values={"active": 0, "updated_at": int(time.time() * 1000)} ) + self.bump_index_version("deactivate_document") # ========================================================================= # Content Operations @@ -916,6 +929,8 @@ def insert_embedding( active=int(active), index_batch_id=index_batch_id, ) + if int(active) == 1 and index_batch_id is None: + self.bump_index_version("insert_embedding") def _index_graph_rows_for_embedding( self, @@ -1243,6 +1258,8 @@ def rename_collection( # Schedule FTS rebuild since we modified embeddings self._schedule_fts_rebuild() + if embeddings_updated or documents_updated or entities_updated or relations_updated: + self.bump_index_version("rename_collection") return { "success": True, @@ -1369,6 +1386,8 @@ def delete_collection( # Schedule FTS rebuild self._schedule_fts_rebuild() + if embeddings_deleted or documents_deleted or entities_deleted or relations_deleted: + self.bump_index_version("delete_collection") return { "success": True, @@ -1496,6 +1515,8 @@ def promote_index_batch( activated_embeddings=activated_embeddings, activated_documents=activated_documents, ) + if activated_embeddings or activated_documents or deactivated_embeddings or deactivated_documents: + self.bump_index_version("promote_index_batch") return { "activated_embeddings": activated_embeddings, "activated_documents": activated_documents, @@ -2707,6 +2728,50 @@ def ingest( # ========================================================================= # Cache Operations # ========================================================================= + + def _parse_index_version(self, raw: Optional[str]) -> int: + if not raw: + return 0 + try: + payload = json.loads(raw) + if isinstance(payload, dict): + return max(0, int(payload.get("version", 0))) + except Exception: + pass + try: + return max(0, int(raw)) + except Exception: + return 0 + + def get_index_version(self) -> str: + """Return a durable version token for cache keys.""" + if getattr(self, "_cache_table", None) is None: + return str(getattr(self, "_index_version", 0) or 0) + + with self._get_index_version_lock(): + cached_version = getattr(self, "_index_version", None) + if cached_version is not None: + return str(cached_version) + version = self._parse_index_version(self.get_cached(self.INDEX_VERSION_CACHE_KEY)) + self._index_version = version + return str(version) + + def bump_index_version(self, reason: str = "") -> str: + """Advance the durable index version after visible storage mutations.""" + with self._get_index_version_lock(): + current = self._parse_index_version(self.get_cached(self.INDEX_VERSION_CACHE_KEY)) + next_version = current + 1 + self._index_version = next_version + payload = { + "version": next_version, + "updated_at": int(time.time() * 1000), + "reason": str(reason or "index_update"), + } + try: + self.set_cached(self.INDEX_VERSION_CACHE_KEY, json.dumps(payload, sort_keys=True)) + except Exception as exc: + logger.debug("bump_index_version: failed to persist version: %s", exc) + return str(next_version) def get_cached(self, key: str) -> Optional[str]: """Get a cached value.""" diff --git a/tests/test_embedding_cache.py b/tests/test_embedding_cache.py index fbd3bf0..70c60e8 100644 --- a/tests/test_embedding_cache.py +++ b/tests/test_embedding_cache.py @@ -139,7 +139,7 @@ def test_cache_hit_skips_backend(self): def test_cache_populated_after_miss(self): searcher, backend, storage, cache = _make_searcher() searcher._vector_search("hello") - key = cache.make_key("text", "hello") + key = searcher._make_query_cache_key("text", "hello") assert cache.get(key) is not None np.testing.assert_array_equal(cache.get(key), backend.embed_text.return_value) diff --git a/tests/test_query_expansion.py b/tests/test_query_expansion.py index b6ac347..4a9bc2a 100644 --- a/tests/test_query_expansion.py +++ b/tests/test_query_expansion.py @@ -31,6 +31,43 @@ def generate_text(self, prompt: str, max_tokens: int = 60) -> str: return self.response +class CountingBackend(BackendWithGenerateText): + """Backend that counts embedding and generation calls.""" + + model_name = "counting-test-model" + + def __init__(self, response: str = "alternate wording"): + super().__init__(response) + self.embed_text_calls = 0 + + def embed_text(self, text: str): + self.embed_text_calls += 1 + return [float(len(text) % 7)] * 2048 + + def needs_reranker(self): + return False + + +class VersionedEmptyStorage: + """Storage test double with an explicit index version token.""" + + def __init__(self, version: str = "1"): + self.version = version + self.vec_calls = 0 + self.fts_calls = 0 + + def get_index_version(self) -> str: + return self.version + + def search_fts(self, *_args, **_kwargs): + self.fts_calls += 1 + return [] + + def search_vec(self, *_args, **_kwargs): + self.vec_calls += 1 + return [] + + class TestVisualQueryDetection: """Test visual query detection.""" @@ -242,6 +279,36 @@ def test_searcher_defaults_expand_to_false(self): assert searcher.expand is False assert searcher.enable_media_query_probe is True + def test_text_embedding_cache_uses_index_version(self): + backend = CountingBackend() + storage = VersionedEmptyStorage(version="1") + searcher = HybridSearcher(backend=backend, storage=storage) + + searcher.search("repeatable query") + searcher.search("repeatable query") + + assert backend.embed_text_calls == 1 + + storage.version = "2" + searcher.search("repeatable query") + + assert backend.embed_text_calls == 2 + + def test_generated_expansion_cache_uses_index_version(self): + backend = CountingBackend("cached variant") + storage = VersionedEmptyStorage(version="1") + searcher = HybridSearcher(backend=backend, storage=storage, expand=True) + + searcher.search("how to cache queries") + searcher.search("how to cache queries") + + assert len(backend.calls) == 1 + + storage.version = "2" + searcher.search("how to cache queries") + + assert len(backend.calls) == 2 + class TestQueryExpansionIntegration: """Integration tests for query expansion in search pipeline.""" diff --git a/tests/test_storage.py b/tests/test_storage.py index 53433fb..ae3ca87 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -187,6 +187,38 @@ def test_cache_set_and_get(self): val = self.backend.get_cached("mykey") self.assertEqual(val, '{"foo": "bar"}') + def test_index_version_bumps_only_for_visible_updates(self): + initial = int(self.backend.get_index_version()) + self.backend.upsert_memory( + path="notes/versioned.md", + text="Visible Acme Robotics memory.", + collection="test", + embed_func=mock_embed, + model="mock-embedder", + ) + after_visible = int(self.backend.get_index_version()) + self.assertGreater(after_visible, initial) + + batch_id = self.backend.begin_index_batch() + self.backend.upsert_memory( + path="notes/versioned.md", + text="Hidden Globex Labs replacement.", + collection="test", + embed_func=mock_embed, + model="mock-embedder", + _skip_delete=True, + _active=0, + _index_batch_id=batch_id, + ) + self.assertEqual(int(self.backend.get_index_version()), after_visible) + + self.backend.promote_index_batch( + batch_id=batch_id, + collection="test", + logical_path="notes/versioned.md", + ) + self.assertGreater(int(self.backend.get_index_version()), after_visible) + class TestIndexAndSearch(unittest.TestCase): """Tests for index_document and search_fts / search_vec."""