From 8e4639a0689b4275e3ce623b552b0b0489957a4b Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Wed, 27 May 2026 07:38:43 +0300 Subject: [PATCH] Prioritize watcher drain writes --- src/brainlayer/dedupe.py | 8 +- src/brainlayer/drain.py | 28 ++- src/brainlayer/vector_store.py | 46 ++++- tests/test_arbitration.py | 291 +++++++++++++++++++++++++++++++ tests/test_search_trigram_fts.py | 33 ++++ 5 files changed, 400 insertions(+), 6 deletions(-) diff --git a/src/brainlayer/dedupe.py b/src/brainlayer/dedupe.py index 9c090b9a..c29f20be 100644 --- a/src/brainlayer/dedupe.py +++ b/src/brainlayer/dedupe.py @@ -373,6 +373,10 @@ def _loads_tags(value: Any) -> set[str]: return set() +def _tags_json_equivalent(left: Any, right: Any) -> bool: + return _loads_tags(left) == _loads_tags(right) + + def _max_optional_number(left: Any, right: Any) -> Any: values = [] for value in (left, right): @@ -640,11 +644,13 @@ def merge_existing_chunk_seen(conn: Any, *, chunk_id: str, incoming: dict[str, A last_seen_at = _latest_timestamp( existing_last, existing_created, incoming.get("last_seen_at"), incoming.get("created_at") ) + merged_tags_json = json.dumps(merged_tags) if merged_tags else None updates: dict[str, Any] = { - "tags": json.dumps(merged_tags) if merged_tags else None, "seen_count": int(existing_seen or 1) + int(incoming.get("seen_count") or 1), "last_seen_at": last_seen_at, } + if not _tags_json_equivalent(existing_tags, merged_tags_json): + updates["tags"] = merged_tags_json if merged_importance is not None: updates["importance"] = merged_importance if merged_half_life is not None: diff --git a/src/brainlayer/drain.py b/src/brainlayer/drain.py index 8790b069..6eb12110 100644 --- a/src/brainlayer/drain.py +++ b/src/brainlayer/drain.py @@ -97,8 +97,21 @@ def _columns(conn: apsw.Connection, table: str) -> set[str]: return {row[1] for row in conn.execute(f"PRAGMA table_info({table})")} +def _content_hash(content: str) -> str: + return hashlib.sha256(content.strip().encode("utf-8")).hexdigest() + + +def _preview_text(values: dict[str, Any]) -> str: + summary = str(values.get("summary") or "").strip() + content = str(values.get("content") or "").strip() + source = summary or content + return source.replace("\n", " ").replace("\r", " ").replace("\t", " ")[:220] + + def _insert_chunk(conn: apsw.Connection, values: dict[str, Any]) -> None: cols = _columns(conn, "chunks") + if "preview_text" in cols and not str(values.get("preview_text") or "").strip(): + values = {**values, "preview_text": _preview_text(values)} if "content" in values: fields = compute_dedupe_fields(str(values["content"]), values.get("created_at")) values = { @@ -226,6 +239,7 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: "summary": content[:200], "tags": json.dumps(tags) if tags else None, "importance": float(event["importance"]) if event.get("importance") is not None else None, + "content_hash": _content_hash(content), "chunk_origin": detect_chunk_origin(content, event.get("chunk_origin")), }, ) @@ -296,6 +310,7 @@ def _apply_watcher(conn: apsw.Connection, event: dict[str, Any]) -> None: "conversation_id": event.get("conversation_id"), "sender": event.get("sender"), "tags": json.dumps(tags) if tags else None, + "content_hash": _content_hash(content), "chunk_origin": detect_chunk_origin(content, event.get("chunk_origin")), }, ) @@ -344,6 +359,7 @@ def _apply_hook(conn: apsw.Connection, event: dict[str, Any]) -> None: "created_at": datetime.fromtimestamp(timestamp, timezone.utc).isoformat(), "conversation_id": session_id, "importance": 5, + "content_hash": _content_hash(content), "chunk_origin": detect_chunk_origin(content, event.get("chunk_origin")), }, ) @@ -356,6 +372,14 @@ def _apply_enrichment(conn: apsw.Connection, event: dict[str, Any]) -> None: return enrichment = event.get("enrichment") or {} cols = _columns(conn, "chunks") + if "content_hash" in cols and event.get("content_hash"): + row = conn.execute("SELECT content_hash, content FROM chunks WHERE id = ?", (chunk_id,)).fetchone() + if not row: + return + current_hash = row[0] or _content_hash(str(row[1] or "")) + if current_hash and current_hash != event["content_hash"]: + logger.warning("Skipping stale enrichment for chunk_id=%s content_hash mismatch", chunk_id) + return updates: dict[str, Any] = {} mappings = { "summary": "summary", @@ -507,7 +531,9 @@ def drain_once( lock_fd = _acquire_queue_lock(queue_dir) try: - files = sorted(queue_dir.glob("*.jsonl"))[:batch_size] + files = sorted(queue_dir.glob("*.jsonl"), key=lambda path: (path.name.startswith("enrichment-"), path.name))[ + :batch_size + ] if not files: return 0 _log(log_path, f"queue_depth={len(files)}") diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py index a35b453b..a96df447 100644 --- a/src/brainlayer/vector_store.py +++ b/src/brainlayer/vector_store.py @@ -801,6 +801,22 @@ def _init_db(self) -> None: ) """) self._trigram_fts_available = True + cursor.execute(""" + CREATE TABLE IF NOT EXISTS chunk_fts_rowids ( + chunk_id TEXT PRIMARY KEY, + fts_rowid INTEGER, + trigram_rowid INTEGER + ) + """) + cursor.execute(""" + INSERT OR IGNORE INTO chunk_fts_rowids(chunk_id, fts_rowid) + SELECT chunk_id, rowid FROM chunks_fts WHERE chunk_id IS NOT NULL + """) + cursor.execute(""" + INSERT INTO chunk_fts_rowids(chunk_id, trigram_rowid) + SELECT chunk_id, rowid FROM chunks_fts_trigram WHERE chunk_id IS NOT NULL + ON CONFLICT(chunk_id) DO UPDATE SET trigram_rowid = excluded.trigram_rowid + """) # FTS5 sync triggers — keep summary/tags/resolved_query in sync cursor.execute("DROP TRIGGER IF EXISTS chunks_fts_insert") @@ -816,6 +832,9 @@ def _init_db(self) -> None: new.resolved_queries, new.id ); + INSERT INTO chunk_fts_rowids(chunk_id, fts_rowid) + VALUES (new.id, last_insert_rowid()) + ON CONFLICT(chunk_id) DO UPDATE SET fts_rowid = excluded.fts_rowid; END """) cursor.execute("DROP TRIGGER IF EXISTS chunks_fts_trigram_insert") @@ -831,25 +850,37 @@ def _init_db(self) -> None: new.resolved_queries, new.id ); + INSERT INTO chunk_fts_rowids(chunk_id, trigram_rowid) + VALUES (new.id, last_insert_rowid()) + ON CONFLICT(chunk_id) DO UPDATE SET trigram_rowid = excluded.trigram_rowid; END """) cursor.execute("DROP TRIGGER IF EXISTS chunks_fts_delete") cursor.execute(""" CREATE TRIGGER IF NOT EXISTS chunks_fts_delete AFTER DELETE ON chunks BEGIN - DELETE FROM chunks_fts WHERE chunk_id = old.id; + DELETE FROM chunks_fts + WHERE rowid = (SELECT fts_rowid FROM chunk_fts_rowids WHERE chunk_id = old.id); + DELETE FROM chunks_fts_trigram + WHERE rowid = (SELECT trigram_rowid FROM chunk_fts_rowids WHERE chunk_id = old.id); + DELETE FROM chunk_fts_rowids WHERE chunk_id = old.id; END """) cursor.execute("DROP TRIGGER IF EXISTS chunks_fts_trigram_delete") cursor.execute(""" CREATE TRIGGER IF NOT EXISTS chunks_fts_trigram_delete AFTER DELETE ON chunks BEGIN - DELETE FROM chunks_fts_trigram WHERE chunk_id = old.id; + DELETE FROM chunks_fts + WHERE rowid = (SELECT fts_rowid FROM chunk_fts_rowids WHERE chunk_id = old.id); + DELETE FROM chunks_fts_trigram + WHERE rowid = (SELECT trigram_rowid FROM chunk_fts_rowids WHERE chunk_id = old.id); + DELETE FROM chunk_fts_rowids WHERE chunk_id = old.id; END """) cursor.execute("DROP TRIGGER IF EXISTS chunks_fts_update") cursor.execute(""" CREATE TRIGGER IF NOT EXISTS chunks_fts_update AFTER UPDATE OF content, summary, tags, resolved_query, key_facts, resolved_queries ON chunks BEGIN - DELETE FROM chunks_fts WHERE chunk_id = old.id; + DELETE FROM chunks_fts + WHERE rowid = (SELECT fts_rowid FROM chunk_fts_rowids WHERE chunk_id = old.id); INSERT INTO chunks_fts(content, summary, tags, resolved_query, key_facts, resolved_queries, chunk_id) VALUES ( new.content, @@ -860,13 +891,17 @@ def _init_db(self) -> None: new.resolved_queries, new.id ); + INSERT INTO chunk_fts_rowids(chunk_id, fts_rowid) + VALUES (new.id, last_insert_rowid()) + ON CONFLICT(chunk_id) DO UPDATE SET fts_rowid = excluded.fts_rowid; END """) cursor.execute("DROP TRIGGER IF EXISTS chunks_fts_trigram_update") cursor.execute(""" CREATE TRIGGER IF NOT EXISTS chunks_fts_trigram_update AFTER UPDATE OF content, summary, tags, resolved_query, key_facts, resolved_queries ON chunks BEGIN - DELETE FROM chunks_fts_trigram WHERE chunk_id = old.id; + DELETE FROM chunks_fts_trigram + WHERE rowid = (SELECT trigram_rowid FROM chunk_fts_rowids WHERE chunk_id = old.id); INSERT INTO chunks_fts_trigram(content, summary, tags, resolved_query, key_facts, resolved_queries, chunk_id) VALUES ( new.content, @@ -877,6 +912,9 @@ def _init_db(self) -> None: new.resolved_queries, new.id ); + INSERT INTO chunk_fts_rowids(chunk_id, trigram_rowid) + VALUES (new.id, last_insert_rowid()) + ON CONFLICT(chunk_id) DO UPDATE SET trigram_rowid = excluded.trigram_rowid; END """) diff --git a/tests/test_arbitration.py b/tests/test_arbitration.py index 913df96d..315594eb 100644 --- a/tests/test_arbitration.py +++ b/tests/test_arbitration.py @@ -1,3 +1,4 @@ +import hashlib import json import multiprocessing as mp import re @@ -45,6 +46,7 @@ def _create_minimal_db(path: Path) -> None: summary TEXT, tags TEXT, importance REAL, + content_hash TEXT, superseded_by TEXT ); CREATE TABLE kg_entities ( @@ -80,6 +82,56 @@ def _create_minimal_db(path: Path) -> None: conn.close() +def _create_preview_trigger_db(path: Path) -> None: + conn = sqlite3.connect(path) + try: + conn.executescript( + """ + PRAGMA journal_mode=WAL; + CREATE TABLE chunks ( + id TEXT PRIMARY KEY, + content TEXT NOT NULL, + metadata TEXT NOT NULL, + source_file TEXT NOT NULL, + project TEXT, + content_type TEXT, + value_type TEXT, + char_count INTEGER, + source TEXT, + created_at TEXT, + summary TEXT, + tags TEXT, + importance REAL, + preview_text TEXT, + seen_count INTEGER DEFAULT 1, + last_seen_at TEXT, + dedupe_hash TEXT, + simhash TEXT, + simhash_band_0 TEXT, + simhash_band_1 TEXT, + simhash_band_2 TEXT, + simhash_band_3 TEXT, + archived INTEGER DEFAULT 0, + archived_at TEXT, + superseded_by TEXT + ); + CREATE TABLE preview_trigger_hits (chunk_id TEXT NOT NULL); + CREATE TRIGGER chunks_preview_text_insert + AFTER INSERT ON chunks + WHEN new.preview_text IS NULL OR trim(new.preview_text) = '' + BEGIN + INSERT INTO preview_trigger_hits(chunk_id) VALUES (new.id); + UPDATE chunks + SET preview_text = trim(substr(replace(replace(replace(content, char(10), ' '), char(13), ' '), char(9), ' '), 1, 220)) + WHERE rowid = new.rowid; + END; + """ + ) + conn.commit() + finally: + conn.close() + + def _connect_apsw(path: Path) -> apsw.Connection: conn = apsw.Connection(str(path)) conn.enableloadextension(True) @@ -136,6 +188,245 @@ def test_drain_default_queue_dir_expands_env_tilde(monkeypatch): assert queue_dir == Path.home() / "brainlayer-arbitration-test" +def test_drain_prioritizes_writes_before_enrichment(tmp_path, monkeypatch): + from brainlayer.drain import drain_once + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + queue_dir.mkdir() + _create_minimal_db(db_path) + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + + (queue_dir / "enrichment-000.jsonl").write_text( + json.dumps({"kind": "enrichment_update", "chunk_id": "missing", "enrichment": {"summary": "later"}}) + "\n", + encoding="utf-8", + ) + (queue_dir / "watcher-999.jsonl").write_text( + json.dumps( + { + "kind": "watcher_chunk", + "chunk_id": "watcher-priority", + "content": "Queued watcher writes must drain before enrichment backlog.", + "created_at": "2026-05-27T04:00:00Z", + } + ) + + "\n", + encoding="utf-8", + ) + + assert drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=1) == 1 + + assert not (queue_dir / "watcher-999.jsonl").exists() + assert (queue_dir / "enrichment-000.jsonl").exists() + with sqlite3.connect(db_path) as conn: + assert conn.execute("SELECT id FROM chunks WHERE id = 'watcher-priority'").fetchone() == ("watcher-priority",) + + +def test_drain_skips_stale_enrichment_for_rewritten_chunk(tmp_path, monkeypatch): + from brainlayer.drain import drain_once + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + queue_dir.mkdir() + _create_minimal_db(db_path) + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + old_hash = hashlib.sha256("old content".encode("utf-8")).hexdigest() + new_hash = hashlib.sha256("new content".encode("utf-8")).hexdigest() + with sqlite3.connect(db_path) as conn: + conn.execute( + """ + INSERT INTO chunks(id, content, metadata, source_file, summary, content_hash) + VALUES ('rewritten', 'new content', '{}', 'watcher', 'new summary', ?) + """, + (new_hash,), + ) + (queue_dir / "enrichment-000.jsonl").write_text( + json.dumps( + { + "kind": "enrichment_update", + "chunk_id": "rewritten", + "content_hash": old_hash, + "enrichment": {"summary": "old stale summary", "tags": ["old"]}, + } + ) + + "\n", + encoding="utf-8", + ) + + assert drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=1) == 1 + + with sqlite3.connect(db_path) as conn: + assert conn.execute("SELECT summary, tags, content_hash FROM chunks WHERE id = 'rewritten'").fetchone() == ( + "new summary", + None, + new_hash, + ) + + +def test_drain_sets_preview_text_on_initial_insert(tmp_path, monkeypatch): + from brainlayer.drain import drain_once + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + queue_dir.mkdir() + _create_preview_trigger_db(db_path) + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + (queue_dir / "watcher.jsonl").write_text( + json.dumps( + { + "kind": "watcher_chunk", + "chunk_id": "watcher-preview", + "content": "Watcher preview text is written during the insert.", + "created_at": "2026-05-27T04:10:00Z", + } + ) + + "\n", + encoding="utf-8", + ) + + assert drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=1) == 1 + + with sqlite3.connect(db_path) as conn: + preview_text, trigger_hits = conn.execute( + """ + SELECT c.preview_text, COUNT(p.chunk_id) + FROM chunks c + LEFT JOIN preview_trigger_hits p ON p.chunk_id = c.id + WHERE c.id = 'watcher-preview' + GROUP BY c.id + """ + ).fetchone() + + assert preview_text == "Watcher preview text is written during the insert." + assert trigger_hits == 0 + + +def test_drain_preview_text_uses_content_when_summary_is_blank(tmp_path, monkeypatch): + from brainlayer.drain import _insert_chunk + + db_path = tmp_path / "brainlayer.db" + _create_preview_trigger_db(db_path) + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + with _connect_apsw(db_path) as conn: + _insert_chunk( + conn, + { + "id": "store-preview", + "content": "Content should win when summary is whitespace.", + "summary": "\n\t ", + "metadata": "{}", + "source_file": "test", + }, + ) + + with sqlite3.connect(db_path) as conn: + preview_text, trigger_hits = conn.execute( + """ + SELECT c.preview_text, COUNT(p.chunk_id) + FROM chunks c + LEFT JOIN preview_trigger_hits p ON p.chunk_id = c.id + WHERE c.id = 'store-preview' + GROUP BY c.id + """ + ).fetchone() + + assert preview_text == "Content should win when summary is whitespace." + assert trigger_hits == 0 + + +def test_drain_seen_merge_does_not_touch_unchanged_tags(tmp_path, monkeypatch): + from brainlayer.drain import drain_once + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + queue_dir.mkdir() + _create_minimal_db(db_path) + with sqlite3.connect(db_path) as conn: + conn.executescript( + """ + CREATE TABLE tag_update_hits (chunk_id TEXT NOT NULL); + CREATE TRIGGER chunks_tags_update_seen_test + AFTER UPDATE OF tags ON chunks + BEGIN + INSERT INTO tag_update_hits(chunk_id) VALUES (new.id); + END; + """ + ) + conn.execute( + """ + INSERT INTO chunks(id, content, metadata, source_file, content_type, char_count, created_at, tags) + VALUES ('watcher-seen', 'Same watcher content', '{}', 'watcher', 'assistant_text', 20, + '2026-05-27T04:00:00Z', NULL) + """ + ) + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + (queue_dir / "watcher.jsonl").write_text( + json.dumps( + { + "kind": "watcher_chunk", + "chunk_id": "watcher-seen", + "content": "Same watcher content", + "content_type": "assistant_text", + "created_at": "2026-05-27T04:10:00Z", + } + ) + + "\n", + encoding="utf-8", + ) + + assert drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=1) == 1 + + with sqlite3.connect(db_path) as conn: + assert conn.execute("SELECT COUNT(*) FROM tag_update_hits").fetchone()[0] == 0 + + +def test_drain_seen_merge_compares_tags_semantically(tmp_path, monkeypatch): + from brainlayer.drain import drain_once + + db_path = tmp_path / "brainlayer.db" + queue_dir = tmp_path / "queue" + queue_dir.mkdir() + _create_minimal_db(db_path) + with sqlite3.connect(db_path) as conn: + conn.executescript( + """ + CREATE TABLE tag_update_hits (chunk_id TEXT NOT NULL); + CREATE TRIGGER chunks_tags_update_seen_test + AFTER UPDATE OF tags ON chunks + BEGIN + INSERT INTO tag_update_hits(chunk_id) VALUES (new.id); + END; + """ + ) + conn.execute( + """ + INSERT INTO chunks(id, content, metadata, source_file, content_type, char_count, created_at, tags) + VALUES ('watcher-tags', 'Same watcher content', '{}', 'watcher', 'assistant_text', 20, + '2026-05-27T04:00:00Z', '["b", "a"]') + """ + ) + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + (queue_dir / "watcher.jsonl").write_text( + json.dumps( + { + "kind": "watcher_chunk", + "chunk_id": "watcher-tags", + "content": "Same watcher content", + "content_type": "assistant_text", + "tags": ["a", "b"], + "created_at": "2026-05-27T04:10:00Z", + } + ) + + "\n", + encoding="utf-8", + ) + + assert drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=1) == 1 + + with sqlite3.connect(db_path) as conn: + assert conn.execute("SELECT COUNT(*) FROM tag_update_hits").fetchone()[0] == 0 + + def test_drain_daemon_serializes_three_concurrent_producers(tmp_path, monkeypatch): from brainlayer.drain import drain_once diff --git a/tests/test_search_trigram_fts.py b/tests/test_search_trigram_fts.py index e9251668..96caad7c 100644 --- a/tests/test_search_trigram_fts.py +++ b/tests/test_search_trigram_fts.py @@ -41,6 +41,39 @@ def test_vector_store_creates_trigram_fts_table(tmp_path): store.close() +def test_fts_update_triggers_delete_by_mapped_rowid(tmp_path): + store = VectorStore(tmp_path / "trigram-rowid.db") + try: + _insert_chunk(store, chunk_id="chunk-rowid", content="stalker-golem queue note") + + trigger_sql = { + row[0]: row[1] + for row in store.conn.cursor().execute( + """ + SELECT name, sql FROM sqlite_master + WHERE type = 'trigger' AND name IN ('chunks_fts_update', 'chunks_fts_trigram_update') + """ + ) + } + assert "DELETE FROM chunks_fts WHERE chunk_id = old.id" not in trigger_sql["chunks_fts_update"] + assert "DELETE FROM chunks_fts_trigram WHERE chunk_id = old.id" not in trigger_sql["chunks_fts_trigram_update"] + assert "chunk_fts_rowids" in trigger_sql["chunks_fts_update"] + assert "chunk_fts_rowids" in trigger_sql["chunks_fts_trigram_update"] + + store.update_enrichment("chunk-rowid", summary="fresh rowid summary", tags=["rowid"]) + + cursor = store.conn.cursor() + assert cursor.execute("SELECT COUNT(*) FROM chunks_fts WHERE chunk_id = 'chunk-rowid'").fetchone()[0] == 1 + assert ( + cursor.execute("SELECT COUNT(*) FROM chunks_fts_trigram WHERE chunk_id = 'chunk-rowid'").fetchone()[0] == 1 + ) + assert cursor.execute("SELECT summary FROM chunks_fts WHERE chunk_id = 'chunk-rowid'").fetchone()[0] == ( + "fresh rowid summary" + ) + finally: + store.close() + + def test_hybrid_search_uses_trigram_fts_for_identifier_substrings(tmp_path): store = VectorStore(tmp_path / "trigram-search.db") try: