Support for NATS dead letter queue#1175
Conversation
* fix: prevent NATS connection flooding and stale job task fetching - Add connect_timeout=5, allow_reconnect=False to NATS connections to prevent leaked reconnection loops from blocking Django's event loop - Guard /tasks endpoint against terminal-status jobs (return empty tasks instead of attempting NATS reserve) - IncompleteJobFilter now excludes jobs by top-level status in addition to progress JSON stages - Add stale worker cleanup to integration test script Found during PSv2 integration testing where stale ADC workers with default DataLoader parallelism overwhelmed the single uvicorn worker thread by flooding /tasks with concurrent NATS reserve requests. Co-Authored-By: Claude <noreply@anthropic.com> * docs: PSv2 integration test session notes and NATS flooding findings Session notes from 2026-02-16 integration test including root cause analysis of stale worker task competition and NATS connection issues. Findings doc tracks applied fixes and remaining TODOs with priorities. Co-Authored-By: Claude <noreply@anthropic.com> * docs: update session notes with successful test run #3 PSv2 integration test passed end-to-end (job 1380, 20/20 images). Identified ack_wait=300s as cause of ~5min idle time when GPU processes race for NATS tasks. Co-Authored-By: Claude <noreply@anthropic.com> * fix: batch NATS task fetch to prevent HTTP timeouts Replace N×1 reserve_task() calls with single reserve_tasks() batch fetch. The previous implementation created a new pull subscription per message (320 NATS round trips for batch=64), causing the /tasks endpoint to exceed HTTP client timeouts. The new approach uses one psub.fetch() call for the entire batch. Co-Authored-By: Claude <noreply@anthropic.com> * docs: add next session prompt * feat: add pipeline__slug__in filter for multi-pipeline job queries Workers that handle multiple pipelines can now fetch jobs for all of them in a single request: ?pipeline__slug__in=slug1,slug2 Co-Authored-By: Claude <noreply@anthropic.com> * chore: remove local-only docs and scripts from branch These files are session notes, planning docs, and test scripts that should stay local rather than be part of the PR. Co-Authored-By: Claude <noreply@anthropic.com> * feat: set job dispatch_mode at creation time based on project feature flags ML jobs with a pipeline now get dispatch_mode set during setup() instead of waiting until run() is called by the Celery worker. This lets the UI show the correct mode immediately after job creation. Co-Authored-By: Claude <noreply@anthropic.com> * fix: add timeouts to all JetStream operations and restore reconnect policy Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations via asyncio.wait_for() so a hung NATS connection fails fast instead of blocking the caller's thread indefinitely. Also restore the intended reconnect policy (2 attempts, 1s wait) that was lost in a prior force push. Co-Authored-By: Claude <noreply@anthropic.com> * fix: propagate NATS timeouts as 503 instead of swallowing them asyncio.TimeoutError from _ensure_stream() and _ensure_consumer() was caught by the broad `except Exception` in reserve_tasks(), silently returning [] and making NATS outages indistinguishable from empty queues. Workers would then poll immediately, recreating the flooding problem. - Add explicit `except asyncio.TimeoutError: raise` in reserve_tasks() - Catch TimeoutError and OSError in the /tasks view, return 503 - Restore allow_reconnect=False (fail-fast on connection issues) - Add return type annotation to get_connection() Co-Authored-By: Claude <noreply@anthropic.com> * fix: address review comments (log level, fetch timeout, docstring) - Downgrade reserve_tasks log to DEBUG when zero tasks reserved (avoid log spam from frequent polling) - Pass timeout=0.5 from /tasks endpoint to avoid blocking the worker for 5s on empty queues - Fix docstring examples using string 'job123' for int-typed job_id Co-Authored-By: Claude <noreply@anthropic.com> * fix: catch nats.errors.Error in /tasks endpoint for proper 503 responses NoServersError, ConnectionClosedError, and other NATS exceptions inherit from nats.errors.Error (not OSError), so they escaped the handler and returned 500 instead of 503. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
…olnickLab#1142) * feat: configurable NATS tuning and gunicorn worker management Rebase onto main after RolnickLab#1135 merge. Keep only the additions unique to this branch: - Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s) - Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100) - Local dev: switch to gunicorn+UvicornWorker by default for production parity, with USE_UVICORN=1 escape hatch for raw uvicorn - Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8) when not explicitly set in the environment Co-Authored-By: Claude <noreply@anthropic.com> * fix: address PR review comments - Fix max_ack_pending falsy-zero guard (use `is not None` instead of `or`) - Update TaskQueueManager docstring with Args section - Simplify production WEB_CONCURRENCY fallback (just use nproc) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com>
* fix: include pipeline_slug in MinimalJobSerializer (ids_only response) The ADC worker fetches jobs with ids_only=1 and expects pipeline_slug in the response to know which pipeline to run. Without it, Pydantic validation fails and the worker skips the job. Co-Authored-By: Claude <noreply@anthropic.com> * Update ami/jobs/serializers.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
👷 Deploy request for antenna-ssec pending review.Visit the deploys page to approve it
|
✅ Deploy Preview for antenna-preview canceled.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a Django management command to inspect dead-letter messages for a job and extends the NATS TaskQueueManager with an advisory stream, DLQ consumer helpers (list/delete), advisory setup on connect, ACK flush behavior, and tests for DLQ paths. (46 words) Changes
Sequence DiagramsequenceDiagram
participant CLI
participant Cmd as check_dead_letter_queue (Django Command)
participant TQM as TaskQueueManager
participant NATS as NATS Client
participant ADVISORY as Advisory Stream
CLI->>Cmd: run check_dead_letter_queue(job_id)
Cmd->>TQM: call _check_dead_letter_queue(job_id)
TQM->>NATS: __aenter__ (open connection/context)
NATS->>ADVISORY: ensure advisory stream exists (_setup_advisory_stream)
ADVISORY-->>NATS: advisory stream ready
TQM->>NATS: get_dead_letter_image_ids(job_id, n)
NATS->>ADVISORY: pull/fetch advisory messages (durable consumer)
ADVISORY-->>NATS: advisory events referencing original messages
NATS->>NATS: fetch original messages -> extract image_ids
NATS->>ADVISORY: ack advisory messages / publish ACKs
NATS->>NATS: flush to ensure ACK delivery
NATS-->>TQM: return list of image_ids
TQM-->>Cmd: return result
Cmd-->>CLI: print found IDs or "no dead letter images"
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Adds dead letter queue (DLQ) support to the NATS/JetStream-backed async job task queue by capturing max-delivery advisories, enabling operators to query failed tasks and improving ACK reliability during teardown.
Changes:
- Create/use a shared JetStream “advisories” stream and a per-job durable advisory consumer to track max-delivery (DLQ) events.
- Add
TaskQueueManager.get_dead_letter_task_ids()and DLQ consumer cleanup hooks. - Introduce a Django management command to query DLQ entries for a job from the CLI.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
ami/ml/orchestration/nats_queue.py |
Adds advisory stream setup on connect, DLQ query method, extra flushes, and DLQ consumer cleanup. |
ami/ml/orchestration/tests/test_nats_queue.py |
Updates unit tests for new advisory stream creation and additional consumer deletion. |
ami/ml/management/commands/check_dead_letter_queue.py |
Adds CLI command to print DLQ task/image IDs for a job. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (4)
ami/ml/orchestration/nats_queue.py (1)
437-460: DLQ consumer deletion logs warning on any failure, including "not found".If a DLQ consumer was never created (no tasks exceeded max deliveries),
delete_dlq_consumerwill log a warning at line 459. Consider catchingNotFoundErrorseparately and logging at debug level to reduce noise for the common case where no DLQ consumer exists.♻️ Proposed refinement
async def delete_dlq_consumer(self, job_id: int) -> bool: ... dlq_consumer_name = self._get_dlq_consumer_name(job_id) try: await asyncio.wait_for( self.js.delete_consumer("advisories", dlq_consumer_name), timeout=NATS_JETSTREAM_TIMEOUT, ) logger.info(f"Deleted DLQ consumer {dlq_consumer_name} for job '{job_id}'") return True + except nats.js.errors.NotFoundError: + logger.debug(f"DLQ consumer {dlq_consumer_name} not found for job '{job_id}' (never created)") + return True # Not an error - consumer simply never existed except Exception as e: logger.warning(f"Failed to delete DLQ consumer for job '{job_id}': {e}") return False🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 437 - 460, In delete_dlq_consumer, handle the "consumer not found" case separately: when calling self.js.delete_consumer("advisories", dlq_consumer_name) catch the NotFoundError (the specific JetStream not-found exception class) and log it at debug level instead of warning, while keeping other exceptions falling back to the existing warning and returning False; update references to js.delete_consumer, dlq_consumer_name, and the logger.warning call to implement the separate except NotFoundError: logger.debug(...) branch.ami/ml/orchestration/tests/test_nats_queue.py (1)
149-160: Consider adding dedicated tests for new DLQ methods.The existing test verifies
cleanup_job_resourcescallsdelete_consumertwice, but there are no dedicated tests for:
get_dead_letter_task_ids(the core new functionality)delete_dlq_consumer_setup_advisory_streamThese methods have non-trivial logic (JSON parsing, message lookups, error handling) that would benefit from unit test coverage.
Would you like me to help generate test cases for these new methods?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/tests/test_nats_queue.py` around lines 149 - 160, Add dedicated unit tests for the new DLQ-related methods: create tests that call get_dead_letter_task_ids (mocking js.api.get_msg and simulating messages with valid/invalid JSON and missing task_id) to assert it returns correct IDs and handles parsing errors; test delete_dlq_consumer by mocking js.delete_consumer to verify it is called with the DLQ consumer name and that errors are handled/propagated as expected; and test _setup_advisory_stream by mocking js.add_stream/js.add_consumer (or js.streams.add/js.consumers.add depending on usage) to verify stream/consumer creation logic and error paths. Use the same pattern as existing tests (patch get_connection to return mocked nc, js) and assert call counts and returned values for get_dead_letter_task_ids, delete_dlq_consumer, and _setup_advisory_stream.ami/ml/management/commands/check_dead_letter_queue.py (2)
21-26: Consider adding optional arguments for count and verbosity.The command currently retrieves only 10 DLQ entries (the default in
get_dead_letter_task_ids). For jobs with many failed tasks, operators may want to retrieve more. Consider adding:
--count/-nto control how many entries to fetch--allflag to fetch all available entries♻️ Example enhancement
def add_arguments(self, parser): parser.add_argument( "job_id", type=int, help="Job ID to check for dead letter queue messages", ) + parser.add_argument( + "-n", "--count", + type=int, + default=10, + help="Maximum number of dead letter tasks to retrieve (default: 10)", + ) async def _check_dead_letter_queue(self, job_id: int) -> list[str]: + async def _check_dead_letter_queue(self, job_id: int, count: int = 10) -> list[str]: """Check for dead letter queue messages using TaskQueueManager.""" async with TaskQueueManager() as manager: - return await manager.get_dead_letter_task_ids(job_id) + return await manager.get_dead_letter_task_ids(job_id, n=count)Also applies to: 46-49
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/management/commands/check_dead_letter_queue.py` around lines 21 - 26, The management command add_arguments should be extended to accept optional --count/-n (int) and --all (store_true) flags, make them mutually exclusive or validate in handle, and pass the chosen value into get_dead_letter_task_ids when invoked in handle; specifically, update add_arguments to add parser.add_argument('--count', '-n', type=int, default=10, help=...) and parser.add_argument('--all', action='store_true', help=...), then in handle read options['count'] and options['all'], validate that both aren't set, convert --all into an appropriate sentinel (e.g., None or a high limit) and call get_dead_letter_task_ids(job_id, count=...) or get_dead_letter_task_ids(job_id, fetch_all=True) depending on that function's signature, and update help text/usage accordingly so operators can request more or all DLQ entries.
43-44: Preserve exception chain for better debugging.Using
raise ... from epreserves the original traceback, making it easier to debug issues when the command fails.♻️ Proposed fix
except Exception as e: - raise CommandError(f"Failed to check dead letter queue: {e}") + raise CommandError(f"Failed to check dead letter queue: {e}") from e🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/management/commands/check_dead_letter_queue.py` around lines 43 - 44, The except block in check_dead_letter_queue.py currently re-raises a CommandError without preserving the original traceback; update the handler inside the Command class (the handle method where you catch "except Exception as e") to re-raise the CommandError using "raise CommandError(... ) from e" so the original exception chain is preserved; keep the existing message string but attach the original exception as the cause by using "from e" with the CommandError symbol.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 474-479: The current cleanup aggregates delete_consumer,
delete_stream and delete_dlq_consumer results so a DLQ deletion failure makes
the whole cleanup fail; change this so delete_dlq_consumer is attempted but
treated best-effort: keep calling await self.delete_dlq_consumer(job_id) (and
optionally log a warning on failure) but return only the combined result of
await self.delete_consumer(job_id) and await self.delete_stream(job_id) (i.e.,
use consumer_deleted and stream_deleted for the final boolean), leaving
delete_dlq_consumer failure from affecting the overall return; refer to the
identifiers delete_consumer, delete_stream and delete_dlq_consumer in
nats_queue.py.
- Line 389: The pull_subscribe call using durable=dlq_consumer_name is missing
the stream parameter, so ensure the durable consumer is created on the correct
stream by adding stream="advisories" to the self.js.pull_subscribe call (the
call that assigns psub). Update the invocation of
self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name) to include
stream="advisories" so the durable consumer is associated with the "advisories"
stream.
- Around line 361-435: The review notes to change the error logging in
get_dead_letter_task_ids to include a stack trace: replace the logger.error call
in the outer exception handler with logger.exception (on the same logger) so the
exception context/traceback is automatically logged instead of manually
formatting the exception string; keep the same message text but remove the
manual "{e}" formatting when calling logger.exception to avoid duplication.
---
Nitpick comments:
In `@ami/ml/management/commands/check_dead_letter_queue.py`:
- Around line 21-26: The management command add_arguments should be extended to
accept optional --count/-n (int) and --all (store_true) flags, make them
mutually exclusive or validate in handle, and pass the chosen value into
get_dead_letter_task_ids when invoked in handle; specifically, update
add_arguments to add parser.add_argument('--count', '-n', type=int, default=10,
help=...) and parser.add_argument('--all', action='store_true', help=...), then
in handle read options['count'] and options['all'], validate that both aren't
set, convert --all into an appropriate sentinel (e.g., None or a high limit) and
call get_dead_letter_task_ids(job_id, count=...) or
get_dead_letter_task_ids(job_id, fetch_all=True) depending on that function's
signature, and update help text/usage accordingly so operators can request more
or all DLQ entries.
- Around line 43-44: The except block in check_dead_letter_queue.py currently
re-raises a CommandError without preserving the original traceback; update the
handler inside the Command class (the handle method where you catch "except
Exception as e") to re-raise the CommandError using "raise CommandError(... )
from e" so the original exception chain is preserved; keep the existing message
string but attach the original exception as the cause by using "from e" with the
CommandError symbol.
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 437-460: In delete_dlq_consumer, handle the "consumer not found"
case separately: when calling self.js.delete_consumer("advisories",
dlq_consumer_name) catch the NotFoundError (the specific JetStream not-found
exception class) and log it at debug level instead of warning, while keeping
other exceptions falling back to the existing warning and returning False;
update references to js.delete_consumer, dlq_consumer_name, and the
logger.warning call to implement the separate except NotFoundError:
logger.debug(...) branch.
In `@ami/ml/orchestration/tests/test_nats_queue.py`:
- Around line 149-160: Add dedicated unit tests for the new DLQ-related methods:
create tests that call get_dead_letter_task_ids (mocking js.api.get_msg and
simulating messages with valid/invalid JSON and missing task_id) to assert it
returns correct IDs and handles parsing errors; test delete_dlq_consumer by
mocking js.delete_consumer to verify it is called with the DLQ consumer name and
that errors are handled/propagated as expected; and test _setup_advisory_stream
by mocking js.add_stream/js.add_consumer (or js.streams.add/js.consumers.add
depending on usage) to verify stream/consumer creation logic and error paths.
Use the same pattern as existing tests (patch get_connection to return mocked
nc, js) and assert call counts and returned values for get_dead_letter_task_ids,
delete_dlq_consumer, and _setup_advisory_stream.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5c3ca6a5-cb22-4bb1-bfc5-0c9809717a0b
📒 Files selected for processing (3)
ami/ml/management/commands/check_dead_letter_queue.pyami/ml/orchestration/nats_queue.pyami/ml/orchestration/tests/test_nats_queue.py
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
ami/ml/management/commands/check_dead_letter_queue.py (1)
43-44: Preserve exception chain for better debugging.Per static analysis, the exception should be chained using
from eto preserve the original traceback, which helps with debugging when the command fails.🔧 Proposed fix
except Exception as e: - raise CommandError(f"Failed to check dead letter queue: {e}") + raise CommandError(f"Failed to check dead letter queue: {e}") from e🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/management/commands/check_dead_letter_queue.py` around lines 43 - 44, The except block in check_dead_letter_queue.py currently raises CommandError without chaining the original exception; update the raise in the except Exception as e handler to re-raise CommandError using exception chaining (i.e., "raise CommandError(... ) from e") so the original traceback is preserved; locate the except block in the Command.handle (or Command class) where CommandError is raised and add "from e" to the raise statement referencing the caught exception variable e.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 452-456: The call to self.js.delete_consumer currently hardcodes
"advisories"; replace that literal with the ADVISORY_STREAM_NAME constant to
match the rest of the file and avoid discrepancies—update the delete_consumer
invocation (and any nearby occurrences) to use ADVISORY_STREAM_NAME and keep
dlq_consumer_name and NATS_JETSTREAM_TIMEOUT as-is.
---
Nitpick comments:
In `@ami/ml/management/commands/check_dead_letter_queue.py`:
- Around line 43-44: The except block in check_dead_letter_queue.py currently
raises CommandError without chaining the original exception; update the raise in
the except Exception as e handler to re-raise CommandError using exception
chaining (i.e., "raise CommandError(... ) from e") so the original traceback is
preserved; locate the except block in the Command.handle (or Command class)
where CommandError is raised and add "from e" to the raise statement referencing
the caught exception variable e.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 551d6559-02e6-406b-8914-5b34b70bac6c
📒 Files selected for processing (3)
ami/ml/management/commands/check_dead_letter_queue.pyami/ml/orchestration/nats_queue.pyami/ml/orchestration/tests/test_nats_queue.py
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 433-436: In get_dead_letter_task_ids (the function using job_id,
dead_letter_ids and n) don't swallow all exceptions: after logging the error via
logger.error(f"Failed to get dead letter task IDs for job '{job_id}': {e}"),
re-raise the exception so callers (like check_dead_letter_queue) can surface a
CommandError; alternatively, if you intentionally want to return an empty list
for a specific, known/expected exception, catch only that specific exception
type and handle it, but remove the broad "except Exception as e" that hides
NATS/advisory failures.
- Around line 73-78: __aenter__ opens the NATS connection via
get_connection(self.nats_url) then calls _setup_advisory_stream(); if
_setup_advisory_stream() raises the connection is leaked because __aexit__ won't
run. Wrap the call to self._setup_advisory_stream() in a try/except, and on
exception explicitly close the opened connection (await self.nc.close()), clear
self.nc/self.js as needed, then re-raise the exception so the error propagates;
keep get_connection, __aenter__, and _setup_advisory_stream names to locate the
change.
- Around line 403-418: The advisory ACK is unconditional and must be gated: in
the advisory-processing loop around self.js.get_msg(...) only call msg.ack()
when you have successfully extracted image_id (appended to dead_letter_ids) or
when get_msg raises a terminal NotFoundError indicating the message is gone; do
not ack on transient exceptions or JSON decode failures so they can be retried.
Update the block that calls self.js.get_msg and handles exceptions to detect
NotFoundError (import/compare the correct exception type used by your NATS
client), ack in the success branch after appending the image_id, and ack in the
NotFoundError terminal branch (with a clear log); do not ack inside the generic
except that catches transient errors. Ensure msg.ack() remains reachable only
from those two confirmed-terminal pathways (success or NotFoundError).
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f370c226-6231-44b9-a461-f2e098446844
📒 Files selected for processing (1)
ami/ml/orchestration/nats_queue.py
There was a problem hiding this comment.
🧹 Nitpick comments (2)
ami/ml/orchestration/nats_queue.py (2)
443-443: Redundant slice operation.The
[:n]slice is unnecessary sincefetch(n, timeout=1.0)at line 399 already limits the number of messages to at mostn, and each message yields at most oneimage_id.Suggested simplification
- return dead_letter_ids[:n] + return dead_letter_ids🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` at line 443, The returned list is unnecessarily sliced; since fetch(n, timeout=1.0) already limits results to at most n and each message yields one image_id, remove the redundant [:n] and return dead_letter_ids directly. Update the return statement that currently reads return dead_letter_ids[:n] to simply return dead_letter_ids (keeping the surrounding logic that collects dead_letter_ids and the fetch(n, timeout=1.0) call intact).
419-431: Design tradeoff: unconditional ACK may lose advisories on transient failures.If
get_msgfails due to a transient error (e.g., network blip), the advisory is still ACKed and won't be retried. This means the dead-letter entry could be lost.The comment at lines 425-427 documents your reasoning that this "shouldn't happen" because
stream_seqis part of the schema and all messages haveimage_id. This is reasonable for expected failure modes (message already discarded, malformed data).Consider whether distinguishing
NotFoundError(terminal - message gone, safe to ACK) from other exceptions (transient - skip ACK for retry) would be worthwhile for resilience:Alternative: conditional ACK for transient failures
try: job_msg = await self.js.get_msg(stream_name, stream_seq) if job_msg and job_msg.data: task_data = json.loads(job_msg.data.decode()) if "image_id" in task_data: dead_letter_ids.append(str(task_data["image_id"])) else: logger.warning(f"No image_id found in task data: {task_data}") + # ACK after successful processing or terminal "missing data" case + await msg.ack() + logger.info(f"Acknowledged advisory message for stream_seq {stream_seq}") + except nats.js.errors.NotFoundError: + # Message was discarded - terminal, safe to ACK + logger.warning(f"Message {stream_seq} not found in {stream_name} (discarded)") + await msg.ack() except Exception as e: + # Transient failure - don't ACK, will retry on next call logger.warning(f"Could not retrieve message {stream_seq} from {stream_name}: {e}") - # The message might have been discarded after max_deliver exceeded else: logger.warning(f"No stream_seq in advisory data: {advisory_data}") + await msg.ack() # Malformed advisory - ACK to avoid infinite loop - # Acknowledge even if we couldn't find the stream_seq or image_id ... - await msg.ack() - logger.info( - f"Acknowledged advisory message for stream_seq {advisory_data.get('stream_seq', 'unknown')}" - )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 419 - 431, The advisory ACK is unconditional which hides transient get_msg failures; update the exception handling around the get_msg call in the advisory processing routine (the block that currently has "except Exception as e" and the subsequent "await msg.ack()") so you only ACK terminal errors (e.g., message-not-found / NotFoundError) but do not ACK for other transient exceptions so the advisory can be retried; specifically, catch the concrete NotFoundError (or whatever JetStream "not found" exception type your client exposes) and ACK in that branch with a clear log, while re-raising or skipping the ACK for other exceptions and logging them as transient failures so the message is not lost.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Line 443: The returned list is unnecessarily sliced; since fetch(n,
timeout=1.0) already limits results to at most n and each message yields one
image_id, remove the redundant [:n] and return dead_letter_ids directly. Update
the return statement that currently reads return dead_letter_ids[:n] to simply
return dead_letter_ids (keeping the surrounding logic that collects
dead_letter_ids and the fetch(n, timeout=1.0) call intact).
- Around line 419-431: The advisory ACK is unconditional which hides transient
get_msg failures; update the exception handling around the get_msg call in the
advisory processing routine (the block that currently has "except Exception as
e" and the subsequent "await msg.ack()") so you only ACK terminal errors (e.g.,
message-not-found / NotFoundError) but do not ACK for other transient exceptions
so the advisory can be retried; specifically, catch the concrete NotFoundError
(or whatever JetStream "not found" exception type your client exposes) and ACK
in that branch with a clear log, while re-raising or skipping the ACK for other
exceptions and logging them as transient failures so the message is not lost.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 66b28f54-dd96-43a3-8bcc-78b9b14a8cc6
📒 Files selected for processing (1)
ami/ml/orchestration/nats_queue.py
There was a problem hiding this comment.
🧹 Nitpick comments (1)
ami/ml/management/commands/check_dead_letter_queue.py (1)
42-43: Preserve exception chain withraise ... from e.Using
raise ... from epreserves the original traceback, which aids debugging when the command fails.Suggested fix
except Exception as e: - raise CommandError(f"Failed to check dead letter queue: {e}") + raise CommandError(f"Failed to check dead letter queue: {e}") from e🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/management/commands/check_dead_letter_queue.py` around lines 42 - 43, The except block currently re-raises a CommandError without preserving the original traceback; update the exception handling to chain the original exception by using "raise CommandError(f'Failed to check dead letter queue: {e}') from e" so the original exception (e) and its traceback are preserved when raising CommandError in the check_dead_letter_queue command.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@ami/ml/management/commands/check_dead_letter_queue.py`:
- Around line 42-43: The except block currently re-raises a CommandError without
preserving the original traceback; update the exception handling to chain the
original exception by using "raise CommandError(f'Failed to check dead letter
queue: {e}') from e" so the original exception (e) and its traceback are
preserved when raising CommandError in the check_dead_letter_queue command.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d5c55d0d-6077-4280-acb8-dc8cd1bc8993
📒 Files selected for processing (1)
ami/ml/management/commands/check_dead_letter_queue.py
Summary
This pull request introduces dead letter queue (DLQ) support for job tasks, allowing for better tracking and cleanup of failed tasks in the NATS orchestration layer.
Helps address #1168
Dead Letter Queue Management
check_dead_letter_queue.pyto allow users to check for dead letter queue messages for a specific job via the command line.get_dead_letter_task_idsinTaskQueueManagerto retrieve task IDs that failed after max delivery attempts, using a durable advisory consumer for persistence._setup_advisory_streamto ensure advisory streams are created when a connection is opened, capturing max-delivery events for DLQ tracking.Reliability Improvements
ACKs) for tasks and advisory messages are flushed to the NATS socket to avoid silent drops when subscriptions are torn down.How to Test the Changes
TASK_TTRinnats_queue.pyto a small value (e.g. 10 seconds) for easier testing. Also changemax_deliver=2.python trapdata/antenna/benchmark.py --job-id <job id> --skip-acks. This will drain the tasks with ACKs, similar to a crashing worker. Wait ~10 seconds and run it again.docker compose run --rm django python manage.py check_dead_letter_queue <job_id>Example output:
Once #1025 is in place, we can use this method to periodically check for failed images and then log to the job something like
Workers have failed to process Image 123 5 times, giving up, maybe only up to a max number of errors.Summary by CodeRabbit
New Features
Improvements
Tests