Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/ENV_VARS.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ This is the canonical reference for all `RECALLFORGE_*` environment variables us
- `RECALLFORGE_ENABLE_MEDIA_RERANKING`
Enable multimodal reranking for image/video-involved searches. Disabled by default.

- `RECALLFORGE_MEDIA_QUERY_RERANK_TOP_K`
Strict cap for opt-in reranking when the query itself is an image or video. Defaults to `min(RECALLFORGE_RERANK_TOP_K, 5)`.

- `RECALLFORGE_MEDIA_RESULT_RERANK_TOP_K`
Strict cap for opt-in reranking when text queries retrieve image/video candidates. Defaults to `min(RECALLFORGE_RERANK_TOP_K, 10)`.

- `RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY`
Keep opt-in media reranking gated to ambiguous RRF results. Enabled by default; set to `0` to rerank the capped media set whenever media reranking is enabled.

- `RECALLFORGE_MEDIA_RERANK_MIN_RRF_MARGIN`
Relative RRF margin for skipping expensive media reranking when the cheap stage has a clear winner. Default: `0.25`.

- `RECALLFORGE_ENABLE_RAW_VIDEO_QUERY_EMBEDDING`
Enable raw video query embedding. On MLX, RecallForge now defaults to safer caption/transcript-first retrieval unless you explicitly enable this.

Expand Down
1 change: 1 addition & 0 deletions docs/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ When you omit `--output`, the benchmark now keeps profile-specific filenames for
- On MLX, raw video query embedding is no longer the default hot path. RecallForge prefers caption/transcript-first retrieval unless `RECALLFORGE_ENABLE_RAW_VIDEO_QUERY_EMBEDDING=1` is set.
- On MLX, qwen-vl-utils native video decoding is now also opt-in. RecallForge defaults to frame/caption fallbacks unless `RECALLFORGE_ENABLE_MLX_NATIVE_VIDEO_PROCESSING=1` is set.
- If you do opt back into native MLX video decoding, prefer `FORCE_QWENVL_VIDEO_READER=torchcodec` per Qwen's upstream guidance.
- Media reranking is still opt-in via `RECALLFORGE_ENABLE_MEDIA_RERANKING=1`. When enabled, it is capped by `RECALLFORGE_MEDIA_QUERY_RERANK_TOP_K` / `RECALLFORGE_MEDIA_RESULT_RERANK_TOP_K` and skipped when RRF has a clear winner unless `RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY=0`.
- Direct image/video indexing and query expansion now schedule an MLX captioner idle unload. Tune with `RECALLFORGE_CAPTIONER_IDLE_SECONDS`; batch ingest still unloads the captioner immediately after the batch.
- The raw-video path now has explicit frame and pixel budget knobs:
- `RECALLFORGE_MLX_VIDEO_SAMPLE_FPS`
Expand Down
2 changes: 2 additions & 0 deletions src/recallforge/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ def _has_torch() -> bool:
"RECALLFORGE_ENABLE_RAW_VIDEO_QUERY_EMBEDDING": "Enable raw video query embedding; MLX defaults to safer caption/transcript-first retrieval unless explicitly enabled.",
"RECALLFORGE_MEDIA_QUERY_RERANK_TOP_K": "Rerank cap for query-side image/video searches.",
"RECALLFORGE_MEDIA_RESULT_RERANK_TOP_K": "Rerank cap when text queries retrieve image/video candidates.",
"RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY": "Skip opt-in media reranking unless the cheap RRF stage is ambiguous.",
"RECALLFORGE_MEDIA_RERANK_MIN_RRF_MARGIN": "Relative RRF margin used to skip expensive media reranking when the cheap stage has a clear winner.",
"RECALLFORGE_DISABLE_MLX": "Force-disable MLX backend detection (1=true).",
"RECALLFORGE_BM25_FALLBACK_MAX_ROWS": "Row limit for BM25 fallback recovery path.",
"RECALLFORGE_BULK_FLUSH_DOCS": "Batch flush threshold for document table writes.",
Expand Down
101 changes: 97 additions & 4 deletions src/recallforge/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,17 @@ def __init__(
min(env_rerank_top_k, 10),
)
)
env_media_rerank_min_rrf_margin = float(
os.environ.get("RECALLFORGE_MEDIA_RERANK_MIN_RRF_MARGIN", "0.25")
)
self.enable_media_reranking = _env_flag(
"RECALLFORGE_ENABLE_MEDIA_RERANKING",
False,
)
self.media_rerank_require_ambiguity = _env_flag(
"RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY",
True,
)
self.enable_raw_video_query_embedding = _env_flag(
"RECALLFORGE_ENABLE_RAW_VIDEO_QUERY_EMBEDDING",
not _is_mlx_backend(self.backend),
Expand All @@ -499,6 +506,7 @@ def __init__(
0,
min(self.rerank_top_k, env_media_result_rerank_top_k),
)
self.media_rerank_min_rrf_margin = max(0.0, env_media_rerank_min_rrf_margin)
self.cache: EmbeddingCache = cache if cache is not None else EmbeddingCache()
self.intent = intent
self.expand = expand
Expand Down Expand Up @@ -1016,6 +1024,44 @@ def _select_best_chunk(self, result: SearchResult) -> Dict[str, Any]:
elif result.content_type == "video":
chunk['video_path'] = str(p)
return chunk

def _chunk_has_rerank_content(self, chunk: Dict[str, Any]) -> bool:
"""Return whether a candidate has enough content for expensive reranking."""
text = str(chunk.get("text") or chunk.get("text_body") or "").strip()
return bool(text or chunk.get("image_path") or chunk.get("video_path"))

def _media_rerank_skip_reason(
self,
candidates_by_rrf: List[SearchResult],
*,
has_query_media: bool,
has_media_candidates: bool,
) -> Optional[str]:
"""Return a media rerank skip reason when the cheap stage is decisive."""
if not (has_query_media or has_media_candidates):
return None
if not self.media_rerank_require_ambiguity:
return None
if len(candidates_by_rrf) < 2:
return None

top_score = float(getattr(candidates_by_rrf[0], "score", 0.0) or 0.0)
second_score = float(getattr(candidates_by_rrf[1], "score", 0.0) or 0.0)
if top_score <= 0:
return None

relative_margin = (top_score - second_score) / max(abs(top_score), 1e-9)
if relative_margin >= self.media_rerank_min_rrf_margin:
logger.debug(
"reranker_path path=media_confident_skip reason=rrf_margin "
"relative_margin=%.4f threshold=%.4f top_score=%.6f second_score=%.6f",
relative_margin,
self.media_rerank_min_rrf_margin,
top_score,
second_score,
)
return "media_confident_skip"
return None

def _rerank_candidates(
self,
Expand Down Expand Up @@ -1078,8 +1124,45 @@ def _rerank_candidates(
if rerank_limit <= 0:
return {c.filepath: 0.5 for c in candidates}, "skipped"

rerank_candidates = candidates_by_rrf[:rerank_limit]
chunks = [self._select_best_chunk(c) for c in rerank_candidates]
media_skip_reason = self._media_rerank_skip_reason(
candidates_by_rrf[:rerank_limit],
has_query_media=has_query_media,
has_media_candidates=has_media_candidates,
)
if media_skip_reason:
_log_stage_metrics(
"reranker",
candidates,
start_time=t0,
extra={"path": media_skip_reason, "rerank_top_k": rerank_limit},
)
return {c.filepath: 0.5 for c in candidates}, media_skip_reason

rerank_candidates: List[SearchResult] = []
chunks: List[Dict[str, Any]] = []
for candidate in candidates_by_rrf[:rerank_limit]:
chunk = self._select_best_chunk(candidate)
if self._chunk_has_rerank_content(chunk):
rerank_candidates.append(candidate)
chunks.append(chunk)
else:
logger.debug(
"reranker_prefilter skip=empty_candidate filepath=%s content_type=%s",
getattr(candidate, "filepath", ""),
getattr(candidate, "content_type", "unknown"),
)

if not rerank_candidates:
path = "media_no_rerankable_candidates" if (has_query_media or has_media_candidates) else "no_rerankable_candidates"
logger.debug("reranker_path path=%s reason=empty_prefilter", path)
_log_stage_metrics(
"reranker",
candidates,
start_time=t0,
extra={"path": path, "rerank_top_k": rerank_limit},
)
return {c.filepath: 0.5 for c in candidates}, path

effective_query = query or ""

# Determine expected reranker scoring path for telemetry
Expand All @@ -1104,14 +1187,24 @@ def _rerank_candidates(
query_video_path=query_video_path,
)
logger.debug(
"reranker_path path=%s candidate_count=%d base_candidate_count=%d",
"reranker_path path=%s candidate_count=%d base_candidate_count=%d rerank_top_k=%d",
path,
len(rerank_candidates),
len(candidates_by_rrf),
rerank_limit,
)
rerank_scores = {c.filepath: 0.5 for c in candidates}
rerank_scores.update({c.filepath: s for c, s in zip(rerank_candidates, scores)})
_log_stage_metrics("reranker", candidates, start_time=t0, extra={"path": path})
_log_stage_metrics(
"reranker",
candidates,
start_time=t0,
extra={
"path": path,
"rerank_top_k": rerank_limit,
"reranked_count": len(rerank_candidates),
},
)
return rerank_scores, path
except Exception as e:
logger.error("Reranking failed: %s", e)
Expand Down
15 changes: 12 additions & 3 deletions tests/test_cross_modal_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,10 @@ class TestRerankerReceivesNonEmptyContent(unittest.TestCase):
def setUp(self):
self.env_patch = patch.dict(
os.environ,
{"RECALLFORGE_ENABLE_MEDIA_RERANKING": "1"},
{
"RECALLFORGE_ENABLE_MEDIA_RERANKING": "1",
"RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY": "0",
},
)
self.env_patch.start()
self.backend = StubBackend(mode="hybrid")
Expand Down Expand Up @@ -334,7 +337,10 @@ class TestMixedModalityHandling(unittest.TestCase):
def setUp(self):
self.env_patch = patch.dict(
os.environ,
{"RECALLFORGE_ENABLE_MEDIA_RERANKING": "1"},
{
"RECALLFORGE_ENABLE_MEDIA_RERANKING": "1",
"RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY": "0",
},
)
self.env_patch.start()
self.backend = StubBackend(mode="hybrid")
Expand Down Expand Up @@ -660,7 +666,10 @@ class TestCrossModalRegressionScenarios(unittest.TestCase):
def setUp(self):
self.env_patch = patch.dict(
os.environ,
{"RECALLFORGE_ENABLE_MEDIA_RERANKING": "1"},
{
"RECALLFORGE_ENABLE_MEDIA_RERANKING": "1",
"RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY": "0",
},
)
self.env_patch.start()
self.backend = StubBackend(mode="hybrid")
Expand Down
120 changes: 120 additions & 0 deletions tests/test_search_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,25 @@ def test_media_reranking_reads_env_override(self):
searcher = HybridSearcher(backend=backend, storage=storage)
self.assertTrue(searcher.enable_media_reranking)

def test_media_reranking_caps_read_env_overrides(self):
backend = StubBackend()
storage = StubStorage()
with patch.dict(
os.environ,
{
"RECALLFORGE_RERANK_TOP_K": "9",
"RECALLFORGE_MEDIA_QUERY_RERANK_TOP_K": "3",
"RECALLFORGE_MEDIA_RESULT_RERANK_TOP_K": "4",
"RECALLFORGE_MEDIA_RERANK_MIN_RRF_MARGIN": "0.4",
},
):
searcher = HybridSearcher(backend=backend, storage=storage)

self.assertEqual(searcher.rerank_top_k, 9)
self.assertEqual(searcher.media_query_rerank_top_k, 3)
self.assertEqual(searcher.media_result_rerank_top_k, 4)
self.assertEqual(searcher.media_rerank_min_rrf_margin, 0.4)


class TestBM25Probe(unittest.TestCase):
def test_bm25_probe_delegates_to_storage(self):
Expand Down Expand Up @@ -349,6 +368,107 @@ def test_rerank_top_k_zero_skips_reranker(self):
backend.rerank.assert_not_called()
self.assertEqual(scores["doc.md"], 0.5)

def test_media_query_reranking_uses_media_query_top_k_cap(self):
backend = StubBackend(mode="hybrid")
backend.rerank = MagicMock(return_value=[0.91, 0.72])
candidates = [
_make_search_result("a.md", 0.95),
_make_search_result("b.md", 0.92),
_make_search_result("c.md", 0.89),
_make_search_result("d.md", 0.86),
]
with patch.dict(
os.environ,
{
"RECALLFORGE_ENABLE_MEDIA_RERANKING": "1",
"RECALLFORGE_MEDIA_QUERY_RERANK_TOP_K": "2",
"RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY": "0",
},
):
searcher = HybridSearcher(backend=backend, storage=StubStorage(), rerank_top_k=10)

scores, path = searcher._rerank_candidates(
candidates,
query="caption text",
query_image_path="/tmp/query.png",
)

backend.rerank.assert_called_once()
rerank_docs = backend.rerank.call_args[0][1]
self.assertEqual([d["filepath"] for d in rerank_docs], ["a.md", "b.md"])
self.assertEqual(scores["a.md"], 0.91)
self.assertEqual(scores["b.md"], 0.72)
self.assertEqual(scores["c.md"], 0.5)
self.assertEqual(path, "vl_image")

def test_media_result_reranking_uses_media_result_top_k_cap(self):
backend = StubBackend(mode="hybrid")
backend.rerank = MagicMock(return_value=[0.81])
candidates = [
_make_search_result("img.png", 0.95, content_type="image"),
_make_search_result("doc.md", 0.92),
_make_search_result("other.md", 0.89),
]
with patch.dict(
os.environ,
{
"RECALLFORGE_ENABLE_MEDIA_RERANKING": "1",
"RECALLFORGE_MEDIA_RESULT_RERANK_TOP_K": "1",
"RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY": "0",
},
):
searcher = HybridSearcher(backend=backend, storage=StubStorage(), rerank_top_k=10)

scores, path = searcher._rerank_candidates(candidates, query="diagram")

backend.rerank.assert_called_once()
rerank_docs = backend.rerank.call_args[0][1]
self.assertEqual([d["filepath"] for d in rerank_docs], ["img.png"])
self.assertEqual(scores["img.png"], 0.81)
self.assertEqual(scores["doc.md"], 0.5)
self.assertEqual(path, "text")

def test_media_reranking_skips_when_rrf_margin_is_confident(self):
backend = StubBackend(mode="hybrid")
backend.rerank = MagicMock()
candidates = [
_make_search_result("img.png", 1.0, content_type="image"),
_make_search_result("doc.md", 0.5),
]
with patch.dict(os.environ, {"RECALLFORGE_ENABLE_MEDIA_RERANKING": "1"}):
searcher = HybridSearcher(backend=backend, storage=StubStorage(), rerank_top_k=10)

scores, path = searcher._rerank_candidates(candidates, query="diagram")

backend.rerank.assert_not_called()
self.assertEqual(scores, {"img.png": 0.5, "doc.md": 0.5})
self.assertEqual(path, "media_confident_skip")

def test_media_reranking_prefilters_empty_media_candidates(self):
backend = StubBackend(mode="hybrid")
backend.rerank = MagicMock(return_value=[0.73])
empty_media = _make_search_result("missing.png", 0.95, content_type="image")
empty_media.body = ""
text_candidate = _make_search_result("doc.md", 0.92)
candidates = [empty_media, text_candidate]
with patch.dict(
os.environ,
{
"RECALLFORGE_ENABLE_MEDIA_RERANKING": "1",
"RECALLFORGE_MEDIA_RERANK_REQUIRE_AMBIGUITY": "0",
},
):
searcher = HybridSearcher(backend=backend, storage=StubStorage(), rerank_top_k=2)

scores, path = searcher._rerank_candidates(candidates, query="diagram")

backend.rerank.assert_called_once()
rerank_docs = backend.rerank.call_args[0][1]
self.assertEqual([d["filepath"] for d in rerank_docs], ["doc.md"])
self.assertEqual(scores["missing.png"], 0.5)
self.assertEqual(scores["doc.md"], 0.73)
self.assertEqual(path, "text")

def test_embed_mode_returns_default_score(self):
backend = StubBackend(mode="embed")
searcher = HybridSearcher(backend=backend, storage=StubStorage())
Expand Down