Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/paperscout/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,15 @@ def _pool_status(p) -> dict:
def _extra_health_fields() -> dict:
lsp = scheduler._last_successful_poll
s = scheduler._last_probe_stats
total = sum(s.get(k, 0) for k in ("hit_recent", "hit_old", "hit_no_lm", "miss", "error"))
hit_rate = (s.get("hit_recent", 0) + s.get("hit_old", 0)) / total if total > 0 else None
# HTTP 200 outcomes / non-skipped probe attempts (excludes skipped_discovered, skipped_in_index).
hits = s.get("hit_recent", 0) + s.get("hit_old", 0) + s.get("hit_no_lm", 0)
attempted = hits + s.get("miss", 0) + s.get("error", 0)
probe_success_rate = hits / attempted if attempted > 0 else None
return {
"last_successful_poll": (
datetime.fromtimestamp(lsp, tz=timezone.utc).isoformat() if lsp else None
),
"probe_hit_rate": hit_rate,
"probe_success_rate": probe_success_rate,
"mq_depth": mq.depth(),
"db_pool": _pool_status(pool),
}
Expand Down
77 changes: 66 additions & 11 deletions src/paperscout/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ def diff_snapshots(
):
updated_papers.append(paper)

new_papers.sort(key=lambda p: p.date or "", reverse=True)
def _paper_sort_key(p: Paper) -> tuple[str, str]:
return (p.date or "", p.id)

new_papers.sort(key=_paper_sort_key, reverse=True)
updated_papers.sort(key=_paper_sort_key, reverse=True)
return DiffResult(new_papers=new_papers, updated_papers=updated_papers)


Expand All @@ -71,6 +75,14 @@ class DPTransition:
discovered_at: float


@dataclass(slots=True)
class SeedResult:
"""Outcome of ``seed()``: probe hits from the seed cycle and whether DB had prior state."""

probe_hits: list[ProbeHit]
had_prior_state: bool


class PollResult:
"""Outcome of one poll: index diff, probe hits, D→P transitions, per-user matches."""

Expand Down Expand Up @@ -117,8 +129,13 @@ def __init__(
self._last_probe_stats: dict[str, int] = {}
self._last_ops_alert: float | None = None

async def seed(self) -> None:
"""First-run: gather all current papers from all sources without notifying."""
async def seed(self) -> SeedResult:
"""Gather current index and probe state.

Cold first deploy: no notifications from seed. On restart (prior poll or
discovered URLs), ``poll_once`` may notify for recent probe hits from this seed cycle.
"""
had_prior_state = self.state.last_poll > 0 or len(self.state.get_all_discovered()) > 0
t0 = time.monotonic()
log.info("SEED-START seeding local database from all sources")

Expand All @@ -128,19 +145,20 @@ async def seed(self) -> None:

self._previous_papers = dict(self.index.papers)

hits: list[ProbeHit] = []
if self.cfg.enable_iso_probe:
hits = await self.prober.run_cycle()
for hit in hits:
self.state.mark_discovered(hit.url)
log.info("SEED isocpp.org probe existing=%d", len(hits))

self._seeded = True
log.info(
"SEED-DONE elapsed=%.1fs papers=%d discovered=%d",
"SEED-DONE elapsed=%.1fs papers=%d discovered=%d had_prior_state=%s",
time.monotonic() - t0,
len(self._previous_papers),
len(self.state.get_all_discovered()),
had_prior_state,
)
return SeedResult(probe_hits=hits, had_prior_state=had_prior_state)

async def poll_once(self) -> PollResult:
"""Refresh index (if enabled), diff, probe isocpp, compute matches, notify."""
Expand All @@ -149,13 +167,50 @@ async def poll_once(self) -> PollResult:
log.info("POLL-START poll=%d", self._poll_count)

if not self._seeded:
await self.seed()
self._last_successful_poll = time.time()
self._last_probe_stats = self.prober.snapshot_stats()
return PollResult(
seed_result = await self.seed()
if not seed_result.had_prior_state:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
self._last_successful_poll = time.time()
self._last_probe_stats = self.prober.snapshot_stats()
return PollResult(
diff=DiffResult(new_papers=[], updated_papers=[]),
probe_hits=[],
)

probe_hits = seed_result.probe_hits
recent_hits = [h for h in probe_hits if h.is_recent]
old_hits = [h for h in probe_hits if not h.is_recent]
if old_hits:
log.info(
"PROBE-OLD %d hits with Last-Modified outside %dh window "
"(recorded to discovered, no alert)",
len(old_hits),
self.cfg.alert_modified_hours,
)

per_user_matches = await run_blocking_io(
self.user_watchlist.matches_for_users,
[],
recent_hits,
)
for uid, m in per_user_matches.items():
log.info(
"WATCHLIST-MATCH user=%s papers=%d probe_hits=%d",
uid,
len(m.papers),
len(m.probe_hits),
)

result = PollResult(
diff=DiffResult(new_papers=[], updated_papers=[]),
probe_hits=[],
probe_hits=recent_hits,
dp_transitions=[],
per_user_matches=per_user_matches,
)
if self.notify_callback:
self.notify_callback(result)
self._last_successful_poll = time.time()
self._last_probe_stats = self.prober.snapshot_stats()
return result

previous = dict(self._previous_papers)

Expand Down
4 changes: 2 additions & 2 deletions tests/test_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def health_url_with_extras():
lambda: 42,
extra_fields_fn=lambda: {
"last_successful_poll": "2026-03-16T12:00:00+00:00",
"probe_hit_rate": 0.5,
"probe_success_rate": 0.5,
"mq_depth": 3,
"db_pool": {"max": 10, "in_use": 1, "available": 9},
},
Expand Down Expand Up @@ -108,6 +108,6 @@ def test_health_extra_fields_merged(self, health_url_with_extras):
assert "version" in data
assert "last_successful_poll" in data
assert data["last_successful_poll"] == "2026-03-16T12:00:00+00:00"
assert data["probe_hit_rate"] == 0.5
assert data["probe_success_rate"] == 0.5
assert data["mq_depth"] == 3
assert data["db_pool"] == {"max": 10, "in_use": 1, "available": 9}
77 changes: 75 additions & 2 deletions tests/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ def test_new_papers_sorted_by_date_descending(self):
dates = [p.date for p in result.new_papers]
assert dates == sorted(dates, reverse=True)

def test_updated_papers_sorted_by_date_descending(self):
prev = {
"P2300R10": self._paper("P2300R10", title="Old A", date="2024-01-01"),
"P2301R0": self._paper("P2301R0", title="Old B", date="2024-03-01"),
"P2302R0": self._paper("P2302R0", title="Old C", date="2024-06-01"),
}
curr = {
"P2300R10": self._paper("P2300R10", title="New A", date="2024-01-01"),
"P2301R0": self._paper("P2301R0", title="New B", date="2024-06-01"),
"P2302R0": self._paper("P2302R0", title="New C", date="2024-03-01"),
}
result = diff_snapshots(prev, curr)
dates = [p.date for p in result.updated_papers]
assert dates == sorted(dates, reverse=True)

def test_empty_to_empty(self):
result = diff_snapshots({}, {})
assert result.new_papers == [] and result.updated_papers == []
Expand Down Expand Up @@ -168,6 +183,7 @@ def _make_scheduler(fake_pool, **cfg_overrides):
index.papers = {}
prober = MagicMock(spec=ISOProber)
prober.run_cycle = AsyncMock(return_value=[])
prober.snapshot_stats = MagicMock(return_value={})
prober._stats = {}
user_watchlist = MagicMock(spec=UserWatchlist)
user_watchlist.matches_for_users.return_value = {}
Expand Down Expand Up @@ -351,6 +367,57 @@ async def test_poll_once_calls_notify_callback(self, fake_pool):
await scheduler.poll_once() # real poll
assert len(notified) == 1

async def test_cold_start_first_poll_does_not_notify(self, fake_pool):
notified = []
scheduler, _, _, _, _ = _make_scheduler(fake_pool)
scheduler.notify_callback = notified.append
result = await scheduler.poll_once()
assert notified == []
assert result.probe_hits == []

async def test_restart_with_prior_poll_notifies_seed_hits(self, fake_pool):
notified = []
scheduler, _, prober, user_watchlist, state = _make_scheduler(fake_pool)
scheduler.notify_callback = notified.append
state.touch_poll()
hit = _recent_hit()
prober.run_cycle = AsyncMock(return_value=[hit])
user_watchlist.matches_for_users.return_value = {
"U123": PerUserMatches(papers=[], probe_hits=[(hit, "author")])
}
result = await scheduler.poll_once()
assert len(notified) == 1
assert len(result.probe_hits) == 1
assert result.probe_hits[0].is_recent is True

async def test_restart_with_discovered_urls_notifies(self, fake_pool):
notified = []
scheduler, _, prober, user_watchlist, state = _make_scheduler(fake_pool)
scheduler.notify_callback = notified.append
state.mark_discovered("https://isocpp.org/files/papers/D1111R0.pdf")
hit = _recent_hit()
prober.run_cycle = AsyncMock(return_value=[hit])
user_watchlist.matches_for_users.return_value = {
"U123": PerUserMatches(papers=[], probe_hits=[(hit, "author")])
}
result = await scheduler.poll_once()
assert len(notified) == 1
assert len(result.probe_hits) == 1

async def test_restart_seed_old_hits_not_in_result(self, fake_pool, caplog):
import logging

notified = []
scheduler, _, prober, _, state = _make_scheduler(fake_pool)
scheduler.notify_callback = notified.append
state.touch_poll()
old = _old_hit()
prober.run_cycle = AsyncMock(return_value=[old])
with caplog.at_level(logging.INFO):
result = await scheduler.poll_once()
assert result.probe_hits == []
assert "PROBE-OLD" in caplog.text

async def test_poll_once_skips_refresh_when_disabled(self, fake_pool):
scheduler, index, _, _, _ = _make_scheduler(fake_pool, enable_bulk_wg21=False)
scheduler._seeded = True
Expand All @@ -368,8 +435,14 @@ async def test_poll_once_skips_probe_when_disabled(self, fake_pool):
async def test_seed_marks_discovered(self, fake_pool):
scheduler, _, prober, _, state = _make_scheduler(fake_pool)
hit = _recent_hit()
prober.run_cycle = AsyncMock(return_value=[hit])
await scheduler.seed()

async def fake_run_cycle():
state.mark_discovered(hit.url)
return [hit]

prober.run_cycle = AsyncMock(side_effect=fake_run_cycle)
seed_result = await scheduler.seed()
assert seed_result.probe_hits == [hit]
assert state.is_discovered(hit.url)

async def test_run_forever_calls_poll_and_breaks_on_cancel(self, fake_pool):
Expand Down