Conversation
…inside except block)
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds aggregate_id to many domain events and constructors, introduces PodMonitorEvent union replacing DomainEvent in pod monitor, overhauls pod watch/process flow and logging, adds get_events_by_execution_id to repo/service and updates API, and propagates aggregate_id through result, saga, and worker event paths. Changes
Sequence Diagram(s)sequenceDiagram
participant Watcher
participant K8sAPI
participant PodMonitor
participant Mapper
participant Publisher
participant Metrics
Watcher->>K8sAPI: start/watch pods (resourceVersion)
K8sAPI-->>Watcher: pod event
Watcher->>PodMonitor: deliver pod event
PodMonitor->>Mapper: map pod -> PodMonitorEvent(s)
alt mapping error
Mapper-->>PodMonitor: raises error
PodMonitor->>Metrics: record PROCESSING_ERROR
else mapping success
Mapper-->>PodMonitor: PodMonitorEvent(s)
PodMonitor->>Publisher: publish event (key=event.execution_id)
Publisher-->>Metrics: record publish metrics
alt terminal event
PodMonitor->>K8sAPI: delete pod
K8sAPI-->>PodMonitor: status/reason
PodMonitor->>Metrics: record delete outcome
end
end
PodMonitor->>Metrics: record processing duration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Codecov Report❌ Patch coverage is
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 2 files with indirect coverage changes
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR streamlines pod-monitor error handling and logging, aiming to make watch-cycle metrics/logs more consistent and simplify pod event processing flow.
Changes:
- Consolidates watch-cycle error classification so metrics/reconnect counters are emitted from a single path.
- Refactors pod event processing to narrow the try/except scope and uses more structured logging.
- Simplifies Kubernetes
ApiExceptionlogging in pod deletion paths and updates a unit test mock accordingly.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
backend/workers/run_pod_monitor.py |
Centralizes watch error classification and reconnect metric emission; switches to structured error logging fields. |
backend/app/services/pod_monitor/monitor.py |
Refactors _process_pod_event flow and simplifies _delete_pod exception logging. |
backend/app/services/k8s_worker/worker.py |
Simplifies ApiException logging for pod deletion and ConfigMap ownerReference patching. |
backend/tests/unit/services/pod_monitor/test_monitor.py |
Updates mocked event_type shape/value in the full-flow unit test. |
Comments suppressed due to low confidence (3)
backend/app/services/k8s_worker/worker.py:100
handle_delete_pod_commandnow logs allApiExceptions as warnings, which downgrades genuinely unexpected failures (e.g., 403/500/timeouts) that previously logged as errors. Consider restoring the status-based severity (e.g., keep 404 as warning/expected, but log non-404 as error and includeexc_info=Trueif useful) so operational alerting doesn’t miss real delete failures.
except ApiException as e:
self.logger.warning("Failed to delete pod", execution_id=execution_id, status=e.status, reason=e.reason)
backend/app/services/pod_monitor/monitor.py:227
_delete_podused to treat 404 (already deleted) as a debug-level, expected condition; it now emits a warning for all delete failures. If 404s are common during reconciliation/watch races, this will create noisy warnings and may trigger alerts. Consider special-casing 404 again (debug/info) while keeping warning/error for unexpected statuses.
except ApiException as e:
self.logger.warning("Failed to delete pod", pod_name=pod_name, status=e.status, reason=e.reason)
backend/tests/unit/services/pod_monitor/test_monitor.py:281
- The unit test’s mock event sets
event_type = "pod.running", but the codebase’s canonical pod event types areEventTypevalues (e.g.EventType.POD_RUNNINGwhich is"pod_running"; seebackend/app/domain/enums/events.py:18-25). Using a nonstandard string makes the test less representative and can silently diverge from metrics/logging labels; consider switching the mock toEventType.POD_RUNNING(or at least"pod_running").
class Event:
event_type = "pod.running"
aggregate_id = "agg1"
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
3 issues found across 4 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/app/services/pod_monitor/monitor.py">
<violation number="1" location="backend/app/services/pod_monitor/monitor.py:189">
P1: Exception handling was narrowed too much: errors during terminal pod deletion or post-processing metrics/logging can now escape `_process_pod_event` and stop the monitor loop.</violation>
<violation number="2" location="backend/app/services/pod_monitor/monitor.py:226">
P2: 404 delete races are now logged as warnings, which can create alert noise; keep 404 as debug and warn only on unexpected statuses.</violation>
</file>
<file name="backend/app/services/k8s_worker/worker.py">
<violation number="1" location="backend/app/services/k8s_worker/worker.py:99">
P2: Do not downgrade all pod deletion API failures to warning; keep non-404 failures at error level so real compensation failures are visible.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
backend/workers/run_pod_monitor.py (1)
59-59: Prefer an explicit monitor API over private-attribute mutation.Line 59 reaches into
monitor._last_resource_version; consider a publicreset_resource_version()method onPodMonitor.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/workers/run_pod_monitor.py` at line 59, The code directly mutates the private attribute monitor._last_resource_version; instead add a public PodMonitor.reset_resource_version() method that sets its internal state (clearing last_resource_version) and call monitor.reset_resource_version() here; update the PodMonitor class (the constructor/state and any watchers) to expose that method rather than touching _last_resource_version externally so callers use reset_resource_version() instead of accessing the private attribute.backend/app/services/k8s_worker/worker.py (1)
226-226: Include object identity in ownerReference patch failure log.Line 226 logs only
reason; addingconfig_map_nameandstatuswill make triage much faster.Suggested logging enrichment
- self.logger.warning("Failed to set ownerReference on ConfigMap", reason=e.reason) + self.logger.warning( + "Failed to set ownerReference on ConfigMap", + config_map_name=config_map.metadata.name if config_map.metadata else None, + owner_pod_name=owner_pod.metadata.name if owner_pod.metadata else None, + status=e.status, + reason=e.reason, + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/k8s_worker/worker.py` at line 226, The ownerReference patch failure log currently calls self.logger.warning("Failed to set ownerReference on ConfigMap", reason=e.reason); update that log to include the ConfigMap identity and HTTP status by adding config_map_name (or the variable holding the ConfigMap name) and the status (e.status or e.status_code / response.status if available) to the structured log call so it becomes something like self.logger.warning("Failed to set ownerReference on ConfigMap", config_map_name=config_map_name, status=status, reason=e.reason) — locate this in worker.py where the ownerReference patch is attempted (the self.logger.warning line) and ensure the referenced variable names match the local names used for the ConfigMap name and the exception status.backend/app/services/pod_monitor/monitor.py (1)
176-178: Record processing-duration metric even on early-return/error paths.Line 178 and Line 187 return before Line 200–201, so duration metrics are dropped for ignored/error events.
Move duration recording into a finally block
async def _process_pod_event(self, event: PodEvent) -> None: """Process a pod event.""" start_time = time.time() - - # Update resource version for crash recovery - if event.resource_version: - self._last_resource_version = event.resource_version - - # Skip ignored phases - pod_phase = event.pod.status.phase if event.pod.status else None - if pod_phase in self.config.ignored_pod_phases: - return - - try: - app_events = await self._event_mapper.map_pod_event(event.pod, event.event_type) - for app_event in app_events: - await self._publish_event(app_event, event.pod) - except Exception: - self.logger.error("Error processing pod event", exc_info=True) - self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR) - return + try: + # Update resource version for crash recovery + if event.resource_version: + self._last_resource_version = event.resource_version + + # Skip ignored phases + pod_phase = event.pod.status.phase if event.pod.status else None + if pod_phase in self.config.ignored_pod_phases: + return + + try: + app_events = await self._event_mapper.map_pod_event(event.pod, event.event_type) + for app_event in app_events: + await self._publish_event(app_event, event.pod) + except Exception: + self.logger.error("Error processing pod event", exc_info=True) + self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR) + return + + if any(e.event_type in _TERMINAL_EVENT_TYPES for e in app_events): + await self._delete_pod(event.pod) + + if app_events: + pod_name = event.pod.metadata.name if event.pod.metadata else "unknown" + self.logger.info( + "Processed pod event", + watch_event_type=event.event_type, + pod_name=pod_name, + pod_phase=pod_phase or "Unknown", + published_events=len(app_events), + ) + finally: + duration = time.time() - start_time + self._metrics.record_pod_monitor_event_processing_duration(duration, event.event_type) - - if any(e.event_type in _TERMINAL_EVENT_TYPES for e in app_events): - await self._delete_pod(event.pod) - - if app_events: - ... - - duration = time.time() - start_time - self._metrics.record_pod_monitor_event_processing_duration(duration, event.event_type)Also applies to: 184-201
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/services/pod_monitor/monitor.py`:
- Around line 193-198: The info log currently interpolates values via an
f-string in the monitor class; change self.logger.info(...) to use structured
logging: provide a static message string (e.g., "Processed pod event") and pass
event_type=event.event_type, pod_name=pod_name, pod_phase=pod_phase or
"Unknown", published_count=len(app_events) as keyword args to self.logger
(BoundLogger) instead of embedding them in the message; update the call site
where pod_name is computed and ensure no user-controlled fields are formatted
into the message string.
---
Nitpick comments:
In `@backend/app/services/k8s_worker/worker.py`:
- Line 226: The ownerReference patch failure log currently calls
self.logger.warning("Failed to set ownerReference on ConfigMap",
reason=e.reason); update that log to include the ConfigMap identity and HTTP
status by adding config_map_name (or the variable holding the ConfigMap name)
and the status (e.status or e.status_code / response.status if available) to the
structured log call so it becomes something like self.logger.warning("Failed to
set ownerReference on ConfigMap", config_map_name=config_map_name,
status=status, reason=e.reason) — locate this in worker.py where the
ownerReference patch is attempted (the self.logger.warning line) and ensure the
referenced variable names match the local names used for the ConfigMap name and
the exception status.
In `@backend/workers/run_pod_monitor.py`:
- Line 59: The code directly mutates the private attribute
monitor._last_resource_version; instead add a public
PodMonitor.reset_resource_version() method that sets its internal state
(clearing last_resource_version) and call monitor.reset_resource_version() here;
update the PodMonitor class (the constructor/state and any watchers) to expose
that method rather than touching _last_resource_version externally so callers
use reset_resource_version() instead of accessing the private attribute.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
backend/app/services/k8s_worker/worker.pybackend/app/services/pod_monitor/monitor.pybackend/tests/unit/services/pod_monitor/test_monitor.pybackend/workers/run_pod_monitor.py
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
backend/app/services/pod_monitor/monitor.py (1)
193-197:⚠️ Potential issue | 🟠 MajorUse structured fields instead of interpolated event data in logs.
Line 193 logs user/pod fields via an f-string; move them to keyword fields on the bound logger.
As per coding guidelines, "Use only structured logging via injected structlog.stdlib.BoundLogger (never call logging.getLogger() directly); pass user-controlled data as keyword arguments, never interpolate into log messages".Suggested fix
- if app_events: - pod_name = event.pod.metadata.name if event.pod.metadata else "unknown" - self.logger.info( - f"Processed {event.event_type} event for pod {pod_name} " - f"(phase: {pod_phase or 'Unknown'}), " - f"published {len(app_events)} events" - ) + if app_events: + pod_name = event.pod.metadata.name if event.pod.metadata else "unknown" + self.logger.info( + "Processed pod event", + watch_event_type=event.event_type, + pod_name=pod_name, + pod_phase=pod_phase or "Unknown", + published_events=len(app_events), + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/pod_monitor/monitor.py` around lines 193 - 197, The log call in monitor.py currently interpolates user/pod data into the message; change the self.logger.info(...) call inside the pod event processing (the block that references event.event_type, pod_name, pod_phase, and app_events) to use a static message string and pass event_type, pod_name, pod_phase, and published_event_count as keyword arguments to the BoundLogger (e.g., event_type=event.event_type, pod_name=pod_name, pod_phase=pod_phase or "Unknown", published_event_count=len(app_events)); do not interpolate these values into the message and keep using the injected self.logger (structlog.stdlib.BoundLogger).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/services/pod_monitor/monitor.py`:
- Around line 170-173: The code advances self._last_resource_version from
event.resource_version before attempting to map/publish the event, so failures
during mapping/publishing can cause skipped events on restart; move the
assignment of self._last_resource_version so it happens only after successful
processing (i.e., after the calls that map the event and publish the message —
the code paths around map_event_to_message and the
publisher.publish/self.publish_message calls), and apply the same change for the
other block covering lines 179-186 (only set _last_resource_version when
mapping/publishing completes without exception).
---
Duplicate comments:
In `@backend/app/services/pod_monitor/monitor.py`:
- Around line 193-197: The log call in monitor.py currently interpolates
user/pod data into the message; change the self.logger.info(...) call inside the
pod event processing (the block that references event.event_type, pod_name,
pod_phase, and app_events) to use a static message string and pass event_type,
pod_name, pod_phase, and published_event_count as keyword arguments to the
BoundLogger (e.g., event_type=event.event_type, pod_name=pod_name,
pod_phase=pod_phase or "Unknown", published_event_count=len(app_events)); do not
interpolate these values into the message and keep using the injected
self.logger (structlog.stdlib.BoundLogger).
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
backend/app/services/pod_monitor/__init__.pybackend/app/services/pod_monitor/event_mapper.pybackend/app/services/pod_monitor/monitor.pybackend/tests/unit/services/pod_monitor/test_config_and_init.pybackend/tests/unit/services/pod_monitor/test_monitor.py
There was a problem hiding this comment.
♻️ Duplicate comments (2)
backend/app/services/pod_monitor/monitor.py (2)
162-168:⚠️ Potential issue | 🟠 MajorUse structured fields for the processed-event info log.
Line 165-167 still interpolates event data into message text. Keep a static message and pass fields as kwargs.
Suggested fix
if app_events: pod_name = event.pod.metadata.name if event.pod.metadata else "unknown" self.logger.info( - f"Processed {event.event_type} event for pod {pod_name} " - f"(phase: {pod_phase or 'Unknown'}), " - f"published {len(app_events)} events" + "Processed pod event", + watch_event_type=event.event_type, + pod_name=pod_name, + pod_phase=pod_phase or "Unknown", + published_events=len(app_events), )As per coding guidelines, "Use only structured logging via injected structlog.stdlib.BoundLogger (never call logging.getLogger() directly); pass user-controlled data as keyword arguments, never interpolate into log messages."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/pod_monitor/monitor.py` around lines 162 - 168, The log message in monitor.py currently interpolates event data into the message string; change it to a static message and pass all dynamic data as structured fields to the BoundLogger: replace the formatted self.logger.info(...) call around self.logger.info(f"Processed ...") with a static message like "Processed pod event" and supply keyword args for event_type (event.event_type), pod_name (pod_name), pod_phase (pod_phase or "Unknown"), and published (len(app_events)) so all user-controlled data (event, pod_name, pod_phase, app_events) is passed as kwargs to self.logger.info.
115-117:⚠️ Potential issue | 🔴 CriticalDo not swallow processing failures while still advancing the watch cursor.
Line 150-157 returns after map/publish exceptions, while Line 115-117 still commits
_last_resource_versioninfinally. A transient mapper/Kafka failure can be dropped permanently on restart.Suggested fix
async def watch_pod_events(self) -> None: + watch_error: ErrorType | None = None try: async for event in self._watch.stream(self._v1.list_namespaced_pod, **kwargs): if event["type"] not in WatchEventType: continue - await self._process_pod_event(PodEvent(event_type=WatchEventType(event["type"]), pod=event["object"])) + try: + await self._process_pod_event( + PodEvent(event_type=WatchEventType(event["type"]), pod=event["object"]) + ) + except Exception: + watch_error = ErrorType.PROCESSING_ERROR + raise + except ApiException as e: + watch_error = ErrorType.RESOURCE_VERSION_EXPIRED if e.status == 410 else ErrorType.API_ERROR + raise + except Exception: + watch_error = watch_error or ErrorType.UNEXPECTED + raise finally: - if self._watch.resource_version: + if watch_error: + self._metrics.record_pod_monitor_watch_error(watch_error) + elif self._watch.resource_version: self._last_resource_version = self._watch.resource_version @@ - except Exception: + except Exception: self.logger.error("Error processing pod event", exc_info=True) - self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR) - return + raiseAs per coding guidelines, "Services in app/services/ must use constructor injection for repositories, UnifiedProducer, structlog logger, Settings, and metrics; orchestrate domain logic only; check None from repos and raise domain-specific NotFoundError subclasses (never return None to routes); record all significant operations via metrics."
Also applies to: 150-157
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/pod_monitor/monitor.py` around lines 115 - 117, The finally block currently always advances self._last_resource_version from self._watch.resource_version even when mapping/publishing fails, which risks permanently skipping events; change the logic so that self._last_resource_version is only updated after successful processing: move the assignment out of the finally and into the success path immediately after the map/publish steps in the method (where exceptions are currently caught around map/publish), ensure exceptions are not swallowed (re-raise or handle so the watch cursor is not advanced on failure), and add a metric increment/logging (using the service's injected metrics/logger) for both successful commits and processing failures to record these significant operations.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@backend/app/services/pod_monitor/monitor.py`:
- Around line 162-168: The log message in monitor.py currently interpolates
event data into the message string; change it to a static message and pass all
dynamic data as structured fields to the BoundLogger: replace the formatted
self.logger.info(...) call around self.logger.info(f"Processed ...") with a
static message like "Processed pod event" and supply keyword args for event_type
(event.event_type), pod_name (pod_name), pod_phase (pod_phase or "Unknown"), and
published (len(app_events)) so all user-controlled data (event, pod_name,
pod_phase, app_events) is passed as kwargs to self.logger.info.
- Around line 115-117: The finally block currently always advances
self._last_resource_version from self._watch.resource_version even when
mapping/publishing fails, which risks permanently skipping events; change the
logic so that self._last_resource_version is only updated after successful
processing: move the assignment out of the finally and into the success path
immediately after the map/publish steps in the method (where exceptions are
currently caught around map/publish), ensure exceptions are not swallowed
(re-raise or handle so the watch cursor is not advanced on failure), and add a
metric increment/logging (using the service's injected metrics/logger) for both
successful commits and processing failures to record these significant
operations.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
backend/app/services/pod_monitor/monitor.pybackend/tests/unit/services/pod_monitor/test_monitor.py
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/tests/unit/services/pod_monitor/test_monitor.py
There was a problem hiding this comment.
1 issue found across 2 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/app/services/pod_monitor/monitor.py">
<violation number="1" location="backend/app/services/pod_monitor/monitor.py:112">
P1: `event["type"] not in WatchEventType` raises `TypeError` for string values on StrEnum, so the watch loop will crash on the first event. Use a safe cast with try/except (or check against `__members__`) before continuing.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
backend/app/services/pod_monitor/monitor.py (1)
111-119:⚠️ Potential issue | 🟠 MajorDo not commit watch cursor when event processing fails.
_process_pod_eventswallows failures, but the watch cursor is still committed infinally. That can permanently skip failed events after restart.Suggested fix
async def watch_pod_events(self) -> None: + processing_failed = False try: async for event in self._watch.stream(self._v1.list_namespaced_pod, **kwargs): if event["type"] not in WatchEventType: continue - await self._process_pod_event(PodEvent(event_type=WatchEventType(event["type"]), pod=event["object"])) + ok = await self._process_pod_event( + PodEvent(event_type=WatchEventType(event["type"]), pod=event["object"]) + ) + if not ok: + processing_failed = True + break finally: - if self._watch.resource_version: + if not processing_failed and self._watch.resource_version: self._last_resource_version = self._watch.resource_version -async def _process_pod_event(self, event: PodEvent) -> None: +async def _process_pod_event(self, event: PodEvent) -> bool: ... except Exception: self.logger.error("Error processing pod event", exc_info=True) self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR) - return + return False + return TrueAlso applies to: 153-160
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/pod_monitor/monitor.py` around lines 111 - 119, The code currently sets self._last_resource_version in the finally block even if processing failed, which can skip events; change the flow so the watch cursor is only committed after successful processing: inside the async for loop, call await self._process_pod_event(...) inside a try/except that only swallows/logs expected non-fatal errors, and on success update self._last_resource_version = self._watch.resource_version (or a resource_version derived from event) — move the commit out of the outer finally and into the successful-path after processing; ensure any unhandled exceptions prevent updating self._last_resource_version so failed events are retried after restart.
🧹 Nitpick comments (3)
backend/app/services/result_processor/processor.py (2)
149-171: Update helper docstrings to Google style after signature changes.Both helpers now take additional arguments, but docstrings are still one-line summaries. Add
ArgsandReturnssections for compliance and maintainability.Proposed docstring update
async def _publish_result_stored( self, result: ExecutionResultDomain, user_id: str, aggregate_id: str | None ) -> None: - """Publish result stored event.""" + """Publish a result stored event. + + Args: + result: Persisted execution result. + user_id: User identifier from event metadata. + aggregate_id: Aggregate identifier propagated from source event. + + Returns: + None + """ @@ async def _publish_result_failed( self, execution_id: str, error_message: str, user_id: str, aggregate_id: str | None ) -> None: - """Publish result processing failed event.""" + """Publish a result processing failed event. + + Args: + execution_id: Execution identifier. + error_message: Error message from processing failure. + user_id: User identifier from event metadata. + aggregate_id: Aggregate identifier propagated from source event. + + Returns: + None + """As per coding guidelines: "Use Google-style docstrings with Args/Returns/Raises sections."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/result_processor/processor.py` around lines 149 - 171, Update the one-line docstrings for _publish_result_stored and _publish_result_failed to Google-style docstrings: include an initial short description, an Args section listing each parameter (result: ExecutionResultDomain, user_id: str, aggregate_id: str | None for _publish_result_stored; execution_id: str, error_message: str, user_id: str, aggregate_id: str | None for _publish_result_failed), a Returns section (None), and a Raises section if the method can raise exceptions; keep descriptions concise and reflect the updated signatures and behavior of these helper methods.
79-82: Add focused tests foraggregate_idpass-through in success and exception branches.The new behavior now depends on propagating
aggregate_idthrough three handlers and two event constructors; please add targeted unit coverage for both stored and failed publish paths.Also applies to: 111-114, 144-147, 156-156, 174-174
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/result_processor/processor.py` around lines 79 - 82, Add unit tests that verify aggregate_id is passed unchanged through the success and exception code paths: write one test that simulates handling an ExecutionCompletedEvent leading to a successful call and asserts that _publish_result_stored was called with the same aggregate_id, and another test that forces an exception in the handler and asserts _publish_result_failed was called with the same aggregate_id. In each test, construct an ExecutionCompletedEvent with a specific aggregate_id, mock/patch the Processor instance methods _publish_result_stored and _publish_result_failed (and any dependent handlers) to capture arguments, invoke the code path that calls self._publish_result_stored or raises to call self._publish_result_failed, and assert the aggregate_id argument matches the event.aggregate_id. Ensure similar tests cover the other affected call sites referenced (_publish_result_stored/_publish_result_failed usages around lines noted) so both stored and failed publish paths are covered.backend/app/services/k8s_worker/worker.py (1)
226-226: Add identifiers to owner-reference warning for operability.This warning only emits
reason. Includeconfig_map_name,pod_name, andstatusso failures can be triaged per object without extra repro steps.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/app/services/k8s_worker/worker.py` at line 226, The warning currently logged by self.logger.warning("Failed to set ownerReference on ConfigMap", reason=e.reason) lacks object identifiers; update the log call in the method that sets the ownerReference (the code invoking self.logger.warning) to include config_map_name, pod_name and status along with reason (e.g., self.logger.warning("Failed to set ownerReference on ConfigMap", config_map_name=config_map_name, pod_name=pod_name, status=getattr(e, "status", None), reason=e.reason)) so each failure includes the ConfigMap name, Pod name, status and the original reason; ensure those local variables (config_map_name, pod_name) are available in scope or derive them from the objects being operated on before logging.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/app/db/repositories/event_repository.py`:
- Around line 86-95: The query in get_events_by_execution_id currently only
matches EventDocument.execution_id and must also match
EventDocument.aggregate_id to preserve existing execution-event semantics;
update the conditions so the base filter is an OR between
Eq(EventDocument.execution_id, execution_id) and Eq(EventDocument.aggregate_id,
execution_id) (e.g., using the repository's Or operator or equivalent), keep the
optional In(EventDocument.event_type, event_types) and the existing sort/limit,
and return the same DomainEventAdapter.validate_python(d) results.
In `@backend/app/services/pod_monitor/monitor.py`:
- Around line 153-160: The exception path returns early and skips recording the
pod-watch duration metric; change the try/except so the duration metric is
recorded in a finally block. Specifically, for the block that calls
self._event_mapper.map_pod_event(...) and awaits self._publish_event(...), wrap
the processing in try/except as-is but move any call that records duration (the
pod-watch duration metric) into a finally clause so it always runs, and keep the
existing
self._metrics.record_pod_monitor_watch_error(ErrorType.PROCESSING_ERROR) in the
except. Apply the same try/except/finally pattern to the other similar block
referenced (the block around lines 175-176) so failures there also record
duration.
In `@backend/app/services/result_processor/processor.py`:
- Around line 80-82: The except blocks currently interpolate the exception into
f-strings (e.g., logger.error(f"Failed to handle ExecutionCompletedEvent: {e}",
...)); change them to use structured logging: keep the message static (e.g.,
"Failed to handle ExecutionCompletedEvent"), call self.logger.error with
exc_info=True (or pass exception=e as a keyword like exc=e) and pass
user-controlled values as separate keyword fields
(execution_id=event.execution_id, user_id=meta.user_id,
aggregate_id=event.aggregate_id) rather than interpolating them; do this for all
three handlers that call await self._publish_result_failed(...) so the log uses
structured fields and the exception is passed as metadata instead of embedded in
the message.
---
Duplicate comments:
In `@backend/app/services/pod_monitor/monitor.py`:
- Around line 111-119: The code currently sets self._last_resource_version in
the finally block even if processing failed, which can skip events; change the
flow so the watch cursor is only committed after successful processing: inside
the async for loop, call await self._process_pod_event(...) inside a try/except
that only swallows/logs expected non-fatal errors, and on success update
self._last_resource_version = self._watch.resource_version (or a
resource_version derived from event) — move the commit out of the outer finally
and into the successful-path after processing; ensure any unhandled exceptions
prevent updating self._last_resource_version so failed events are retried after
restart.
---
Nitpick comments:
In `@backend/app/services/k8s_worker/worker.py`:
- Line 226: The warning currently logged by self.logger.warning("Failed to set
ownerReference on ConfigMap", reason=e.reason) lacks object identifiers; update
the log call in the method that sets the ownerReference (the code invoking
self.logger.warning) to include config_map_name, pod_name and status along with
reason (e.g., self.logger.warning("Failed to set ownerReference on ConfigMap",
config_map_name=config_map_name, pod_name=pod_name, status=getattr(e, "status",
None), reason=e.reason)) so each failure includes the ConfigMap name, Pod name,
status and the original reason; ensure those local variables (config_map_name,
pod_name) are available in scope or derive them from the objects being operated
on before logging.
In `@backend/app/services/result_processor/processor.py`:
- Around line 149-171: Update the one-line docstrings for _publish_result_stored
and _publish_result_failed to Google-style docstrings: include an initial short
description, an Args section listing each parameter (result:
ExecutionResultDomain, user_id: str, aggregate_id: str | None for
_publish_result_stored; execution_id: str, error_message: str, user_id: str,
aggregate_id: str | None for _publish_result_failed), a Returns section (None),
and a Raises section if the method can raise exceptions; keep descriptions
concise and reflect the updated signatures and behavior of these helper methods.
- Around line 79-82: Add unit tests that verify aggregate_id is passed unchanged
through the success and exception code paths: write one test that simulates
handling an ExecutionCompletedEvent leading to a successful call and asserts
that _publish_result_stored was called with the same aggregate_id, and another
test that forces an exception in the handler and asserts _publish_result_failed
was called with the same aggregate_id. In each test, construct an
ExecutionCompletedEvent with a specific aggregate_id, mock/patch the Processor
instance methods _publish_result_stored and _publish_result_failed (and any
dependent handlers) to capture arguments, invoke the code path that calls
self._publish_result_stored or raises to call self._publish_result_failed, and
assert the aggregate_id argument matches the event.aggregate_id. Ensure similar
tests cover the other affected call sites referenced
(_publish_result_stored/_publish_result_failed usages around lines noted) so
both stored and failed publish paths are covered.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
backend/app/api/routes/execution.pybackend/app/db/repositories/event_repository.pybackend/app/services/event_service.pybackend/app/services/k8s_worker/worker.pybackend/app/services/pod_monitor/event_mapper.pybackend/app/services/pod_monitor/monitor.pybackend/app/services/result_processor/processor.pybackend/app/services/saga/execution_saga.pybackend/app/services/saga/saga_orchestrator.py
|



Summary by cubic
Streamlined the pod monitor by removing the raw event handler, routing watch events directly to processing, and switching to structured logging. Clarified aggregate_id vs execution_id across events and APIs for consistent publishing and querying, and fixed watch error metrics and reconnect counting.
Written for commit 7088dad. Summary will update on new commits.
Summary by CodeRabbit
New Features
Bug Fixes
Tests