Skip to content

Add agent graceful stop/resume with checkpoint persistence#1023

Open
aureliensibiril wants to merge 15 commits intogetprobo:mainfrom
aureliensibiril:aureliensibiril/checkpoint-agents
Open

Add agent graceful stop/resume with checkpoint persistence#1023
aureliensibiril wants to merge 15 commits intogetprobo:mainfrom
aureliensibiril:aureliensibiril/checkpoint-agents

Conversation

@aureliensibiril
Copy link
Copy Markdown
Contributor

@aureliensibiril aureliensibiril commented Apr 10, 2026

Summary

  • LLM Message and all Part types now round-trip through JSON for checkpoint serialization
  • coreLoop saves incremental checkpoints after every tool-call turn and checks a cooperative stop signal at turn boundaries
  • Restore function reconstructs agent state from a persisted checkpoint (suspended, nested suspended, awaiting approval)
  • AgentRun entity and PGCheckpointStore in coredata with lease-based concurrency and 10 MiB size guard
  • Poll-based supervisor worker that claims, runs, heartbeats, and recovers agent runs; on graceful shutdown rows stay RUNNING so stale recovery auto-resumes on restart

Test plan

  • make test MODULE=./pkg/llm — JSON round-trip tests (40 tests)
  • make test MODULE=./pkg/agent — stop signal, restore, existing agent tests (284 tests)
  • Verify migration applies cleanly: make stack-up && make psql then run pkg/coredata/migrations/20260410T120000Z.sql
  • Manual: start an agent run, kill the process mid-turn, restart, verify it resumes from checkpoint

Summary by cubic

Adds graceful stop/resume to agents with durable JSON checkpoints, a Postgres-backed AgentRun store, and a supervisor that safely auto-resumes runs. Includes end-to-end tests for checkpoint persistence and the supervisor lifecycle.

  • New Features

    • Checkpoint persistence: JSON round-trip for LLM Message, Part, ToolCall, FunctionCall, and Usage; new Checkpoint/CheckpointStore; 10 MiB size guard.
    • Agent loop: incremental checkpoints each tool-call turn; cooperative stop via WithStopSignal; resume via Restore; supports nested agents; adds StreamEventSuspended and OnRunRestore; checks max-turns before stop signal; attaches in-memory checkpoint on save failure.
    • Run APIs: RunWithOpts, RunStreamedWithOpts, ResumeWithOpts, and WithCheckpointer to plug in persistence.
    • Core data: AgentRun entity with lease-based concurrency and stop requests; PGCheckpointStore writes to agent_runs.checkpoint.
    • Supervisor: AgentRunSupervisor claims PENDING runs with a lease, heartbeats, and early-returns on lease loss or suspension; checkpoints on stop; resets stale RUNNING rows to PENDING for auto-resume; handles result serialization errors; guards non-positive interval/lease durations.
    • Supervisor integration tests: now in pkg/agentruntest; Postgres-backed Save/Load/Delete round-trip, claim-and-complete flow, cooperative stop/resume via flag, and SIGTERM resilience across restarts; uses embedded coredata.Migrations (skips if Postgres is unavailable).
    • Polish: renamed CheckpointStatus to AgentStatus; removed unnecessary JSON tags from internal structs; fixed suspension checkpoint fallback in nested/parallel execution so resumability is preserved; persisted table init error in agentruntest to avoid silent failures.
  • Migration

    • Apply DB migration: pkg/coredata/migrations/20260410T120000Z.sql.
    • Deploy the AgentRunSupervisor configured with PGCheckpointStore and an AgentRegistry.
    • To request a cooperative stop, set stop_requested = true; on graceful shutdown, let the worker exit—runs resume from the last checkpoint on restart.

Written for commit 734a2ac. Summary will update on new commits.

Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 issues found across 20 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="pkg/agent/restore.go">

<violation number="1" location="pkg/agent/restore.go:334">
P2: Awaiting-approval checkpoint restore only reconstructs one nested interruption level, which can lose deeper handoff context during resume.</violation>
</file>

<file name="pkg/probo/agent_run_supervisor.go">

<violation number="1" location="pkg/probo/agent_run_supervisor.go:66">
P1: Unvalidated supervisor durations can cause runtime panics in `time.NewTicker` when interval/lease values are non-positive.</violation>

<violation number="2" location="pkg/probo/agent_run_supervisor.go:145">
P2: Blocking semaphore acquisition in the claim loop can stall supervisor polling (including stop-request checks) when concurrency is saturated.</violation>
</file>

<file name="pkg/agent/run.go">

<violation number="1" location="pkg/agent/run.go:334">
P2: Checkpoint save errors are logged but ignored, so a failed Save still returns SuspendedError without a fallback checkpoint. Restore relies on the stored checkpoint; if Save failed, the run becomes non-recoverable. Consider surfacing the error or attaching the in-memory checkpoint when persistence fails.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 1 file (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="pkg/probo/agent_run_supervisor_test.go">

<violation number="1" location="pkg/probo/agent_run_supervisor_test.go:130">
P2: Test bootstraps `agent_runs` with duplicated handwritten DDL instead of reusing migration source, creating schema-drift risk between tests and production.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

messagesCopy := make([]llm.Message, len(messages))
copy(messagesCopy, messages)

if cp.Turns >= agent.maxTurns {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when this event happen? Seams not normal.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed, config will be snapshoted and superseed on restore

Message, Part (Text/Image/File), ToolCall, FunctionCall, and Usage
now round-trip through JSON. Message uses a type-discriminated
envelope for the Part interface. Required for checkpoint persistence.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Introduce Checkpoint, CheckpointStore, SuspendedError, AgentRegistry,
and CompletedCall types. Add cooperative stop signal via context.
Export CompletedCall (was unexported completedCall) so checkpoints
can reference completed tool results. Add JSON tags to ToolResult
and ApprovalResult for checkpoint serialization.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
coreLoop now saves incremental checkpoints after each tool-call turn
and checks a cooperative stop signal at turn boundaries. SuspendedError
is handled in finishRun, executeParallel, and executeSingleTool.
Approval-interrupted checkpoints are persisted for both flat and
nested interruptions.

Introduce RunOption, WithCheckpointStore, RunWithOpts, ResumeWithOpts,
and RunStreamedWithOpts so callers can provide checkpoint storage.
Add StreamEventSuspended and OnRunRestore hook.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Restore loads a checkpoint from the store, resolves the agent from
a registry, and re-enters coreLoop. Handles suspended, nested
suspended (concurrent inner restore), and awaiting-approval states.
Partial progress is saved when some inner agents complete while
others remain suspended.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Create agent_runs table with lease-based concurrency control.
AgentRun entity follows standard coredata patterns with Scoper,
StrictNamedArgs, and cursor pagination. PGCheckpointStore implements
agent.CheckpointStore backed by the checkpoint JSONB column with
version validation and 10 MiB size guard. Register AgentRunEntityType
as entity type 75.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Poll-based supervisor that claims PENDING agent runs with FOR UPDATE
SKIP LOCKED, runs them with lease-based heartbeat, and handles
graceful shutdown. On infrastructure stop the row stays RUNNING so
stale recovery resets it to PENDING on restart; Restore picks up
from the last checkpoint. Heartbeat loss cancels execution without
committing a terminal status.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Move suspended and heartbeat-loss early returns before status
assignment to avoid confusing dead code paths. Remove AWAITING_APPROVAL
status since no API surface exists to provide approvals. Handle
json.Marshal error on result. Move maxTurns check before stop signal
in coreLoop. Remove duplicate version check in restoreCheckpoint.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Reject non-positive interval and lease durations in functional
options to prevent time.NewTicker panics. Attach in-memory
checkpoint to SuspendedError when store save fails so the
supervisor can still recover from the last completed turn.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Tests the checkpoint persistence and supervisor lifecycle against a
real Postgres database:

- PGCheckpointStore Save/Load/Delete round-trip
- Supervisor claims PENDING run and completes it
- Cooperative stop/resume via stop_requested flag
- SIGTERM battle test: 3 kill/resume cycles across 10 tool-call
  turns with parallel calls, long-running tools, thinking text,
  and progressive checkpoint accumulation

Tests skip gracefully when Postgres is unavailable.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Read agent_runs DDL from coredata.Migrations instead of
duplicating the schema inline, preventing test/production
schema drift.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
JSON marshaling uses field names directly; explicit tags
are unnecessary at this level.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
The status values describe the agent state, not the
checkpoint data state.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
The test does not belong in the probo package. Move it
alongside its shared helpers in pkg/agentruntest.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
@aureliensibiril aureliensibiril force-pushed the aureliensibiril/checkpoint-agents branch from 1248afa to 5f66066 Compare April 13, 2026 21:03
@aureliensibiril
Copy link
Copy Markdown
Contributor Author

@cubic-dev-ai review this PR

@cubic-dev-ai
Copy link
Copy Markdown

cubic-dev-ai bot commented Apr 13, 2026

@cubic-dev-ai review this PR

@aureliensibiril I have started the AI code review. It will take a few minutes to complete.

Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7 issues found across 21 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="pkg/agentruntest/agent_run_supervisor_test.go">

<violation number="1" location="pkg/agentruntest/agent_run_supervisor_test.go:613">
P2: SIGTERM subprocess test is not pinned to the created run ID, so it can claim an unrelated pending run from the shared queue and produce flaky assertions.</violation>
</file>

<file name="pkg/agent/run.go">

<violation number="1" location="pkg/agent/run.go:453">
P1: Nested suspension propagation drops the constructed checkpoint fallback, so runs may become non-resumable when checkpoint persistence is unavailable or save fails.</violation>

<violation number="2" location="pkg/agent/run.go:857">
P1: `executeParallel` ignores valid `SuspendedError` values when checkpoint is nil, causing suspension to be treated as normal tool error.</violation>
</file>

<file name="pkg/coredata/migrations/20260410T120000Z.sql">

<violation number="1" location="pkg/coredata/migrations/20260410T120000Z.sql:20">
P2: `agent_runs.status` is unconstrained free-form TEXT even though lifecycle logic depends on specific status values, risking stuck/unclaimable runs on invalid data.</violation>
</file>

<file name="pkg/agentruntest/agentruntest.go">

<violation number="1" location="pkg/agentruntest/agentruntest.go:102">
P2: `sync.Once` table initialization error is not persisted, so first failure can be silently hidden on later calls.</violation>
</file>

<file name="pkg/agent/restore_test.go">

<violation number="1" location="pkg/agent/restore_test.go:44">
P2: Test checkpointer performs only shallow checkpoint copies, causing aliasing of nested mutable fields and weakening persistence-boundary fidelity in restore tests.</violation>
</file>

<file name="pkg/coredata/agent_run.go">

<violation number="1" location="pkg/coredata/agent_run.go:676">
P1: Checkpoint writes are not lease-guarded, so stale workers can overwrite checkpoint state after lease loss.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

return s.pg.WithConn(
ctx,
func(ctx context.Context, conn pg.Querier) error {
q := `UPDATE agent_runs SET checkpoint = @checkpoint, updated_at = now() WHERE id = @id;`
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Checkpoint writes are not lease-guarded, so stale workers can overwrite checkpoint state after lease loss.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At pkg/coredata/agent_run.go, line 676:

<comment>Checkpoint writes are not lease-guarded, so stale workers can overwrite checkpoint state after lease loss.</comment>

<file context>
@@ -0,0 +1,755 @@
+	return s.pg.WithConn(
+		ctx,
+		func(ctx context.Context, conn pg.Querier) error {
+			q := `UPDATE agent_runs SET checkpoint = @checkpoint, updated_at = now() WHERE id = @id;`
+
+			args := pgx.StrictNamedArgs{
</file context>
Fix with Cubic

cmd.Env = append(os.Environ(),
"TEST_SIGTERM_SUBPROCESS=1",
"TEST_SIGTERM_PROGRESS_FILE="+progressFile,
"TEST_SIGTERM_SKIP_RESPONSES="+strconv.Itoa(skipResponses),
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: SIGTERM subprocess test is not pinned to the created run ID, so it can claim an unrelated pending run from the shared queue and produce flaky assertions.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At pkg/agentruntest/agent_run_supervisor_test.go, line 613:

<comment>SIGTERM subprocess test is not pinned to the created run ID, so it can claim an unrelated pending run from the shared queue and produce flaky assertions.</comment>

<file context>
@@ -0,0 +1,875 @@
+		cmd.Env = append(os.Environ(),
+			"TEST_SIGTERM_SUBPROCESS=1",
+			"TEST_SIGTERM_PROGRESS_FILE="+progressFile,
+			"TEST_SIGTERM_SKIP_RESPONSES="+strconv.Itoa(skipResponses),
+		)
+		cmd.Stdout = os.Stdout
</file context>
Fix with Cubic

tenant_id TEXT NOT NULL,
organization_id TEXT NOT NULL,
start_agent_name TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'PENDING',
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: agent_runs.status is unconstrained free-form TEXT even though lifecycle logic depends on specific status values, risking stuck/unclaimable runs on invalid data.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At pkg/coredata/migrations/20260410T120000Z.sql, line 20:

<comment>`agent_runs.status` is unconstrained free-form TEXT even though lifecycle logic depends on specific status values, risking stuck/unclaimable runs on invalid data.</comment>

<file context>
@@ -0,0 +1,37 @@
+    tenant_id       TEXT NOT NULL,
+    organization_id TEXT NOT NULL,
+    start_agent_name TEXT NOT NULL,
+    status          TEXT NOT NULL DEFAULT 'PENDING',
+    checkpoint      JSONB,
+    input_messages  JSONB NOT NULL,
</file context>
Fix with Cubic

s.mu.Lock()
defer s.mu.Unlock()

clone := *cp
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Apr 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Test checkpointer performs only shallow checkpoint copies, causing aliasing of nested mutable fields and weakening persistence-boundary fidelity in restore tests.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At pkg/agent/restore_test.go, line 44:

<comment>Test checkpointer performs only shallow checkpoint copies, causing aliasing of nested mutable fields and weakening persistence-boundary fidelity in restore tests.</comment>

<file context>
@@ -0,0 +1,465 @@
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	clone := *cp
+	s.checkpoints[runID] = &clone
+	return nil
</file context>
Fix with Cubic

executeParallel ignored SuspendedError when checkpoint was nil,
treating it as a normal tool error. Nested suspension propagation
also dropped the in-memory checkpoint when persistence failed,
making runs non-resumable.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
The sync.Once error was stored in a local variable, so a
failed first attempt would be silently hidden on later calls.

Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants