Skip to content

fix: streamlined pod monitor#257

Merged
HardMax71 merged 7 commits intomainfrom
fix/backend-monitor
Mar 3, 2026
Merged

fix: streamlined pod monitor#257
HardMax71 merged 7 commits intomainfrom
fix/backend-monitor

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Mar 3, 2026


Summary by cubic

Streamlined the pod monitor by removing the raw event handler, routing watch events directly to processing, and switching to structured logging. Clarified aggregate_id vs execution_id across events and APIs for consistent publishing and querying, and fixed watch error metrics and reconnect counting.

  • Refactors
    • Removed _process_raw_event; list-then-watch sets the cursor; watch filters valid types, calls _process_pod_event, and stores the watch RV on exit.
    • Restricted publishing to PodMonitorEvent and exported it; Kafka key always uses execution_id.
    • Structured logging across monitor, worker, and runner; standardized pod delete/configmap warnings with status/reason.
    • Propagated aggregate_id in pod/result/saga events and worker commands; added get_events_by_execution_id (repo/service) and updated the API route; corrected watch error metrics and reconnect counts; updated e2e events adapter to DomainEvent.

Written for commit 7088dad. Summary will update on new commits.

Summary by CodeRabbit

  • New Features

    • Expose a consolidated PodMonitorEvent type and include aggregate_id across many emitted events.
    • Added API to fetch events by execution ID.
  • Bug Fixes

    • Standardized and clearer pod/configmap error logging.
    • Improved pod-monitor resilience: better error aggregation, reconnect handling, publishing keying, and processing metrics.
  • Tests

    • Updated tests to align with event typing, payloads, and export changes.

Copilot AI review requested due to automatic review settings March 3, 2026 14:06
@coderabbitai
Copy link

coderabbitai bot commented Mar 3, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds aggregate_id to many domain events and constructors, introduces PodMonitorEvent union replacing DomainEvent in pod monitor, overhauls pod watch/process flow and logging, adds get_events_by_execution_id to repo/service and updates API, and propagates aggregate_id through result, saga, and worker event paths.

Changes

Cohort / File(s) Summary
Pod monitor core & mapping
backend/app/services/pod_monitor/monitor.py, backend/app/services/pod_monitor/event_mapper.py, backend/app/services/pod_monitor/__init__.py
Replaced DomainEvent with new PodMonitorEvent union; event constructors now include aggregate_id; _publish_event accepts PodMonitorEvent and publishes keyed by event.execution_id; removed resource_version from pod event types and simplified processing flow and logging.
Watch loop / runner
backend/workers/run_pod_monitor.py, backend/app/services/pod_monitor/monitor.py
Refactored watch cycle to use an error_type aggregator, treat ApiException 410 as resource-version-expired reset, defer metric recording until after exception handling, and unify error logging.
K8s worker & pod deletion
backend/app/services/k8s_worker/worker.py
Consolidated exception handling and warning logs for pod/configmap delete operations; included execution_id, status, and reason in warnings. Also add aggregate_id to PodCreated/CreationFailed event publishing.
Event store and API
backend/app/db/repositories/event_repository.py, backend/app/services/event_service.py, backend/app/api/routes/execution.py
Added get_events_by_execution_id to repository and service (query by execution_id, optional event_types, limit); updated route to call get_events_by_execution_id.
Result publishing & saga
backend/app/services/result_processor/processor.py, backend/app/services/saga/execution_saga.py, backend/app/services/saga/saga_orchestrator.py
Propagated aggregate_id into result publish helpers and events (ResultStoredEvent, ResultFailedEvent); added aggregate_id to command and saga events (CreatePodCommandEvent, DeletePodCommandEvent, SagaStartedEvent, SagaCancelledEvent).
Tests & exports
backend/tests/unit/services/pod_monitor/test_monitor.py, backend/tests/unit/services/pod_monitor/test_config_and_init.py, backend/tests/e2e/test_execution_routes.py
Updated tests to new event shapes: replaced ExecutionStartedEvent usages with ExecutionCompletedEvent, changed mocked event_type to string, removed resource_version assertions, exported PodMonitorEvent, and adjusted e2e adapters to use DomainEvent.
Small typing/signature changes
backend/app/services/pod_monitor/event_mapper.py, backend/app/services/pod_monitor/monitor.py, backend/app/services/k8s_worker/worker.py
Removed resource_version from PodEvent; changed _last_resource_version to `str

Sequence Diagram(s)

sequenceDiagram
  participant Watcher
  participant K8sAPI
  participant PodMonitor
  participant Mapper
  participant Publisher
  participant Metrics

  Watcher->>K8sAPI: start/watch pods (resourceVersion)
  K8sAPI-->>Watcher: pod event
  Watcher->>PodMonitor: deliver pod event
  PodMonitor->>Mapper: map pod -> PodMonitorEvent(s)
  alt mapping error
    Mapper-->>PodMonitor: raises error
    PodMonitor->>Metrics: record PROCESSING_ERROR
  else mapping success
    Mapper-->>PodMonitor: PodMonitorEvent(s)
    PodMonitor->>Publisher: publish event (key=event.execution_id)
    Publisher-->>Metrics: record publish metrics
    alt terminal event
      PodMonitor->>K8sAPI: delete pod
      K8sAPI-->>PodMonitor: status/reason
      PodMonitor->>Metrics: record delete outcome
    end
  end
  PodMonitor->>Metrics: record processing duration
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐇 I hopped through watches, stitched IDs to each event's thread,
Mapped pods to messages, keyed them by execution's stead.
I logged the reasons, reset cursors when they slipped,
Published, pruned, and timed each hop — a tidy rabbit script.
Carrots for commits, a tiny hop — aggregate_id!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 61.90% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix: streamlined pod monitor' accurately describes the main change: streamlining the pod monitor by removing raw event handling and simplifying the event processing pipeline.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/backend-monitor

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov-commenter
Copy link

codecov-commenter commented Mar 3, 2026

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 82.69231% with 9 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
backend/app/services/pod_monitor/monitor.py 85.71% 4 Missing ⚠️
backend/app/services/result_processor/processor.py 57.14% 3 Missing ⚠️
backend/app/services/k8s_worker/worker.py 0.00% 2 Missing ⚠️
❗ Your organization needs to install the Codecov GitHub app to enable full functionality.
Flag Coverage Δ
backend-e2e 83.19% <34.61%> (+0.01%) ⬆️
backend-unit 67.78% <69.23%> (-0.05%) ⬇️
frontend-unit 86.86% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
backend/app/api/routes/execution.py 100.00% <100.00%> (ø)
backend/app/db/repositories/event_repository.py 67.27% <100.00%> (+1.88%) ⬆️
backend/app/services/event_service.py 52.27% <100.00%> (+2.27%) ⬆️
backend/app/services/pod_monitor/__init__.py 100.00% <100.00%> (ø)
backend/app/services/pod_monitor/event_mapper.py 89.87% <100.00%> (+0.04%) ⬆️
backend/app/services/saga/execution_saga.py 100.00% <ø> (ø)
backend/app/services/saga/saga_orchestrator.py 75.00% <ø> (ø)
backend/app/services/k8s_worker/worker.py 20.00% <0.00%> (+0.28%) ⬆️
backend/app/services/result_processor/processor.py 83.33% <57.14%> (ø)
backend/app/services/pod_monitor/monitor.py 91.11% <85.71%> (+0.81%) ⬆️

... and 2 files with indirect coverage changes

Components Coverage Δ
Backend 90.05% <82.69%> (-0.06%) ⬇️
Frontend 86.86% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR streamlines pod-monitor error handling and logging, aiming to make watch-cycle metrics/logs more consistent and simplify pod event processing flow.

Changes:

  • Consolidates watch-cycle error classification so metrics/reconnect counters are emitted from a single path.
  • Refactors pod event processing to narrow the try/except scope and uses more structured logging.
  • Simplifies Kubernetes ApiException logging in pod deletion paths and updates a unit test mock accordingly.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.

File Description
backend/workers/run_pod_monitor.py Centralizes watch error classification and reconnect metric emission; switches to structured error logging fields.
backend/app/services/pod_monitor/monitor.py Refactors _process_pod_event flow and simplifies _delete_pod exception logging.
backend/app/services/k8s_worker/worker.py Simplifies ApiException logging for pod deletion and ConfigMap ownerReference patching.
backend/tests/unit/services/pod_monitor/test_monitor.py Updates mocked event_type shape/value in the full-flow unit test.
Comments suppressed due to low confidence (3)

backend/app/services/k8s_worker/worker.py:100

  • handle_delete_pod_command now logs all ApiExceptions as warnings, which downgrades genuinely unexpected failures (e.g., 403/500/timeouts) that previously logged as errors. Consider restoring the status-based severity (e.g., keep 404 as warning/expected, but log non-404 as error and include exc_info=True if useful) so operational alerting doesn’t miss real delete failures.
        except ApiException as e:
            self.logger.warning("Failed to delete pod", execution_id=execution_id, status=e.status, reason=e.reason)

backend/app/services/pod_monitor/monitor.py:227

  • _delete_pod used to treat 404 (already deleted) as a debug-level, expected condition; it now emits a warning for all delete failures. If 404s are common during reconciliation/watch races, this will create noisy warnings and may trigger alerts. Consider special-casing 404 again (debug/info) while keeping warning/error for unexpected statuses.
        except ApiException as e:
            self.logger.warning("Failed to delete pod", pod_name=pod_name, status=e.status, reason=e.reason)

backend/tests/unit/services/pod_monitor/test_monitor.py:281

  • The unit test’s mock event sets event_type = "pod.running", but the codebase’s canonical pod event types are EventType values (e.g. EventType.POD_RUNNING which is "pod_running"; see backend/app/domain/enums/events.py:18-25). Using a nonstandard string makes the test less representative and can silently diverge from metrics/logging labels; consider switching the mock to EventType.POD_RUNNING (or at least "pod_running").
            class Event:
                event_type = "pod.running"
                aggregate_id = "agg1"

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 issues found across 4 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="backend/app/services/pod_monitor/monitor.py">

<violation number="1" location="backend/app/services/pod_monitor/monitor.py:189">
P1: Exception handling was narrowed too much: errors during terminal pod deletion or post-processing metrics/logging can now escape `_process_pod_event` and stop the monitor loop.</violation>

<violation number="2" location="backend/app/services/pod_monitor/monitor.py:226">
P2: 404 delete races are now logged as warnings, which can create alert noise; keep 404 as debug and warn only on unexpected statuses.</violation>
</file>

<file name="backend/app/services/k8s_worker/worker.py">

<violation number="1" location="backend/app/services/k8s_worker/worker.py:99">
P2: Do not downgrade all pod deletion API failures to warning; keep non-404 failures at error level so real compensation failures are visible.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
backend/workers/run_pod_monitor.py (1)

59-59: Prefer an explicit monitor API over private-attribute mutation.

Line 59 reaches into monitor._last_resource_version; consider a public reset_resource_version() method on PodMonitor.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/workers/run_pod_monitor.py` at line 59, The code directly mutates the
private attribute monitor._last_resource_version; instead add a public
PodMonitor.reset_resource_version() method that sets its internal state
(clearing last_resource_version) and call monitor.reset_resource_version() here;
update the PodMonitor class (the constructor/state and any watchers) to expose
that method rather than touching _last_resource_version externally so callers
use reset_resource_version() instead of accessing the private attribute.
backend/app/services/k8s_worker/worker.py (1)

226-226: Include object identity in ownerReference patch failure log.

Line 226 logs only reason; adding config_map_name and status will make triage much faster.

Suggested logging enrichment
-            self.logger.warning("Failed to set ownerReference on ConfigMap", reason=e.reason)
+            self.logger.warning(
+                "Failed to set ownerReference on ConfigMap",
+                config_map_name=config_map.metadata.name if config_map.metadata else None,
+                owner_pod_name=owner_pod.metadata.name if owner_pod.metadata else None,
+                status=e.status,
+                reason=e.reason,
+            )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/k8s_worker/worker.py` at line 226, The ownerReference
patch failure log currently calls self.logger.warning("Failed to set
ownerReference on ConfigMap", reason=e.reason); update that log to include the
ConfigMap identity and HTTP status by adding config_map_name (or the variable
holding the ConfigMap name) and the status (e.status or e.status_code /
response.status if available) to the structured log call so it becomes something
like self.logger.warning("Failed to set ownerReference on ConfigMap",
config_map_name=config_map_name, status=status, reason=e.reason) — locate this
in worker.py where the ownerReference patch is attempted (the
self.logger.warning line) and ensure the referenced variable names match the
local names used for the ConfigMap name and the exception status.
backend/app/services/pod_monitor/monitor.py (1)

176-178: Record processing-duration metric even on early-return/error paths.

Line 178 and Line 187 return before Line 200–201, so duration metrics are dropped for ignored/error events.

Move duration recording into a finally block
     async def _process_pod_event(self, event: PodEvent) -> None:
         """Process a pod event."""
         start_time = time.time()
-
-        # Update resource version for crash recovery
-        if event.resource_version:
-            self._last_resource_version = event.resource_version
-
-        # Skip ignored phases
-        pod_phase = event.pod.status.phase if event.pod.status else None
-        if pod_phase in self.config.ignored_pod_phases:
-            return
-
-        try:
-            app_events = await self._event_mapper.map_pod_event(event.pod, event.event_type)
-            for app_event in app_events:
-                await self._publish_event(app_event, event.pod)
-        except Exception:
-            self.logger.error("Error processing pod event", exc_info=True)
-            self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR)
-            return
+        try:
+            # Update resource version for crash recovery
+            if event.resource_version:
+                self._last_resource_version = event.resource_version
+
+            # Skip ignored phases
+            pod_phase = event.pod.status.phase if event.pod.status else None
+            if pod_phase in self.config.ignored_pod_phases:
+                return
+
+            try:
+                app_events = await self._event_mapper.map_pod_event(event.pod, event.event_type)
+                for app_event in app_events:
+                    await self._publish_event(app_event, event.pod)
+            except Exception:
+                self.logger.error("Error processing pod event", exc_info=True)
+                self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR)
+                return
+
+            if any(e.event_type in _TERMINAL_EVENT_TYPES for e in app_events):
+                await self._delete_pod(event.pod)
+
+            if app_events:
+                pod_name = event.pod.metadata.name if event.pod.metadata else "unknown"
+                self.logger.info(
+                    "Processed pod event",
+                    watch_event_type=event.event_type,
+                    pod_name=pod_name,
+                    pod_phase=pod_phase or "Unknown",
+                    published_events=len(app_events),
+                )
+        finally:
+            duration = time.time() - start_time
+            self._metrics.record_pod_monitor_event_processing_duration(duration, event.event_type)
-
-        if any(e.event_type in _TERMINAL_EVENT_TYPES for e in app_events):
-            await self._delete_pod(event.pod)
-
-        if app_events:
-            ...
-
-        duration = time.time() - start_time
-        self._metrics.record_pod_monitor_event_processing_duration(duration, event.event_type)

Also applies to: 184-201

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/app/services/pod_monitor/monitor.py`:
- Around line 193-198: The info log currently interpolates values via an
f-string in the monitor class; change self.logger.info(...) to use structured
logging: provide a static message string (e.g., "Processed pod event") and pass
event_type=event.event_type, pod_name=pod_name, pod_phase=pod_phase or
"Unknown", published_count=len(app_events) as keyword args to self.logger
(BoundLogger) instead of embedding them in the message; update the call site
where pod_name is computed and ensure no user-controlled fields are formatted
into the message string.

---

Nitpick comments:
In `@backend/app/services/k8s_worker/worker.py`:
- Line 226: The ownerReference patch failure log currently calls
self.logger.warning("Failed to set ownerReference on ConfigMap",
reason=e.reason); update that log to include the ConfigMap identity and HTTP
status by adding config_map_name (or the variable holding the ConfigMap name)
and the status (e.status or e.status_code / response.status if available) to the
structured log call so it becomes something like self.logger.warning("Failed to
set ownerReference on ConfigMap", config_map_name=config_map_name,
status=status, reason=e.reason) — locate this in worker.py where the
ownerReference patch is attempted (the self.logger.warning line) and ensure the
referenced variable names match the local names used for the ConfigMap name and
the exception status.

In `@backend/workers/run_pod_monitor.py`:
- Line 59: The code directly mutates the private attribute
monitor._last_resource_version; instead add a public
PodMonitor.reset_resource_version() method that sets its internal state
(clearing last_resource_version) and call monitor.reset_resource_version() here;
update the PodMonitor class (the constructor/state and any watchers) to expose
that method rather than touching _last_resource_version externally so callers
use reset_resource_version() instead of accessing the private attribute.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a9b4b7a and 50654e4.

📒 Files selected for processing (4)
  • backend/app/services/k8s_worker/worker.py
  • backend/app/services/pod_monitor/monitor.py
  • backend/tests/unit/services/pod_monitor/test_monitor.py
  • backend/workers/run_pod_monitor.py

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
backend/app/services/pod_monitor/monitor.py (1)

193-197: ⚠️ Potential issue | 🟠 Major

Use structured fields instead of interpolated event data in logs.

Line 193 logs user/pod fields via an f-string; move them to keyword fields on the bound logger.

Suggested fix
-        if app_events:
-            pod_name = event.pod.metadata.name if event.pod.metadata else "unknown"
-            self.logger.info(
-                f"Processed {event.event_type} event for pod {pod_name} "
-                f"(phase: {pod_phase or 'Unknown'}), "
-                f"published {len(app_events)} events"
-            )
+        if app_events:
+            pod_name = event.pod.metadata.name if event.pod.metadata else "unknown"
+            self.logger.info(
+                "Processed pod event",
+                watch_event_type=event.event_type,
+                pod_name=pod_name,
+                pod_phase=pod_phase or "Unknown",
+                published_events=len(app_events),
+            )
As per coding guidelines, "Use only structured logging via injected structlog.stdlib.BoundLogger (never call logging.getLogger() directly); pass user-controlled data as keyword arguments, never interpolate into log messages".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/pod_monitor/monitor.py` around lines 193 - 197, The log
call in monitor.py currently interpolates user/pod data into the message; change
the self.logger.info(...) call inside the pod event processing (the block that
references event.event_type, pod_name, pod_phase, and app_events) to use a
static message string and pass event_type, pod_name, pod_phase, and
published_event_count as keyword arguments to the BoundLogger (e.g.,
event_type=event.event_type, pod_name=pod_name, pod_phase=pod_phase or
"Unknown", published_event_count=len(app_events)); do not interpolate these
values into the message and keep using the injected self.logger
(structlog.stdlib.BoundLogger).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/app/services/pod_monitor/monitor.py`:
- Around line 170-173: The code advances self._last_resource_version from
event.resource_version before attempting to map/publish the event, so failures
during mapping/publishing can cause skipped events on restart; move the
assignment of self._last_resource_version so it happens only after successful
processing (i.e., after the calls that map the event and publish the message —
the code paths around map_event_to_message and the
publisher.publish/self.publish_message calls), and apply the same change for the
other block covering lines 179-186 (only set _last_resource_version when
mapping/publishing completes without exception).

---

Duplicate comments:
In `@backend/app/services/pod_monitor/monitor.py`:
- Around line 193-197: The log call in monitor.py currently interpolates
user/pod data into the message; change the self.logger.info(...) call inside the
pod event processing (the block that references event.event_type, pod_name,
pod_phase, and app_events) to use a static message string and pass event_type,
pod_name, pod_phase, and published_event_count as keyword arguments to the
BoundLogger (e.g., event_type=event.event_type, pod_name=pod_name,
pod_phase=pod_phase or "Unknown", published_event_count=len(app_events)); do not
interpolate these values into the message and keep using the injected
self.logger (structlog.stdlib.BoundLogger).

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 50654e4 and 5e429bd.

📒 Files selected for processing (5)
  • backend/app/services/pod_monitor/__init__.py
  • backend/app/services/pod_monitor/event_mapper.py
  • backend/app/services/pod_monitor/monitor.py
  • backend/tests/unit/services/pod_monitor/test_config_and_init.py
  • backend/tests/unit/services/pod_monitor/test_monitor.py

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
backend/app/services/pod_monitor/monitor.py (2)

162-168: ⚠️ Potential issue | 🟠 Major

Use structured fields for the processed-event info log.

Line 165-167 still interpolates event data into message text. Keep a static message and pass fields as kwargs.

Suggested fix
         if app_events:
             pod_name = event.pod.metadata.name if event.pod.metadata else "unknown"
             self.logger.info(
-                f"Processed {event.event_type} event for pod {pod_name} "
-                f"(phase: {pod_phase or 'Unknown'}), "
-                f"published {len(app_events)} events"
+                "Processed pod event",
+                watch_event_type=event.event_type,
+                pod_name=pod_name,
+                pod_phase=pod_phase or "Unknown",
+                published_events=len(app_events),
             )

As per coding guidelines, "Use only structured logging via injected structlog.stdlib.BoundLogger (never call logging.getLogger() directly); pass user-controlled data as keyword arguments, never interpolate into log messages."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/pod_monitor/monitor.py` around lines 162 - 168, The log
message in monitor.py currently interpolates event data into the message string;
change it to a static message and pass all dynamic data as structured fields to
the BoundLogger: replace the formatted self.logger.info(...) call around
self.logger.info(f"Processed ...") with a static message like "Processed pod
event" and supply keyword args for event_type (event.event_type), pod_name
(pod_name), pod_phase (pod_phase or "Unknown"), and published (len(app_events))
so all user-controlled data (event, pod_name, pod_phase, app_events) is passed
as kwargs to self.logger.info.

115-117: ⚠️ Potential issue | 🔴 Critical

Do not swallow processing failures while still advancing the watch cursor.

Line 150-157 returns after map/publish exceptions, while Line 115-117 still commits _last_resource_version in finally. A transient mapper/Kafka failure can be dropped permanently on restart.

Suggested fix
 async def watch_pod_events(self) -> None:
+    watch_error: ErrorType | None = None
     try:
         async for event in self._watch.stream(self._v1.list_namespaced_pod, **kwargs):
             if event["type"] not in WatchEventType:
                 continue
-            await self._process_pod_event(PodEvent(event_type=WatchEventType(event["type"]), pod=event["object"]))
+            try:
+                await self._process_pod_event(
+                    PodEvent(event_type=WatchEventType(event["type"]), pod=event["object"])
+                )
+            except Exception:
+                watch_error = ErrorType.PROCESSING_ERROR
+                raise
+    except ApiException as e:
+        watch_error = ErrorType.RESOURCE_VERSION_EXPIRED if e.status == 410 else ErrorType.API_ERROR
+        raise
+    except Exception:
+        watch_error = watch_error or ErrorType.UNEXPECTED
+        raise
     finally:
-        if self._watch.resource_version:
+        if watch_error:
+            self._metrics.record_pod_monitor_watch_error(watch_error)
+        elif self._watch.resource_version:
             self._last_resource_version = self._watch.resource_version
@@
-        except Exception:
+        except Exception:
             self.logger.error("Error processing pod event", exc_info=True)
-            self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR)
-            return
+            raise

As per coding guidelines, "Services in app/services/ must use constructor injection for repositories, UnifiedProducer, structlog logger, Settings, and metrics; orchestrate domain logic only; check None from repos and raise domain-specific NotFoundError subclasses (never return None to routes); record all significant operations via metrics."

Also applies to: 150-157

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/pod_monitor/monitor.py` around lines 115 - 117, The
finally block currently always advances self._last_resource_version from
self._watch.resource_version even when mapping/publishing fails, which risks
permanently skipping events; change the logic so that
self._last_resource_version is only updated after successful processing: move
the assignment out of the finally and into the success path immediately after
the map/publish steps in the method (where exceptions are currently caught
around map/publish), ensure exceptions are not swallowed (re-raise or handle so
the watch cursor is not advanced on failure), and add a metric increment/logging
(using the service's injected metrics/logger) for both successful commits and
processing failures to record these significant operations.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@backend/app/services/pod_monitor/monitor.py`:
- Around line 162-168: The log message in monitor.py currently interpolates
event data into the message string; change it to a static message and pass all
dynamic data as structured fields to the BoundLogger: replace the formatted
self.logger.info(...) call around self.logger.info(f"Processed ...") with a
static message like "Processed pod event" and supply keyword args for event_type
(event.event_type), pod_name (pod_name), pod_phase (pod_phase or "Unknown"), and
published (len(app_events)) so all user-controlled data (event, pod_name,
pod_phase, app_events) is passed as kwargs to self.logger.info.
- Around line 115-117: The finally block currently always advances
self._last_resource_version from self._watch.resource_version even when
mapping/publishing fails, which risks permanently skipping events; change the
logic so that self._last_resource_version is only updated after successful
processing: move the assignment out of the finally and into the success path
immediately after the map/publish steps in the method (where exceptions are
currently caught around map/publish), ensure exceptions are not swallowed
(re-raise or handle so the watch cursor is not advanced on failure), and add a
metric increment/logging (using the service's injected metrics/logger) for both
successful commits and processing failures to record these significant
operations.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5e429bd and 8bafd3a.

📒 Files selected for processing (2)
  • backend/app/services/pod_monitor/monitor.py
  • backend/tests/unit/services/pod_monitor/test_monitor.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/tests/unit/services/pod_monitor/test_monitor.py

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 2 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="backend/app/services/pod_monitor/monitor.py">

<violation number="1" location="backend/app/services/pod_monitor/monitor.py:112">
P1: `event["type"] not in WatchEventType` raises `TypeError` for string values on StrEnum, so the watch loop will crash on the first event. Use a safe cast with try/except (or check against `__members__`) before continuing.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
backend/app/services/pod_monitor/monitor.py (1)

111-119: ⚠️ Potential issue | 🟠 Major

Do not commit watch cursor when event processing fails.

_process_pod_event swallows failures, but the watch cursor is still committed in finally. That can permanently skip failed events after restart.

Suggested fix
 async def watch_pod_events(self) -> None:
+    processing_failed = False
     try:
         async for event in self._watch.stream(self._v1.list_namespaced_pod, **kwargs):
             if event["type"] not in WatchEventType:
                 continue
-            await self._process_pod_event(PodEvent(event_type=WatchEventType(event["type"]), pod=event["object"]))
+            ok = await self._process_pod_event(
+                PodEvent(event_type=WatchEventType(event["type"]), pod=event["object"])
+            )
+            if not ok:
+                processing_failed = True
+                break
     finally:
-        if self._watch.resource_version:
+        if not processing_failed and self._watch.resource_version:
             self._last_resource_version = self._watch.resource_version

-async def _process_pod_event(self, event: PodEvent) -> None:
+async def _process_pod_event(self, event: PodEvent) -> bool:
     ...
     except Exception:
         self.logger.error("Error processing pod event", exc_info=True)
         self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR)
-        return
+        return False
+    return True

Also applies to: 153-160

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/pod_monitor/monitor.py` around lines 111 - 119, The code
currently sets self._last_resource_version in the finally block even if
processing failed, which can skip events; change the flow so the watch cursor is
only committed after successful processing: inside the async for loop, call
await self._process_pod_event(...) inside a try/except that only swallows/logs
expected non-fatal errors, and on success update self._last_resource_version =
self._watch.resource_version (or a resource_version derived from event) — move
the commit out of the outer finally and into the successful-path after
processing; ensure any unhandled exceptions prevent updating
self._last_resource_version so failed events are retried after restart.
🧹 Nitpick comments (3)
backend/app/services/result_processor/processor.py (2)

149-171: Update helper docstrings to Google style after signature changes.

Both helpers now take additional arguments, but docstrings are still one-line summaries. Add Args and Returns sections for compliance and maintainability.

Proposed docstring update
     async def _publish_result_stored(
         self, result: ExecutionResultDomain, user_id: str, aggregate_id: str | None
     ) -> None:
-        """Publish result stored event."""
+        """Publish a result stored event.
+
+        Args:
+            result: Persisted execution result.
+            user_id: User identifier from event metadata.
+            aggregate_id: Aggregate identifier propagated from source event.
+
+        Returns:
+            None
+        """
@@
     async def _publish_result_failed(
         self, execution_id: str, error_message: str, user_id: str, aggregate_id: str | None
     ) -> None:
-        """Publish result processing failed event."""
+        """Publish a result processing failed event.
+
+        Args:
+            execution_id: Execution identifier.
+            error_message: Error message from processing failure.
+            user_id: User identifier from event metadata.
+            aggregate_id: Aggregate identifier propagated from source event.
+
+        Returns:
+            None
+        """

As per coding guidelines: "Use Google-style docstrings with Args/Returns/Raises sections."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/result_processor/processor.py` around lines 149 - 171,
Update the one-line docstrings for _publish_result_stored and
_publish_result_failed to Google-style docstrings: include an initial short
description, an Args section listing each parameter (result:
ExecutionResultDomain, user_id: str, aggregate_id: str | None for
_publish_result_stored; execution_id: str, error_message: str, user_id: str,
aggregate_id: str | None for _publish_result_failed), a Returns section (None),
and a Raises section if the method can raise exceptions; keep descriptions
concise and reflect the updated signatures and behavior of these helper methods.

79-82: Add focused tests for aggregate_id pass-through in success and exception branches.

The new behavior now depends on propagating aggregate_id through three handlers and two event constructors; please add targeted unit coverage for both stored and failed publish paths.

Also applies to: 111-114, 144-147, 156-156, 174-174

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/result_processor/processor.py` around lines 79 - 82, Add
unit tests that verify aggregate_id is passed unchanged through the success and
exception code paths: write one test that simulates handling an
ExecutionCompletedEvent leading to a successful call and asserts that
_publish_result_stored was called with the same aggregate_id, and another test
that forces an exception in the handler and asserts _publish_result_failed was
called with the same aggregate_id. In each test, construct an
ExecutionCompletedEvent with a specific aggregate_id, mock/patch the Processor
instance methods _publish_result_stored and _publish_result_failed (and any
dependent handlers) to capture arguments, invoke the code path that calls
self._publish_result_stored or raises to call self._publish_result_failed, and
assert the aggregate_id argument matches the event.aggregate_id. Ensure similar
tests cover the other affected call sites referenced
(_publish_result_stored/_publish_result_failed usages around lines noted) so
both stored and failed publish paths are covered.
backend/app/services/k8s_worker/worker.py (1)

226-226: Add identifiers to owner-reference warning for operability.

This warning only emits reason. Include config_map_name, pod_name, and status so failures can be triaged per object without extra repro steps.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/k8s_worker/worker.py` at line 226, The warning currently
logged by self.logger.warning("Failed to set ownerReference on ConfigMap",
reason=e.reason) lacks object identifiers; update the log call in the method
that sets the ownerReference (the code invoking self.logger.warning) to include
config_map_name, pod_name and status along with reason (e.g.,
self.logger.warning("Failed to set ownerReference on ConfigMap",
config_map_name=config_map_name, pod_name=pod_name, status=getattr(e, "status",
None), reason=e.reason)) so each failure includes the ConfigMap name, Pod name,
status and the original reason; ensure those local variables (config_map_name,
pod_name) are available in scope or derive them from the objects being operated
on before logging.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/app/db/repositories/event_repository.py`:
- Around line 86-95: The query in get_events_by_execution_id currently only
matches EventDocument.execution_id and must also match
EventDocument.aggregate_id to preserve existing execution-event semantics;
update the conditions so the base filter is an OR between
Eq(EventDocument.execution_id, execution_id) and Eq(EventDocument.aggregate_id,
execution_id) (e.g., using the repository's Or operator or equivalent), keep the
optional In(EventDocument.event_type, event_types) and the existing sort/limit,
and return the same DomainEventAdapter.validate_python(d) results.

In `@backend/app/services/pod_monitor/monitor.py`:
- Around line 153-160: The exception path returns early and skips recording the
pod-watch duration metric; change the try/except so the duration metric is
recorded in a finally block. Specifically, for the block that calls
self._event_mapper.map_pod_event(...) and awaits self._publish_event(...), wrap
the processing in try/except as-is but move any call that records duration (the
pod-watch duration metric) into a finally clause so it always runs, and keep the
existing
self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR) in the
except. Apply the same try/except/finally pattern to the other similar block
referenced (the block around lines 175-176) so failures there also record
duration.

In `@backend/app/services/result_processor/processor.py`:
- Around line 80-82: The except blocks currently interpolate the exception into
f-strings (e.g., logger.error(f"Failed to handle ExecutionCompletedEvent: {e}",
...)); change them to use structured logging: keep the message static (e.g.,
"Failed to handle ExecutionCompletedEvent"), call self.logger.error with
exc_info=True (or pass exception=e as a keyword like exc=e) and pass
user-controlled values as separate keyword fields
(execution_id=event.execution_id, user_id=meta.user_id,
aggregate_id=event.aggregate_id) rather than interpolating them; do this for all
three handlers that call await self._publish_result_failed(...) so the log uses
structured fields and the exception is passed as metadata instead of embedded in
the message.

---

Duplicate comments:
In `@backend/app/services/pod_monitor/monitor.py`:
- Around line 111-119: The code currently sets self._last_resource_version in
the finally block even if processing failed, which can skip events; change the
flow so the watch cursor is only committed after successful processing: inside
the async for loop, call await self._process_pod_event(...) inside a try/except
that only swallows/logs expected non-fatal errors, and on success update
self._last_resource_version = self._watch.resource_version (or a
resource_version derived from event) — move the commit out of the outer finally
and into the successful-path after processing; ensure any unhandled exceptions
prevent updating self._last_resource_version so failed events are retried after
restart.

---

Nitpick comments:
In `@backend/app/services/k8s_worker/worker.py`:
- Line 226: The warning currently logged by self.logger.warning("Failed to set
ownerReference on ConfigMap", reason=e.reason) lacks object identifiers; update
the log call in the method that sets the ownerReference (the code invoking
self.logger.warning) to include config_map_name, pod_name and status along with
reason (e.g., self.logger.warning("Failed to set ownerReference on ConfigMap",
config_map_name=config_map_name, pod_name=pod_name, status=getattr(e, "status",
None), reason=e.reason)) so each failure includes the ConfigMap name, Pod name,
status and the original reason; ensure those local variables (config_map_name,
pod_name) are available in scope or derive them from the objects being operated
on before logging.

In `@backend/app/services/result_processor/processor.py`:
- Around line 149-171: Update the one-line docstrings for _publish_result_stored
and _publish_result_failed to Google-style docstrings: include an initial short
description, an Args section listing each parameter (result:
ExecutionResultDomain, user_id: str, aggregate_id: str | None for
_publish_result_stored; execution_id: str, error_message: str, user_id: str,
aggregate_id: str | None for _publish_result_failed), a Returns section (None),
and a Raises section if the method can raise exceptions; keep descriptions
concise and reflect the updated signatures and behavior of these helper methods.
- Around line 79-82: Add unit tests that verify aggregate_id is passed unchanged
through the success and exception code paths: write one test that simulates
handling an ExecutionCompletedEvent leading to a successful call and asserts
that _publish_result_stored was called with the same aggregate_id, and another
test that forces an exception in the handler and asserts _publish_result_failed
was called with the same aggregate_id. In each test, construct an
ExecutionCompletedEvent with a specific aggregate_id, mock/patch the Processor
instance methods _publish_result_stored and _publish_result_failed (and any
dependent handlers) to capture arguments, invoke the code path that calls
self._publish_result_stored or raises to call self._publish_result_failed, and
assert the aggregate_id argument matches the event.aggregate_id. Ensure similar
tests cover the other affected call sites referenced
(_publish_result_stored/_publish_result_failed usages around lines noted) so
both stored and failed publish paths are covered.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8bafd3a and 2a2cb94.

📒 Files selected for processing (9)
  • backend/app/api/routes/execution.py
  • backend/app/db/repositories/event_repository.py
  • backend/app/services/event_service.py
  • backend/app/services/k8s_worker/worker.py
  • backend/app/services/pod_monitor/event_mapper.py
  • backend/app/services/pod_monitor/monitor.py
  • backend/app/services/result_processor/processor.py
  • backend/app/services/saga/execution_saga.py
  • backend/app/services/saga/saga_orchestrator.py

@sonarqubecloud
Copy link

sonarqubecloud bot commented Mar 3, 2026

@HardMax71 HardMax71 merged commit b72fd23 into main Mar 3, 2026
21 of 22 checks passed
@HardMax71 HardMax71 deleted the fix/backend-monitor branch March 3, 2026 16:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants