Add agent graceful stop/resume with checkpoint persistence#1023
Add agent graceful stop/resume with checkpoint persistence#1023aureliensibiril wants to merge 15 commits intogetprobo:mainfrom
Conversation
There was a problem hiding this comment.
4 issues found across 20 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="pkg/agent/restore.go">
<violation number="1" location="pkg/agent/restore.go:334">
P2: Awaiting-approval checkpoint restore only reconstructs one nested interruption level, which can lose deeper handoff context during resume.</violation>
</file>
<file name="pkg/probo/agent_run_supervisor.go">
<violation number="1" location="pkg/probo/agent_run_supervisor.go:66">
P1: Unvalidated supervisor durations can cause runtime panics in `time.NewTicker` when interval/lease values are non-positive.</violation>
<violation number="2" location="pkg/probo/agent_run_supervisor.go:145">
P2: Blocking semaphore acquisition in the claim loop can stall supervisor polling (including stop-request checks) when concurrency is saturated.</violation>
</file>
<file name="pkg/agent/run.go">
<violation number="1" location="pkg/agent/run.go:334">
P2: Checkpoint save errors are logged but ignored, so a failed Save still returns SuspendedError without a fallback checkpoint. Restore relies on the stored checkpoint; if Save failed, the run becomes non-recoverable. Consider surfacing the error or attaching the in-memory checkpoint when persistence fails.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
1 issue found across 1 file (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="pkg/probo/agent_run_supervisor_test.go">
<violation number="1" location="pkg/probo/agent_run_supervisor_test.go:130">
P2: Test bootstraps `agent_runs` with duplicated handwritten DDL instead of reusing migration source, creating schema-drift risk between tests and production.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| messagesCopy := make([]llm.Message, len(messages)) | ||
| copy(messagesCopy, messages) | ||
|
|
||
| if cp.Turns >= agent.maxTurns { |
There was a problem hiding this comment.
when this event happen? Seams not normal.
There was a problem hiding this comment.
as discussed, config will be snapshoted and superseed on restore
Message, Part (Text/Image/File), ToolCall, FunctionCall, and Usage now round-trip through JSON. Message uses a type-discriminated envelope for the Part interface. Required for checkpoint persistence. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Introduce Checkpoint, CheckpointStore, SuspendedError, AgentRegistry, and CompletedCall types. Add cooperative stop signal via context. Export CompletedCall (was unexported completedCall) so checkpoints can reference completed tool results. Add JSON tags to ToolResult and ApprovalResult for checkpoint serialization. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
coreLoop now saves incremental checkpoints after each tool-call turn and checks a cooperative stop signal at turn boundaries. SuspendedError is handled in finishRun, executeParallel, and executeSingleTool. Approval-interrupted checkpoints are persisted for both flat and nested interruptions. Introduce RunOption, WithCheckpointStore, RunWithOpts, ResumeWithOpts, and RunStreamedWithOpts so callers can provide checkpoint storage. Add StreamEventSuspended and OnRunRestore hook. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Restore loads a checkpoint from the store, resolves the agent from a registry, and re-enters coreLoop. Handles suspended, nested suspended (concurrent inner restore), and awaiting-approval states. Partial progress is saved when some inner agents complete while others remain suspended. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Create agent_runs table with lease-based concurrency control. AgentRun entity follows standard coredata patterns with Scoper, StrictNamedArgs, and cursor pagination. PGCheckpointStore implements agent.CheckpointStore backed by the checkpoint JSONB column with version validation and 10 MiB size guard. Register AgentRunEntityType as entity type 75. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Poll-based supervisor that claims PENDING agent runs with FOR UPDATE SKIP LOCKED, runs them with lease-based heartbeat, and handles graceful shutdown. On infrastructure stop the row stays RUNNING so stale recovery resets it to PENDING on restart; Restore picks up from the last checkpoint. Heartbeat loss cancels execution without committing a terminal status. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Move suspended and heartbeat-loss early returns before status assignment to avoid confusing dead code paths. Remove AWAITING_APPROVAL status since no API surface exists to provide approvals. Handle json.Marshal error on result. Move maxTurns check before stop signal in coreLoop. Remove duplicate version check in restoreCheckpoint. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Reject non-positive interval and lease durations in functional options to prevent time.NewTicker panics. Attach in-memory checkpoint to SuspendedError when store save fails so the supervisor can still recover from the last completed turn. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Tests the checkpoint persistence and supervisor lifecycle against a real Postgres database: - PGCheckpointStore Save/Load/Delete round-trip - Supervisor claims PENDING run and completes it - Cooperative stop/resume via stop_requested flag - SIGTERM battle test: 3 kill/resume cycles across 10 tool-call turns with parallel calls, long-running tools, thinking text, and progressive checkpoint accumulation Tests skip gracefully when Postgres is unavailable. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Read agent_runs DDL from coredata.Migrations instead of duplicating the schema inline, preventing test/production schema drift. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
JSON marshaling uses field names directly; explicit tags are unnecessary at this level. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
The status values describe the agent state, not the checkpoint data state. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
The test does not belong in the probo package. Move it alongside its shared helpers in pkg/agentruntest. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
1248afa to
5f66066
Compare
|
@cubic-dev-ai review this PR |
@aureliensibiril I have started the AI code review. It will take a few minutes to complete. |
There was a problem hiding this comment.
7 issues found across 21 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="pkg/agentruntest/agent_run_supervisor_test.go">
<violation number="1" location="pkg/agentruntest/agent_run_supervisor_test.go:613">
P2: SIGTERM subprocess test is not pinned to the created run ID, so it can claim an unrelated pending run from the shared queue and produce flaky assertions.</violation>
</file>
<file name="pkg/agent/run.go">
<violation number="1" location="pkg/agent/run.go:453">
P1: Nested suspension propagation drops the constructed checkpoint fallback, so runs may become non-resumable when checkpoint persistence is unavailable or save fails.</violation>
<violation number="2" location="pkg/agent/run.go:857">
P1: `executeParallel` ignores valid `SuspendedError` values when checkpoint is nil, causing suspension to be treated as normal tool error.</violation>
</file>
<file name="pkg/coredata/migrations/20260410T120000Z.sql">
<violation number="1" location="pkg/coredata/migrations/20260410T120000Z.sql:20">
P2: `agent_runs.status` is unconstrained free-form TEXT even though lifecycle logic depends on specific status values, risking stuck/unclaimable runs on invalid data.</violation>
</file>
<file name="pkg/agentruntest/agentruntest.go">
<violation number="1" location="pkg/agentruntest/agentruntest.go:102">
P2: `sync.Once` table initialization error is not persisted, so first failure can be silently hidden on later calls.</violation>
</file>
<file name="pkg/agent/restore_test.go">
<violation number="1" location="pkg/agent/restore_test.go:44">
P2: Test checkpointer performs only shallow checkpoint copies, causing aliasing of nested mutable fields and weakening persistence-boundary fidelity in restore tests.</violation>
</file>
<file name="pkg/coredata/agent_run.go">
<violation number="1" location="pkg/coredata/agent_run.go:676">
P1: Checkpoint writes are not lease-guarded, so stale workers can overwrite checkpoint state after lease loss.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| return s.pg.WithConn( | ||
| ctx, | ||
| func(ctx context.Context, conn pg.Querier) error { | ||
| q := `UPDATE agent_runs SET checkpoint = @checkpoint, updated_at = now() WHERE id = @id;` |
There was a problem hiding this comment.
P1: Checkpoint writes are not lease-guarded, so stale workers can overwrite checkpoint state after lease loss.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At pkg/coredata/agent_run.go, line 676:
<comment>Checkpoint writes are not lease-guarded, so stale workers can overwrite checkpoint state after lease loss.</comment>
<file context>
@@ -0,0 +1,755 @@
+ return s.pg.WithConn(
+ ctx,
+ func(ctx context.Context, conn pg.Querier) error {
+ q := `UPDATE agent_runs SET checkpoint = @checkpoint, updated_at = now() WHERE id = @id;`
+
+ args := pgx.StrictNamedArgs{
</file context>
| cmd.Env = append(os.Environ(), | ||
| "TEST_SIGTERM_SUBPROCESS=1", | ||
| "TEST_SIGTERM_PROGRESS_FILE="+progressFile, | ||
| "TEST_SIGTERM_SKIP_RESPONSES="+strconv.Itoa(skipResponses), |
There was a problem hiding this comment.
P2: SIGTERM subprocess test is not pinned to the created run ID, so it can claim an unrelated pending run from the shared queue and produce flaky assertions.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At pkg/agentruntest/agent_run_supervisor_test.go, line 613:
<comment>SIGTERM subprocess test is not pinned to the created run ID, so it can claim an unrelated pending run from the shared queue and produce flaky assertions.</comment>
<file context>
@@ -0,0 +1,875 @@
+ cmd.Env = append(os.Environ(),
+ "TEST_SIGTERM_SUBPROCESS=1",
+ "TEST_SIGTERM_PROGRESS_FILE="+progressFile,
+ "TEST_SIGTERM_SKIP_RESPONSES="+strconv.Itoa(skipResponses),
+ )
+ cmd.Stdout = os.Stdout
</file context>
| tenant_id TEXT NOT NULL, | ||
| organization_id TEXT NOT NULL, | ||
| start_agent_name TEXT NOT NULL, | ||
| status TEXT NOT NULL DEFAULT 'PENDING', |
There was a problem hiding this comment.
P2: agent_runs.status is unconstrained free-form TEXT even though lifecycle logic depends on specific status values, risking stuck/unclaimable runs on invalid data.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At pkg/coredata/migrations/20260410T120000Z.sql, line 20:
<comment>`agent_runs.status` is unconstrained free-form TEXT even though lifecycle logic depends on specific status values, risking stuck/unclaimable runs on invalid data.</comment>
<file context>
@@ -0,0 +1,37 @@
+ tenant_id TEXT NOT NULL,
+ organization_id TEXT NOT NULL,
+ start_agent_name TEXT NOT NULL,
+ status TEXT NOT NULL DEFAULT 'PENDING',
+ checkpoint JSONB,
+ input_messages JSONB NOT NULL,
</file context>
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| clone := *cp |
There was a problem hiding this comment.
P2: Test checkpointer performs only shallow checkpoint copies, causing aliasing of nested mutable fields and weakening persistence-boundary fidelity in restore tests.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At pkg/agent/restore_test.go, line 44:
<comment>Test checkpointer performs only shallow checkpoint copies, causing aliasing of nested mutable fields and weakening persistence-boundary fidelity in restore tests.</comment>
<file context>
@@ -0,0 +1,465 @@
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ clone := *cp
+ s.checkpoints[runID] = &clone
+ return nil
</file context>
executeParallel ignored SuspendedError when checkpoint was nil, treating it as a normal tool error. Nested suspension propagation also dropped the in-memory checkpoint when persistence failed, making runs non-resumable. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
The sync.Once error was stored in a local variable, so a failed first attempt would be silently hidden on later calls. Signed-off-by: Aurélien Sibiril <81782+aureliensibiril@users.noreply.github.com>
Summary
Messageand allParttypes now round-trip through JSON for checkpoint serializationcoreLoopsaves incremental checkpoints after every tool-call turn and checks a cooperative stop signal at turn boundariesRestorefunction reconstructs agent state from a persisted checkpoint (suspended, nested suspended, awaiting approval)AgentRunentity andPGCheckpointStorein coredata with lease-based concurrency and 10 MiB size guardTest plan
make test MODULE=./pkg/llm— JSON round-trip tests (40 tests)make test MODULE=./pkg/agent— stop signal, restore, existing agent tests (284 tests)make stack-up && make psqlthen runpkg/coredata/migrations/20260410T120000Z.sqlSummary by cubic
Adds graceful stop/resume to agents with durable JSON checkpoints, a Postgres-backed
AgentRunstore, and a supervisor that safely auto-resumes runs. Includes end-to-end tests for checkpoint persistence and the supervisor lifecycle.New Features
Message,Part,ToolCall,FunctionCall, andUsage; newCheckpoint/CheckpointStore; 10 MiB size guard.WithStopSignal; resume viaRestore; supports nested agents; addsStreamEventSuspendedandOnRunRestore; checks max-turns before stop signal; attaches in-memory checkpoint on save failure.RunWithOpts,RunStreamedWithOpts,ResumeWithOpts, andWithCheckpointerto plug in persistence.AgentRunentity with lease-based concurrency and stop requests;PGCheckpointStorewrites toagent_runs.checkpoint.AgentRunSupervisorclaims PENDING runs with a lease, heartbeats, and early-returns on lease loss or suspension; checkpoints on stop; resets stale RUNNING rows to PENDING for auto-resume; handles result serialization errors; guards non-positive interval/lease durations.pkg/agentruntest; Postgres-backed Save/Load/Delete round-trip, claim-and-complete flow, cooperative stop/resume via flag, and SIGTERM resilience across restarts; uses embeddedcoredata.Migrations(skips if Postgres is unavailable).CheckpointStatustoAgentStatus; removed unnecessary JSON tags from internal structs; fixed suspension checkpoint fallback in nested/parallel execution so resumability is preserved; persisted table init error inagentruntestto avoid silent failures.Migration
pkg/coredata/migrations/20260410T120000Z.sql.AgentRunSupervisorconfigured withPGCheckpointStoreand anAgentRegistry.stop_requested = true; on graceful shutdown, let the worker exit—runs resume from the last checkpoint on restart.Written for commit 734a2ac. Summary will update on new commits.