Change MetricsAggregatorService to publisher to support live metrics without shmem#306
Conversation
|
MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅ |
There was a problem hiding this comment.
Code Review
This pull request refactors the metrics aggregation system to use a registry-based architecture with HDR histograms and a generalized pub/sub transport layer, replacing the legacy mmap-backed storage. The update introduces periodic snapshot publishing with disk fallback and updates reporting logic to consume these snapshots. Feedback suggests improving encapsulation by exposing in-flight task metrics through public properties and adopting a more numerically stable variance formula for high-precision latency calculations. Additionally, several legacy tests have been skipped pending migration to the new system.
There was a problem hiding this comment.
(superseded — see #pullrequestreview-4237753512 for the full review-council output)
arekay-nv
left a comment
There was a problem hiding this comment.
Review Council — Multi-AI Code Review
Reviewed by: Codex + Claude | Depth: thorough
Found 12 issues across 7 files (excluding tests, pyproject.toml, uv.lock).
⚠️ GitHub's review-comment API is currently returning a persistentpull_request_review_thread.baseinternal error on this PR for inline comments vialine+sideandposition-based posts (the github-code-quality and gemini-code-assist bot comments from yesterday were unaffected). Falling back to a single review with file:line links — clicking each link opens the PR diff at the cited line.
Note: the existing gemini comments on aggregator.py:286/342/353/354 already cover the encapsulation concern around table._in_flight_tasks direct access, so that issue is intentionally not duplicated here. The cancel-without-await issue (#4 below) is a separate concurrency concern at the same file:line.
🔴 Must Fix (critical/high)
Issues that will cause incorrect behavior, data loss, or security problems in production.
| # | File:Line | Reviewer | Category | Summary |
|---|---|---|---|---|
| 1 | src/inference_endpoint/commands/benchmark/execute.py:482 |
Claude | data-integrity | Subscriber late-binding can drop initial ticks (ZMQ slow-joiner) |
| 2 | src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py:168 |
Claude | performance | _write_atomic_fallback runs blocking I/O on the event loop |
| 3 | src/inference_endpoint/async_utils/services/metrics_aggregator/registry.py:208 |
Codex | performance | Unbounded raw-sample retention in SeriesSampler |
🟡 Should Fix (medium)
Real issues that trigger under specific conditions or design flaws that will compound.
| # | File:Line | Reviewer | Category | Summary |
|---|---|---|---|---|
| 4 | src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py:353 |
Claude | error-handling | Cancellations not awaited before reading n_pending |
| 5 | src/inference_endpoint/async_utils/services/metrics_aggregator/registry.py:161 |
Claude | bug | HdrHistogram(low, high, sig_figs) constructed without high >= 2*low check |
| 6 | src/inference_endpoint/async_utils/services/metrics_aggregator/subscriber.py:55 |
Claude | data-integrity | conflate=True default for the Report consumer is fragile |
| 7 | src/inference_endpoint/commands/benchmark/execute.py:423 |
Codex | data-integrity | Stale final_snapshot.msgpack not cleared on report-dir reuse |
🔵 Consider (low)
Valid improvements that could be follow-ups.
| # | File:Line | Reviewer | Category | Summary |
|---|---|---|---|---|
| 8 | src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py:281 |
Claude | bug | Double STARTED reassigns and orphans the tick task |
| 9 | src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py:195 |
Claude | concurrency | close() cancels tick task but doesn't await it |
| 10 | src/inference_endpoint/async_utils/services/metrics_aggregator/__main__.py:166 |
Claude | error-handling | No top-level exception handling around run_until_complete(main()) |
| 11 | src/inference_endpoint/load_generator/session.py:408 |
Claude | design | ERROR-before-COMPLETE relies on an undocumented publisher ordering contract |
| 12 | src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py:132 |
Claude | bug | SIGTERM bypasses publish_final |
Detailed findings
1. 🔴 high · data-integrity · [Claude]
src/inference_endpoint/commands/benchmark/execute.py:482
Subscriber late-binding can drop initial ticks (ZMQ slow-joiner)
MetricsSnapshotSubscriber is constructed (482) and start()ed (485) AFTER launcher.launch() waits for aggregator readiness. The aggregator can begin publishing as soon as STARTED is observed; ZMQ slow-joiner means snapshots emitted before the SUB handshake completes are dropped. The comment on line 480 acknowledges this. On a busy host the subscriber may also miss COMPLETE if the publisher closes before the subscription warms up, silently degrading to LIVE/DRAINING (incomplete report). Fix: move subscriber construction + start() BEFORE launcher.launch(). Connecting to a not-yet-bound IPC path is fine — ZMQ retries connect transparently.
2. 🔴 high · performance · [Claude]
src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py:168
_write_atomic_fallback runs blocking I/O on the event loop
f.write + f.flush + two os.fsync (file + parent dir) + os.rename execute synchronously on the aggregator subprocess's main event loop (awaited via publish_final from aggregator.py:362). On a busy box os.fsync can block tens-to-hundreds of ms — long enough to back-pressure event-record processing if any events are still in flight at finalization. Fix: wrap with await asyncio.to_thread(self._write_atomic_fallback, payload).
3. 🔴 high · performance · [Codex]
src/inference_endpoint/async_utils/services/metrics_aggregator/registry.py:208
Unbounded raw-sample retention in SeriesSampler
self._raw.append(value) retains every observed value in an array.array for the full run so the final snapshot can recompute exact percentiles. The prior mmap-backed store spilled this to disk; the new path keeps it in the aggregator subprocess RAM, so memory now scales linearly with run length. A 5-min 50k-QPS run produces ~15M samples (~120MB per int64 series); once latency/ISL/OSL/TTFT/TPOT are all tracked, several hundred MB; longer runs OOM or swap heavily before the report is emitted. Fix: periodically spill _raw to disk, cap with reservoir sampling, or rely solely on the HDR digest for percentiles.
4. 🟡 medium · error-handling · [Claude]
src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py:353
Cancellations not awaited before reading n_pending
After the drain timeout, t.cancel() only schedules cancellation at the next await point. n_pending = len(table._in_flight_tasks) on the next line therefore reads a count that's still high, and the not-yet-cancelled tasks may still be running when _publisher.close() and the loop teardown happen → "Task was destroyed but it is pending!" warnings on shutdown. Fix: after t.cancel(), do await asyncio.gather(*tasks, return_exceptions=True) (bounded by a short timeout) before recomputing n_pending. Independent of the encapsulation point gemini already raised.
5. 🟡 medium · bug · [Claude]
src/inference_endpoint/async_utils/services/metrics_aggregator/registry.py:161
HdrHistogram(low, high, sig_figs) constructed without high >= 2*low check
hdrh requires high >= 2*low. Current bounds (1ns–3.6e12ns, 1–1e7 tokens) satisfy this, but a future caller passing e.g. hdr_low=1, hdr_high=1 (or any hdr_high < 2*hdr_low) gets an opaque ValueError from deep inside the C library. Fix: explicit pre-check after the clamps: if self._hdr_high < self._hdr_low * 2: raise ValueError(...).
6. 🟡 medium · data-integrity · [Claude]
src/inference_endpoint/async_utils/services/metrics_aggregator/subscriber.py:55
conflate=True default for the Report consumer is fragile
The docstring argues this is safe because COMPLETE is the publisher's last message, but ZMQ CONFLATE only keeps the most recent unread message. If the subscriber's loop is starved (main process busy in wait_for_exit) and the publisher closes before the SUB sees COMPLETE (LINGER expires, IPC socket unlinked), complete never fires and latest may be a stale LIVE. The 2 s wait_for_complete timeout in execute.py:548 papers over most cases, but consider conflate=False for the Report consumer (a single producer at a few snapshots/sec, not a TUI).
7. 🟡 medium · data-integrity · [Codex]
src/inference_endpoint/commands/benchmark/execute.py:423
Stale final_snapshot.msgpack not cleared on report-dir reuse
metrics_output_dir = ctx.report_dir / "metrics" is created (423) but an existing final_snapshot.msgpack from a prior run is never removed. If a user reuses --report-dir and the new run exits before publishing a fresh final snapshot (SIGTERM, crash), _load_final_snapshot_from_disk() decodes the previous run's file and silently builds the new report from stale metrics. Fix: unlink any pre-existing final_snapshot.msgpack here, OR refuse to decode a snapshot whose timestamp/run-id doesn't match the current run.
8. 🔵 low · bug · [Claude]
src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py:281
Double STARTED reassigns and orphans the tick task
self._publisher.start(...) is called every time a SessionEventType.STARTED record is observed. If two STARTED events ever land in the EventRecord stream (replay, buggy producer, test fixture), start() reassigns _tick_task; the previous task keeps running until garbage-collected and races with the new task to publish snapshots. Fix: guard with if self._tick_task is None: ... inside publisher.start(), or only call start() on the first STARTED in process().
9. 🔵 low · concurrency · [Claude]
src/inference_endpoint/async_utils/services/metrics_aggregator/publisher.py:195
close() cancels tick task but doesn't await it
self._tick_task.cancel() (205) is followed immediately by self._publisher.close() (206). If close() is invoked from a sync path, the cancelled task may still be runnable when the underlying ZMQ publisher socket is closed → brief window where _tick could call publish() on a closed transport. ZmqMessagePublisher.publish() early-returns on is_closed so this isn't fatal, but produces noisy CancelledError-during-shutdown traces. Fix: make close() async-aware (mirror publish_final) or document that callers must await publish_final (which already cancels the task) before close.
10. 🔵 low · error-handling · [Claude]
src/inference_endpoint/async_utils/services/metrics_aggregator/__main__.py:166
No top-level exception handling around run_until_complete(main())
If main() raises (bad CLI args, ZMQ bind failure, tokenizer load error), the subprocess exits with a stack trace but the parent's ServiceLauncher.wait_for_exit sees only a non-zero exit code with no diagnostic context propagated upward. Fix: wrap with try/except + structured logging so failures are surfaced in the parent's logs alongside the exit code.
11. 🔵 low · design · [Claude]
src/inference_endpoint/load_generator/session.py:408
ERROR-before-COMPLETE relies on an undocumented publisher ordering contract
The aggregator's TRACKED_SAMPLES_FAILED logic (aggregator.py:303–305) assumes ERROR is observed BEFORE COMPLETE removes the row. The ordering is correct here, but it depends on ZmqMessagePublisher preserving publish() order through batching AND ZMQ PUB→SUB delivering in order to a single SUB (both currently true). A short comment in session.py noting "publisher must preserve publish-order" would harden this against future transport refactors.
12. 🔵 low · bug · [Claude]
src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py:132
SIGTERM bypasses publish_final
_finalize() only sets _shutdown_event after publish_final and _publisher.close() complete, but __main__.py doesn't install a SIGTERM handler. On parent-side launcher.kill_all (or any external SIGTERM before ENDED arrives), neither pub/sub COMPLETE nor disk fallback runs, leaving the parent's triple-redundant snapshot path empty. Fix: loop.add_signal_handler(SIGTERM, ...) to flush the final snapshot defensively before exit.
Generated by /review-council — Codex gpt-5.4 (review of git diff against main) + Claude (direct file review with HEAD-source line verification).
Two high-severity issues raised by the review-council pass on PR #306: 1. (#306-1) Subscriber late-binding could drop early ticks via the ZMQ slow-joiner pattern. Move MetricsSnapshotSubscriber construction + start() BEFORE launcher.launch() so the SUB handshake completes during the subprocess-spawn window. ZMQ tolerates connect-before- bind on IPC — the connect resolves once the binder appears. The prior ordering (subscribe AFTER launch returns) had a window where the aggregator could begin ticking on STARTED before the SUB subscription warmed up, dropping early live snapshots and, in the worst case, missing COMPLETE entirely. 2. (#306-2) MetricsPublisher._write_atomic_fallback runs synchronous f.flush + fsync(file) + fsync(parent dir) + rename on the aggregator's event loop. On a busy host this can block tens-to- hundreds of ms — long enough to back-pressure event-record processing. Wrap with asyncio.to_thread inside publish_final. Both fixes are localized — no API changes, no test changes required. Existing integration tests (test_concurrency_benchmark, test_end_to_ end_oracle) exercise both paths end-to-end and still pass. The third P0 item (#306-3, unbounded raw-sample retention) is the agreed memory trade documented in metrics_pubsub_design_v5.md §11; addressed by adding "--persist-raw" as a tracked follow-up rather than a code change in this PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two high-severity issues raised by the review-council pass on PR #306: 1. (#306-1) Subscriber late-binding could drop early ticks via the ZMQ slow-joiner pattern. Move MetricsSnapshotSubscriber construction + start() BEFORE launcher.launch() so the SUB handshake completes during the subprocess-spawn window. ZMQ tolerates connect-before- bind on IPC — the connect resolves once the binder appears. The prior ordering (subscribe AFTER launch returns) had a window where the aggregator could begin ticking on STARTED before the SUB subscription warmed up, dropping early live snapshots and, in the worst case, missing COMPLETE entirely. 2. (#306-2) MetricsPublisher._write_atomic_fallback runs synchronous f.flush + fsync(file) + fsync(parent dir) + rename on the aggregator's event loop. On a busy host this can block tens-to- hundreds of ms — long enough to back-pressure event-record processing. Wrap with asyncio.to_thread inside publish_final. Both fixes are localized — no API changes, no test changes required. Existing integration tests (test_concurrency_benchmark, test_end_to_ end_oracle) exercise both paths end-to-end and still pass. The third P0 item (#306-3, unbounded raw-sample retention) is the agreed memory trade documented in metrics_pubsub_design_v5.md §11; addressed by adding "--persist-raw" as a tracked follow-up rather than a code change in this PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
85dfac6 to
4110447
Compare
arekay-nv
left a comment
There was a problem hiding this comment.
Please do a performance test as well to determine what the limits of the interval and qps are.
Address PR #306 review comments from gemini-code-assist (encapsulation) and github-code-quality (non-iterable enum loop): - Add `MetricsTable.in_flight_tasks_count` property so the aggregator no longer reaches into `table._in_flight_tasks` to report pending- task counts on snapshots and drain logging. - Add `MetricsTable.cancel_in_flight_tasks()` returning the list of cancelled tasks (sets up the T3 await-cancellations fix). - Update aggregator.py call sites accordingly. - Use `MetricCounterKey.__members__.values()` in test_report_builder to satisfy CodeQL's "non-iterable used in for-loop" check. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 review comments from arekay-nv asking for the "interval" naming convention used elsewhere in the repo (e.g. `check_interval` in worker_manager, `interval` in benchmark_httpclient). - CLI flag `--refresh-hz <Hz>` → `--publish-interval <seconds>` (default 4.0 Hz → 0.25 s; same wire cadence). - Constructor parameters `refresh_hz` (aggregator + publisher) → `publish_interval_s`. The `_s` suffix makes the unit explicit so call sites can't accidentally pass a frequency. - Internal field `_refresh_hz` → `_publish_interval_s`. - Drops the `period = 1.0 / refresh_hz` indirection in publisher.start. - Tests / AGENTS.md updated accordingly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 gemini-code-assist comment on report.py:53. For ns-precision latency series (`SAMPLE_LATENCY_NS`, `TTFT_NS`, `TPOT_NS`, etc.) the rollups store `total` and `sum_sq` as Python ints that can grow to many digits. The previous formula `sum_sq - total*total / n` evaluates `total*total / n` as a float and catastrophically cancels against `sum_sq` when the variance is small relative to the mean, producing a negative variance numerator the sqrt() then clamps to 0. Use the exact integer numerator `n*sum_sq - total*total` when the inputs are ints (this is what the math.sqrt sees, no cancellation), falling back to the float form for series whose dtype is float (currently only TPOT, where the magnitudes are small enough that the naive form is fine). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 review-council items #4, #9, #12. #4 — Cancellations not awaited before reading n_pending After a drain timeout, the aggregator's `t.cancel()` loop only *scheduled* cancellation; reading `n_pending` on the next line therefore reported a stale-high count and left the to-be-cancelled tasks runnable when the loop tore down. Now `await asyncio.gather(*cancelled, return_exceptions=True)` runs before `n_pending = table.in_flight_tasks_count`, so the snapshot reflects the post-cancellation set and the cancelled tasks actually exit. #9 — close() cancels tick task but doesn't await it Added `MetricsPublisher.aclose()` (async) that cancels the tick task AND awaits its exit before closing the underlying transport. Aggregator's post-publish_final path and __main__.py's finally block now use it. Sync `close()` is kept for sync error-path fallbacks with a docstring noting the race. #12 — SIGTERM bypasses publish_final Installed `SIGTERM` and `SIGINT` handlers in __main__.py that fire `publish_final` defensively before setting `shutdown_event`. Added `MetricsPublisher._finalized` so the SIGTERM-triggered and the ENDED-triggered paths are safe to race — only the first call publishes a COMPLETE frame. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 council review #5 (registry.py:161). The C-backed hdrhistogram constructor requires `high >= 2*low` but raises an opaque allocation error if that doesn't hold — making it hard to debug a misconfigured `register_series` call. Add an explicit pre-check after the low/high clamps so the error names the series and both values up front. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 council review #8 (aggregator.py:281). A repeat `SessionEventType.STARTED` (replay buffer, buggy producer, test fixture) used to make `MetricsPublisher.start` overwrite `_tick_task`, orphaning the first tick task — it kept running until GC and raced the new task to publish snapshots. Make `start` idempotent: if `_tick_task` is already set, log a warning and return without spawning a second task. The original task remains the one `publish_final` / `aclose` cancels and awaits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 council review #10 (__main__.py:166). Wrap the top-level `run_until_complete(main())` so startup / bind / tokenizer-load failures emit a structured `logger.exception` before the interpreter prints the traceback. The parent's ServiceLauncher previously saw only the non-zero exit code and a raw stderr trace with no context to correlate against the parent's logs. `SystemExit` is re-raised untouched so argparse usage / explicit sys.exit paths stay user-facing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…LETE Address PR #306 council review #11 (session.py:408). The metrics aggregator's `TRACKED_SAMPLES_FAILED` accounting relies on the publisher delivering ERROR strictly before COMPLETE for a failed sample. The ordering is correct today (ZMQ PUB→SUB in-order delivery, ZmqMessagePublisher batches without reordering), but it's an implicit contract — a future transport refactor that breaks it would break tracked-failure counting silently. Document the invariant inline so that future refactors trip over it instead of past it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 arekay-nv inline comment on aggregator.py:99 ("This might need to be higher"). Analysis: at the system's design point (50k QPS short-context, default 2 tokenizer workers) the 30 s drain finishes in well under a second. Long-context tokenize workloads can push the backlog higher — a 32k- context 5k-QPS run with 2 workers can take ~100 s to drain. The right knob there is `--tokenizer-workers`, not the drain budget, but giving the user a CLI handle makes both ends tunable without redeploying. Changes: - Default drain budget bumped 30s → 60s. Covers normal + long-context at the default 2 workers without inflating the high-QPS short- context case (we exit early when drain_tasks returns). - New `--drain-timeout <seconds>` CLI flag plumbed through the aggregator subprocess and into `MetricsAggregatorService` as a constructor arg `drain_timeout_s`. The kwarg is positionable (not a global) so callers can inject test values without monkey-patching the module-level constant. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 arekay-nv inline comments on snapshot.py:35 (add an `INITIALIZE` state preceding `LIVE`) and test_snapshot.py:67 (add state-check tests). - Add `SessionState.INITIALIZE = "initialize"` to the wire schema; the aggregator now starts in INITIALIZE and transitions to LIVE on the first STARTED event. The state machine is forward-only: INITIALIZE → LIVE → DRAINING → COMPLETE. - No INITIALIZE snapshot is emitted today (the tick task only starts on the first STARTED), but the state exists as the well-defined starting point and so a future setup-phase tick has a state to carry. Wire compatibility is preserved — INITIALIZE round-trips through the codec (test added). - New `TestSessionStateTransitions` pins: member set, declaration order (consumers can rely on `list(SessionState)` for forward checks), the `complete = state == COMPLETE and n_pending_tasks == 0` rule across every state, and the INITIALIZE round-trip. - AGENTS.md updated. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 arekay-nv inline comment on registry.py:118 ("can we add tests to ensure that the behavior is fixed and any changes are caught by tests, specifically the internal points/boundaries"). New `TestSeriesSamplerBoundaries` class pins: - HDR construction-time invariants: `high < 2*low` rejected, equality case accepted, `low=0` coerced to 1, unsupported dtype rejected. - Clamp behavior at the HDR bounds: values exactly at `hdr_low` / `hdr_high` are unclamped and don't trip the warn-once flag. - Under- and over-bound clamping: warn-once fires exactly once per sampler, raw values stay un-clamped (only HDR's view is clamped). - Float dtype uses float comparison for the lower clamp (so sub-integer under-bound values are still detected). - sig_figs at HDR-supported extremes (1 and 5) construct and record. - Rollup edges: count==1 (min==max==total, sum_sq==v^2) and the empty case (count==0, histogram==[]). - Warn-once flag is per-sampler, not process-global. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Decouple the two delivery paths so the Report consumer no longer depends on pub/sub terminal-frame survivability. Closes PR #306 council #6 (conflate=True fragility for the Report consumer). Architecture change: - `MetricsPublisher.publish_final(..., interrupted: bool = False)` now atomically writes `final_snapshot.json` (pretty-printed, dict form) as the **primary** Report source AND publishes the terminal-state snapshot over pub/sub as a **TUI shutdown signal**. Disk write and pub/sub send are independent best-effort paths. - Signal handler in `__main__.py` invokes `publish_final(interrupted= True)` so SIGTERM/SIGINT writes a snapshot tagged `INTERRUPTED` (introduced in the prior commit) — distinguishes "user killed the run mid-execution" from a clean shutdown. - `MetricsSnapshotSubscriber` is now TUI-only: stripped `complete`, `_complete_event`, `wait_for_complete`. `conflate=True` is the unambiguous default — no Report-consumer fragility to reason about. - `execute.py` reads `final_snapshot.json` via `json.loads` straight to the dict form, drops the 2 s `wait_for_complete` window and the triple-redundant fallback chain. Single fallback: if the file is missing (SIGKILL/OOM before the signal handler ran), convert the subscriber's `latest` live snapshot via `snapshot_to_dict` and mark the report incomplete. - `Report.from_snapshot` now accepts a dict (the consumer contract). All field reads use `dict.get(...)` with defaults that produce an honest "incomplete" report on missing fields rather than crashing. Surfaces a `state: str` field so `display()` renders an explicit INTERRUPTED indicator. - New `snapshot_to_dict()` in `snapshot.py` is the one-way bridge from the wire `MetricsSnapshot` (array_like=True, compact msgpack) to the dict form used by both the file writer and any consumer that needs to feed a live Struct into Report. The inverse is intentionally absent — see `Report.from_snapshot` docstring for the rationale. Tests rewritten: - `test_publisher.py`: assertions read JSON from disk instead of msgpack, new test for `interrupted=True` writing `state=interrupted`. - `test_aggregator_e2e.py`: covers both delivery paths (JSON file + pub/sub terminal frame). - `test_report_builder.py`: routes through `snapshot_to_dict`; new tests for INTERRUPTED display, empty-dict defaults, and malformed metric entries. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Introduces the three primitives that the upcoming pub/sub metrics path will compose on top of: - snapshot.py: MetricsSnapshot wire struct (msgspec, tagged union of CounterStat | SeriesStat) plus SessionState enum (LIVE / DRAINING / COMPLETE) and msgpack codec. - registry.py: MetricsRegistry holding CounterSamplers and SeriesSamplers. Series samplers carry an HDR Histogram for cheap live percentiles, an array.array of raw values for exact-final computation, and exact rollup primitives. Histogram bucket edges are log-spaced over the observed [min, max] per snapshot, so they auto-zoom to data instead of wasting buckets on empty range. - New unit tests cover the wire codec round-trip, sampler hot path, and registry registration/collision behavior. Adds hdrhistogram==0.10.3 as a runtime dependency. Wiring of these primitives into the aggregator and removal of the old KVStore path follow in subsequent commits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- publisher.py: MetricsPublisher owns the periodic tick task that publishes live MetricsSnapshots over IPC pub/sub at refresh_hz, plus publish_final() which is awaited by the aggregator on ENDED. Final delivery is dual-path: * pub/sub publish (best-effort, telemetry knobs sndhwm=4, linger=10s) * disk fallback (atomic: tmp + fsync(file) + rename + fsync(parent dir)) Both paths are independently wrapped in try/except — neither failure suppresses the other. publish_final is async and awaits tick-task cancellation before publishing COMPLETE so a late LIVE/DRAINING tick can never land after COMPLETE on the wire. - subscriber.py: MetricsSnapshotSubscriber tracks ``latest`` and the ``COMPLETE``-state snapshot. Defaults to conflate=True (TUI / report consumer) but parametrized for any consumer that needs every tick. - New unit tests cover tick-task lifecycle, atomic disk fallback, independence of pub/sub vs disk failure paths, and the regression that publish_final must await tick-task cancellation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the mmap-backed BasicKVStore with the registry/publisher path introduced in the previous two commits. Aggregator changes: - MetricsAggregatorService now constructs a MetricsRegistry and MetricsPublisher on entry; trigger callbacks call registry.record / registry.increment instead of kv_store.update. - Tracks SessionState (LIVE → DRAINING on ENDED → COMPLETE on publish_final). The publisher tick task captures (state, n_pending_tasks) per tick via a callback; consumers detect drain timeout as state == COMPLETE and n_pending_tasks > 0. - Adds TRACKED_SAMPLES_FAILED counter, incremented on ERROR events whose tracked row still exists at processing time. Correctness depends on the load_generator emitting ERROR before COMPLETE; the matching test asserts that order. - ENDED handler awaits drain_tasks (30s timeout), publish_final, and closes the publisher (linger=10s drains pending pub/sub frames). Report changes: - Replaces from_kv_reader with from_snapshot (pure function on a MetricsSnapshot). complete is derived from state == COMPLETE and n_pending_tasks == 0. Display warns when not complete. Main-process changes (commands/benchmark/execute.py): - Spawns a MetricsSnapshotSubscriber on the main loop. Triple-redundant report sourcing: pub/sub COMPLETE → disk fallback → latest live. - Removes _setup_kv_reader, ARM tmpfs branching, and mmap salvage in _salvage_tmpfs (events.jsonl salvage is preserved). - Awaits subscriber.wait_for_complete(timeout=2.0) after launcher exit so the loop can dispatch the COMPLETE frame before deciding the pub/sub path missed. Removed: - async_utils/services/metrics_aggregator/kv_store.py - async_utils/services/metrics_aggregator/fs_check.py Tests: - Deletes test_kv_store.py. - Marks test_aggregator.py / test_aggregator_e2e.py / test_metrics_table.py / test_report_builder.py / conftest.py with module-level skip + a TODO referencing the design doc; rewriting these on the new fixtures is a tracked follow-up. - Adds test_aggregator_error_handler.py covering the TRACKED_SAMPLES_FAILED increment path and the negative case where COMPLETE arrives before ERROR (documents the bug the ERROR/COMPLETE swap fixes). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Swaps the publish order in BenchmarkSession._handle_response so that a QueryResult carrying an error emits ErrorEventType.GENERIC first, then SampleEventType.COMPLETE. This is required for metrics-aggregator correctness: COMPLETE causes MetricsTable.set_field to remove the tracked row, so an ERROR observed afterward has no row to inspect and TRACKED_SAMPLES_FAILED would silently stay at 0. Emitting ERROR first keeps the row alive long enough for the aggregator's error handler to identify the failure as tracked. EventLoggerService and other event consumers treat the two event types independently, so order is invisible to them. The test_failed_query_published_as_error_event test now asserts the order explicitly so a future revert is caught immediately, and the aggregator-side regression is covered by test_aggregator_error_handler. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 review comments from gemini-code-assist (encapsulation) and github-code-quality (non-iterable enum loop): - Add `MetricsTable.in_flight_tasks_count` property so the aggregator no longer reaches into `table._in_flight_tasks` to report pending- task counts on snapshots and drain logging. - Add `MetricsTable.cancel_in_flight_tasks()` returning the list of cancelled tasks (sets up the T3 await-cancellations fix). - Update aggregator.py call sites accordingly. - Use `MetricCounterKey.__members__.values()` in test_report_builder to satisfy CodeQL's "non-iterable used in for-loop" check. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 review comments from arekay-nv asking for the "interval" naming convention used elsewhere in the repo (e.g. `check_interval` in worker_manager, `interval` in benchmark_httpclient). - CLI flag `--refresh-hz <Hz>` → `--publish-interval <seconds>` (default 4.0 Hz → 0.25 s; same wire cadence). - Constructor parameters `refresh_hz` (aggregator + publisher) → `publish_interval_s`. The `_s` suffix makes the unit explicit so call sites can't accidentally pass a frequency. - Internal field `_refresh_hz` → `_publish_interval_s`. - Drops the `period = 1.0 / refresh_hz` indirection in publisher.start. - Tests / AGENTS.md updated accordingly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 gemini-code-assist comment on report.py:53. For ns-precision latency series (`SAMPLE_LATENCY_NS`, `TTFT_NS`, `TPOT_NS`, etc.) the rollups store `total` and `sum_sq` as Python ints that can grow to many digits. The previous formula `sum_sq - total*total / n` evaluates `total*total / n` as a float and catastrophically cancels against `sum_sq` when the variance is small relative to the mean, producing a negative variance numerator the sqrt() then clamps to 0. Use the exact integer numerator `n*sum_sq - total*total` when the inputs are ints (this is what the math.sqrt sees, no cancellation), falling back to the float form for series whose dtype is float (currently only TPOT, where the magnitudes are small enough that the naive form is fine). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 review-council items #4, #9, #12. #4 — Cancellations not awaited before reading n_pending After a drain timeout, the aggregator's `t.cancel()` loop only *scheduled* cancellation; reading `n_pending` on the next line therefore reported a stale-high count and left the to-be-cancelled tasks runnable when the loop tore down. Now `await asyncio.gather(*cancelled, return_exceptions=True)` runs before `n_pending = table.in_flight_tasks_count`, so the snapshot reflects the post-cancellation set and the cancelled tasks actually exit. #9 — close() cancels tick task but doesn't await it Added `MetricsPublisher.aclose()` (async) that cancels the tick task AND awaits its exit before closing the underlying transport. Aggregator's post-publish_final path and __main__.py's finally block now use it. Sync `close()` is kept for sync error-path fallbacks with a docstring noting the race. #12 — SIGTERM bypasses publish_final Installed `SIGTERM` and `SIGINT` handlers in __main__.py that fire `publish_final` defensively before setting `shutdown_event`. Added `MetricsPublisher._finalized` so the SIGTERM-triggered and the ENDED-triggered paths are safe to race — only the first call publishes a COMPLETE frame. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 council review #5 (registry.py:161). The C-backed hdrhistogram constructor requires `high >= 2*low` but raises an opaque allocation error if that doesn't hold — making it hard to debug a misconfigured `register_series` call. Add an explicit pre-check after the low/high clamps so the error names the series and both values up front. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 council review #8 (aggregator.py:281). A repeat `SessionEventType.STARTED` (replay buffer, buggy producer, test fixture) used to make `MetricsPublisher.start` overwrite `_tick_task`, orphaning the first tick task — it kept running until GC and raced the new task to publish snapshots. Make `start` idempotent: if `_tick_task` is already set, log a warning and return without spawning a second task. The original task remains the one `publish_final` / `aclose` cancels and awaits. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 council review #10 (__main__.py:166). Wrap the top-level `run_until_complete(main())` so startup / bind / tokenizer-load failures emit a structured `logger.exception` before the interpreter prints the traceback. The parent's ServiceLauncher previously saw only the non-zero exit code and a raw stderr trace with no context to correlate against the parent's logs. `SystemExit` is re-raised untouched so argparse usage / explicit sys.exit paths stay user-facing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…LETE Address PR #306 council review #11 (session.py:408). The metrics aggregator's `TRACKED_SAMPLES_FAILED` accounting relies on the publisher delivering ERROR strictly before COMPLETE for a failed sample. The ordering is correct today (ZMQ PUB→SUB in-order delivery, ZmqMessagePublisher batches without reordering), but it's an implicit contract — a future transport refactor that breaks it would break tracked-failure counting silently. Document the invariant inline so that future refactors trip over it instead of past it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 arekay-nv inline comment on aggregator.py:99 ("This might need to be higher"). Analysis: at the system's design point (50k QPS short-context, default 2 tokenizer workers) the 30 s drain finishes in well under a second. Long-context tokenize workloads can push the backlog higher — a 32k- context 5k-QPS run with 2 workers can take ~100 s to drain. The right knob there is `--tokenizer-workers`, not the drain budget, but giving the user a CLI handle makes both ends tunable without redeploying. Changes: - Default drain budget bumped 30s → 60s. Covers normal + long-context at the default 2 workers without inflating the high-QPS short- context case (we exit early when drain_tasks returns). - New `--drain-timeout <seconds>` CLI flag plumbed through the aggregator subprocess and into `MetricsAggregatorService` as a constructor arg `drain_timeout_s`. The kwarg is positionable (not a global) so callers can inject test values without monkey-patching the module-level constant. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 arekay-nv inline comments on snapshot.py:35 (add an `INITIALIZE` state preceding `LIVE`) and test_snapshot.py:67 (add state-check tests). - Add `SessionState.INITIALIZE = "initialize"` to the wire schema; the aggregator now starts in INITIALIZE and transitions to LIVE on the first STARTED event. The state machine is forward-only: INITIALIZE → LIVE → DRAINING → COMPLETE. - No INITIALIZE snapshot is emitted today (the tick task only starts on the first STARTED), but the state exists as the well-defined starting point and so a future setup-phase tick has a state to carry. Wire compatibility is preserved — INITIALIZE round-trips through the codec (test added). - New `TestSessionStateTransitions` pins: member set, declaration order (consumers can rely on `list(SessionState)` for forward checks), the `complete = state == COMPLETE and n_pending_tasks == 0` rule across every state, and the INITIALIZE round-trip. - AGENTS.md updated. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 arekay-nv inline comment on registry.py:118 ("can we add tests to ensure that the behavior is fixed and any changes are caught by tests, specifically the internal points/boundaries"). New `TestSeriesSamplerBoundaries` class pins: - HDR construction-time invariants: `high < 2*low` rejected, equality case accepted, `low=0` coerced to 1, unsupported dtype rejected. - Clamp behavior at the HDR bounds: values exactly at `hdr_low` / `hdr_high` are unclamped and don't trip the warn-once flag. - Under- and over-bound clamping: warn-once fires exactly once per sampler, raw values stay un-clamped (only HDR's view is clamped). - Float dtype uses float comparison for the lower clamp (so sub-integer under-bound values are still detected). - sig_figs at HDR-supported extremes (1 and 5) construct and record. - Rollup edges: count==1 (min==max==total, sum_sq==v^2) and the empty case (count==0, histogram==[]). - Warn-once flag is per-sampler, not process-global. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Foundation commit for the JSON-file-final-snapshot refactor: add a
terminal state distinct from `COMPLETE` so signal-handler-triggered
final snapshots can be told apart from clean ENDED-driven ones.
- Add `SessionState.INTERRUPTED = "interrupted"` and document the
forward-only transition graph in the enum docstring:
`INITIALIZE → LIVE → DRAINING → {COMPLETE | INTERRUPTED}`
- Tighten the `state == COMPLETE and n_pending_tasks == 0` complete-
predicate test to cover both INTERRUPTED + n_pending=0 and
INTERRUPTED + n_pending>0 as "not complete".
- Add a wire-round-trip test for INTERRUPTED via the msgpack codec.
No call-site changes yet — the next commit wires the publisher /
signal handler / consumer to use INTERRUPTED, and switches the
persisted final snapshot from msgpack pub/sub fallback to a JSON
file as the primary source.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Decouple the two delivery paths so the Report consumer no longer depends on pub/sub terminal-frame survivability. Closes PR #306 council #6 (conflate=True fragility for the Report consumer). Architecture change: - `MetricsPublisher.publish_final(..., interrupted: bool = False)` now atomically writes `final_snapshot.json` (pretty-printed, dict form) as the **primary** Report source AND publishes the terminal-state snapshot over pub/sub as a **TUI shutdown signal**. Disk write and pub/sub send are independent best-effort paths. - Signal handler in `__main__.py` invokes `publish_final(interrupted= True)` so SIGTERM/SIGINT writes a snapshot tagged `INTERRUPTED` (introduced in the prior commit) — distinguishes "user killed the run mid-execution" from a clean shutdown. - `MetricsSnapshotSubscriber` is now TUI-only: stripped `complete`, `_complete_event`, `wait_for_complete`. `conflate=True` is the unambiguous default — no Report-consumer fragility to reason about. - `execute.py` reads `final_snapshot.json` via `json.loads` straight to the dict form, drops the 2 s `wait_for_complete` window and the triple-redundant fallback chain. Single fallback: if the file is missing (SIGKILL/OOM before the signal handler ran), convert the subscriber's `latest` live snapshot via `snapshot_to_dict` and mark the report incomplete. - `Report.from_snapshot` now accepts a dict (the consumer contract). All field reads use `dict.get(...)` with defaults that produce an honest "incomplete" report on missing fields rather than crashing. Surfaces a `state: str` field so `display()` renders an explicit INTERRUPTED indicator. - New `snapshot_to_dict()` in `snapshot.py` is the one-way bridge from the wire `MetricsSnapshot` (array_like=True, compact msgpack) to the dict form used by both the file writer and any consumer that needs to feed a live Struct into Report. The inverse is intentionally absent — see `Report.from_snapshot` docstring for the rationale. Tests rewritten: - `test_publisher.py`: assertions read JSON from disk instead of msgpack, new test for `interrupted=True` writing `state=interrupted`. - `test_aggregator_e2e.py`: covers both delivery paths (JSON file + pub/sub terminal frame). - `test_report_builder.py`: routes through `snapshot_to_dict`; new tests for INTERRUPTED display, empty-dict defaults, and malformed metric entries. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 review-council finding (Claude #5): a duplicate `STARTED` event silently froze `total_duration_ns` for the rest of the run because the max-of-elapsed guard never beat the new smaller deltas computed against the later start timestamp. The producer contract is "STARTED exactly once per session". Treat a duplicate as a producer bug: log an error with both timestamps and DROP the duplicate (don't re-assign `_session_start_ns`). The publisher.start guard already rejects the second tick-task spawn (council #8); this commit defends the session-state side of the same invariant. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 review-council finding (Claude #7): the persisted `final_snapshot.json` could contain literal `NaN` / `Infinity` tokens if any series recorded a non-finite float (e.g. division-by-zero in a future TPOT calc, clock-skew artifact, etc.). Python's `json.loads` reads those back fine, but `jq`, Go's `encoding/json`, JS strict mode, and most other strict-JSON consumers reject them — and the documented "cat / jq the file" workflow makes this a real interop tripwire. Two changes: 1. `snapshot.py::snapshot_to_dict` scrubs non-finite floats to `None` on the numeric fields where they could land (counter value, series total/min/max/sum_sq/percentiles/histogram-edges). `None` is self-describing in the dict consumer: `Report.from_snapshot` uses `dict.get(..., default)` so the absence-mapping degrades gracefully to zero/empty. 2. `publisher.py::publish_final` switches `json.dumps` to `allow_nan=False`. With the scrub in place this should never raise; if it does, that's a producer-side bug that needs surfacing, not silencing into a non-strict JSON file. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 review-council findings (Codex #1 + #2): #1 — SIGTERM-driven `_signal_finalize` skipped the `tracked_duration_ns` refresh that the ENDED-driven path does at `aggregator.py:379-381`. Interrupted reports therefore showed `duration_ns=0` / `QPS=N/A` even after processing many tracked samples. Mirror the ENDED path: `registry.set_counter(..., table.total_tracked_duration_ns)` before `publish_final`. #2 — On interactive ^C, the OS sends SIGINT to the whole foreground process group; the aggregator child received it and immediately called `publish_final(interrupted=True)`, writing the file from whatever state it had at signal time. Samples that completed during the parent's clean-shutdown window (between the SIGINT and the parent's eventual ENDED) never reached the file because `_finalized=True` made the subsequent ENDED-driven `publish_final` a no-op. Result: systematic undercount on interactive runs. Fix: SIGINT registers a no-op handler that silences Python's default KeyboardInterrupt and lets the parent's ENDED path drive the aggregator's finalize. SIGTERM remains the only signal that finalizes — used by `ServiceLauncher.kill_all` when the parent decides to terminate the child before ENDED arrives. New integration tests in `tests/integration/async_utils/services/ metrics_aggregator/test_signal_handling.py` spawn the aggregator as a real subprocess and verify both paths end-to-end (SIGTERM writes `state=interrupted`; SIGINT does not write the file and the subprocess stays alive). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 review-council finding (Claude #15): the consumer- side fallback ladder in `execute.py` had no test coverage. The three branches (file present / file absent / file malformed) plus the state→complete-flag→display contract are load-bearing for the "JSON file is the canonical Report source" architecture, but a regression that swapped precedence or mis-defaulted on a malformed file would go unnoticed until manual QA. New `TestLoadFinalSnapshotFromDisk` pins: - file missing → None (SIGKILL / OOM case) - valid JSON → dict returned with state+pending fields intact - malformed JSON → None + WARNING logged (graceful, not crash) New `TestReportFromLoadedSnapshot` pins: - Parametrized state × n_pending → expected `report.complete`, covering clean-COMPLETE, drain-timeout-COMPLETE, INTERRUPTED-0, and INTERRUPTED-with-pending. - INTERRUPTED display() surfaces the signal-driven shutdown so a user reading the output knows the data is partial. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 review-council follow-ups #6, #7, #8, #9, #14, #17. #6 — Drop the bottom `if not self.complete` WARNING in `Report.display`. The top if/elif (state == "interrupted" vs not self.complete) already says everything needed and says it correctly. The bottom warning fired a second time for INTERRUPTED runs with the misleading "(drain timeout)" attribution. #7 — Reword `execute.py` fallback log from "report will be marked incomplete" to "state may or may not be terminal" — the latest pub/sub frame may in fact be a terminal-state signal. #8 — Update `MetricsSnapshot.state` field docstring to list all five states (INITIALIZE, LIVE, DRAINING, COMPLETE, INTERRUPTED) and note that COMPLETE / INTERRUPTED are both terminal. #9 — Codify the "parent owns directory setup" contract. The parent (`commands/benchmark/execute.py:432-433`) already creates `<report_dir>/metrics/` before launching the aggregator subprocess. The child's redundant `mkdir` and the publisher's redundant `path.parent.mkdir` are both replaced with a fail-fast contract check in `__main__.py`: if the directory doesn't exist at startup, the child raises `SystemExit` with a clear message in its stderr. This prevents the prior failure mode where an mkdir error in the child caused a 30s parent-side launcher timeout with no visible diagnostic. (The parent-side fail-fast-on-early-subprocess-death piece remains a known follow-up against `ServiceLauncher`.) #14 — Enforce the "p50 mandatory" contract at registration time. `MetricsRegistry.register_series` now rejects percentiles tuples that omit 50.0, with a clear error message naming the series. `_series_to_metric_dict` keeps the midrange fallback as defense- in-depth for hand-crafted snapshot dicts (e.g. manually-edited JSON files) that bypass the registry path, with a comment labeling it as approximate-only. #17 — Expand the `publisher.py:publish_final` pub/sub-publish `except` comment to call out the legitimate ENDED-vs-signal race (a SIGTERM-driven publish_final reaching `aclose()` first leaves the underlying ZMQ socket closed when this publish runs). The dropped TUI frame in that race is acceptable because the JSON file is the authoritative Report source. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #306 review-council follow-ups #10, #12, #13. #10 — Top-level exception handler in `__main__.py` caught `BaseException`, which includes `KeyboardInterrupt`. If SIGINT arrived before the per-loop signal handlers were registered (during argparse / `aggregator.start()` / tokenizer load), the user-initiated ^C was logged as "subprocess crashed" with a full traceback — misleading on a clean interactive shutdown. Narrow to `except Exception as e:` so KeyboardInterrupt and SystemExit propagate untouched, and log the concrete exception type up front for grep- ability. #12 — `aggregator.process()` ENDED path called `publish_final` → `aclose()` → `_finalize()` as three top-level awaits. If `publish_final` raised (e.g. tick-task crashed with a non- CancelledError that escaped its `await self._tick_task`), the remaining two cleanup steps were skipped — and `_finalize()` is what sets `shutdown_event`. Without it, `await shutdown_event.wait()` in main() hangs forever absent a signal. Wrap in `try/finally` so the cleanup pair always runs, with the inner `aclose()` also wrapped so its own failure can't prevent `_finalize()` from completing. #13 — `_write_atomic_json` on `publisher.py` didn't clean up the `.tmp` file on failure. If `os.rename` raised (EXDEV cross-device after a tmpfs flip, parent dir removed mid-write, permission change), the `.tmp` file leaked across runs. Wrap the write + rename sequence so any failure unlinks `tmp` (with `missing_ok=True` since rename may have consumed it just before the failure point). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Make room for non-dataset test fixtures (e.g. local tokenizer artifacts for tests that need ISL/OSL/TPOT triggers but can't depend on HuggingFace Hub access in CI). `tests/datasets/` was too narrowly named; `tests/assets/` will house both `datasets/` and other test artifacts under logical subdirectories. Pure path rename — files move from `tests/datasets/<x>` to `tests/assets/datasets/<x>`, no content changes. References updated across: - pyproject.toml (sdist include glob) - README.md, docs/CLI_QUICK_REFERENCE.md, docs/LOCAL_TESTING.md, examples/02_ServerBenchmarking/README.md - scripts/create_dummy_dataset.py, scripts/regenerate_templates.py - src/inference_endpoint/config/templates/*.yaml (placeholder examples regenerated from the updated script) - tests/conftest.py, tests/unit/commands/test_benchmark.py - AGENTS.md (Test data section) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ndency Two integration tests in PR #306's metrics-aggregator path were flaky / slow in CI because of HuggingFace Hub: - `TestTemplateIntegration::test_template_runs` (6 cases) called `AutoTokenizer.from_pretrained("TinyLlama/TinyLlama-1.1B-Chat-v1.0")` on the aggregator subprocess's startup path. Cold-cache CI runs paid the ~1 MB download + tokenizer-init cost, sometimes pushing subprocess startup past the parent launcher's 30 s timeout. Also required network egress / HF_TOKEN for some CI environments. - `test_signal_handling.py` (new tests) were not affected (they don't pass `--tokenizer`), but the parent-owns-output-dir contract from the earlier #9 follow-up also applied — those tests now create the output dir themselves before spawning the subprocess. Fix: drop in a local character-level tokenizer fixture at `tests/assets/tokenizers/char/`. ~3 KB total (`tokenizer.json` + `tokenizer_config.json`). Loaded via the existing `AutoTokenizer.from_pretrained(local_dir)` codepath — no test-only hooks in production code. Each character is one token, which is enough for the aggregator's ISL/OSL/TPOT triggers to produce deterministic counts (the e2e test path doesn't care about tokenization correctness, only that *some* count appears). Effects: no network call on the aggregator startup path for these tests, no HF_TOKEN requirement, and tokenizer load completes in single-digit ms instead of seconds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1910057 to
924be7a
Compare
`TestTemplateIntegration::test_template_runs[concurrency_template.yaml]` consistently hits the 60s `worker_initialization_timeout` in CI on cold-start. `concurrency_template.yaml` is alphabetically first in the parametrized lane, so it pays the full first-time-this-CI-job cost: - Python `multiprocessing` `spawn`-mode re-import of the entire `inference_endpoint` package per worker subprocess (transformers, msgspec, pyzmq, etc.) - First-time ZMQ IPC bind + connect handshake for the worker pool - Concurrent aggregator subprocess cold-start contending for the same small-CI-runner CPU Subsequent templates in the same lane benefit from warm module caches and don't approach the limit. Local Docker runs finish all 6 templates in ~40 s total (~6.5 s/template), but CI runners with less headroom (and `spawn` vs `fork`) consistently push the first test past 60 s. Bump to 120 s in this test only — `_resolve_template` injects `settings.client.worker_initialization_timeout: 120.0` into each template before running. Production default (60 s) is unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
arekay-nv
left a comment
There was a problem hiding this comment.
PR Review — ZMQ pub/sub metrics refactor
Five specialised reviews (general code, error-handling, tests, comments, type design) were run in parallel against the diff. Findings below are deduplicated and categorised by priority. Where multiple agents independently converged on the same issue, that is noted — those are the highest-confidence findings.
🔴 High Priority (correctness — fix before merge)
H1. Drain-timeout indicator n_pending_tasks is always 0 — state==COMPLETE and n_pending_tasks>0 contract is unenforceable
File: src/inference_endpoint/async_utils/services/metrics_aggregator/aggregator.py:371-393
AGENTS.md and the MetricsSnapshot.n_pending_tasks docstring both document the consumer contract: drain timeout is detected as state == COMPLETE and n_pending_tasks > 0. But the producer code does cancel_in_flight_tasks() → await asyncio.gather(*cancelled, return_exceptions=True), and each cancelled task's add_done_callback(self._in_flight_tasks.discard) fires before n_pending is read at line 390. The set is empty by then; the final snapshot reports n_pending_tasks=0; Report.complete renders clean success for a run that actually timed out draining.
Three independent agents (silent-failure, comments, tests) converged on this.
Fix: snapshot len(_in_flight_tasks) inside the except TimeoutError: block before cancellation; publish that count. Add a regression test with a forever-blocking MockTokenizePool and drain_timeout_s=0.05 asserting n_pending_tasks > 0 in the final snapshot.
H2. _signal_finalize task reference is discarded — GC race risks the entire INTERRUPTED delivery path
File: src/inference_endpoint/async_utils/services/metrics_aggregator/__main__.py:214
def _on_sigterm() -> None:
...
loop.create_task(_signal_finalize()) # ← return value discardedPython's asyncio docs explicitly warn to save a reference: "a task disappearing mid-execution". _signal_finalize performs the atomic JSON write and pub/sub send that the entire INTERRUPTED contract depends on — exactly the failure mode the SIGTERM path exists to prevent.
Fix: hold the task in a module-level set, remove via task.add_done_callback.
H3. NaN-scrubbed None percentiles crash Report.display()
Files: src/inference_endpoint/async_utils/services/metrics_aggregator/snapshot.py:239, src/inference_endpoint/metrics/report.py:306, 328
_scrub_nonfinite maps non-finite percentiles to None to keep json.dumps(allow_nan=False) honest, but report.py:306 does metric_dict[key] * scale_factor and :328 does val * scale_factor with no None guard. finalize_benchmark calls display() outside the report-build try/except, so any producer-side NaN takes down the whole run_benchmark flow with TypeError.
Fix: render N/A (or skip) for None values in _display_metric; add a round-trip test that puts NaN into registry.set_counter, builds snapshot, runs through snapshot_to_dict → Report.from_snapshot → display() without crashing.
🟡 Medium Priority (rule violations, untested load-bearing code, doc/code mismatches)
M1. Docstrings claim SIGINT also calls publish_final — it doesn't
Files: publisher.py:91-94, 166-167, 193-194
All three say "the SIGTERM/SIGINT handler in __main__.py ... can both call it". But __main__.py:240-248 installs SIGINT as a log-only no-op handler. Only SIGTERM calls publish_final. Change all three to "SIGTERM" singular.
M2. SIGTERM-handler comment misdescribes its trigger
File: __main__.py:186-208
Comment says the primary reason for the SIGTERM handler is ServiceLauncher.kill_all — but kill_all calls proc.kill() (SIGKILL on POSIX, uncatchable). The handler never fires from that path. Either fix the comment to describe the real trigger (external operator kill -TERM), or change kill_all to terminate() + grace period + kill() so it actually exercises the handler.
M3. "Disk fallback" wording inverts the design
Files: execute.py:440, __main__.py:65, AGENTS.md:202
All three describe the JSON file as the "disk fallback". But publisher.py:48-55 and the narrative section of AGENTS.md call it the primary Report source, with pub/sub as the TUI signal. Inverted polarity. Pick one story (the JSON file is primary) and apply it consistently across docs and comments.
M4. publish_final idempotency has no test
File: tests/unit/async_utils/services/metrics_aggregator/test_publisher.py
publisher.py:196-198's if self._finalized: return is the explicit race protector for SIGTERM-vs-ENDED. Both _signal_finalize (__main__.py:216-237) and the ENDED path (aggregator.py:399) call publish_final. A regression that removed the _finalized guard would double-write the JSON file and the suite would stay green. Add: test_publish_final_idempotent asserting publish.call_count == 1 and unchanged file mtime after a second call.
M5. Drain-timeout recovery path is entirely untested
File: tests/unit/async_utils/services/metrics_aggregator/test_aggregator.py
aggregator.py:371-389 is the recovery for blocked tokenize tasks. No test exercises it. Compounds with H1: even after fixing the contract, there is no test to prevent regression. Add: integration test with a forever-blocking MockTokenizePool + drain_timeout_s=0.05.
M6. .tmp-cleanup leg of _write_atomic_json has no test
File: tests/unit/async_utils/services/metrics_aggregator/test_publisher.py
publisher.py:264-272's try / except BaseException: tmp.unlink(missing_ok=True); raise is the only thing preventing .tmp leak across aggregator restarts. The existing disk-failure test fails before .tmp is created. Add: patch os.rename to raise mid-call, assert no .tmp left behind.
M7. Bare except asyncio.CancelledError: pass violates AGENTS.md
File: src/inference_endpoint/load_generator/session.py:299-300
Project rules require every except block to contain a comment or logging. Adjacent blocks at publisher.py:203-205 and session.py:338 already comply. Add a one-line comment here.
M8. Default-to-INTERRUPTED on missing state masks malformed snapshots
File: src/inference_endpoint/metrics/report.py:200
state = snap.get(\"state\", \"interrupted\")A snapshot dict missing state is malformed, not interrupted. Defaulting silently rewrites a schema violation as a normal "user pressed Ctrl-C" warning. Log at ERROR with the snapshot keys, then either raise or use a distinct \"malformed\" sentinel.
M9. _load_final_snapshot_from_disk collapses "missing" vs "corrupt"
File: src/inference_endpoint/commands/benchmark/execute.py:387-405
Both file-missing and corrupt-JSON return None; downstream log says "no file on disk" even when the file exists but failed to parse. Corrupt JSON is a producer-contract violation — should be ERROR level with a distinct message including the parse exception.
M10. Fallback warning at execute.py:575-580 says "may or may not be terminal"
File: src/inference_endpoint/commands/benchmark/execute.py:575-580
metrics_subscriber.latest.state is already available. A LIVE state means the aggregator died mid-run (data lost); COMPLETE/INTERRUPTED means the disk write failed. These deserve different log messages.
M11. Add gc=False to CounterStat, SeriesStat, MetricsSnapshot
File: src/inference_endpoint/async_utils/services/metrics_aggregator/snapshot.py
Matches project convention (per AGENTS.md and core/types.py). All fields are scalars, frozen dicts of scalars, or lists of frozen-scalar-only Structs — no cycle path exists. One snapshot every publish-interval × N series is high-frequency enough to matter.
🟢 Low Priority (suggestions)
Wire schema
- Rename
MetricsSnapshot.counter→seq/emit_seq— collides namespace-wise withCounterStat; docstring already has to disclaim the confusion. Easier to fix before downstream consumers ship. - Replace
((lo, hi), count)histogram tuple with a namedHistogramBucketStruct — lossy round-trip throughsnapshot_to_dict; consumers index positionally (h[0][0]).
Encapsulation
MetricsSnapshotSubscriber.latestshould be a@propertywith private backing field — currently public-mutable, could be silently overwritten.MetricsTable.is_tracking/session_started_nsare public mutable attributes — consider@propertyaccessors.
Error handling polish
publisher.start()should validatepublish_interval_s > 0before the idempotency early-return (publisher.py:120-130) — current ordering can swallow validation errors._write_atomic_jsonparent-dir fsync error is misattributed as "JSON snapshot write failed" — file is on disk; only durability is at risk.MetricsSnapshotCodec.on_decode_errorreturnsNonesilently (snapshot.py:270-277) — amsgspec.DecodeErroris a schema bug; addlogger.warningonce with frame size + exc.http_client.shutdown_asynccleanup usesf\"{e}\"(execute.py:548-552) — bump tologger.exceptionso operators investigating leftover workers have a chain.
Test polish
test_aggregator_e2e.pyis marked@pytest.mark.unitbut binds real ZMQ IPC — should be@pytest.mark.integration.time.sleep(2.0)readiness wait intest_signal_handling.py:117, 166—__main__.pyalready supports--readiness-path/--readiness-id; use it.- Add negative test for
register_seriesrejecting percentiles tuple without 50.0 —registry.py:388-393guard currently has no test. - Add
_scrub_nonfiniteround-trip test — inject NaN viaregistry.set_counter, assertNonein dict + cleanjson.dumps(allow_nan=False). - Add aggregator
_session_statetransition assertions (INITIALIZE → LIVE → DRAINING) at unit level. test_publisher.py:MetricsPublisher.start()double-start guard (publisher.py:120-126) has no test.test_publisher.py:170-176hastry: publisher.close() except Exception: pass— comment suggests author wasn't sure if it should raise. Construct the mock soclose()succeeds.test_on_decode_error_drops_malformed(test_snapshot.py:112-119) usestry/exceptinstead ofpytest.raises— silently no-ops if msgspec ever stops raising.test_offline_benchmark(integration) should assertfinal_snapshot.jsonexists andstate == \"complete\"— given the file is now primary, a regression that never writes it would not be caught.
Comment hygiene
- Stale ref:
registry.py:379saysReport._series_to_metric_dict— function is module-level inmetrics/report.pynow. metrics_table.py:332-339TpotTrigger docstring contains literal# NOTE(agents):— renders as#text inhelp(). Also drop the "if tokenization throughput becomes a bottleneck, consider merging..." speculation (per AGENTS.md TODOs must link an issue).report.py:200bare TODO (# TODO: surface session_started_ns via snapshot) — link an issue or drop.subscriber.py:57-60"no current callers do" parenthetical will rot — rephrase.aggregator.py:305comment "Now that we have an event loop running" is misleading (loop is always running inasync def); the real precondition is STARTED was observed.
Type design
SampleRowgc=Falsedocstring should use the standard AT-RISK pattern. It is mutated post-construction; current docstring ("no mutable container fields") understates the audit argument.SeriesSampler._raw: array.arrayis unbounded — documented but not type-expressed. Lifetime is the run, not the snapshot interval.- Lift
get_runtime_state: Callable[[], tuple[SessionState, int]]to a namedRuntimeState(NamedTuple or array_like Struct). - Co-locate
MetricCounterKeyandMetricSeriesKey(currentlyaggregator.pyvsmetrics_table.py) into a singlekeys.py.
Strengths (worth keeping)
- Excellent comment density on tricky invariants —
session.py:411-421(ERROR-before-COMPLETE),aggregator.py:284-301(duplicate-STARTED),aggregator.py:381-389(cancel-then-await),publisher.py:42-65(dual-path contract). All describe current state with rationale, no history narration. - Dual-path final delivery is genuinely independent —
test_disk_failure_does_not_block_pubsub(test_publisher.py:140) proves disk failure does not block pub/sub. Exemplary defensive design. _finalizedflag set before anyawait(publisher.py:196-198) — SIGTERM-vs-ENDED double-publish race is race-free, not just narrow.test_aggregator_error_handler.py:144-197explicitly parametrises the ERROR-before-COMPLETE counter-example withtest_error_after_complete_misses_tracked_failed— textbook regression-resistant test.- HDR
high >= 2*lowpre-check (registry.py:160-167) rejects misconfiguration loudly instead of cryptic HdrHistogram errors. registry.py:388-393p50 contract enforcement rejects misregistration that would silently produce midrange-fallback medians.- Disciplined msgspec usage (frozen + array_like + tagged unions) is correct hot-path shape.
- Integration tests for SIGINT-no-op and SIGTERM-final-write are in the right shape and right place.
- License headers present, no lazy imports, no history-narrating comments, no gitignored refs across the reviewed surface.
Recommended Action Plan
- High priority (H1–H3) are correctness bugs that materially mislead the user. Fix before merge.
- Medium priority (M1–M11) — tighten the SIGINT/SIGTERM/disk-fallback documentation (M1–M3) and add the missing tests for load-bearing recovery code (M4–M6). These are cheap and currently the docs lie about the design.
- Low priority can be batched into a follow-up PR.
- Re-run targeted reviews after fixes (
errors+testsaspects).
🤖 Generated by /pr-review-toolkit:review-pr (5 specialised agents — code, errors, tests, comments, types).
arekay-nv
left a comment
There was a problem hiding this comment.
@nv-alicheng
The three high priority changes seem important and small enough to fix.
Everything else can be deferred to followup.
Please merge when ready and collect baseline perf numbers pre/post commit to see if there is any major perf implications.
Thanks!
Addresses the three high-priority findings from the review council: H1: drain_tasks now owns the timeout + cancel-and-await sequence, so the pending count is captured before per-task done callbacks empty the in-flight set. Previously read 0 unconditionally — the documented state==COMPLETE and n_pending_tasks>0 drain-timeout contract was unenforceable. H2: Extract _make_sigterm_handler returning a strong-ref set[Task] that holds the spawned _signal_finalize task; the loop tracks tasks via weakref only, so a discarded create_task() return value can be GC'd mid-flight (Python asyncio docs) — exactly the failure the INTERRUPTED delivery path exists to prevent. H3: _scrub_nonfinite maps producer-side NaN/Inf to None for strict JSON. _display_metric did val * scale_factor with no guard → TypeError on display(), which finalize_benchmark calls outside the report-build try/except. Render N/A for None across named scalars, histogram bucket edges, and percentiles. Tests added (all verified failing pre-fix): - test_drain_timeout_reports_pending_count: forever-blocking pool + drain_timeout_s=0.05, asserts publish_final receives n_pending>0 - test_sigterm_handler_holds_strong_reference_to_finalize_task: drives the handler, asserts task is in the strong-ref set, survives gc.collect(), and self-removes via done-callback on completion - test_sigterm_handler_refreshes_tracked_duration: handler mirrors the ENDED path's tracked_duration_ns refresh before publish_final - test_display_handles_scrubbed_nan_percentiles: dict with scrubbed None percentile values does not crash display(); renders N/A - test_scrub_nonfinite_round_trip_yields_none: registry-side NaN/Inf surfaces as None in snapshot_to_dict and round-trips through json.dumps(allow_nan=False) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
AGENTS.md forbids imports inside function bodies. The H3 round-trip test introduced lazy imports of math, json, MetricsSnapshot, and SeriesStat — move them to the top-level import block. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
What does this PR do?
shmemimplementation of KVStore in MetricsAggregatorService causes issues on ARM. Several solutions exist:This PR implements (2).
Type of change
Related issues
Testing
Checklist