Skip to content

fix(agent-runtime): persist thread messages when a run is aborted#439

Merged
jerryliang64 merged 2 commits intomasterfrom
fix/agent-runtime-persist-on-abort
Apr 21, 2026
Merged

fix(agent-runtime): persist thread messages when a run is aborted#439
jerryliang64 merged 2 commits intomasterfrom
fix/agent-runtime-persist-on-abort

Conversation

@jerryliang64
Copy link
Copy Markdown
Contributor

@jerryliang64 jerryliang64 commented Apr 21, 2026

Summary

When a run is aborted mid-stream, AgentRuntime returns early without persisting the user input or any partial executor output to the thread. For executors that keep their own session state (e.g. the Claude CLI session file), this leaves the thread diverged from the executor's view of history — the executor has [user_N, partial_assistant_N] written, but the thread only has turns 1..N-1.

On the next request, ensureThread still reports isResume=true (thread has older turns), the executor is invoked with resume=<sessionId>, and startup can fail because the two histories no longer match.

This change calls appendMessages in every abort path so the thread reflects the same user + streamMessages the executor has already observed. Subsequent resume requests start from a consistent history and can continue the conversation — matching the local Claude Code ESC-then-continue behaviour.

Changes

  • syncRun: persist on both the signal.aborted early-return inside the loop and the catch branch where the executor throws after abort.
  • asyncRun: same two paths inside the background IIFE.
  • executeStreamBackground: same two paths, plus the cross-worker Cancelling/Cancelled check after the loop completes normally.
  • New private helper persistMessagesOnAbort centralises the append and swallows store errors so a failing store cannot mask the abort or block cancelRun from finalising the run status.

Test plan

  • 6 new cases in AgentRuntime.test.ts under abort message persistence:
    • syncRun / asyncRun / streamRun: persist user + partial assistant on abort
    • syncRun: persist only user message when aborted before any chunk is yielded
    • regression: isResume=true on the next run after an abort
    • store.appendMessages failure during abort cleanup does not throw
  • Full suite: 122 passing (116 existing + 6 new), no regressions.
  • tsc and eslint clean.

Summary by CodeRabbit

  • Bug Fixes

    • Improved handling of interrupted runs: User and assistant messages are now reliably persisted when runs are aborted or cancelled, preventing message loss.
    • Partial assistant responses generated before interruption are preserved to maintain accurate conversation history.
  • Tests

    • Added comprehensive tests for message persistence across abort and cancellation scenarios.

When a run is aborted mid-stream, the executor (e.g. Claude CLI) has
already written the user turn and any partial assistant output to its
own session state. Previously AgentRuntime returned early before
calling appendMessages, leaving the thread empty while the executor
session was not — so the next request hit isResume=true with a thread
that diverged from the session, and resume could fail at executor
startup.

This change persists the user input plus any streamMessages collected
so far in all abort paths of syncRun / asyncRun / executeStreamBackground
so the thread stays consistent with the executor's view, allowing the
next turn to resume cleanly. Store errors during cleanup are swallowed
to avoid masking the abort or blocking cancelRun from finalising state.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 21, 2026

Warning

Rate limit exceeded

@jerryliang64 has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 27 minutes and 29 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 27 minutes and 29 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4cc486b4-bb5c-446f-b4d8-aa725124c202

📥 Commits

Reviewing files that changed from the base of the PR and between bd556d6 and 28a2e20.

📒 Files selected for processing (2)
  • core/agent-runtime/src/AgentRuntime.ts
  • core/agent-runtime/test/AgentRuntime.test.ts
📝 Walkthrough

Walkthrough

The changes add message persistence when agent runs are aborted or cancelled. A streamMessages accumulator collects stream-based messages during execution, and a new persistMessagesOnAbort() helper ensures collected messages plus original input are appended to the thread when runs are interrupted, with graceful error handling for storage failures.

Changes

Cohort / File(s) Summary
Runtime Abort Message Persistence
core/agent-runtime/src/AgentRuntime.ts
Added streamMessages accumulator initialization in syncRun, asyncRun, and executeStreamBackground methods. Introduced persistMessagesOnAbort() helper method that appends original input messages and filtered stream messages to the thread store on abort/cancellation paths, with error swallowing and logging.
Abort Persistence Test Suite
core/agent-runtime/test/AgentRuntime.test.ts
Added comprehensive abort message persistence test suite with 5 test cases covering: syncRun abort via AbortController with partial executor output, asyncRun cancellation with partial messages, streamRun cancellation with SSE writer closure, isResume flag regression after abort, and storage failure error handling during abort cleanup.

Sequence Diagram

sequenceDiagram
    actor Client
    participant AgentRuntime
    participant Executor
    participant StreamBuffer as Stream Buffer<br/>(streamMessages)
    participant Store

    Client->>AgentRuntime: syncRun(messages) with AbortSignal
    AgentRuntime->>StreamBuffer: Initialize accumulator
    AgentRuntime->>Executor: Execute with abort signal
    Executor->>StreamBuffer: Yield chunk 1 (partial message)
    Executor->>StreamBuffer: Yield chunk 2 (partial message)
    Client->>AgentRuntime: Abort signal triggered
    AgentRuntime->>AgentRuntime: persistMessagesOnAbort()
    AgentRuntime->>Store: appendMessages([input] + filtered[partial])
    Store-->>AgentRuntime: Success/Error
    AgentRuntime-->>Client: Return with run_* id
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

Suggested reviewers

  • akitaSummer

Poem

🐰 When threads get cut short mid-thought,
Our messages are safely caught!
Stream buffers dance, then safely store—
No partial words lost anymore! ✨
Abort or cancel, don't you fret,
Your conversations won't be forget!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and clearly summarizes the main change: persisting thread messages when a run is aborted, which is the core bug fix implemented across multiple abort paths.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/agent-runtime-persist-on-abort

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a mechanism to persist input and partial stream messages to the thread when an agent run is aborted or cancelled. This ensures that the thread state remains synchronized with the executor's internal state, allowing for consistent resumes. The changes include the addition of a persistMessagesOnAbort helper and its integration into syncRun, asyncRun, and executeStreamBackground. Comprehensive tests were added to verify persistence across different run types and error conditions. Feedback suggests including the current message chunk in the persistence call within loops to ensure the most recent state is captured before the abort is processed.

Comment thread core/agent-runtime/src/AgentRuntime.ts
Comment thread core/agent-runtime/src/AgentRuntime.ts
Comment thread core/agent-runtime/src/AgentRuntime.ts
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
core/agent-runtime/src/AgentRuntime.ts (1)

204-208: ⚠️ Potential issue | 🟡 Minor

Cross-worker cancel path in asyncRun skips persistMessagesOnAbort — inconsistent with executeStreamBackground and reintroduces the divergence this PR is fixing.

In executeStreamBackground (lines 354-359) the post-loop check for an externally-set Cancelling/Cancelled status persists collected messages before returning. The analogous path here just returns, so in the worker-A-finishes-exec / worker-B-cancels scenario the thread gets no user or assistant messages even though the executor completed and (for executors with session files) already recorded the full turn. A subsequent resume then sees isResume=false (or out-of-sync history) — the exact bug class the PR targets, just on a different path.

🛠️ Proposed fix
         // Check if another worker has cancelled this run before writing final state
         const currentRun = await this.store.getRun(run.id);
         if (currentRun.status === RunStatus.Cancelling || currentRun.status === RunStatus.Cancelled) {
+          await this.persistMessagesOnAbort(threadId, input, streamMessages);
           return;
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/agent-runtime/src/AgentRuntime.ts` around lines 204 - 208, In asyncRun,
after reloading the run via this.store.getRun(run.id) and detecting
currentRun.status is RunStatus.Cancelling or RunStatus.Cancelled, call the same
persistMessagesOnAbort(...) flow used in executeStreamBackground to flush any
collected user/assistant messages before returning; keep the RunStatus check and
return but ensure persistMessagesOnAbort is invoked with the same
arguments/state that asyncRun has (so messages are persisted on cross-worker
cancel just like in executeStreamBackground).
🧹 Nitpick comments (2)
core/agent-runtime/test/AgentRuntime.test.ts (1)

956-982: Consider asserting the documented post-condition and restoring the stub in a finally.

The comment on line 980 states "state is empty, but the abort path completed cleanly", but only the latter is actually asserted. Adding a thread-length assertion pins down the swallow-on-failure contract, and restoring the stub in finally keeps afterEachruntime.destroy() from tripping over the patched appendMessages if an assertion above throws.

-      const ac = new AbortController();
-      const syncPromise = runtime.syncRun(
-        { threadId: thread.id, input: { messages: [{ role: 'user', content: 'Hi' }] } },
-        ac.signal,
-      );
-      await waitUntil(() => yielded);
-      ac.abort();
-      // Should resolve with a snapshot instead of rejecting
-      const result = await syncPromise;
-      assert(result.id.startsWith('run_'));
-
-      // Thread append failed — state is empty, but the abort path completed cleanly
-      store.appendMessages = origAppend;
+      try {
+        const ac = new AbortController();
+        const syncPromise = runtime.syncRun(
+          { threadId: thread.id, input: { messages: [{ role: 'user', content: 'Hi' }] } },
+          ac.signal,
+        );
+        await waitUntil(() => yielded);
+        ac.abort();
+        // Should resolve with a snapshot instead of rejecting
+        const result = await syncPromise;
+        assert(result.id.startsWith('run_'));
+      } finally {
+        store.appendMessages = origAppend;
+      }
+
+      // Thread append failed — state remained empty, but the abort path completed cleanly
+      const updated = await runtime.getThread(thread.id);
+      assert.equal(updated.messages.length, 0);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/agent-runtime/test/AgentRuntime.test.ts` around lines 956 - 982, Update
the test "should swallow store.appendMessages errors during abort cleanup" to
assert the documented post-condition by checking the thread's messages/state is
empty after the aborted syncRun (use runtime.createThread/thread.id and fetch
the thread or thread.messages length) and ensure the original
store.appendMessages is restored in a finally block so the stub is always
reverted even if an assertion fails; keep the existing flow (createSlowExecRun,
runtime.syncRun, AbortController, waitUntil/yielded) but wrap the stub
replacement and assertions in try/finally where finally resets
store.appendMessages = origAppend.
core/agent-runtime/src/AgentRuntime.ts (1)

392-415: Helper looks good; one small observability note.

The helper correctly centralizes the append + error-swallow contract so abort finalization (cancelRunrb.cancel()) can't be masked by a store failure. Single invocation per run is guaranteed across all call sites (in-loop abort returns immediately, catch branch is mutually exclusive with the loop completing, cross-worker check only fires on normal loop exit), so there's no double-append risk.

Minor: on failure, the thread silently diverges from the executor's session — which is the very condition this helper exists to prevent. Consider attaching threadId and the intended message count to the error log to make post-mortem triage easier.

     } catch (err) {
-      this.logger.error('[AgentRuntime] failed to persist messages on abort:', err);
+      this.logger.error(
+        '[AgentRuntime] failed to persist messages on abort (threadId=%s, streamMessages=%d):',
+        threadId,
+        streamMessages.length,
+        err,
+      );
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/agent-runtime/src/AgentRuntime.ts` around lines 392 - 415,
persistMessagesOnAbort swallows store failures but the error log lacks context;
modify persistMessagesOnAbort to compute the intended append message count (e.g.
const msgs = [...MessageConverter.toAgentMessages(input.input.messages),
...MessageConverter.filterForStorage(streamMessages)]; const count =
msgs.length) and include threadId, count and the caught error in the
logger.error call (reference persistMessagesOnAbort,
MessageConverter.toAgentMessages, MessageConverter.filterForStorage,
this.store.appendMessages and this.logger.error) so post-mortem triage can see
which thread and how many messages failed to persist.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@core/agent-runtime/src/AgentRuntime.ts`:
- Around line 204-208: In asyncRun, after reloading the run via
this.store.getRun(run.id) and detecting currentRun.status is
RunStatus.Cancelling or RunStatus.Cancelled, call the same
persistMessagesOnAbort(...) flow used in executeStreamBackground to flush any
collected user/assistant messages before returning; keep the RunStatus check and
return but ensure persistMessagesOnAbort is invoked with the same
arguments/state that asyncRun has (so messages are persisted on cross-worker
cancel just like in executeStreamBackground).

---

Nitpick comments:
In `@core/agent-runtime/src/AgentRuntime.ts`:
- Around line 392-415: persistMessagesOnAbort swallows store failures but the
error log lacks context; modify persistMessagesOnAbort to compute the intended
append message count (e.g. const msgs =
[...MessageConverter.toAgentMessages(input.input.messages),
...MessageConverter.filterForStorage(streamMessages)]; const count =
msgs.length) and include threadId, count and the caught error in the
logger.error call (reference persistMessagesOnAbort,
MessageConverter.toAgentMessages, MessageConverter.filterForStorage,
this.store.appendMessages and this.logger.error) so post-mortem triage can see
which thread and how many messages failed to persist.

In `@core/agent-runtime/test/AgentRuntime.test.ts`:
- Around line 956-982: Update the test "should swallow store.appendMessages
errors during abort cleanup" to assert the documented post-condition by checking
the thread's messages/state is empty after the aborted syncRun (use
runtime.createThread/thread.id and fetch the thread or thread.messages length)
and ensure the original store.appendMessages is restored in a finally block so
the stub is always reverted even if an assertion fails; keep the existing flow
(createSlowExecRun, runtime.syncRun, AbortController, waitUntil/yielded) but
wrap the stub replacement and assertions in try/finally where finally resets
store.appendMessages = origAppend.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8e81987f-6f7c-4523-973f-69b32d896a13

📥 Commits

Reviewing files that changed from the base of the PR and between 1ca4099 and bd556d6.

📒 Files selected for processing (2)
  • core/agent-runtime/src/AgentRuntime.ts
  • core/agent-runtime/test/AgentRuntime.test.ts

Previously, a run aborted via an external AbortSignal (on syncRun) or
via destroy() on in-flight asyncRun/streamRun tasks would return early
without transitioning the run status, leaving it stuck in_progress
forever. Only cancelRun drove the in_progress → cancelling → cancelled
transition.

Add a private finaliseAbortedRun helper that inspects the current store
status and:
  - leaves terminal runs untouched;
  - leaves `cancelling` alone so cancelRun can finish its own transition;
  - pushes `in_progress` / `queued` through cancelling → cancelled when
    nobody else will.

Call it in every signal-driven abort branch across syncRun, asyncRun
and executeStreamBackground. Store errors are swallowed so cleanup
cannot mask the abort.

Also tighten the existing abort+resume regression test to assert the
second turn's prompt reaches the executor and the run reaches Completed,
not just that isResume=true was observed.
@jerryliang64 jerryliang64 merged commit 384ab1b into master Apr 21, 2026
12 checks passed
@jerryliang64 jerryliang64 deleted the fix/agent-runtime-persist-on-abort branch April 21, 2026 07:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant