From 1bedeed2b9fdf69ab2430cba5004453b1e5ecb97 Mon Sep 17 00:00:00 2001 From: MollyAI Date: Sun, 17 May 2026 14:17:18 -0400 Subject: [PATCH] Gate media reranking behind bounded cascade --- docs/ENV_VARS.md | 12 +++ docs/RELEASE.md | 1 + src/recallforge/__init__.py | 2 + src/recallforge/search.py | 101 +++++++++++++++++++++++- tests/test_cross_modal_pipeline.py | 15 +++- tests/test_search_pipeline.py | 120 +++++++++++++++++++++++++++++ 6 files changed, 244 insertions(+), 7 deletions(-) diff --git a/docs/ENV_VARS.md b/docs/ENV_VARS.md index 23e3648..1125425 100644 --- a/docs/ENV_VARS.md +++ b/docs/ENV_VARS.md @@ -62,6 +62,18 @@ This is the canonical reference for all `RECALLFORGE_*` environment variables us - `RECALLFORGE_ENABLE_MEDIA_RERANKING` Enable multimodal reranking for image/video-involved searches. Disabled by default. +- `RECALLFORGE_MEDIA_QUERY_RERANK_TOP_K` + Strict cap for opt-in reranking when the query itself is an image or video. Defaults to `min(RECALLFORGE_RERANK_TOP_K, 5)`. + +- `RECALLFORGE_MEDIA_RESULT_RERANK_TOP_K` + Strict cap for opt-in reranking when text queries retrieve image/video candidates. Defaults to `min(RECALLFORGE_RERANK_TOP_K, 10)`. + +- `RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY` + Keep opt-in media reranking gated to ambiguous RRF results. Enabled by default; set to `0` to rerank the capped media set whenever media reranking is enabled. + +- `RECALLFORGE_MEDIA_RERANK_MIN_RRF_MARGIN` + Relative RRF margin for skipping expensive media reranking when the cheap stage has a clear winner. Default: `0.25`. + - `RECALLFORGE_ENABLE_RAW_VIDEO_QUERY_EMBEDDING` Enable raw video query embedding. On MLX, RecallForge now defaults to safer caption/transcript-first retrieval unless you explicitly enable this. diff --git a/docs/RELEASE.md b/docs/RELEASE.md index 9c9a87d..40d4a97 100644 --- a/docs/RELEASE.md +++ b/docs/RELEASE.md @@ -96,6 +96,7 @@ When you omit `--output`, the benchmark now keeps profile-specific filenames for - On MLX, raw video query embedding is no longer the default hot path. RecallForge prefers caption/transcript-first retrieval unless `RECALLFORGE_ENABLE_RAW_VIDEO_QUERY_EMBEDDING=1` is set. - On MLX, qwen-vl-utils native video decoding is now also opt-in. RecallForge defaults to frame/caption fallbacks unless `RECALLFORGE_ENABLE_MLX_NATIVE_VIDEO_PROCESSING=1` is set. - If you do opt back into native MLX video decoding, prefer `FORCE_QWENVL_VIDEO_READER=torchcodec` per Qwen's upstream guidance. +- Media reranking is still opt-in via `RECALLFORGE_ENABLE_MEDIA_RERANKING=1`. When enabled, it is capped by `RECALLFORGE_MEDIA_QUERY_RERANK_TOP_K` / `RECALLFORGE_MEDIA_RESULT_RERANK_TOP_K` and skipped when RRF has a clear winner unless `RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY=0`. - Direct image/video indexing and query expansion now schedule an MLX captioner idle unload. Tune with `RECALLFORGE_CAPTIONER_IDLE_SECONDS`; batch ingest still unloads the captioner immediately after the batch. - The raw-video path now has explicit frame and pixel budget knobs: - `RECALLFORGE_MLX_VIDEO_SAMPLE_FPS` diff --git a/src/recallforge/__init__.py b/src/recallforge/__init__.py index dfd2cad..e496eff 100644 --- a/src/recallforge/__init__.py +++ b/src/recallforge/__init__.py @@ -62,6 +62,8 @@ def _has_torch() -> bool: "RECALLFORGE_ENABLE_RAW_VIDEO_QUERY_EMBEDDING": "Enable raw video query embedding; MLX defaults to safer caption/transcript-first retrieval unless explicitly enabled.", "RECALLFORGE_MEDIA_QUERY_RERANK_TOP_K": "Rerank cap for query-side image/video searches.", "RECALLFORGE_MEDIA_RESULT_RERANK_TOP_K": "Rerank cap when text queries retrieve image/video candidates.", + "RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY": "Skip opt-in media reranking unless the cheap RRF stage is ambiguous.", + "RECALLFORGE_MEDIA_RERANK_MIN_RRF_MARGIN": "Relative RRF margin used to skip expensive media reranking when the cheap stage has a clear winner.", "RECALLFORGE_DISABLE_MLX": "Force-disable MLX backend detection (1=true).", "RECALLFORGE_BM25_FALLBACK_MAX_ROWS": "Row limit for BM25 fallback recovery path.", "RECALLFORGE_BULK_FLUSH_DOCS": "Batch flush threshold for document table writes.", diff --git a/src/recallforge/search.py b/src/recallforge/search.py index 4d5e260..c9204b4 100644 --- a/src/recallforge/search.py +++ b/src/recallforge/search.py @@ -479,10 +479,17 @@ def __init__( min(env_rerank_top_k, 10), ) ) + env_media_rerank_min_rrf_margin = float( + os.environ.get("RECALLFORGE_MEDIA_RERANK_MIN_RRF_MARGIN", "0.25") + ) self.enable_media_reranking = _env_flag( "RECALLFORGE_ENABLE_MEDIA_RERANKING", False, ) + self.media_rerank_require_ambiguity = _env_flag( + "RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY", + True, + ) self.enable_raw_video_query_embedding = _env_flag( "RECALLFORGE_ENABLE_RAW_VIDEO_QUERY_EMBEDDING", not _is_mlx_backend(self.backend), @@ -499,6 +506,7 @@ def __init__( 0, min(self.rerank_top_k, env_media_result_rerank_top_k), ) + self.media_rerank_min_rrf_margin = max(0.0, env_media_rerank_min_rrf_margin) self.cache: EmbeddingCache = cache if cache is not None else EmbeddingCache() self.intent = intent self.expand = expand @@ -1016,6 +1024,44 @@ def _select_best_chunk(self, result: SearchResult) -> Dict[str, Any]: elif result.content_type == "video": chunk['video_path'] = str(p) return chunk + + def _chunk_has_rerank_content(self, chunk: Dict[str, Any]) -> bool: + """Return whether a candidate has enough content for expensive reranking.""" + text = str(chunk.get("text") or chunk.get("text_body") or "").strip() + return bool(text or chunk.get("image_path") or chunk.get("video_path")) + + def _media_rerank_skip_reason( + self, + candidates_by_rrf: List[SearchResult], + *, + has_query_media: bool, + has_media_candidates: bool, + ) -> Optional[str]: + """Return a media rerank skip reason when the cheap stage is decisive.""" + if not (has_query_media or has_media_candidates): + return None + if not self.media_rerank_require_ambiguity: + return None + if len(candidates_by_rrf) < 2: + return None + + top_score = float(getattr(candidates_by_rrf[0], "score", 0.0) or 0.0) + second_score = float(getattr(candidates_by_rrf[1], "score", 0.0) or 0.0) + if top_score <= 0: + return None + + relative_margin = (top_score - second_score) / max(abs(top_score), 1e-9) + if relative_margin >= self.media_rerank_min_rrf_margin: + logger.debug( + "reranker_path path=media_confident_skip reason=rrf_margin " + "relative_margin=%.4f threshold=%.4f top_score=%.6f second_score=%.6f", + relative_margin, + self.media_rerank_min_rrf_margin, + top_score, + second_score, + ) + return "media_confident_skip" + return None def _rerank_candidates( self, @@ -1078,8 +1124,45 @@ def _rerank_candidates( if rerank_limit <= 0: return {c.filepath: 0.5 for c in candidates}, "skipped" - rerank_candidates = candidates_by_rrf[:rerank_limit] - chunks = [self._select_best_chunk(c) for c in rerank_candidates] + media_skip_reason = self._media_rerank_skip_reason( + candidates_by_rrf[:rerank_limit], + has_query_media=has_query_media, + has_media_candidates=has_media_candidates, + ) + if media_skip_reason: + _log_stage_metrics( + "reranker", + candidates, + start_time=t0, + extra={"path": media_skip_reason, "rerank_top_k": rerank_limit}, + ) + return {c.filepath: 0.5 for c in candidates}, media_skip_reason + + rerank_candidates: List[SearchResult] = [] + chunks: List[Dict[str, Any]] = [] + for candidate in candidates_by_rrf[:rerank_limit]: + chunk = self._select_best_chunk(candidate) + if self._chunk_has_rerank_content(chunk): + rerank_candidates.append(candidate) + chunks.append(chunk) + else: + logger.debug( + "reranker_prefilter skip=empty_candidate filepath=%s content_type=%s", + getattr(candidate, "filepath", ""), + getattr(candidate, "content_type", "unknown"), + ) + + if not rerank_candidates: + path = "media_no_rerankable_candidates" if (has_query_media or has_media_candidates) else "no_rerankable_candidates" + logger.debug("reranker_path path=%s reason=empty_prefilter", path) + _log_stage_metrics( + "reranker", + candidates, + start_time=t0, + extra={"path": path, "rerank_top_k": rerank_limit}, + ) + return {c.filepath: 0.5 for c in candidates}, path + effective_query = query or "" # Determine expected reranker scoring path for telemetry @@ -1104,14 +1187,24 @@ def _rerank_candidates( query_video_path=query_video_path, ) logger.debug( - "reranker_path path=%s candidate_count=%d base_candidate_count=%d", + "reranker_path path=%s candidate_count=%d base_candidate_count=%d rerank_top_k=%d", path, len(rerank_candidates), len(candidates_by_rrf), + rerank_limit, ) rerank_scores = {c.filepath: 0.5 for c in candidates} rerank_scores.update({c.filepath: s for c, s in zip(rerank_candidates, scores)}) - _log_stage_metrics("reranker", candidates, start_time=t0, extra={"path": path}) + _log_stage_metrics( + "reranker", + candidates, + start_time=t0, + extra={ + "path": path, + "rerank_top_k": rerank_limit, + "reranked_count": len(rerank_candidates), + }, + ) return rerank_scores, path except Exception as e: logger.error("Reranking failed: %s", e) diff --git a/tests/test_cross_modal_pipeline.py b/tests/test_cross_modal_pipeline.py index 8f49b3a..062b00e 100644 --- a/tests/test_cross_modal_pipeline.py +++ b/tests/test_cross_modal_pipeline.py @@ -231,7 +231,10 @@ class TestRerankerReceivesNonEmptyContent(unittest.TestCase): def setUp(self): self.env_patch = patch.dict( os.environ, - {"RECALLFORGE_ENABLE_MEDIA_RERANKING": "1"}, + { + "RECALLFORGE_ENABLE_MEDIA_RERANKING": "1", + "RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY": "0", + }, ) self.env_patch.start() self.backend = StubBackend(mode="hybrid") @@ -334,7 +337,10 @@ class TestMixedModalityHandling(unittest.TestCase): def setUp(self): self.env_patch = patch.dict( os.environ, - {"RECALLFORGE_ENABLE_MEDIA_RERANKING": "1"}, + { + "RECALLFORGE_ENABLE_MEDIA_RERANKING": "1", + "RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY": "0", + }, ) self.env_patch.start() self.backend = StubBackend(mode="hybrid") @@ -660,7 +666,10 @@ class TestCrossModalRegressionScenarios(unittest.TestCase): def setUp(self): self.env_patch = patch.dict( os.environ, - {"RECALLFORGE_ENABLE_MEDIA_RERANKING": "1"}, + { + "RECALLFORGE_ENABLE_MEDIA_RERANKING": "1", + "RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY": "0", + }, ) self.env_patch.start() self.backend = StubBackend(mode="hybrid") diff --git a/tests/test_search_pipeline.py b/tests/test_search_pipeline.py index c4a9b19..69fc75f 100644 --- a/tests/test_search_pipeline.py +++ b/tests/test_search_pipeline.py @@ -153,6 +153,25 @@ def test_media_reranking_reads_env_override(self): searcher = HybridSearcher(backend=backend, storage=storage) self.assertTrue(searcher.enable_media_reranking) + def test_media_reranking_caps_read_env_overrides(self): + backend = StubBackend() + storage = StubStorage() + with patch.dict( + os.environ, + { + "RECALLFORGE_RERANK_TOP_K": "9", + "RECALLFORGE_MEDIA_QUERY_RERANK_TOP_K": "3", + "RECALLFORGE_MEDIA_RESULT_RERANK_TOP_K": "4", + "RECALLFORGE_MEDIA_RERANK_MIN_RRF_MARGIN": "0.4", + }, + ): + searcher = HybridSearcher(backend=backend, storage=storage) + + self.assertEqual(searcher.rerank_top_k, 9) + self.assertEqual(searcher.media_query_rerank_top_k, 3) + self.assertEqual(searcher.media_result_rerank_top_k, 4) + self.assertEqual(searcher.media_rerank_min_rrf_margin, 0.4) + class TestBM25Probe(unittest.TestCase): def test_bm25_probe_delegates_to_storage(self): @@ -349,6 +368,107 @@ def test_rerank_top_k_zero_skips_reranker(self): backend.rerank.assert_not_called() self.assertEqual(scores["doc.md"], 0.5) + def test_media_query_reranking_uses_media_query_top_k_cap(self): + backend = StubBackend(mode="hybrid") + backend.rerank = MagicMock(return_value=[0.91, 0.72]) + candidates = [ + _make_search_result("a.md", 0.95), + _make_search_result("b.md", 0.92), + _make_search_result("c.md", 0.89), + _make_search_result("d.md", 0.86), + ] + with patch.dict( + os.environ, + { + "RECALLFORGE_ENABLE_MEDIA_RERANKING": "1", + "RECALLFORGE_MEDIA_QUERY_RERANK_TOP_K": "2", + "RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY": "0", + }, + ): + searcher = HybridSearcher(backend=backend, storage=StubStorage(), rerank_top_k=10) + + scores, path = searcher._rerank_candidates( + candidates, + query="caption text", + query_image_path="/tmp/query.png", + ) + + backend.rerank.assert_called_once() + rerank_docs = backend.rerank.call_args[0][1] + self.assertEqual([d["filepath"] for d in rerank_docs], ["a.md", "b.md"]) + self.assertEqual(scores["a.md"], 0.91) + self.assertEqual(scores["b.md"], 0.72) + self.assertEqual(scores["c.md"], 0.5) + self.assertEqual(path, "vl_image") + + def test_media_result_reranking_uses_media_result_top_k_cap(self): + backend = StubBackend(mode="hybrid") + backend.rerank = MagicMock(return_value=[0.81]) + candidates = [ + _make_search_result("img.png", 0.95, content_type="image"), + _make_search_result("doc.md", 0.92), + _make_search_result("other.md", 0.89), + ] + with patch.dict( + os.environ, + { + "RECALLFORGE_ENABLE_MEDIA_RERANKING": "1", + "RECALLFORGE_MEDIA_RESULT_RERANK_TOP_K": "1", + "RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY": "0", + }, + ): + searcher = HybridSearcher(backend=backend, storage=StubStorage(), rerank_top_k=10) + + scores, path = searcher._rerank_candidates(candidates, query="diagram") + + backend.rerank.assert_called_once() + rerank_docs = backend.rerank.call_args[0][1] + self.assertEqual([d["filepath"] for d in rerank_docs], ["img.png"]) + self.assertEqual(scores["img.png"], 0.81) + self.assertEqual(scores["doc.md"], 0.5) + self.assertEqual(path, "text") + + def test_media_reranking_skips_when_rrf_margin_is_confident(self): + backend = StubBackend(mode="hybrid") + backend.rerank = MagicMock() + candidates = [ + _make_search_result("img.png", 1.0, content_type="image"), + _make_search_result("doc.md", 0.5), + ] + with patch.dict(os.environ, {"RECALLFORGE_ENABLE_MEDIA_RERANKING": "1"}): + searcher = HybridSearcher(backend=backend, storage=StubStorage(), rerank_top_k=10) + + scores, path = searcher._rerank_candidates(candidates, query="diagram") + + backend.rerank.assert_not_called() + self.assertEqual(scores, {"img.png": 0.5, "doc.md": 0.5}) + self.assertEqual(path, "media_confident_skip") + + def test_media_reranking_prefilters_empty_media_candidates(self): + backend = StubBackend(mode="hybrid") + backend.rerank = MagicMock(return_value=[0.73]) + empty_media = _make_search_result("missing.png", 0.95, content_type="image") + empty_media.body = "" + text_candidate = _make_search_result("doc.md", 0.92) + candidates = [empty_media, text_candidate] + with patch.dict( + os.environ, + { + "RECALLFORGE_ENABLE_MEDIA_RERANKING": "1", + "RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY": "0", + }, + ): + searcher = HybridSearcher(backend=backend, storage=StubStorage(), rerank_top_k=2) + + scores, path = searcher._rerank_candidates(candidates, query="diagram") + + backend.rerank.assert_called_once() + rerank_docs = backend.rerank.call_args[0][1] + self.assertEqual([d["filepath"] for d in rerank_docs], ["doc.md"]) + self.assertEqual(scores["missing.png"], 0.5) + self.assertEqual(scores["doc.md"], 0.73) + self.assertEqual(path, "text") + def test_embed_mode_returns_default_score(self): backend = StubBackend(mode="embed") searcher = HybridSearcher(backend=backend, storage=StubStorage())