fix(agent-runtime): persist thread messages when a run is aborted#439
fix(agent-runtime): persist thread messages when a run is aborted#439jerryliang64 merged 2 commits intomasterfrom
Conversation
When a run is aborted mid-stream, the executor (e.g. Claude CLI) has already written the user turn and any partial assistant output to its own session state. Previously AgentRuntime returned early before calling appendMessages, leaving the thread empty while the executor session was not — so the next request hit isResume=true with a thread that diverged from the session, and resume could fail at executor startup. This change persists the user input plus any streamMessages collected so far in all abort paths of syncRun / asyncRun / executeStreamBackground so the thread stays consistent with the executor's view, allowing the next turn to resume cleanly. Store errors during cleanup are swallowed to avoid masking the abort or blocking cancelRun from finalising state.
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 27 minutes and 29 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughThe changes add message persistence when agent runs are aborted or cancelled. A Changes
Sequence DiagramsequenceDiagram
actor Client
participant AgentRuntime
participant Executor
participant StreamBuffer as Stream Buffer<br/>(streamMessages)
participant Store
Client->>AgentRuntime: syncRun(messages) with AbortSignal
AgentRuntime->>StreamBuffer: Initialize accumulator
AgentRuntime->>Executor: Execute with abort signal
Executor->>StreamBuffer: Yield chunk 1 (partial message)
Executor->>StreamBuffer: Yield chunk 2 (partial message)
Client->>AgentRuntime: Abort signal triggered
AgentRuntime->>AgentRuntime: persistMessagesOnAbort()
AgentRuntime->>Store: appendMessages([input] + filtered[partial])
Store-->>AgentRuntime: Success/Error
AgentRuntime-->>Client: Return with run_* id
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to persist input and partial stream messages to the thread when an agent run is aborted or cancelled. This ensures that the thread state remains synchronized with the executor's internal state, allowing for consistent resumes. The changes include the addition of a persistMessagesOnAbort helper and its integration into syncRun, asyncRun, and executeStreamBackground. Comprehensive tests were added to verify persistence across different run types and error conditions. Feedback suggests including the current message chunk in the persistence call within loops to ensure the most recent state is captured before the abort is processed.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
core/agent-runtime/src/AgentRuntime.ts (1)
204-208:⚠️ Potential issue | 🟡 MinorCross-worker cancel path in
asyncRunskipspersistMessagesOnAbort— inconsistent withexecuteStreamBackgroundand reintroduces the divergence this PR is fixing.In
executeStreamBackground(lines 354-359) the post-loop check for an externally-setCancelling/Cancelledstatus persists collected messages before returning. The analogous path here just returns, so in the worker-A-finishes-exec / worker-B-cancels scenario the thread gets no user or assistant messages even though the executor completed and (for executors with session files) already recorded the full turn. A subsequent resume then seesisResume=false(or out-of-sync history) — the exact bug class the PR targets, just on a different path.🛠️ Proposed fix
// Check if another worker has cancelled this run before writing final state const currentRun = await this.store.getRun(run.id); if (currentRun.status === RunStatus.Cancelling || currentRun.status === RunStatus.Cancelled) { + await this.persistMessagesOnAbort(threadId, input, streamMessages); return; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/agent-runtime/src/AgentRuntime.ts` around lines 204 - 208, In asyncRun, after reloading the run via this.store.getRun(run.id) and detecting currentRun.status is RunStatus.Cancelling or RunStatus.Cancelled, call the same persistMessagesOnAbort(...) flow used in executeStreamBackground to flush any collected user/assistant messages before returning; keep the RunStatus check and return but ensure persistMessagesOnAbort is invoked with the same arguments/state that asyncRun has (so messages are persisted on cross-worker cancel just like in executeStreamBackground).
🧹 Nitpick comments (2)
core/agent-runtime/test/AgentRuntime.test.ts (1)
956-982: Consider asserting the documented post-condition and restoring the stub in afinally.The comment on line 980 states "state is empty, but the abort path completed cleanly", but only the latter is actually asserted. Adding a thread-length assertion pins down the swallow-on-failure contract, and restoring the stub in
finallykeepsafterEach→runtime.destroy()from tripping over the patchedappendMessagesif an assertion above throws.- const ac = new AbortController(); - const syncPromise = runtime.syncRun( - { threadId: thread.id, input: { messages: [{ role: 'user', content: 'Hi' }] } }, - ac.signal, - ); - await waitUntil(() => yielded); - ac.abort(); - // Should resolve with a snapshot instead of rejecting - const result = await syncPromise; - assert(result.id.startsWith('run_')); - - // Thread append failed — state is empty, but the abort path completed cleanly - store.appendMessages = origAppend; + try { + const ac = new AbortController(); + const syncPromise = runtime.syncRun( + { threadId: thread.id, input: { messages: [{ role: 'user', content: 'Hi' }] } }, + ac.signal, + ); + await waitUntil(() => yielded); + ac.abort(); + // Should resolve with a snapshot instead of rejecting + const result = await syncPromise; + assert(result.id.startsWith('run_')); + } finally { + store.appendMessages = origAppend; + } + + // Thread append failed — state remained empty, but the abort path completed cleanly + const updated = await runtime.getThread(thread.id); + assert.equal(updated.messages.length, 0);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/agent-runtime/test/AgentRuntime.test.ts` around lines 956 - 982, Update the test "should swallow store.appendMessages errors during abort cleanup" to assert the documented post-condition by checking the thread's messages/state is empty after the aborted syncRun (use runtime.createThread/thread.id and fetch the thread or thread.messages length) and ensure the original store.appendMessages is restored in a finally block so the stub is always reverted even if an assertion fails; keep the existing flow (createSlowExecRun, runtime.syncRun, AbortController, waitUntil/yielded) but wrap the stub replacement and assertions in try/finally where finally resets store.appendMessages = origAppend.core/agent-runtime/src/AgentRuntime.ts (1)
392-415: Helper looks good; one small observability note.The helper correctly centralizes the append + error-swallow contract so abort finalization (
cancelRun→rb.cancel()) can't be masked by a store failure. Single invocation per run is guaranteed across all call sites (in-loop abort returns immediately, catch branch is mutually exclusive with the loop completing, cross-worker check only fires on normal loop exit), so there's no double-append risk.Minor: on failure, the thread silently diverges from the executor's session — which is the very condition this helper exists to prevent. Consider attaching
threadIdand the intended message count to the error log to make post-mortem triage easier.} catch (err) { - this.logger.error('[AgentRuntime] failed to persist messages on abort:', err); + this.logger.error( + '[AgentRuntime] failed to persist messages on abort (threadId=%s, streamMessages=%d):', + threadId, + streamMessages.length, + err, + ); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/agent-runtime/src/AgentRuntime.ts` around lines 392 - 415, persistMessagesOnAbort swallows store failures but the error log lacks context; modify persistMessagesOnAbort to compute the intended append message count (e.g. const msgs = [...MessageConverter.toAgentMessages(input.input.messages), ...MessageConverter.filterForStorage(streamMessages)]; const count = msgs.length) and include threadId, count and the caught error in the logger.error call (reference persistMessagesOnAbort, MessageConverter.toAgentMessages, MessageConverter.filterForStorage, this.store.appendMessages and this.logger.error) so post-mortem triage can see which thread and how many messages failed to persist.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@core/agent-runtime/src/AgentRuntime.ts`:
- Around line 204-208: In asyncRun, after reloading the run via
this.store.getRun(run.id) and detecting currentRun.status is
RunStatus.Cancelling or RunStatus.Cancelled, call the same
persistMessagesOnAbort(...) flow used in executeStreamBackground to flush any
collected user/assistant messages before returning; keep the RunStatus check and
return but ensure persistMessagesOnAbort is invoked with the same
arguments/state that asyncRun has (so messages are persisted on cross-worker
cancel just like in executeStreamBackground).
---
Nitpick comments:
In `@core/agent-runtime/src/AgentRuntime.ts`:
- Around line 392-415: persistMessagesOnAbort swallows store failures but the
error log lacks context; modify persistMessagesOnAbort to compute the intended
append message count (e.g. const msgs =
[...MessageConverter.toAgentMessages(input.input.messages),
...MessageConverter.filterForStorage(streamMessages)]; const count =
msgs.length) and include threadId, count and the caught error in the
logger.error call (reference persistMessagesOnAbort,
MessageConverter.toAgentMessages, MessageConverter.filterForStorage,
this.store.appendMessages and this.logger.error) so post-mortem triage can see
which thread and how many messages failed to persist.
In `@core/agent-runtime/test/AgentRuntime.test.ts`:
- Around line 956-982: Update the test "should swallow store.appendMessages
errors during abort cleanup" to assert the documented post-condition by checking
the thread's messages/state is empty after the aborted syncRun (use
runtime.createThread/thread.id and fetch the thread or thread.messages length)
and ensure the original store.appendMessages is restored in a finally block so
the stub is always reverted even if an assertion fails; keep the existing flow
(createSlowExecRun, runtime.syncRun, AbortController, waitUntil/yielded) but
wrap the stub replacement and assertions in try/finally where finally resets
store.appendMessages = origAppend.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8e81987f-6f7c-4523-973f-69b32d896a13
📒 Files selected for processing (2)
core/agent-runtime/src/AgentRuntime.tscore/agent-runtime/test/AgentRuntime.test.ts
Previously, a run aborted via an external AbortSignal (on syncRun) or
via destroy() on in-flight asyncRun/streamRun tasks would return early
without transitioning the run status, leaving it stuck in_progress
forever. Only cancelRun drove the in_progress → cancelling → cancelled
transition.
Add a private finaliseAbortedRun helper that inspects the current store
status and:
- leaves terminal runs untouched;
- leaves `cancelling` alone so cancelRun can finish its own transition;
- pushes `in_progress` / `queued` through cancelling → cancelled when
nobody else will.
Call it in every signal-driven abort branch across syncRun, asyncRun
and executeStreamBackground. Store errors are swallowed so cleanup
cannot mask the abort.
Also tighten the existing abort+resume regression test to assert the
second turn's prompt reaches the executor and the run reaches Completed,
not just that isResume=true was observed.
Summary
When a run is aborted mid-stream,
AgentRuntimereturns early without persisting the user input or any partial executor output to the thread. For executors that keep their own session state (e.g. the Claude CLI session file), this leaves the thread diverged from the executor's view of history — the executor has[user_N, partial_assistant_N]written, but the thread only has turns1..N-1.On the next request,
ensureThreadstill reportsisResume=true(thread has older turns), the executor is invoked withresume=<sessionId>, and startup can fail because the two histories no longer match.This change calls
appendMessagesin every abort path so the thread reflects the same user + streamMessages the executor has already observed. Subsequent resume requests start from a consistent history and can continue the conversation — matching the local Claude Code ESC-then-continue behaviour.Changes
syncRun: persist on both thesignal.abortedearly-return inside the loop and thecatchbranch where the executor throws after abort.asyncRun: same two paths inside the background IIFE.executeStreamBackground: same two paths, plus the cross-workerCancelling/Cancelledcheck after the loop completes normally.persistMessagesOnAbortcentralises the append and swallows store errors so a failing store cannot mask the abort or blockcancelRunfrom finalising the run status.Test plan
AgentRuntime.test.tsunderabort message persistence:syncRun/asyncRun/streamRun: persist user + partial assistant on abortsyncRun: persist only user message when aborted before any chunk is yieldedisResume=trueon the next run after an abortstore.appendMessagesfailure during abort cleanup does not throwtscand eslint clean.Summary by CodeRabbit
Bug Fixes
Tests