Skip to content

Support for NATS dead letter queue#1175

Open
carlosgjs wants to merge 24 commits intoRolnickLab:mainfrom
uw-ssec:carlos/natsdlq
Open

Support for NATS dead letter queue#1175
carlosgjs wants to merge 24 commits intoRolnickLab:mainfrom
uw-ssec:carlos/natsdlq

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Mar 4, 2026

Summary

This pull request introduces dead letter queue (DLQ) support for job tasks, allowing for better tracking and cleanup of failed tasks in the NATS orchestration layer.

Helps address #1168

Dead Letter Queue Management

  • Added a new management command check_dead_letter_queue.py to allow users to check for dead letter queue messages for a specific job via the command line.
  • Implemented get_dead_letter_task_ids in TaskQueueManager to retrieve task IDs that failed after max delivery attempts, using a durable advisory consumer for persistence.
  • Added _setup_advisory_stream to ensure advisory streams are created when a connection is opened, capturing max-delivery events for DLQ tracking.

Reliability Improvements

  • Ensured that acknowledgements (ACKs) for tasks and advisory messages are flushed to the NATS socket to avoid silent drops when subscriptions are torn down.

How to Test the Changes

  • Temporarily: change the TASK_TTR in nats_queue.py to a small value (e.g. 10 seconds) for easier testing. Also change max_deliver=2.
  • Create an async job with a small number of images, don't run any workers
  • Using the code from PR #112, run python trapdata/antenna/benchmark.py --job-id <job id> --skip-acks. This will drain the tasks with ACKs, similar to a crashing worker. Wait ~10 seconds and run it again.
  • Run docker compose run --rm django python manage.py check_dead_letter_queue <job_id>

Example output:

INFO 2026-03-04 14:15:34,238 nats_queue 1 131574005512000 Advisory stream created
INFO 2026-03-04 14:15:34,241 nats_queue 1 131574005512000 Acknowledged advisory message for stream_seq 1
INFO 2026-03-04 14:15:34,242 nats_queue 1 131574005512000 Acknowledged advisory message for stream_seq 2
INFO 2026-03-04 14:15:34,242 nats_queue 1 131574005512000 Acknowledged advisory message for stream_seq 3
INFO 2026-03-04 14:15:34,242 nats_queue 1 131574005512000 Acknowledged advisory message for stream_seq 4
INFO 2026-03-04 14:15:34,243 nats_queue 1 131574005512000 Acknowledged advisory message for stream_seq 5
Found 5 dead letter task(s) for job 196:
  - Image ID: 28324
  - Image ID: 35223
  - Image ID: 22865
  - Image ID: 1528
  - Image ID: 3781

Once #1025 is in place, we can use this method to periodically check for failed images and then log to the job something like Workers have failed to process Image 123 5 times, giving up, maybe only up to a max number of errors.

Summary by CodeRabbit

  • New Features

    • Added a CLI command to list dead-letter queue entries for a job, reporting failed task image IDs.
  • Improvements

    • Added a shared advisory stream to surface dead-letter events, improved advisory consumer handling and removal during job cleanup, and made acknowledgements more reliable by flushing after publish.
  • Tests

    • Expanded tests for advisory stream setup, dead-letter retrieval paths, and cleanup behavior.

carlos-irreverentlabs and others added 17 commits January 16, 2026 11:25
* fix: prevent NATS connection flooding and stale job task fetching

- Add connect_timeout=5, allow_reconnect=False to NATS connections to
  prevent leaked reconnection loops from blocking Django's event loop
- Guard /tasks endpoint against terminal-status jobs (return empty tasks
  instead of attempting NATS reserve)
- IncompleteJobFilter now excludes jobs by top-level status in addition
  to progress JSON stages
- Add stale worker cleanup to integration test script

Found during PSv2 integration testing where stale ADC workers with
default DataLoader parallelism overwhelmed the single uvicorn worker
thread by flooding /tasks with concurrent NATS reserve requests.

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: PSv2 integration test session notes and NATS flooding findings

Session notes from 2026-02-16 integration test including root cause
analysis of stale worker task competition and NATS connection issues.
Findings doc tracks applied fixes and remaining TODOs with priorities.

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: update session notes with successful test run #3

PSv2 integration test passed end-to-end (job 1380, 20/20 images).
Identified ack_wait=300s as cause of ~5min idle time when GPU
processes race for NATS tasks.

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: batch NATS task fetch to prevent HTTP timeouts

Replace N×1 reserve_task() calls with single reserve_tasks() batch
fetch. The previous implementation created a new pull subscription per
message (320 NATS round trips for batch=64), causing the /tasks endpoint
to exceed HTTP client timeouts. The new approach uses one psub.fetch()
call for the entire batch.

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: add next session prompt

* feat: add pipeline__slug__in filter for multi-pipeline job queries

Workers that handle multiple pipelines can now fetch jobs for all of them
in a single request: ?pipeline__slug__in=slug1,slug2

Co-Authored-By: Claude <noreply@anthropic.com>

* chore: remove local-only docs and scripts from branch

These files are session notes, planning docs, and test scripts that
should stay local rather than be part of the PR.

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: set job dispatch_mode at creation time based on project feature flags

ML jobs with a pipeline now get dispatch_mode set during setup() instead
of waiting until run() is called by the Celery worker. This lets the UI
show the correct mode immediately after job creation.

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: add timeouts to all JetStream operations and restore reconnect policy

Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations
via asyncio.wait_for() so a hung NATS connection fails fast instead of
blocking the caller's thread indefinitely. Also restore the intended
reconnect policy (2 attempts, 1s wait) that was lost in a prior force push.

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: propagate NATS timeouts as 503 instead of swallowing them

asyncio.TimeoutError from _ensure_stream() and _ensure_consumer() was
caught by the broad `except Exception` in reserve_tasks(), silently
returning [] and making NATS outages indistinguishable from empty queues.
Workers would then poll immediately, recreating the flooding problem.

- Add explicit `except asyncio.TimeoutError: raise` in reserve_tasks()
- Catch TimeoutError and OSError in the /tasks view, return 503
- Restore allow_reconnect=False (fail-fast on connection issues)
- Add return type annotation to get_connection()

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: address review comments (log level, fetch timeout, docstring)

- Downgrade reserve_tasks log to DEBUG when zero tasks reserved (avoid
  log spam from frequent polling)
- Pass timeout=0.5 from /tasks endpoint to avoid blocking the worker
  for 5s on empty queues
- Fix docstring examples using string 'job123' for int-typed job_id

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: catch nats.errors.Error in /tasks endpoint for proper 503 responses

NoServersError, ConnectionClosedError, and other NATS exceptions inherit
from nats.errors.Error (not OSError), so they escaped the handler and
returned 500 instead of 503.

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
…olnickLab#1142)

* feat: configurable NATS tuning and gunicorn worker management

Rebase onto main after RolnickLab#1135 merge. Keep only the additions unique to
this branch:

- Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s)
- Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100)
- Local dev: switch to gunicorn+UvicornWorker by default for production
  parity, with USE_UVICORN=1 escape hatch for raw uvicorn
- Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8)
  when not explicitly set in the environment

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: address PR review comments

- Fix max_ack_pending falsy-zero guard (use `is not None` instead of `or`)
- Update TaskQueueManager docstring with Args section
- Simplify production WEB_CONCURRENCY fallback (just use nproc)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
* fix: include pipeline_slug in MinimalJobSerializer (ids_only response)

The ADC worker fetches jobs with ids_only=1 and expects pipeline_slug in
the response to know which pipeline to run. Without it, Pydantic
validation fails and the worker skips the job.

Co-Authored-By: Claude <noreply@anthropic.com>

* Update ami/jobs/serializers.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@netlify
Copy link

netlify bot commented Mar 4, 2026

👷 Deploy request for antenna-ssec pending review.

Visit the deploys page to approve it

Name Link
🔨 Latest commit e4564fb

@netlify
Copy link

netlify bot commented Mar 4, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit e4564fb
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/69b08899d364b200087a2bf4

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 4, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a Django management command to inspect dead-letter messages for a job and extends the NATS TaskQueueManager with an advisory stream, DLQ consumer helpers (list/delete), advisory setup on connect, ACK flush behavior, and tests for DLQ paths. (46 words)

Changes

Cohort / File(s) Summary
Management command
ami/ml/management/commands/check_dead_letter_queue.py
New Django Command that accepts job_id, invokes an async _check_dead_letter_queue via TaskQueueManager, prints DLQ image IDs or a success message, and raises CommandError on failure.
NATS orchestration / DLQ support
ami/ml/orchestration/nats_queue.py
Adds ADVISORY_STREAM_NAME, advisory stream setup on connect (_setup_advisory_stream), generic _stream_exists and job-specific _job_stream_exists, DLQ helpers (_get_dlq_consumer_name, get_dead_letter_image_ids, delete_dlq_consumer), ensures flush after ACK publish, expands TimeoutError handling, and extends cleanup_job_resources to remove DLQ advisory consumer.
Tests (NATS queue)
ami/ml/orchestration/tests/test_nats_queue.py
Updates mocks/assertions for advisory stream creation and extra consumer deletion; adds tests for get_dead_letter_image_ids success and timeout paths; adds nc.flush mock and references ADVISORY_STREAM_NAME in assertions.

Sequence Diagram

sequenceDiagram
    participant CLI
    participant Cmd as check_dead_letter_queue (Django Command)
    participant TQM as TaskQueueManager
    participant NATS as NATS Client
    participant ADVISORY as Advisory Stream

    CLI->>Cmd: run check_dead_letter_queue(job_id)
    Cmd->>TQM: call _check_dead_letter_queue(job_id)
    TQM->>NATS: __aenter__ (open connection/context)
    NATS->>ADVISORY: ensure advisory stream exists (_setup_advisory_stream)
    ADVISORY-->>NATS: advisory stream ready
    TQM->>NATS: get_dead_letter_image_ids(job_id, n)
    NATS->>ADVISORY: pull/fetch advisory messages (durable consumer)
    ADVISORY-->>NATS: advisory events referencing original messages
    NATS->>NATS: fetch original messages -> extract image_ids
    NATS->>ADVISORY: ack advisory messages / publish ACKs
    NATS->>NATS: flush to ensure ACK delivery
    NATS-->>TQM: return list of image_ids
    TQM-->>Cmd: return result
    Cmd-->>CLI: print found IDs or "no dead letter images"
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Suggested reviewers

  • mihow

Poem

🐇 I sniff the streams where lost messages hide,
I tap advisories, follow each guide,
I fetch tiny IDs from shadowy queues,
I hop to ack, then flush away the clues,
Hooray — the dead letters sleep light and spry.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main change: adding support for NATS dead letter queue functionality, which aligns with all major changes in the changeset.
Description check ✅ Passed The description covers the summary, list of changes, related issues, detailed description with testing instructions, and includes a checklist, matching the repository template requirements.
Docstring Coverage ✅ Passed Docstring coverage is 80.77% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).
Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@carlosgjs carlosgjs requested a review from mihow March 4, 2026 19:27
@carlosgjs carlosgjs changed the title Carlos/natsdlq Support for NATS dead letter queue Mar 6, 2026
@carlosgjs carlosgjs marked this pull request as ready for review March 6, 2026 21:57
Copilot AI review requested due to automatic review settings March 6, 2026 21:57
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds dead letter queue (DLQ) support to the NATS/JetStream-backed async job task queue by capturing max-delivery advisories, enabling operators to query failed tasks and improving ACK reliability during teardown.

Changes:

  • Create/use a shared JetStream “advisories” stream and a per-job durable advisory consumer to track max-delivery (DLQ) events.
  • Add TaskQueueManager.get_dead_letter_task_ids() and DLQ consumer cleanup hooks.
  • Introduce a Django management command to query DLQ entries for a job from the CLI.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.

File Description
ami/ml/orchestration/nats_queue.py Adds advisory stream setup on connect, DLQ query method, extra flushes, and DLQ consumer cleanup.
ami/ml/orchestration/tests/test_nats_queue.py Updates unit tests for new advisory stream creation and additional consumer deletion.
ami/ml/management/commands/check_dead_letter_queue.py Adds CLI command to print DLQ task/image IDs for a job.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (4)
ami/ml/orchestration/nats_queue.py (1)

437-460: DLQ consumer deletion logs warning on any failure, including "not found".

If a DLQ consumer was never created (no tasks exceeded max deliveries), delete_dlq_consumer will log a warning at line 459. Consider catching NotFoundError separately and logging at debug level to reduce noise for the common case where no DLQ consumer exists.

♻️ Proposed refinement
     async def delete_dlq_consumer(self, job_id: int) -> bool:
         ...
         dlq_consumer_name = self._get_dlq_consumer_name(job_id)
         try:
             await asyncio.wait_for(
                 self.js.delete_consumer("advisories", dlq_consumer_name),
                 timeout=NATS_JETSTREAM_TIMEOUT,
             )
             logger.info(f"Deleted DLQ consumer {dlq_consumer_name} for job '{job_id}'")
             return True
+        except nats.js.errors.NotFoundError:
+            logger.debug(f"DLQ consumer {dlq_consumer_name} not found for job '{job_id}' (never created)")
+            return True  # Not an error - consumer simply never existed
         except Exception as e:
             logger.warning(f"Failed to delete DLQ consumer for job '{job_id}': {e}")
             return False
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 437 - 460, In
delete_dlq_consumer, handle the "consumer not found" case separately: when
calling self.js.delete_consumer("advisories", dlq_consumer_name) catch the
NotFoundError (the specific JetStream not-found exception class) and log it at
debug level instead of warning, while keeping other exceptions falling back to
the existing warning and returning False; update references to
js.delete_consumer, dlq_consumer_name, and the logger.warning call to implement
the separate except NotFoundError: logger.debug(...) branch.
ami/ml/orchestration/tests/test_nats_queue.py (1)

149-160: Consider adding dedicated tests for new DLQ methods.

The existing test verifies cleanup_job_resources calls delete_consumer twice, but there are no dedicated tests for:

  • get_dead_letter_task_ids (the core new functionality)
  • delete_dlq_consumer
  • _setup_advisory_stream

These methods have non-trivial logic (JSON parsing, message lookups, error handling) that would benefit from unit test coverage.

Would you like me to help generate test cases for these new methods?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/tests/test_nats_queue.py` around lines 149 - 160, Add
dedicated unit tests for the new DLQ-related methods: create tests that call
get_dead_letter_task_ids (mocking js.api.get_msg and simulating messages with
valid/invalid JSON and missing task_id) to assert it returns correct IDs and
handles parsing errors; test delete_dlq_consumer by mocking js.delete_consumer
to verify it is called with the DLQ consumer name and that errors are
handled/propagated as expected; and test _setup_advisory_stream by mocking
js.add_stream/js.add_consumer (or js.streams.add/js.consumers.add depending on
usage) to verify stream/consumer creation logic and error paths. Use the same
pattern as existing tests (patch get_connection to return mocked nc, js) and
assert call counts and returned values for get_dead_letter_task_ids,
delete_dlq_consumer, and _setup_advisory_stream.
ami/ml/management/commands/check_dead_letter_queue.py (2)

21-26: Consider adding optional arguments for count and verbosity.

The command currently retrieves only 10 DLQ entries (the default in get_dead_letter_task_ids). For jobs with many failed tasks, operators may want to retrieve more. Consider adding:

  • --count / -n to control how many entries to fetch
  • --all flag to fetch all available entries
♻️ Example enhancement
     def add_arguments(self, parser):
         parser.add_argument(
             "job_id",
             type=int,
             help="Job ID to check for dead letter queue messages",
         )
+        parser.add_argument(
+            "-n", "--count",
+            type=int,
+            default=10,
+            help="Maximum number of dead letter tasks to retrieve (default: 10)",
+        )

     async def _check_dead_letter_queue(self, job_id: int) -> list[str]:
+    async def _check_dead_letter_queue(self, job_id: int, count: int = 10) -> list[str]:
         """Check for dead letter queue messages using TaskQueueManager."""
         async with TaskQueueManager() as manager:
-            return await manager.get_dead_letter_task_ids(job_id)
+            return await manager.get_dead_letter_task_ids(job_id, n=count)

Also applies to: 46-49

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/management/commands/check_dead_letter_queue.py` around lines 21 - 26,
The management command add_arguments should be extended to accept optional
--count/-n (int) and --all (store_true) flags, make them mutually exclusive or
validate in handle, and pass the chosen value into get_dead_letter_task_ids when
invoked in handle; specifically, update add_arguments to add
parser.add_argument('--count', '-n', type=int, default=10, help=...) and
parser.add_argument('--all', action='store_true', help=...), then in handle read
options['count'] and options['all'], validate that both aren't set, convert
--all into an appropriate sentinel (e.g., None or a high limit) and call
get_dead_letter_task_ids(job_id, count=...) or get_dead_letter_task_ids(job_id,
fetch_all=True) depending on that function's signature, and update help
text/usage accordingly so operators can request more or all DLQ entries.

43-44: Preserve exception chain for better debugging.

Using raise ... from e preserves the original traceback, making it easier to debug issues when the command fails.

♻️ Proposed fix
         except Exception as e:
-            raise CommandError(f"Failed to check dead letter queue: {e}")
+            raise CommandError(f"Failed to check dead letter queue: {e}") from e
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/management/commands/check_dead_letter_queue.py` around lines 43 - 44,
The except block in check_dead_letter_queue.py currently re-raises a
CommandError without preserving the original traceback; update the handler
inside the Command class (the handle method where you catch "except Exception as
e") to re-raise the CommandError using "raise CommandError(... ) from e" so the
original exception chain is preserved; keep the existing message string but
attach the original exception as the cause by using "from e" with the
CommandError symbol.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 474-479: The current cleanup aggregates delete_consumer,
delete_stream and delete_dlq_consumer results so a DLQ deletion failure makes
the whole cleanup fail; change this so delete_dlq_consumer is attempted but
treated best-effort: keep calling await self.delete_dlq_consumer(job_id) (and
optionally log a warning on failure) but return only the combined result of
await self.delete_consumer(job_id) and await self.delete_stream(job_id) (i.e.,
use consumer_deleted and stream_deleted for the final boolean), leaving
delete_dlq_consumer failure from affecting the overall return; refer to the
identifiers delete_consumer, delete_stream and delete_dlq_consumer in
nats_queue.py.
- Line 389: The pull_subscribe call using durable=dlq_consumer_name is missing
the stream parameter, so ensure the durable consumer is created on the correct
stream by adding stream="advisories" to the self.js.pull_subscribe call (the
call that assigns psub). Update the invocation of
self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name) to include
stream="advisories" so the durable consumer is associated with the "advisories"
stream.
- Around line 361-435: The review notes to change the error logging in
get_dead_letter_task_ids to include a stack trace: replace the logger.error call
in the outer exception handler with logger.exception (on the same logger) so the
exception context/traceback is automatically logged instead of manually
formatting the exception string; keep the same message text but remove the
manual "{e}" formatting when calling logger.exception to avoid duplication.

---

Nitpick comments:
In `@ami/ml/management/commands/check_dead_letter_queue.py`:
- Around line 21-26: The management command add_arguments should be extended to
accept optional --count/-n (int) and --all (store_true) flags, make them
mutually exclusive or validate in handle, and pass the chosen value into
get_dead_letter_task_ids when invoked in handle; specifically, update
add_arguments to add parser.add_argument('--count', '-n', type=int, default=10,
help=...) and parser.add_argument('--all', action='store_true', help=...), then
in handle read options['count'] and options['all'], validate that both aren't
set, convert --all into an appropriate sentinel (e.g., None or a high limit) and
call get_dead_letter_task_ids(job_id, count=...) or
get_dead_letter_task_ids(job_id, fetch_all=True) depending on that function's
signature, and update help text/usage accordingly so operators can request more
or all DLQ entries.
- Around line 43-44: The except block in check_dead_letter_queue.py currently
re-raises a CommandError without preserving the original traceback; update the
handler inside the Command class (the handle method where you catch "except
Exception as e") to re-raise the CommandError using "raise CommandError(... )
from e" so the original exception chain is preserved; keep the existing message
string but attach the original exception as the cause by using "from e" with the
CommandError symbol.

In `@ami/ml/orchestration/nats_queue.py`:
- Around line 437-460: In delete_dlq_consumer, handle the "consumer not found"
case separately: when calling self.js.delete_consumer("advisories",
dlq_consumer_name) catch the NotFoundError (the specific JetStream not-found
exception class) and log it at debug level instead of warning, while keeping
other exceptions falling back to the existing warning and returning False;
update references to js.delete_consumer, dlq_consumer_name, and the
logger.warning call to implement the separate except NotFoundError:
logger.debug(...) branch.

In `@ami/ml/orchestration/tests/test_nats_queue.py`:
- Around line 149-160: Add dedicated unit tests for the new DLQ-related methods:
create tests that call get_dead_letter_task_ids (mocking js.api.get_msg and
simulating messages with valid/invalid JSON and missing task_id) to assert it
returns correct IDs and handles parsing errors; test delete_dlq_consumer by
mocking js.delete_consumer to verify it is called with the DLQ consumer name and
that errors are handled/propagated as expected; and test _setup_advisory_stream
by mocking js.add_stream/js.add_consumer (or js.streams.add/js.consumers.add
depending on usage) to verify stream/consumer creation logic and error paths.
Use the same pattern as existing tests (patch get_connection to return mocked
nc, js) and assert call counts and returned values for get_dead_letter_task_ids,
delete_dlq_consumer, and _setup_advisory_stream.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5c3ca6a5-cb22-4bb1-bfc5-0c9809717a0b

📥 Commits

Reviewing files that changed from the base of the PR and between dafb83a and db05526.

📒 Files selected for processing (3)
  • ami/ml/management/commands/check_dead_letter_queue.py
  • ami/ml/orchestration/nats_queue.py
  • ami/ml/orchestration/tests/test_nats_queue.py

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
ami/ml/management/commands/check_dead_letter_queue.py (1)

43-44: Preserve exception chain for better debugging.

Per static analysis, the exception should be chained using from e to preserve the original traceback, which helps with debugging when the command fails.

🔧 Proposed fix
         except Exception as e:
-            raise CommandError(f"Failed to check dead letter queue: {e}")
+            raise CommandError(f"Failed to check dead letter queue: {e}") from e
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/management/commands/check_dead_letter_queue.py` around lines 43 - 44,
The except block in check_dead_letter_queue.py currently raises CommandError
without chaining the original exception; update the raise in the except
Exception as e handler to re-raise CommandError using exception chaining (i.e.,
"raise CommandError(... ) from e") so the original traceback is preserved;
locate the except block in the Command.handle (or Command class) where
CommandError is raised and add "from e" to the raise statement referencing the
caught exception variable e.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 452-456: The call to self.js.delete_consumer currently hardcodes
"advisories"; replace that literal with the ADVISORY_STREAM_NAME constant to
match the rest of the file and avoid discrepancies—update the delete_consumer
invocation (and any nearby occurrences) to use ADVISORY_STREAM_NAME and keep
dlq_consumer_name and NATS_JETSTREAM_TIMEOUT as-is.

---

Nitpick comments:
In `@ami/ml/management/commands/check_dead_letter_queue.py`:
- Around line 43-44: The except block in check_dead_letter_queue.py currently
raises CommandError without chaining the original exception; update the raise in
the except Exception as e handler to re-raise CommandError using exception
chaining (i.e., "raise CommandError(... ) from e") so the original traceback is
preserved; locate the except block in the Command.handle (or Command class)
where CommandError is raised and add "from e" to the raise statement referencing
the caught exception variable e.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 551d6559-02e6-406b-8914-5b34b70bac6c

📥 Commits

Reviewing files that changed from the base of the PR and between db05526 and 0102ee7.

📒 Files selected for processing (3)
  • ami/ml/management/commands/check_dead_letter_queue.py
  • ami/ml/orchestration/nats_queue.py
  • ami/ml/orchestration/tests/test_nats_queue.py

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 433-436: In get_dead_letter_task_ids (the function using job_id,
dead_letter_ids and n) don't swallow all exceptions: after logging the error via
logger.error(f"Failed to get dead letter task IDs for job '{job_id}': {e}"),
re-raise the exception so callers (like check_dead_letter_queue) can surface a
CommandError; alternatively, if you intentionally want to return an empty list
for a specific, known/expected exception, catch only that specific exception
type and handle it, but remove the broad "except Exception as e" that hides
NATS/advisory failures.
- Around line 73-78: __aenter__ opens the NATS connection via
get_connection(self.nats_url) then calls _setup_advisory_stream(); if
_setup_advisory_stream() raises the connection is leaked because __aexit__ won't
run. Wrap the call to self._setup_advisory_stream() in a try/except, and on
exception explicitly close the opened connection (await self.nc.close()), clear
self.nc/self.js as needed, then re-raise the exception so the error propagates;
keep get_connection, __aenter__, and _setup_advisory_stream names to locate the
change.
- Around line 403-418: The advisory ACK is unconditional and must be gated: in
the advisory-processing loop around self.js.get_msg(...) only call msg.ack()
when you have successfully extracted image_id (appended to dead_letter_ids) or
when get_msg raises a terminal NotFoundError indicating the message is gone; do
not ack on transient exceptions or JSON decode failures so they can be retried.
Update the block that calls self.js.get_msg and handles exceptions to detect
NotFoundError (import/compare the correct exception type used by your NATS
client), ack in the success branch after appending the image_id, and ack in the
NotFoundError terminal branch (with a clear log); do not ack inside the generic
except that catches transient errors. Ensure msg.ack() remains reachable only
from those two confirmed-terminal pathways (success or NotFoundError).

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: f370c226-6231-44b9-a461-f2e098446844

📥 Commits

Reviewing files that changed from the base of the PR and between 0102ee7 and b44f5b0.

📒 Files selected for processing (1)
  • ami/ml/orchestration/nats_queue.py

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
ami/ml/orchestration/nats_queue.py (2)

443-443: Redundant slice operation.

The [:n] slice is unnecessary since fetch(n, timeout=1.0) at line 399 already limits the number of messages to at most n, and each message yields at most one image_id.

Suggested simplification
-        return dead_letter_ids[:n]
+        return dead_letter_ids
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` at line 443, The returned list is
unnecessarily sliced; since fetch(n, timeout=1.0) already limits results to at
most n and each message yields one image_id, remove the redundant [:n] and
return dead_letter_ids directly. Update the return statement that currently
reads return dead_letter_ids[:n] to simply return dead_letter_ids (keeping the
surrounding logic that collects dead_letter_ids and the fetch(n, timeout=1.0)
call intact).

419-431: Design tradeoff: unconditional ACK may lose advisories on transient failures.

If get_msg fails due to a transient error (e.g., network blip), the advisory is still ACKed and won't be retried. This means the dead-letter entry could be lost.

The comment at lines 425-427 documents your reasoning that this "shouldn't happen" because stream_seq is part of the schema and all messages have image_id. This is reasonable for expected failure modes (message already discarded, malformed data).

Consider whether distinguishing NotFoundError (terminal - message gone, safe to ACK) from other exceptions (transient - skip ACK for retry) would be worthwhile for resilience:

Alternative: conditional ACK for transient failures
                     try:
                         job_msg = await self.js.get_msg(stream_name, stream_seq)

                         if job_msg and job_msg.data:
                             task_data = json.loads(job_msg.data.decode())

                             if "image_id" in task_data:
                                 dead_letter_ids.append(str(task_data["image_id"]))
                             else:
                                 logger.warning(f"No image_id found in task data: {task_data}")
+                        # ACK after successful processing or terminal "missing data" case
+                        await msg.ack()
+                        logger.info(f"Acknowledged advisory message for stream_seq {stream_seq}")
+                    except nats.js.errors.NotFoundError:
+                        # Message was discarded - terminal, safe to ACK
+                        logger.warning(f"Message {stream_seq} not found in {stream_name} (discarded)")
+                        await msg.ack()
                     except Exception as e:
+                        # Transient failure - don't ACK, will retry on next call
                         logger.warning(f"Could not retrieve message {stream_seq} from {stream_name}: {e}")
-                        # The message might have been discarded after max_deliver exceeded
                 else:
                     logger.warning(f"No stream_seq in advisory data: {advisory_data}")
+                    await msg.ack()  # Malformed advisory - ACK to avoid infinite loop

-                # Acknowledge even if we couldn't find the stream_seq or image_id ...
-                await msg.ack()
-                logger.info(
-                    f"Acknowledged advisory message for stream_seq {advisory_data.get('stream_seq', 'unknown')}"
-                )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 419 - 431, The advisory ACK
is unconditional which hides transient get_msg failures; update the exception
handling around the get_msg call in the advisory processing routine (the block
that currently has "except Exception as e" and the subsequent "await msg.ack()")
so you only ACK terminal errors (e.g., message-not-found / NotFoundError) but do
not ACK for other transient exceptions so the advisory can be retried;
specifically, catch the concrete NotFoundError (or whatever JetStream "not
found" exception type your client exposes) and ACK in that branch with a clear
log, while re-raising or skipping the ACK for other exceptions and logging them
as transient failures so the message is not lost.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Line 443: The returned list is unnecessarily sliced; since fetch(n,
timeout=1.0) already limits results to at most n and each message yields one
image_id, remove the redundant [:n] and return dead_letter_ids directly. Update
the return statement that currently reads return dead_letter_ids[:n] to simply
return dead_letter_ids (keeping the surrounding logic that collects
dead_letter_ids and the fetch(n, timeout=1.0) call intact).
- Around line 419-431: The advisory ACK is unconditional which hides transient
get_msg failures; update the exception handling around the get_msg call in the
advisory processing routine (the block that currently has "except Exception as
e" and the subsequent "await msg.ack()") so you only ACK terminal errors (e.g.,
message-not-found / NotFoundError) but do not ACK for other transient exceptions
so the advisory can be retried; specifically, catch the concrete NotFoundError
(or whatever JetStream "not found" exception type your client exposes) and ACK
in that branch with a clear log, while re-raising or skipping the ACK for other
exceptions and logging them as transient failures so the message is not lost.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 66b28f54-dd96-43a3-8bcc-78b9b14a8cc6

📥 Commits

Reviewing files that changed from the base of the PR and between b44f5b0 and e09fd9a.

📒 Files selected for processing (1)
  • ami/ml/orchestration/nats_queue.py

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
ami/ml/management/commands/check_dead_letter_queue.py (1)

42-43: Preserve exception chain with raise ... from e.

Using raise ... from e preserves the original traceback, which aids debugging when the command fails.

Suggested fix
         except Exception as e:
-            raise CommandError(f"Failed to check dead letter queue: {e}")
+            raise CommandError(f"Failed to check dead letter queue: {e}") from e
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/management/commands/check_dead_letter_queue.py` around lines 42 - 43,
The except block currently re-raises a CommandError without preserving the
original traceback; update the exception handling to chain the original
exception by using "raise CommandError(f'Failed to check dead letter queue:
{e}') from e" so the original exception (e) and its traceback are preserved when
raising CommandError in the check_dead_letter_queue command.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@ami/ml/management/commands/check_dead_letter_queue.py`:
- Around line 42-43: The except block currently re-raises a CommandError without
preserving the original traceback; update the exception handling to chain the
original exception by using "raise CommandError(f'Failed to check dead letter
queue: {e}') from e" so the original exception (e) and its traceback are
preserved when raising CommandError in the check_dead_letter_queue command.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d5c55d0d-6077-4280-acb8-dc8cd1bc8993

📥 Commits

Reviewing files that changed from the base of the PR and between e09fd9a and e4564fb.

📒 Files selected for processing (1)
  • ami/ml/management/commands/check_dead_letter_queue.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants