From ce589e1e658bb59dcf43edddd60f7a7b6fc38e7b Mon Sep 17 00:00:00 2001 From: MollyAI Date: Sun, 17 May 2026 15:54:04 -0400 Subject: [PATCH] Add staged background ingest promotion --- CHANGELOG.md | 1 + README.md | 2 +- docs/ARCHITECTURE.md | 19 +- docs/MEMORY_POLICY.md | 7 + docs/mcp-tools.md | 4 +- .../recallforge-memory-mcp-roadmap.md | 6 +- src/recallforge/storage/base.py | 6 + src/recallforge/storage/indexing_ops.py | 555 ++++++++++-------- src/recallforge/storage/lancedb_backend.py | 343 +++++++++-- src/recallforge/storage/lancedb_shared.py | 11 + src/recallforge/storage/search_ops.py | 12 +- tests/test_storage.py | 105 ++++ 12 files changed, 789 insertions(+), 282 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c0c78eb..225ffb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ All notable changes to RecallForge will be documented in this file. ## [Unreleased] +- Added staged background reindex promotion so document, video, audio, and conversation replacements stay hidden until their parent/child memory batches are complete. - Added deterministic memory graph enrichment with entity/relation side tables and new `memory_graph_entities` / `memory_graph_related` MCP tools. - Replaced the tiny UAT video clips with compact episodic-memory fixtures, richer transcript sidecars, related artifact metadata, and regression coverage for the video corpus. - Added `memory_add_conversation` so conversation threads ingest as canonical parent memories with turn-level child memories and standard memory rollups. diff --git a/README.md b/README.md index 18320a5..e8ae72b 100644 --- a/README.md +++ b/README.md @@ -161,7 +161,7 @@ See [docs/mcp-tools.md](docs/mcp-tools.md) for the full tool reference. ## How it works -RecallForge encodes text, images, video frames, documents, conversation turns, and audio transcripts into the same 2048-dimensional vector space using Qwen3-VL. It also extracts lightweight entity and relation metadata so agents can navigate from one memory to other memories that mention the same people, projects, tickets, URLs, and organizations. This means "find notes about this diagram" works whether the diagram is text, an image, a conversation thread, or a frame from a video. A 3-stage pipeline handles the rest: +RecallForge encodes text, images, video frames, documents, conversation turns, and audio transcripts into the same 2048-dimensional vector space using Qwen3-VL. It also extracts lightweight entity and relation metadata so agents can navigate from one memory to other memories that mention the same people, projects, tickets, URLs, and organizations. Reindexes for documents, video, audio, and conversations are staged as hidden batches first, then promoted together so agents keep seeing the previous complete memory until the replacement is ready. This means "find notes about this diagram" works whether the diagram is text, an image, a conversation thread, or a frame from a video. A 3-stage pipeline handles the rest: ```mermaid graph TD diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 8452c39..89d36b6 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -106,24 +106,27 @@ embedded_at | vector[2048] user/session/project/profile namespace fields memory_id | memory_role | memory_root_path importance | ttl_seconds | tags | expires_at +active | index_batch_id ``` **documents** (registry) ``` id | collection | file_path | title | content_hash | content_type | active -created_at | updated_at +created_at | updated_at | index_batch_id ``` **entities** (memory graph mentions) ``` id | collection | entity_key | name | entity_type | memory_id | memory_root_path file_path | content_hash | hash_seq | seq | evidence | namespace fields | created_at +active | index_batch_id ``` **relations** (lightweight graph edges) ``` id | collection | subject_key | subject_name | object_key | object_name | relation_type memory_id | memory_root_path | file_path | content_hash | hash_seq | evidence | namespace fields +active | index_batch_id ``` Conversation memories use the same parent/child layout as media-derived memories: @@ -226,6 +229,8 @@ Key runtime details: │ ├── data/*.parquet │ └── _indices/text_body_fts/ # Tantivy FTS ├── documents/ + ├── entities/ + ├── relations/ ├── content/ └── cache/ ``` @@ -252,6 +257,18 @@ path + text ensure_fts_index() ──> Tantivy index rebuild ``` +Complex file and conversation reindexes use a staging/promotion variant of this flow: + +``` +new root + children + │ + ├──> write hidden rows (`active=0`, `index_batch_id=batch_*`) + ├──> keep old active rows visible to search/list/get/graph reads + └──> promote batch: activate new rows and deactivate old rows for the memory path +``` + +Failed staged ingests delete only the hidden batch, so readers continue to see the last complete memory. + ## Data Flow: Audio Audio is transcript-first in this release. The storage layer accepts common audio extensions only when a sibling `.srt`, `.vtt`, `.txt`, or `.transcript.json` sidecar exists. diff --git a/docs/MEMORY_POLICY.md b/docs/MEMORY_POLICY.md index 5a45855..7ef6a80 100644 --- a/docs/MEMORY_POLICY.md +++ b/docs/MEMORY_POLICY.md @@ -27,10 +27,17 @@ RecallForge stores complex files as a root memory plus child assets: The root memory uses `memory_role="root"` and child assets use `memory_role="child"` with `memory_root_path` pointing back to the root. +## Ingest Consistency + +Complex reindexes are staged before they become visible. Documents, video, audio, and conversation memories write replacement parent/child rows with `active=0` and a private `index_batch_id`; search, memory listing, memory lookup, and graph navigation only read active rows. Once the replacement parent and all child assets are stored, RecallForge promotes the batch in one visibility step and deactivates the old rows for that memory path. + +This keeps agents from seeing half-updated memories during background ingest. If a staged ingest fails before promotion, RecallForge deletes the hidden batch and the previous active memory remains readable. + ## Ranking RecallForge combines BM25 and vector search through RRF, then reranks in `hybrid` mode. Before fusion, storage search applies memory policy: +- Hidden staging rows are filtered out until promotion. - Expired rows are filtered out with `expires_at`. - `importance` can add up to a 15% boost before score normalization. - Fresh rows receive a small recency boost. diff --git a/docs/mcp-tools.md b/docs/mcp-tools.md index 2f5e4cf..94c56da 100644 --- a/docs/mcp-tools.md +++ b/docs/mcp-tools.md @@ -471,7 +471,7 @@ Example MCP client config (Claude Desktop): - `INVALID_INPUT`: returned by storage/validation for invalid combinations/inputs. - `INTERNAL_ERROR`: uncaught exceptions. -**Notes:** This is the recommended ingest entry point. `set_config` can change default `collection`/`max_file_size_mb` used when omitted. +**Notes:** This is the recommended ingest entry point. `set_config` can change default `collection`/`max_file_size_mb` used when omitted. Reindexing complex memories (documents, video, audio) stages replacement rows hidden from reads, then promotes the parent and child assets together when ingest succeeds. --- @@ -710,7 +710,7 @@ Turn objects accept: - `BACKEND_ERROR`: when the storage backend does not support conversation memories. - `INTERNAL_ERROR`: uncaught exceptions. -**Notes:** The root path becomes the stable `memory_id` identity seed. Child turns are stored at `path::turn:0001`, `path::turn:0002`, and so on with the same `memory_id` and `memory_root_path`. +**Notes:** The root path becomes the stable `memory_id` identity seed. Child turns are stored at `path::turn:0001`, `path::turn:0002`, and so on with the same `memory_id` and `memory_root_path`. Replacing a conversation stages the new root and turn children first, so clients keep seeing the previous complete thread until the replacement is promoted. --- diff --git a/docs/research/recallforge-memory-mcp-roadmap.md b/docs/research/recallforge-memory-mcp-roadmap.md index 4da2f18..6f8c8fa 100644 --- a/docs/research/recallforge-memory-mcp-roadmap.md +++ b/docs/research/recallforge-memory-mcp-roadmap.md @@ -67,9 +67,7 @@ Why it comes before advanced ranking: Goal: - Make RecallForge feel like memory retrieval, not generic search. -Current Linear fit: -- `REC-84` -- `REC-83` +Shipped Linear work: - `REC-75` - `REC-78` @@ -87,7 +85,7 @@ Why it comes here: Goal: - Push more intelligence into ingest-time structure instead of query-time cost. -Current Linear fit: +Shipped Linear work: - `REC-76` Likely follow-ons: diff --git a/src/recallforge/storage/base.py b/src/recallforge/storage/base.py index a2a9bf2..0c5b002 100644 --- a/src/recallforge/storage/base.py +++ b/src/recallforge/storage/base.py @@ -115,6 +115,8 @@ def insert_document( memory_id: Optional[str] = None, memory_role: str = "root", memory_root_path: Optional[str] = None, + active: int = 1, + index_batch_id: Optional[str] = None, ) -> str: """ Insert or update a document. @@ -175,6 +177,8 @@ def insert_embedding( importance: Optional[float] = None, ttl_seconds: Optional[int] = None, tags: Optional[List[str]] = None, + active: int = 1, + index_batch_id: Optional[str] = None, ) -> None: """Insert an embedding for a document chunk.""" pass @@ -273,6 +277,8 @@ def upsert_memory( _skip_delete: bool = False, memory_role: str = "root", memory_root_path: Optional[str] = None, + _active: int = 1, + _index_batch_id: Optional[str] = None, ) -> str: """Create or update a memory entry and its embeddings.""" pass diff --git a/src/recallforge/storage/indexing_ops.py b/src/recallforge/storage/indexing_ops.py index 0628770..de1db87 100644 --- a/src/recallforge/storage/indexing_ops.py +++ b/src/recallforge/storage/indexing_ops.py @@ -311,6 +311,8 @@ def upsert_memory( _skip_delete: bool = False, memory_role: str = "root", memory_root_path: Optional[str] = None, + _active: int = 1, + _index_batch_id: Optional[str] = None, ) -> str: """Create or update a text memory, replacing old vectors for this path. @@ -383,6 +385,8 @@ def upsert_memory( profile=profile, memory_role=memory_role, memory_root_path=memory_root_path, + active=_active, + index_batch_id=_index_batch_id, ) chunks = chunk_document(text) @@ -408,6 +412,8 @@ def upsert_memory( importance=importance, ttl_seconds=ttl_seconds, tags=tags, + active=_active, + index_batch_id=_index_batch_id, ) @@ -471,77 +477,87 @@ def index_conversation( profile=profile, ) - self._delete_path_entries( - collection=collection, - logical_path=normalized_path, - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile, - include_children=True, - ) - root_hash = "" child_hashes: List[str] = [] - with self._backend.bulk_mode(): - root_hash = self.upsert_memory( - path=normalized_path, - text=root_text, + batch_id = self._backend.begin_index_batch() + try: + with self._backend.bulk_mode(): + root_hash = self.upsert_memory( + path=normalized_path, + text=root_text, + collection=collection, + embed_func=embed_func, + model=model, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + importance=importance, + ttl_seconds=ttl_seconds, + tags=root_tags, + _skip_delete=True, + memory_role="root", + memory_root_path=normalized_path, + _active=0, + _index_batch_id=batch_id, + ) + + total_turns = len(normalized_turns) + for index, turn in enumerate(normalized_turns, start=1): + turn_tags: List[str] = [] + for raw_tag in root_tags + [ + "conversation_turn", + f"turn:{index:04d}", + f"role:{turn.role}", + ]: + tag = str(raw_tag or "").strip().lower() + if tag and tag not in turn_tags: + turn_tags.append(tag) + if turn.speaker: + speaker_tag = f"participant:{turn.speaker.lower()}" + if speaker_tag not in turn_tags: + turn_tags.append(speaker_tag) + turn_text = build_conversation_turn_text( + title=resolved_title, + turn=turn, + index=index, + total=total_turns, + ) + child_hashes.append( + self.upsert_memory( + path=conversation_turn_path(normalized_path, index), + text=turn_text, + collection=collection, + embed_func=embed_func, + model=model, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + importance=importance, + ttl_seconds=ttl_seconds, + tags=turn_tags, + _skip_delete=True, + memory_role="child", + memory_root_path=normalized_path, + _active=0, + _index_batch_id=batch_id, + ) + ) + self._backend._fts.schedule_fts_rebuild() + self._backend.promote_index_batch( + batch_id=batch_id, collection=collection, - embed_func=embed_func, - model=model, + logical_path=normalized_path, user_id=user_id, session_id=session_id, project_id=project_id, profile=profile, - importance=importance, - ttl_seconds=ttl_seconds, - tags=root_tags, - _skip_delete=True, - memory_role="root", - memory_root_path=normalized_path, + include_children=True, ) - - total_turns = len(normalized_turns) - for index, turn in enumerate(normalized_turns, start=1): - turn_tags: List[str] = [] - for raw_tag in root_tags + [ - "conversation_turn", - f"turn:{index:04d}", - f"role:{turn.role}", - ]: - tag = str(raw_tag or "").strip().lower() - if tag and tag not in turn_tags: - turn_tags.append(tag) - if turn.speaker: - speaker_tag = f"participant:{turn.speaker.lower()}" - if speaker_tag not in turn_tags: - turn_tags.append(speaker_tag) - turn_text = build_conversation_turn_text( - title=resolved_title, - turn=turn, - index=index, - total=total_turns, - ) - child_hashes.append( - self.upsert_memory( - path=conversation_turn_path(normalized_path, index), - text=turn_text, - collection=collection, - embed_func=embed_func, - model=model, - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile, - importance=importance, - ttl_seconds=ttl_seconds, - tags=turn_tags, - _skip_delete=True, - memory_role="child", - memory_root_path=normalized_path, - ) - ) + except Exception: + self._backend.delete_index_batch(batch_id) + raise trace_log( "index_conversation_done", @@ -1292,6 +1308,9 @@ def index_image( memory_role: str = "root", memory_root_path: Optional[str] = None, inherited_tags: Optional[List[str]] = None, + _skip_delete: bool = False, + _active: int = 1, + _index_batch_id: Optional[str] = None, ) -> str: """ Index an image file. @@ -1331,15 +1350,16 @@ def index_image( # Remove previous image vectors for this logical document path. # Deleting only by hash_seq misses changed-content reindex cases. - self._delete_path_entries( - collection=collection, - logical_path=logical_path, - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile, - content_type="image", - ) + if not _skip_delete: + self._delete_path_entries( + collection=collection, + logical_path=logical_path, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + content_type="image", + ) self._backend.insert_content(content_hash, actual_path, content_type="image") self._backend.insert_document( @@ -1356,6 +1376,8 @@ def index_image( profile=profile, memory_role=memory_role, memory_root_path=memory_root_path, + active=_active, + index_batch_id=_index_batch_id, ) vector = embed_func(actual_path) @@ -1383,6 +1405,8 @@ def index_image( memory_role=memory_role, memory_root_path=memory_root_path, tags=image_tags or None, + active=_active, + index_batch_id=_index_batch_id, ) # Schedule debounced FTS rebuild @@ -1413,21 +1437,12 @@ def index_video( logical_path = stored_path or actual_path resolved_title = os.path.splitext(os.path.basename(logical_path))[0] video_embed = embed_video_func or embed_image_func + batch_id = self._backend.begin_index_batch() artifact_root = Path(self._backend._store_path or DEFAULT_INDEX_DIR) / "video_frames" digest = hashlib.sha1(logical_path.encode("utf-8")).hexdigest()[:16] output_dir = artifact_root / digest - self._delete_path_entries( - collection=collection, - logical_path=logical_path, - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile, - include_children=True, - ) - artifacts = extract_video_artifacts( video_path=actual_path, output_dir=output_dir, @@ -1476,22 +1491,28 @@ def index_video( modified_at = int(time.time() * 1000) created_at = modified_at - self._backend.insert_content(content_hash, actual_path, content_type="video") - self._backend.insert_document( - collection=collection, - file_path=logical_path, - title=resolved_title, - content_hash=content_hash, - content_type="video", - created_at=created_at, - modified_at=modified_at, - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile, - memory_role="root", - memory_root_path=logical_path, - ) + try: + self._backend.insert_content(content_hash, actual_path, content_type="video") + self._backend.insert_document( + collection=collection, + file_path=logical_path, + title=resolved_title, + content_hash=content_hash, + content_type="video", + created_at=created_at, + modified_at=modified_at, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + memory_role="root", + memory_root_path=logical_path, + active=0, + index_batch_id=batch_id, + ) + except Exception: + self._backend.delete_index_batch(batch_id) + raise indexed_video_embeddings = 0 try: @@ -1514,6 +1535,8 @@ def index_video( memory_role="root", memory_root_path=logical_path, tags=video_tags or None, + active=0, + index_batch_id=batch_id, ) indexed_video_embeddings = 1 except Exception as e: @@ -1543,6 +1566,8 @@ def index_video( memory_role="root", memory_root_path=logical_path, tags=video_tags or None, + active=0, + index_batch_id=batch_id, ) indexed_video_embeddings = 1 except Exception as summary_exc: @@ -1555,41 +1580,63 @@ def index_video( indexed_frames = 0 indexed_transcripts = 0 - for frame in artifacts.frames: - self.index_image( - path=frame.image_path, - collection=collection, - embed_func=embed_image_func, - model=model, - stored_path=frame.logical_path, - title=frame.title, - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile, - caption_media=caption_media, - memory_role="child", - memory_root_path=logical_path, - inherited_tags=video_tags or None, - ) - indexed_frames += 1 + try: + for frame in artifacts.frames: + self.index_image( + path=frame.image_path, + collection=collection, + embed_func=embed_image_func, + model=model, + stored_path=frame.logical_path, + title=frame.title, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + caption_media=caption_media, + memory_role="child", + memory_root_path=logical_path, + inherited_tags=video_tags or None, + _skip_delete=True, + _active=0, + _index_batch_id=batch_id, + ) + indexed_frames += 1 + + for segment in artifacts.transcripts: + self.upsert_memory( + path=segment.logical_path, + text=segment.text, + collection=collection, + embed_func=embed_text_func, + model=model, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + _skip_delete=True, + memory_role="child", + memory_root_path=logical_path, + tags=video_tags or None, + _active=0, + _index_batch_id=batch_id, + ) + indexed_transcripts += 1 - for segment in artifacts.transcripts: - self.upsert_memory( - path=segment.logical_path, - text=segment.text, + self._backend._fts.schedule_fts_rebuild() + self._backend.promote_index_batch( + batch_id=batch_id, collection=collection, - embed_func=embed_text_func, - model=model, + logical_path=logical_path, user_id=user_id, session_id=session_id, project_id=project_id, profile=profile, - memory_role="child", - memory_root_path=logical_path, - tags=video_tags or None, + include_children=True, ) - indexed_transcripts += 1 + except Exception: + self._backend.delete_index_batch(batch_id) + raise return { "success": True, @@ -1628,15 +1675,7 @@ def index_audio( "or .transcript.json sidecar next to the audio file." ) - self._delete_path_entries( - collection=collection, - logical_path=logical_path, - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile, - include_children=True, - ) + batch_id = self._backend.begin_index_batch() try: content_hash = hash_file_bytes(actual_path) @@ -1650,22 +1689,28 @@ def index_audio( modified_at = int(time.time() * 1000) created_at = modified_at - self._backend.insert_content(content_hash, actual_path, content_type="audio") - self._backend.insert_document( - collection=collection, - file_path=logical_path, - title=resolved_title, - content_hash=content_hash, - content_type="audio", - created_at=created_at, - modified_at=modified_at, - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile, - memory_role="root", - memory_root_path=logical_path, - ) + try: + self._backend.insert_content(content_hash, actual_path, content_type="audio") + self._backend.insert_document( + collection=collection, + file_path=logical_path, + title=resolved_title, + content_hash=content_hash, + content_type="audio", + created_at=created_at, + modified_at=modified_at, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + memory_role="root", + memory_root_path=logical_path, + active=0, + index_batch_id=batch_id, + ) + except Exception: + self._backend.delete_index_batch(batch_id) + raise transcript_summary = self._build_parent_summary( [segment.text for segment in transcripts], @@ -1691,6 +1736,8 @@ def index_audio( profile=profile, memory_role="root", memory_root_path=logical_path, + active=0, + index_batch_id=batch_id, ) except Exception as exc: logger.warning( @@ -1700,22 +1747,40 @@ def index_audio( ) indexed_transcripts = 0 - for segment in transcripts: - self.upsert_memory( - path=segment.logical_path, - text=segment.text, + try: + for segment in transcripts: + self.upsert_memory( + path=segment.logical_path, + text=segment.text, + collection=collection, + embed_func=embed_text_func, + model=model, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + _skip_delete=True, + memory_role="child", + memory_root_path=logical_path, + _active=0, + _index_batch_id=batch_id, + ) + indexed_transcripts += 1 + + self._backend._fts.schedule_fts_rebuild() + self._backend.promote_index_batch( + batch_id=batch_id, collection=collection, - embed_func=embed_text_func, - model=model, + logical_path=logical_path, user_id=user_id, session_id=session_id, project_id=project_id, profile=profile, - _skip_delete=True, - memory_role="child", - memory_root_path=logical_path, + include_children=True, ) - indexed_transcripts += 1 + except Exception: + self._backend.delete_index_batch(batch_id) + raise return { "success": True, @@ -1743,15 +1808,7 @@ def index_document_file( actual_path = str(Path(path).expanduser().resolve()) logical_path = stored_path or actual_path - self._delete_path_entries( - collection=collection, - logical_path=logical_path, - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile, - include_children=True, - ) + batch_id = self._backend.begin_index_batch() artifacts = extract_document_artifacts(actual_path, logical_path) indexed_sections = 0 @@ -1770,22 +1827,28 @@ def index_document_file( modified_at = int(time.time() * 1000) created_at = modified_at - self._backend.insert_content(document_hash, actual_path, content_type=artifacts.document_type) - self._backend.insert_document( - collection=collection, - file_path=logical_path, - title=document_title, - content_hash=document_hash, - content_type=artifacts.document_type, - created_at=created_at, - modified_at=modified_at, - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile, - memory_role="root", - memory_root_path=logical_path, - ) + try: + self._backend.insert_content(document_hash, actual_path, content_type=artifacts.document_type) + self._backend.insert_document( + collection=collection, + file_path=logical_path, + title=document_title, + content_hash=document_hash, + content_type=artifacts.document_type, + created_at=created_at, + modified_at=modified_at, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + memory_role="root", + memory_root_path=logical_path, + active=0, + index_batch_id=batch_id, + ) + except Exception: + self._backend.delete_index_batch(batch_id) + raise document_summary = self._build_parent_summary( [section.text for section in artifacts.sections], @@ -1811,6 +1874,8 @@ def index_document_file( profile=profile, memory_role="root", memory_root_path=logical_path, + active=0, + index_batch_id=batch_id, ) except Exception as exc: logger.warning( @@ -1836,36 +1901,67 @@ def index_document_file( "Pass embed_image_func for proper vision embedding.", section.image_path, ) - self.index_image( - path=section.image_path, - collection=collection, - embed_func=image_embed, - model=model, - stored_path=section.logical_path, - title=section.title, - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile, - memory_role="child", - memory_root_path=logical_path, - ) - # Override content entry to reference source PDF, not temp image. - # Temp images are cleaned up after this loop; the embedding vector - # persists and is the primary retrieval artifact. - from recallforge.storage.lancedb_shared import hash_content - content_hash = hash_content(f"pdf_page_image:{actual_path}:page:{section.index}") - self._backend.insert_content(content_hash, actual_path, content_type="pdf_page_image") - indexed_images += 1 + try: + self.index_image( + path=section.image_path, + collection=collection, + embed_func=image_embed, + model=model, + stored_path=section.logical_path, + title=section.title, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + memory_role="child", + memory_root_path=logical_path, + _skip_delete=True, + _active=0, + _index_batch_id=batch_id, + ) + # Override content entry to reference source PDF, not temp image. + # Temp images are cleaned up after this loop; the embedding vector + # persists and is the primary retrieval artifact. + from recallforge.storage.lancedb_shared import hash_content + content_hash = hash_content(f"pdf_page_image:{actual_path}:page:{section.index}") + self._backend.insert_content(content_hash, actual_path, content_type="pdf_page_image") + indexed_images += 1 + except Exception: + self._backend.delete_index_batch(batch_id) + raise # Preserve OCR text for scanned/image-only pages as a sibling # text child so BM25 and file-as-query can use it without # dropping the visual page representation. ocr_text = (section.text or "").strip() if ocr_text: + try: + self.upsert_memory( + path=f"{section.logical_path}::ocr", + text=ocr_text, + collection=collection, + embed_func=embed_func, + model=model, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + _skip_delete=True, + memory_role="child", + memory_root_path=logical_path, + _active=0, + _index_batch_id=batch_id, + ) + except Exception: + self._backend.delete_index_batch(batch_id) + raise + indexed_sections += 1 + else: + # Use text embedding for text sections + try: self.upsert_memory( - path=f"{section.logical_path}::ocr", - text=ocr_text, + path=section.logical_path, + text=section.text, collection=collection, embed_func=embed_func, model=model, @@ -1876,24 +1972,12 @@ def index_document_file( _skip_delete=True, memory_role="child", memory_root_path=logical_path, + _active=0, + _index_batch_id=batch_id, ) - indexed_sections += 1 - else: - # Use text embedding for text sections - self.upsert_memory( - path=section.logical_path, - text=section.text, - collection=collection, - embed_func=embed_func, - model=model, - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile, - _skip_delete=True, - memory_role="child", - memory_root_path=logical_path, - ) + except Exception: + self._backend.delete_index_batch(batch_id) + raise indexed_sections += 1 # Clean up temp dirs from PDF page-to-image rendering @@ -1902,10 +1986,21 @@ def index_document_file( if temp_dir and "recallforge_pdf_" in temp_dir: shutil.rmtree(temp_dir, ignore_errors=True) - # Ensure FTS rebuild is scheduled even when no sections were indexed, - # since _delete_path_entries above may have removed stale entries. - if indexed_sections == 0 and indexed_images == 0: - self._backend._fts.schedule_fts_rebuild() + self._backend._fts.schedule_fts_rebuild() + try: + self._backend.promote_index_batch( + batch_id=batch_id, + collection=collection, + logical_path=logical_path, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + include_children=True, + ) + except Exception: + self._backend.delete_index_batch(batch_id) + raise return { "success": True, diff --git a/src/recallforge/storage/lancedb_backend.py b/src/recallforge/storage/lancedb_backend.py index 78e962e..59c0964 100644 --- a/src/recallforge/storage/lancedb_backend.py +++ b/src/recallforge/storage/lancedb_backend.py @@ -14,6 +14,7 @@ import re import shutil import subprocess +import threading import time import uuid from pathlib import Path @@ -45,6 +46,7 @@ _SQL_METACHARACTERS, _safe_filter, _validate_identifier, + active_row_filter, build_memory_id, escape_sql, extract_title, @@ -105,6 +107,7 @@ def __init__(self, store_path: Optional[str] = None): self._cache_table = None self._entities_table = None self._relations_table = None + self._visibility_lock = threading.RLock() # FTS rebuild debouncing state self._fts_rebuild_pending = 0 @@ -124,6 +127,12 @@ def __init__(self, store_path: Optional[str] = None): self._fts = FTSManager(self) self._search = SearchOps(self) self._indexer = IndexingOps(self) + + def _get_visibility_lock(self): + """Return the promotion/read visibility lock, creating it for __new__ tests.""" + if not hasattr(self, "_visibility_lock") or self._visibility_lock is None: + self._visibility_lock = threading.RLock() + return self._visibility_lock def initialize(self, store_path: Optional[str] = None) -> None: """Initialize the LanceDB database.""" @@ -348,6 +357,8 @@ def _build_embeddings_schema(self) -> pa.Schema: pa.field("ttl_seconds", pa.int32(), nullable=True), pa.field("tags", pa.string(), nullable=True), # JSON-encoded list of strings pa.field("expires_at", pa.int64(), nullable=True), # Timestamp in ms when entry expires + pa.field("active", pa.int8(), nullable=True), + pa.field("index_batch_id", pa.string(), nullable=True), ]) def _build_documents_schema(self) -> pa.Schema: @@ -370,6 +381,7 @@ def _build_documents_schema(self) -> pa.Schema: pa.field("memory_id", pa.string(), nullable=True), pa.field("memory_role", pa.string(), nullable=True), pa.field("memory_root_path", pa.string(), nullable=True), + pa.field("index_batch_id", pa.string(), nullable=True), ]) def _build_content_schema(self) -> pa.Schema: @@ -409,6 +421,8 @@ def _build_entities_schema(self) -> pa.Schema: pa.field("project_id", pa.string(), nullable=True), pa.field("profile", pa.string(), nullable=True), pa.field("created_at", pa.int64(), nullable=False), + pa.field("active", pa.int8(), nullable=True), + pa.field("index_batch_id", pa.string(), nullable=True), ]) def _build_relations_schema(self) -> pa.Schema: @@ -433,6 +447,8 @@ def _build_relations_schema(self) -> pa.Schema: pa.field("project_id", pa.string(), nullable=True), pa.field("profile", pa.string(), nullable=True), pa.field("created_at", pa.int64(), nullable=False), + pa.field("active", pa.int8(), nullable=True), + pa.field("index_batch_id", pa.string(), nullable=True), ]) def _has_scalar_index(self, table, column: str) -> bool: @@ -589,6 +605,8 @@ def insert_document( memory_id: Optional[str] = None, memory_role: str = "root", memory_root_path: Optional[str] = None, + active: int = 1, + index_batch_id: Optional[str] = None, ) -> str: """Insert or update a document.""" now = int(time.time() * 1000) @@ -621,7 +639,7 @@ def insert_document( # Check for existing (including staged bulk rows) existing_row = None - if self._bulk_mode: + if self._bulk_mode and index_batch_id is None: for row in self._pending_documents.values(): if ( row["collection"] == collection @@ -634,7 +652,7 @@ def insert_document( existing_row = row break - if existing_row is None: + if existing_row is None and index_batch_id is None: try: existing = list(self._documents_table.search() .where(ns_filter) @@ -664,7 +682,9 @@ def insert_document( "memory_id": normalized_memory_id, "memory_role": normalized_memory_role, "memory_root_path": normalized_memory_root_path, + "index_batch_id": index_batch_id, } + row["active"] = int(active) if self._bulk_mode: self._pending_documents[doc_id] = row @@ -800,6 +820,8 @@ def insert_embedding( importance: Optional[float] = None, ttl_seconds: Optional[int] = None, tags: Optional[List[str]] = None, + active: int = 1, + index_batch_id: Optional[str] = None, ) -> None: """Insert an embedding with optional metadata.""" hash_seq = f"{content_hash}_{seq}" @@ -829,15 +851,15 @@ def insert_embedding( self._ensure_bulk_buffers() - if self._bulk_mode: + if self._bulk_mode and index_batch_id is None: self._pending_embedding_deletes.add(hash_seq) - else: + elif index_batch_id is None: # Delete existing try: self._embeddings_table.delete(_safe_filter("hash_seq", hash_seq)) except Exception as e: logger.debug(f"insert_embedding: no existing embedding to delete for {hash_seq}: {e}") - self.delete_graph_entries(hash_seq=hash_seq) + self.delete_graph_entries(hash_seq=hash_seq) trace_log("insert_embedding", hash_seq=hash_seq, collection=collection, file_path=file_path, seq=seq, user_id=user_id, session_id=session_id, project_id=project_id, profile=profile, @@ -867,6 +889,8 @@ def insert_embedding( "ttl_seconds": ttl_seconds, "tags": tags_json, "expires_at": expires_at, + "active": int(active), + "index_batch_id": index_batch_id, } if self._bulk_mode: @@ -889,6 +913,8 @@ def insert_embedding( memory_id=normalized_memory_id, memory_root_path=normalized_memory_root_path, created_at=now, + active=int(active), + index_batch_id=index_batch_id, ) def _index_graph_rows_for_embedding( @@ -907,6 +933,8 @@ def _index_graph_rows_for_embedding( memory_id: Optional[str], memory_root_path: Optional[str], created_at: int, + active: int, + index_batch_id: Optional[str], ) -> None: """Persist deterministic entity/relation rows for one indexed evidence unit.""" if getattr(self, "_entities_table", None) is None or not isinstance(text_body, str) or not text_body.strip(): @@ -936,6 +964,8 @@ def _index_graph_rows_for_embedding( "project_id": project_id, "profile": profile, "created_at": created_at, + "active": active, + "index_batch_id": index_batch_id, } for entity in entities ] @@ -980,6 +1010,8 @@ def _index_graph_rows_for_embedding( "project_id": project_id, "profile": profile, "created_at": created_at, + "active": active, + "index_batch_id": index_batch_id, } for relation in relations ] @@ -992,8 +1024,15 @@ def _index_graph_rows_for_embedding( def has_vectors(self) -> bool: """Check if index has any vectors.""" try: - count = self._embeddings_table.count_rows() - return count > 0 + with self._get_visibility_lock(): + rows = ( + self._embeddings_table.search() + .where(active_row_filter()) + .select(["hash_seq"]) + .limit(1) + .to_list() + ) + return bool(rows) except Exception as e: logger.warning(f"has_vectors: failed to count rows: {e}") return False @@ -1014,16 +1053,17 @@ def search_fts( profile: Optional[str] = None ) -> List[Any]: """Full-text search using LanceDB Tantivy.""" - return self._search.search_fts( - query=query, - limit=limit, - collection=collection, - content_type=content_type, - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile - ) + with self._get_visibility_lock(): + return self._search.search_fts( + query=query, + limit=limit, + collection=collection, + content_type=content_type, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile + ) def search_vec( self, @@ -1037,16 +1077,17 @@ def search_vec( profile: Optional[str] = None ) -> List[Any]: """Vector similarity search.""" - return self._search.search_vec( - vector=vector, - limit=limit, - collection=collection, - content_type=content_type, - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile - ) + with self._get_visibility_lock(): + return self._search.search_vec( + vector=vector, + limit=limit, + collection=collection, + content_type=content_type, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile + ) def list_collections( self, @@ -1056,19 +1097,21 @@ def list_collections( profile: Optional[str] = None, ) -> List[str]: """Return sorted list of unique collection names, with optional namespace filters.""" - return self._search.list_collections( - user_id=user_id, - session_id=session_id, - project_id=project_id, - profile=profile - ) + with self._get_visibility_lock(): + return self._search.list_collections( + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile + ) def list_namespaces( self, collection: Optional[str] = None, ) -> List[Dict[str, str]]: """Return unique namespace combinations (user_id, session_id, project_id, profile).""" - return self._search.list_namespaces(collection=collection) + with self._get_visibility_lock(): + return self._search.list_namespaces(collection=collection) def rename_collection( self, @@ -1382,6 +1425,104 @@ def _memory_path_clause(self, path: str) -> str: f"OR file_path LIKE '{escaped_path}::%')" ) + def begin_index_batch(self) -> str: + """Create a staging batch ID for hidden background ingest rows.""" + return f"batch_{uuid.uuid4().hex}" + + def _batch_filter(self, batch_id: str) -> str: + return _safe_filter("index_batch_id", batch_id) + + def _not_batch_filter(self, batch_id: str) -> str: + escaped = escape_sql(batch_id) + return f"(index_batch_id IS NULL OR index_batch_id != '{escaped}')" + + def _apply_visibility_update(self, table, where: str, active: int, label: str) -> int: + if table is None: + return 0 + try: + rows = table.search().where(where).select(["id"] if label != "embeddings" else ["hash_seq"]).limit(10_000_000).to_list() + except Exception: + rows = [] + try: + table.update(where=where, values={"active": int(active)}) + except Exception as exc: + logger.warning("promote_index_batch: failed to update %s visibility: %s", label, exc) + raise + return len(rows) + + def promote_index_batch( + self, + *, + batch_id: str, + collection: str, + logical_path: str, + user_id: Optional[str] = None, + session_id: Optional[str] = None, + project_id: Optional[str] = None, + profile: Optional[str] = None, + include_children: bool = False, + ) -> Dict[str, int]: + """Promote a complete hidden ingest batch into the visible memory graph.""" + self._flush_pending_writes(force=True) + path_clause = self._memory_path_clause(logical_path) if include_children else f"file_path = '{escape_sql(logical_path)}'" + filters = self._graph_namespace_filters( + collection=collection, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + ) + filters.append(path_clause) + base_filter = " AND ".join(filters) + staged_filter = f"{base_filter} AND {self._batch_filter(batch_id)}" + old_filter = f"{base_filter} AND {active_row_filter()} AND {self._not_batch_filter(batch_id)}" + + with self._get_visibility_lock(): + activated_embeddings = self._apply_visibility_update(self._embeddings_table, staged_filter, 1, "embeddings") + activated_documents = self._apply_visibility_update(self._documents_table, staged_filter, 1, "documents") + activated_entities = self._apply_visibility_update(self._entities_table, staged_filter, 1, "entities") + activated_relations = self._apply_visibility_update(self._relations_table, staged_filter, 1, "relations") + + deactivated_embeddings = self._apply_visibility_update(self._embeddings_table, old_filter, 0, "embeddings") + deactivated_documents = self._apply_visibility_update(self._documents_table, old_filter, 0, "documents") + deactivated_entities = self._apply_visibility_update(self._entities_table, old_filter, 0, "entities") + deactivated_relations = self._apply_visibility_update(self._relations_table, old_filter, 0, "relations") + + trace_log( + "promote_index_batch", + batch_id=batch_id, + collection=collection, + logical_path=logical_path, + activated_embeddings=activated_embeddings, + activated_documents=activated_documents, + ) + return { + "activated_embeddings": activated_embeddings, + "activated_documents": activated_documents, + "activated_entities": activated_entities, + "activated_relations": activated_relations, + "deactivated_embeddings": deactivated_embeddings, + "deactivated_documents": deactivated_documents, + "deactivated_entities": deactivated_entities, + "deactivated_relations": deactivated_relations, + } + + def delete_index_batch(self, batch_id: str) -> None: + """Remove hidden staging rows after a failed background ingest.""" + batch_filter = self._batch_filter(batch_id) + for table, label in ( + (self._embeddings_table, "embeddings"), + (self._documents_table, "documents"), + (self._entities_table, "entities"), + (self._relations_table, "relations"), + ): + if table is None: + continue + try: + table.delete(batch_filter) + except Exception as exc: + logger.debug("delete_index_batch: failed to cleanup %s rows: %s", label, exc) + def _graph_namespace_filters( self, *, @@ -1466,6 +1607,33 @@ def list_memory_entities( project_id: Optional[str] = None, profile: Optional[str] = None, limit: int = 100, + ) -> List[Dict[str, Any]]: + """List entity mentions with evidence for a memory, path, or entity key.""" + with self._get_visibility_lock(): + return self._list_memory_entities_unlocked( + memory_id=memory_id, + path=path, + entity=entity, + collection=collection, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + limit=limit, + ) + + def _list_memory_entities_unlocked( + self, + *, + memory_id: Optional[str] = None, + path: Optional[str] = None, + entity: Optional[str] = None, + collection: Optional[str] = None, + user_id: Optional[str] = None, + session_id: Optional[str] = None, + project_id: Optional[str] = None, + profile: Optional[str] = None, + limit: int = 100, ) -> List[Dict[str, Any]]: """List entity mentions with evidence for a memory, path, or entity key.""" if getattr(self, "_entities_table", None) is None: @@ -1478,6 +1646,7 @@ def list_memory_entities( project_id=project_id, profile=profile, ) + filters.append(active_row_filter()) if memory_id: filters.append(_safe_filter("memory_id", memory_id)) if path: @@ -1526,6 +1695,33 @@ def find_related_memories( project_id: Optional[str] = None, profile: Optional[str] = None, limit: int = 20, + ) -> List[Dict[str, Any]]: + """Find memories that share graph entities with a seed memory/path/entity.""" + with self._get_visibility_lock(): + return self._find_related_memories_unlocked( + memory_id=memory_id, + path=path, + entity=entity, + collection=collection, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + limit=limit, + ) + + def _find_related_memories_unlocked( + self, + *, + memory_id: Optional[str] = None, + path: Optional[str] = None, + entity: Optional[str] = None, + collection: Optional[str] = None, + user_id: Optional[str] = None, + session_id: Optional[str] = None, + project_id: Optional[str] = None, + profile: Optional[str] = None, + limit: int = 20, ) -> List[Dict[str, Any]]: """Find memories that share graph entities with a seed memory/path/entity.""" if getattr(self, "_entities_table", None) is None: @@ -1569,6 +1765,7 @@ def find_related_memories( project_id=project_id, profile=profile, ) + filters.append(active_row_filter()) filters.append("(" + " OR ".join(_safe_filter("entity_key", key) for key in sorted(seed_keys)) + ")") try: rows = list( @@ -1681,6 +1878,7 @@ def _fetch_memory_summary_rows( profile=profile, active_only=False, ) + embed_filters.append(active_row_filter()) embed_filters.append( "(" + " OR ".join(self._memory_path_clause(path) for path in unique_root_paths) + ")" ) @@ -1890,6 +2088,27 @@ def list_memories( project_id: Optional[str] = None, profile: Optional[str] = None, limit: int = 200, + ) -> List[Dict[str, Any]]: + """List canonical root memories from the documents table.""" + with self._get_visibility_lock(): + return self._list_memories_unlocked( + collection=collection, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + limit=limit, + ) + + def _list_memories_unlocked( + self, + *, + collection: Optional[str] = None, + user_id: Optional[str] = None, + session_id: Optional[str] = None, + project_id: Optional[str] = None, + profile: Optional[str] = None, + limit: int = 200, ) -> List[Dict[str, Any]]: """List canonical root memories from the documents table.""" filters = self._memory_namespace_filters( @@ -1963,6 +2182,29 @@ def get_memory( project_id: Optional[str] = None, profile: Optional[str] = None, path: Optional[str] = None, + ) -> Optional[Dict[str, Any]]: + """Return a canonical memory plus child assets and snippets.""" + with self._get_visibility_lock(): + return self._get_memory_unlocked( + memory_id=memory_id, + collection=collection, + user_id=user_id, + session_id=session_id, + project_id=project_id, + profile=profile, + path=path, + ) + + def _get_memory_unlocked( + self, + memory_id: Optional[str] = None, + *, + collection: Optional[str] = None, + user_id: Optional[str] = None, + session_id: Optional[str] = None, + project_id: Optional[str] = None, + profile: Optional[str] = None, + path: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """Return a canonical memory plus child assets and snippets.""" if not memory_id and not path: @@ -2025,6 +2267,7 @@ def get_memory( profile=profile, active_only=False, ) + embed_filters.append(active_row_filter()) embed_filters.append(self._memory_path_clause(root_path)) try: @@ -2138,6 +2381,8 @@ def upsert_memory( _skip_delete: bool = False, memory_role: str = "root", memory_root_path: Optional[str] = None, + _active: int = 1, + _index_batch_id: Optional[str] = None, ) -> str: """Create or update a text memory, replacing old vectors for this path.""" return self._indexer.upsert_memory( @@ -2156,6 +2401,8 @@ def upsert_memory( _skip_delete=_skip_delete, memory_role=memory_role, memory_root_path=memory_root_path, + _active=_active, + _index_batch_id=_index_batch_id, ) def delete_memory( @@ -2353,6 +2600,9 @@ def index_image( caption_media: bool = True, memory_role: str = "root", memory_root_path: Optional[str] = None, + _skip_delete: bool = False, + _active: int = 1, + _index_batch_id: Optional[str] = None, ) -> str: """Index an image file.""" return self._indexer.index_image( @@ -2369,6 +2619,9 @@ def index_image( caption_media=caption_media, memory_role=memory_role, memory_root_path=memory_root_path, + _skip_delete=_skip_delete, + _active=_active, + _index_batch_id=_index_batch_id, ) def index_video( @@ -2487,15 +2740,29 @@ def set_cached(self, key: str, value: str) -> None: # ========================================================================= def count_embeddings(self) -> int: - """Count total embeddings.""" + """Count visible embeddings.""" try: - return self._embeddings_table.count_rows() + with self._get_visibility_lock(): + return len( + self._embeddings_table.search() + .where(active_row_filter()) + .select(["hash_seq"]) + .limit(10_000_000) + .to_list() + ) except Exception: return 0 def count_documents(self) -> int: - """Count total documents.""" + """Count visible documents.""" try: - return self._documents_table.count_rows() + with self._get_visibility_lock(): + return len( + self._documents_table.search() + .where("active = 1") + .select(["id"]) + .limit(10_000_000) + .to_list() + ) except Exception: return 0 diff --git a/src/recallforge/storage/lancedb_shared.py b/src/recallforge/storage/lancedb_shared.py index a36e20a..70704f4 100644 --- a/src/recallforge/storage/lancedb_shared.py +++ b/src/recallforge/storage/lancedb_shared.py @@ -51,6 +51,17 @@ def _safe_filter(field: str, value: str) -> str: return f"{field} = '{escaped}'" +def active_row_filter(field: str = "active") -> str: + """Filter rows that are visible to readers. + + ``active IS NULL`` keeps stores created before the active column migration + visible until they are rewritten. + """ + if not re.match(r"^[\w_]+$", field): + raise ValueError(f"Invalid field name: {field}") + return f"({field} IS NULL OR {field} = 1)" + + def escape_sql(s: str) -> str: return _validate_identifier(s, "value").replace("'", "''") diff --git a/src/recallforge/storage/search_ops.py b/src/recallforge/storage/search_ops.py index 7e40bf1..b7ab6b1 100644 --- a/src/recallforge/storage/search_ops.py +++ b/src/recallforge/storage/search_ops.py @@ -8,7 +8,7 @@ from typing import Any, Dict, List, Optional, TYPE_CHECKING from .base import SearchResult -from .lancedb_shared import _safe_filter, get_docid, logger, resolve_memory_identity, trace_log +from .lancedb_shared import active_row_filter, _safe_filter, get_docid, logger, resolve_memory_identity, trace_log if TYPE_CHECKING: from .lancedb_backend import LanceDBBackend @@ -33,7 +33,7 @@ def _bm25_fallback( ) -> List[SearchResult]: """In-memory BM25 fallback when FTS index fails.""" try: - filter_parts = [self._get_ttl_filter()] + filter_parts = [active_row_filter(), self._get_ttl_filter()] if collection: filter_parts.append(_safe_filter("collection", collection)) if content_type: @@ -136,7 +136,7 @@ def search_fts( self._backend._fts.ensure_fts_index() # Build filter including TTL and namespace fields - filter_parts = [self._get_ttl_filter()] + filter_parts = [active_row_filter(), self._get_ttl_filter()] if collection: filter_parts.append(_safe_filter("collection", collection)) @@ -212,7 +212,7 @@ def search_vec( user_id=user_id, session_id=session_id, project_id=project_id, profile=profile) # Build filter including TTL and namespace fields - filter_parts = [self._get_ttl_filter()] + filter_parts = [active_row_filter(), self._get_ttl_filter()] if collection: filter_parts.append(_safe_filter("collection", collection)) @@ -397,7 +397,7 @@ def list_collections( return [] try: - filter_parts: List[str] = [] + filter_parts: List[str] = [active_row_filter()] if user_id is not None: filter_parts.append(_safe_filter("user_id", user_id)) if session_id is not None: @@ -431,7 +431,7 @@ def list_namespaces( return [] try: - filter_parts: List[str] = [] + filter_parts: List[str] = [active_row_filter()] if collection is not None: filter_parts.append(_safe_filter("collection", collection)) diff --git a/tests/test_storage.py b/tests/test_storage.py index 7c28e6f..53433fb 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -453,6 +453,68 @@ def test_memory_graph_rows_are_replaced_on_memory_update(self): self.backend.list_memory_entities(path="notes/company.md", entity="Globex Labs", collection="test") ) + def test_index_batch_rows_are_hidden_until_promoted(self): + path = "notes/batch-promotion.md" + self.backend.upsert_memory( + path=path, + text="Mira from Acme Robotics owns the launch checklist.", + collection="test", + embed_func=mock_embed, + model="mock-embedder", + ) + visible_embeddings_before = self.backend.count_embeddings() + self.assertEqual(self.backend.count_documents(), 1) + + batch_id = self.backend.begin_index_batch() + self.backend.upsert_memory( + path=path, + text="Mira from Globex Labs owns the launch checklist.", + collection="test", + embed_func=mock_embed, + model="mock-embedder", + _skip_delete=True, + _active=0, + _index_batch_id=batch_id, + ) + + self.assertEqual(self.backend.count_documents(), 1) + self.assertEqual(self.backend.count_embeddings(), visible_embeddings_before) + memory_before = self.backend.get_memory(path=path, collection="test") + self.assertIsNotNone(memory_before) + self.assertIn("Acme Robotics", memory_before["summary"]) + self.assertNotIn("Globex Labs", memory_before["summary"]) + self.assertTrue( + self.backend.list_memory_entities(path=path, entity="Acme Robotics", collection="test") + ) + self.assertFalse( + self.backend.list_memory_entities(path=path, entity="Globex Labs", collection="test") + ) + + hidden_rows = self.backend._embeddings_table.search().where( + f"collection = 'test' AND file_path = '{path}' AND active = 0" + ).to_list() + self.assertGreater(len(hidden_rows), 0) + + promoted = self.backend.promote_index_batch( + batch_id=batch_id, + collection="test", + logical_path=path, + ) + self.assertGreater(promoted["activated_embeddings"], 0) + self.assertGreater(promoted["deactivated_embeddings"], 0) + + memory_after = self.backend.get_memory(path=path, collection="test") + self.assertIsNotNone(memory_after) + self.assertEqual(self.backend.count_documents(), 1) + self.assertIn("Globex Labs", memory_after["summary"]) + self.assertNotIn("Acme Robotics", memory_after["summary"]) + self.assertFalse( + self.backend.list_memory_entities(path=path, entity="Acme Robotics", collection="test") + ) + self.assertTrue( + self.backend.list_memory_entities(path=path, entity="Globex Labs", collection="test") + ) + def test_delete_memory_deactivates_doc_and_removes_embeddings(self): self.backend.upsert_memory( path="notes/delete-me.md", @@ -970,6 +1032,49 @@ def test_matching_turns_roll_up_to_parent_conversation(self): evidence_paths = [result.memory_primary_evidence_path] + (result.memory_supporting_paths or []) self.assertIn("recallforge://test/threads/pricing-approval::turn:0002", evidence_paths) + def test_reindex_conversation_replaces_stale_children_as_one_batch(self): + path = "threads/reindex-consistency" + self.backend.index_conversation( + path=path, + title="Reindex Consistency", + turns=[ + {"role": "user", "content": "Acme Robotics asked about launch readiness."}, + {"role": "assistant", "content": "Globex Labs is the stale follow-up owner."}, + ], + collection="test", + embed_func=mock_embed, + model="mock-embedder", + ) + + first_memory = self.backend.get_memory(path=path, collection="test") + self.assertIsNotNone(first_memory) + self.assertEqual(len(first_memory["children"]), 2) + self.assertTrue( + self.backend.list_memory_entities(path=path, entity="Globex Labs", collection="test") + ) + + self.backend.index_conversation( + path=path, + title="Reindex Consistency", + turns=[ + {"role": "user", "content": "Initech is now the only launch readiness owner."}, + ], + collection="test", + embed_func=mock_embed, + model="mock-embedder", + ) + + updated_memory = self.backend.get_memory(path=path, collection="test") + self.assertIsNotNone(updated_memory) + self.assertEqual(len(updated_memory["children"]), 1) + self.assertEqual(updated_memory["children"][0]["path"], f"{path}::turn:0001") + self.assertFalse( + self.backend.list_memory_entities(path=path, entity="Globex Labs", collection="test") + ) + self.assertTrue( + self.backend.list_memory_entities(path=path, entity="Initech", collection="test") + ) + class TestFTSMissFallbackBehavior(unittest.TestCase): """Tests for P0: FTS miss fallback behavior - no BM25 fallback on empty results."""