From 0ff03dc8972984886f52691e152f21d9077d45a2 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Thu, 19 Feb 2026 09:44:03 +0000 Subject: [PATCH 1/8] feat: add /x/acpio package --- go.mod | 1 + go.sum | 2 + x/acpio/acp_conversation.go | 247 ++++++++++++++++ x/acpio/acp_conversation_test.go | 474 +++++++++++++++++++++++++++++++ x/acpio/acpio.go | 239 ++++++++++++++++ 5 files changed, 963 insertions(+) create mode 100644 x/acpio/acp_conversation.go create mode 100644 x/acpio/acp_conversation_test.go create mode 100644 x/acpio/acpio.go diff --git a/go.mod b/go.mod index 35aee152..5c1a486f 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/ActiveState/termtest/xpty v0.6.0 github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d github.com/charmbracelet/bubbletea v1.3.4 + github.com/coder/acp-go-sdk v0.6.3 github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225 github.com/coder/quartz v0.1.2 github.com/danielgtaylor/huma/v2 v2.32.0 diff --git a/go.sum b/go.sum index a98ca56d..1d70abb9 100644 --- a/go.sum +++ b/go.sum @@ -163,6 +163,8 @@ github.com/ckaznocha/intrange v0.3.1 h1:j1onQyXvHUsPWujDH6WIjhyH26gkRt/txNlV7Lsp github.com/ckaznocha/intrange v0.3.1/go.mod h1:QVepyz1AkUoFQkpEqksSYpNpUo3c5W7nWh/s6SHIJJk= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/coder/acp-go-sdk v0.6.3 h1:LsXQytehdjKIYJnoVWON/nf7mqbiarnyuyE3rrjBsXQ= +github.com/coder/acp-go-sdk v0.6.3/go.mod h1:yKzM/3R9uELp4+nBAwwtkS0aN1FOFjo11CNPy37yFko= github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225 h1:tRIViZ5JRmzdOEo5wUWngaGEFBG8OaE1o2GIHN5ujJ8= github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225/go.mod h1:rNLVpYgEVeu1Zk29K64z6Od8RBP9DwqCu9OfCzh8MR4= github.com/coder/paralleltestctx v0.0.1 h1:eauyehej1XYTGwgzGWMTjeRIVgOpU6XLPNVb2oi6kDs= diff --git a/x/acpio/acp_conversation.go b/x/acpio/acp_conversation.go new file mode 100644 index 00000000..f58e7ff3 --- /dev/null +++ b/x/acpio/acp_conversation.go @@ -0,0 +1,247 @@ +package acpio + +import ( + "context" + "log/slog" + "slices" + "strings" + "sync" + + st "github.com/coder/agentapi/lib/screentracker" + "github.com/coder/quartz" +) + +// Compile-time assertion that ACPConversation implements st.Conversation +var _ st.Conversation = (*ACPConversation)(nil) + +// ChunkableAgentIO extends AgentIO with chunk callback support for streaming responses. +// This interface is what ACPConversation needs from its AgentIO implementation. +type ChunkableAgentIO interface { + st.AgentIO + SetOnChunk(fn func(chunk string)) +} + +// ACPConversation tracks conversations with ACP-based agents. +// Unlike PTY-based Conversation, ACP has blocking writes where the +// response is complete when Write() returns. +type ACPConversation struct { + mu sync.Mutex + ctx context.Context + cancel context.CancelFunc + agentIO ChunkableAgentIO + messages []st.ConversationMessage + prompting bool // true while agent is processing + streamingResponse strings.Builder + logger *slog.Logger + emitter st.Emitter + initialPrompt []st.MessagePart + clock quartz.Clock +} + +// noopEmitter is a no-op implementation of Emitter for when no emitter is provided. +type noopEmitter struct{} + +func (noopEmitter) EmitMessages([]st.ConversationMessage) {} +func (noopEmitter) EmitStatus(st.ConversationStatus) {} +func (noopEmitter) EmitScreen(string) {} + +// NewACPConversation creates a new ACPConversation. +// If emitter is provided, it will receive events when messages/status/screen change. +// If clock is nil, a real clock will be used. +func NewACPConversation(ctx context.Context, agentIO ChunkableAgentIO, logger *slog.Logger, initialPrompt []st.MessagePart, emitter st.Emitter, clock quartz.Clock) *ACPConversation { + if logger == nil { + logger = slog.Default() + } + if clock == nil { + clock = quartz.NewReal() + } + if emitter == nil { + emitter = noopEmitter{} + } + ctx, cancel := context.WithCancel(ctx) + c := &ACPConversation{ + ctx: ctx, + cancel: cancel, + agentIO: agentIO, + logger: logger, + initialPrompt: initialPrompt, + emitter: emitter, + clock: clock, + } + return c +} + +// Messages returns the conversation history. +func (c *ACPConversation) Messages() []st.ConversationMessage { + c.mu.Lock() + defer c.mu.Unlock() + return slices.Clone(c.messages) +} + +// Send sends a message to the agent asynchronously. +// It returns immediately after recording the user message and starts +// the agent request in a background goroutine. Returns an error if +// a message is already being processed. +func (c *ACPConversation) Send(messageParts ...st.MessagePart) error { + message := "" + for _, part := range messageParts { + message += part.String() + } + + // Validate whitespace BEFORE trimming (match PTY behavior) + if message != strings.TrimSpace(message) { + return st.ErrMessageValidationWhitespace + } + + if message == "" { + return st.ErrMessageValidationEmpty + } + + // Check if already prompting and set state atomically + c.mu.Lock() + if c.prompting { + c.mu.Unlock() + return st.ErrMessageValidationChanging + } + c.messages = append(c.messages, st.ConversationMessage{ + Id: len(c.messages), + Role: st.ConversationRoleUser, + Message: message, + Time: c.clock.Now(), + }) + // Add placeholder for streaming agent response + c.messages = append(c.messages, st.ConversationMessage{ + Id: len(c.messages), + Role: st.ConversationRoleAgent, + Message: "", + Time: c.clock.Now(), + }) + c.streamingResponse.Reset() + c.prompting = true + status := c.statusLocked() + c.mu.Unlock() + + // Emit status change to "running" before starting the prompt + c.emitter.EmitStatus(status) + + c.logger.Debug("ACPConversation sending message", "message", message) + + // Run the blocking write in a goroutine so HTTP returns immediately + go c.executePrompt(messageParts) + + return nil +} + +// Start sets up chunk handling and sends the initial prompt if provided. +func (c *ACPConversation) Start(ctx context.Context) { + // Wire up the chunk callback for streaming + c.agentIO.SetOnChunk(c.handleChunk) + + // Send initial prompt if provided + if len(c.initialPrompt) > 0 { + err := c.Send(c.initialPrompt...) + if err != nil { + c.logger.Error("ACPConversation failed to send initial prompt", "error", err) + } + } else { + // No initial prompt means we start in stable state + c.emitter.EmitStatus(c.Status()) + } +} + +// Status returns the current conversation status. +func (c *ACPConversation) Status() st.ConversationStatus { + c.mu.Lock() + defer c.mu.Unlock() + return c.statusLocked() +} + +// statusLocked returns the status without acquiring the lock (caller must hold lock). +func (c *ACPConversation) statusLocked() st.ConversationStatus { + if c.prompting { + return st.ConversationStatusChanging // agent is processing + } + return st.ConversationStatusStable +} + +// Stop cancels any in-progress operations. +func (c *ACPConversation) Stop() { + c.cancel() +} + +// Text returns the current streaming response text. +func (c *ACPConversation) Text() string { + c.mu.Lock() + defer c.mu.Unlock() + return c.streamingResponse.String() +} + +// handleChunk is called for each streaming chunk from the agent. +func (c *ACPConversation) handleChunk(chunk string) { + c.mu.Lock() + c.streamingResponse.WriteString(chunk) + // Update the last message (the streaming agent response) + if len(c.messages) > 0 { + c.messages[len(c.messages)-1].Message = c.streamingResponse.String() + } + messages := slices.Clone(c.messages) + status := c.statusLocked() + screen := c.streamingResponse.String() + c.mu.Unlock() + + c.emitter.EmitMessages(messages) + c.emitter.EmitStatus(status) + c.emitter.EmitScreen(screen) +} + +// executePrompt runs the actual agent request in background +func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) { + var err error + for _, part := range messageParts { + if c.ctx.Err() != nil { + err = c.ctx.Err() + break + } + if partErr := part.Do(c.agentIO); partErr != nil { + err = partErr + break + } + } + + c.mu.Lock() + c.prompting = false + + if err != nil { + c.logger.Error("ACPConversation message failed", "error", err) + // Remove the agent's streaming message on error (may be empty or partial) + if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent { + c.messages = c.messages[:len(c.messages)-1] + } + messages := slices.Clone(c.messages) + status := c.statusLocked() + screen := c.streamingResponse.String() + c.mu.Unlock() + + c.emitter.EmitMessages(messages) + c.emitter.EmitStatus(status) + c.emitter.EmitScreen(screen) + return + } + + // Final response should already be in the last message via streaming + // but ensure it's finalized + response := c.streamingResponse.String() + if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent { + c.messages[len(c.messages)-1].Message = strings.TrimSpace(response) + } + messages := slices.Clone(c.messages) + status := c.statusLocked() + screen := c.streamingResponse.String() + c.mu.Unlock() + + c.emitter.EmitMessages(messages) + c.emitter.EmitStatus(status) + c.emitter.EmitScreen(screen) + + c.logger.Debug("ACPConversation message complete", "responseLen", len(response)) +} diff --git a/x/acpio/acp_conversation_test.go b/x/acpio/acp_conversation_test.go new file mode 100644 index 00000000..0811bc3b --- /dev/null +++ b/x/acpio/acp_conversation_test.go @@ -0,0 +1,474 @@ +package acpio_test + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/coder/quartz" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/coder/agentapi/lib/screentracker" + "github.com/coder/agentapi/x/acpio" +) + +// mockAgentIO implements acpio.ChunkableAgentIO for testing. +// It provides a channel-based synchronization mechanism to test ACPConversation +// without relying on time.Sleep. +type mockAgentIO struct { + mu sync.Mutex + written []byte + screenContent string + onChunkFn func(chunk string) + + // Control behavior + writeErr error + // writeBlock is a channel that, if non-nil, will cause Write to block until closed. + // This allows tests to control when the write completes. + writeBlock chan struct{} + // writeStarted is closed when Write begins (before blocking on writeBlock). + // This allows tests to synchronize on the write starting. + writeStarted chan struct{} +} + +// mockEmitter implements screentracker.Emitter for testing. +type mockEmitter struct { + mu sync.Mutex + messagesCalls int + statusCalls int + screenCalls int + lastMessages []screentracker.ConversationMessage + lastStatus screentracker.ConversationStatus + lastScreen string +} + +func newMockEmitter() *mockEmitter { + return &mockEmitter{} +} + +func (m *mockEmitter) EmitMessages(messages []screentracker.ConversationMessage) { + m.mu.Lock() + defer m.mu.Unlock() + m.messagesCalls++ + m.lastMessages = messages +} + +func (m *mockEmitter) EmitStatus(status screentracker.ConversationStatus) { + m.mu.Lock() + defer m.mu.Unlock() + m.statusCalls++ + m.lastStatus = status +} + +func (m *mockEmitter) EmitScreen(screen string) { + m.mu.Lock() + defer m.mu.Unlock() + m.screenCalls++ + m.lastScreen = screen +} + +func (m *mockEmitter) TotalCalls() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.messagesCalls + m.statusCalls + m.screenCalls +} + +func newMockAgentIO() *mockAgentIO { + return &mockAgentIO{} +} + +func (m *mockAgentIO) Write(data []byte) (int, error) { + // Signal that write has started + m.mu.Lock() + started := m.writeStarted + block := m.writeBlock + m.mu.Unlock() + + if started != nil { + close(started) + } + + // Block if configured to do so (for testing concurrent sends) + if block != nil { + <-block + } + + m.mu.Lock() + defer m.mu.Unlock() + if m.writeErr != nil { + return 0, m.writeErr + } + m.written = append(m.written, data...) + return len(data), nil +} + +func (m *mockAgentIO) ReadScreen() string { + m.mu.Lock() + defer m.mu.Unlock() + return m.screenContent +} + +func (m *mockAgentIO) SetOnChunk(fn func(chunk string)) { + m.mu.Lock() + defer m.mu.Unlock() + m.onChunkFn = fn +} + +// SimulateChunks simulates the agent sending streaming chunks. +// This triggers the onChunk callback as if the agent was responding. +func (m *mockAgentIO) SimulateChunks(chunks ...string) { + m.mu.Lock() + fn := m.onChunkFn + m.mu.Unlock() + for _, chunk := range chunks { + if fn != nil { + fn(chunk) + } + } +} + +// GetWritten returns all data written to the agent. +func (m *mockAgentIO) GetWritten() []byte { + m.mu.Lock() + defer m.mu.Unlock() + return append([]byte(nil), m.written...) +} + +// BlockWrite sets up blocking for the next Write call and returns: +// - started: a channel that is closed when Write begins +// - done: a channel to close to unblock the Write +func (m *mockAgentIO) BlockWrite() (started chan struct{}, done chan struct{}) { + m.mu.Lock() + defer m.mu.Unlock() + m.writeStarted = make(chan struct{}) + m.writeBlock = make(chan struct{}) + return m.writeStarted, m.writeBlock +} + +func Test_NewACPConversation(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + + require.NotNil(t, conv) +} + +func Test_Messages_InitiallyEmpty(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + + messages := conv.Messages() + + assert.Empty(t, messages) +} + +func Test_Status_InitiallyStable(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + + status := conv.Status() + + assert.Equal(t, screentracker.ConversationStatusStable, status) +} + +func Test_Send_AddsUserMessage(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Set up blocking to synchronize with the goroutine + started, done := mock.BlockWrite() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + conv.Start(context.Background()) + + err := conv.Send(screentracker.MessagePartText{Content: "hello"}) + require.NoError(t, err) + + // Wait for the write goroutine to start + <-started + + messages := conv.Messages() + require.Len(t, messages, 2) // user message + placeholder agent message + + assert.Equal(t, screentracker.ConversationRoleUser, messages[0].Role) + assert.Equal(t, "hello", messages[0].Message) + assert.Equal(t, screentracker.ConversationRoleAgent, messages[1].Role) + + // Unblock the write to let the test complete cleanly + close(done) +} + +func Test_Send_RejectsEmptyMessage(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + + err := conv.Send(screentracker.MessagePartText{Content: ""}) + + assert.ErrorIs(t, err, screentracker.ErrMessageValidationEmpty) +} + +func Test_Send_RejectsWhitespace(t *testing.T) { + tests := []struct { + name string + content string + }{ + {"leading space", " hello"}, + {"trailing space", "hello "}, + {"leading newline", "\nhello"}, + {"trailing newline", "hello\n"}, + {"both sides", " hello "}, + {"newlines both sides", "\nhello\n"}, + {"leading tab", "\thello"}, + {"trailing tab", "hello\t"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + + err := conv.Send(screentracker.MessagePartText{Content: tt.content}) + + assert.ErrorIs(t, err, screentracker.ErrMessageValidationWhitespace) + }) + } +} + +func Test_Send_RejectsDuplicateSend(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Block the write so it doesn't complete immediately + started, done := mock.BlockWrite() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + conv.Start(context.Background()) + + // First send should succeed + err := conv.Send(screentracker.MessagePartText{Content: "first"}) + require.NoError(t, err) + + // Wait for the write to start (ensuring we're in "prompting" state) + <-started + + // Second send while first is processing should fail + err = conv.Send(screentracker.MessagePartText{Content: "second"}) + assert.ErrorIs(t, err, screentracker.ErrMessageValidationChanging) + + // Unblock the write to let the test complete cleanly + close(done) +} + +func Test_Status_ChangesWhileProcessing(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Block the write so we can observe status changes + started, done := mock.BlockWrite() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + conv.Start(context.Background()) + + // Initially stable + assert.Equal(t, screentracker.ConversationStatusStable, conv.Status()) + + // Send a message + err := conv.Send(screentracker.MessagePartText{Content: "test"}) + require.NoError(t, err) + + // Wait for write to start + <-started + + // Status should be changing while processing + assert.Equal(t, screentracker.ConversationStatusChanging, conv.Status()) + + // Unblock the write + close(done) + + // Give the goroutine a chance to complete (status update happens after Write returns) + require.Eventually(t, func() bool { + return conv.Status() == screentracker.ConversationStatusStable + }, 100*time.Millisecond, 5*time.Millisecond, "status should return to stable") +} + +func Test_Text_ReturnsStreamingContent(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Block the write so we can simulate streaming during processing + started, done := mock.BlockWrite() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + conv.Start(context.Background()) + + // Initially empty + assert.Equal(t, "", conv.Text()) + + // Send a message + err := conv.Send(screentracker.MessagePartText{Content: "question"}) + require.NoError(t, err) + + // Wait for write to start + <-started + + // Simulate streaming chunks from agent + mock.SimulateChunks("Hello", " ", "world!") + + // Text should contain the streamed content + assert.Equal(t, "Hello world!", conv.Text()) + + // The last message should also be updated + messages := conv.Messages() + require.Len(t, messages, 2) + assert.Equal(t, "Hello world!", messages[1].Message) + + // Unblock the write to let the test complete cleanly + close(done) +} + +func Test_Emitter_CalledOnChanges(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Block the write so we can simulate chunks during processing + started, done := mock.BlockWrite() + + emitter := newMockEmitter() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, emitter, mClock) + conv.Start(context.Background()) + + // Send a message + err := conv.Send(screentracker.MessagePartText{Content: "test"}) + require.NoError(t, err) + + // Wait for write to start + <-started + + // Simulate chunks - each should trigger emitter calls + mock.SimulateChunks("chunk1") + mock.SimulateChunks("chunk2") + + emitter.mu.Lock() + messagesCallsBeforeComplete := emitter.messagesCalls + emitter.mu.Unlock() + + // Should have emit calls from chunks (each chunk emits messages, status, and screen) + assert.Equal(t, 2, messagesCallsBeforeComplete) + + // Unblock the write to complete processing + close(done) + + // Wait for completion emit + require.Eventually(t, func() bool { + emitter.mu.Lock() + c := emitter.messagesCalls + emitter.mu.Unlock() + return c >= 3 // 2 from chunks + 1 from completion + }, 100*time.Millisecond, 5*time.Millisecond, "should receive completion emit") +} + +func Test_InitialPrompt_SentOnStart(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Set up blocking to synchronize with the initial prompt send + started, done := mock.BlockWrite() + + initialPrompt := []screentracker.MessagePart{ + screentracker.MessagePartText{Content: "initial prompt"}, + } + + conv := acpio.NewACPConversation(context.Background(), mock, nil, initialPrompt, nil, mClock) + conv.Start(context.Background()) + + // Wait for write to start (initial prompt is being sent) + <-started + + // Should have user message from initial prompt + messages := conv.Messages() + require.GreaterOrEqual(t, len(messages), 1) + assert.Equal(t, screentracker.ConversationRoleUser, messages[0].Role) + assert.Equal(t, "initial prompt", messages[0].Message) + + // Unblock the write to let the test complete cleanly + close(done) +} + +func Test_Messages_AreCopied(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Set up blocking to synchronize + started, done := mock.BlockWrite() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + conv.Start(context.Background()) + + err := conv.Send(screentracker.MessagePartText{Content: "test"}) + require.NoError(t, err) + + // Wait for write to start + <-started + + // Get messages and modify + messages := conv.Messages() + require.Len(t, messages, 2) + messages[0].Message = "modified" + + // Original should be unchanged + originalMessages := conv.Messages() + assert.Equal(t, "test", originalMessages[0].Message) + + // Unblock the write to let the test complete cleanly + close(done) +} + +func Test_ErrorRemovesPartialMessage(t *testing.T) { + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + // Block the write so we can simulate partial content before error + started, done := mock.BlockWrite() + + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + conv.Start(context.Background()) + + // Send a message + err := conv.Send(screentracker.MessagePartText{Content: "test"}) + require.NoError(t, err) + + // Wait for write to start + <-started + + // Should have user message + placeholder agent message + messages := conv.Messages() + require.Len(t, messages, 2) + assert.Equal(t, screentracker.ConversationRoleUser, messages[0].Role) + assert.Equal(t, screentracker.ConversationRoleAgent, messages[1].Role) + + // Simulate the agent streaming partial content before the error + mock.SimulateChunks("partial ", "response ", "content") + + // Verify partial content is in the agent message + messages = conv.Messages() + require.Len(t, messages, 2) + assert.Equal(t, "partial response content", messages[1].Message) + + // Now configure the mock to return an error and unblock + mock.mu.Lock() + mock.writeErr = assert.AnError + mock.mu.Unlock() + close(done) + + // Wait for the conversation to stabilize after the error + require.Eventually(t, func() bool { + return conv.Status() == screentracker.ConversationStatusStable + }, 100*time.Millisecond, 5*time.Millisecond, "status should return to stable") + + // The partial agent message should be removed on error. + // Only the user message should remain. + messages = conv.Messages() + require.Len(t, messages, 1, "partial agent message should be removed on error") + assert.Equal(t, screentracker.ConversationRoleUser, messages[0].Role) + assert.Equal(t, "test", messages[0].Message) +} diff --git a/x/acpio/acpio.go b/x/acpio/acpio.go new file mode 100644 index 00000000..77db963e --- /dev/null +++ b/x/acpio/acpio.go @@ -0,0 +1,239 @@ +package acpio + +import ( + "context" + "fmt" + "io" + "log/slog" + "strings" + "sync" + "time" + + acp "github.com/coder/acp-go-sdk" + st "github.com/coder/agentapi/lib/screentracker" +) + +// Compile-time assertion that ACPAgentIO implements st.AgentIO +var _ st.AgentIO = (*ACPAgentIO)(nil) + +// DefaultPromptTimeout is the maximum time to wait for an agent response. +const DefaultPromptTimeout = 5 * time.Minute + +// ACPAgentIO implements screentracker.AgentIO using the ACP protocol +type ACPAgentIO struct { + ctx context.Context + conn *acp.ClientSideConnection + sessionID acp.SessionId + mu sync.RWMutex + response strings.Builder + logger *slog.Logger + onChunk func(chunk string) // called on each streaming chunk +} + +// acpClient implements acp.Client to handle callbacks from the agent +type acpClient struct { + agentIO *ACPAgentIO +} + +var _ acp.Client = (*acpClient)(nil) + +func (c *acpClient) SessionUpdate(ctx context.Context, params acp.SessionNotification) error { + c.agentIO.logger.Debug("SessionUpdate received", + "sessionId", params.SessionId, + "hasAgentMessageChunk", params.Update.AgentMessageChunk != nil) + + if params.Update.AgentMessageChunk != nil { + if text := params.Update.AgentMessageChunk.Content.Text; text != nil { + c.agentIO.logger.Debug("AgentMessageChunk text", + "text", text.Text, + "textLen", len(text.Text)) + c.agentIO.mu.Lock() + c.agentIO.response.WriteString(text.Text) + onChunk := c.agentIO.onChunk + c.agentIO.mu.Unlock() + if onChunk != nil { + onChunk(text.Text) + } + } + } + + // Handle tool calls - format as text and append to response + if params.Update.ToolCall != nil { + tc := params.Update.ToolCall + formatted := fmt.Sprintf("\n[Tool: %s] %s\n", tc.Kind, tc.Title) + c.agentIO.mu.Lock() + c.agentIO.response.WriteString(formatted) + onChunk := c.agentIO.onChunk + c.agentIO.mu.Unlock() + if onChunk != nil { + onChunk(formatted) + } + } + + if params.Update.ToolCallUpdate != nil { + tcu := params.Update.ToolCallUpdate + var formatted string + if tcu.Status != nil { + formatted = fmt.Sprintf("[Tool Status: %s]\n", *tcu.Status) + } + if formatted != "" { + c.agentIO.mu.Lock() + c.agentIO.response.WriteString(formatted) + onChunk := c.agentIO.onChunk + c.agentIO.mu.Unlock() + if onChunk != nil { + onChunk(formatted) + } + } + } + + return nil +} + +func (c *acpClient) RequestPermission(ctx context.Context, params acp.RequestPermissionRequest) (acp.RequestPermissionResponse, error) { + // Auto-approve all permissions for Phase 1 + return acp.RequestPermissionResponse{ + Outcome: acp.RequestPermissionOutcome{ + Selected: &acp.RequestPermissionOutcomeSelected{OptionId: "allow"}, + }, + }, nil +} + +func (c *acpClient) ReadTextFile(ctx context.Context, params acp.ReadTextFileRequest) (acp.ReadTextFileResponse, error) { + return acp.ReadTextFileResponse{}, nil +} + +func (c *acpClient) WriteTextFile(ctx context.Context, params acp.WriteTextFileRequest) (acp.WriteTextFileResponse, error) { + return acp.WriteTextFileResponse{}, nil +} + +func (c *acpClient) CreateTerminal(ctx context.Context, params acp.CreateTerminalRequest) (acp.CreateTerminalResponse, error) { + return acp.CreateTerminalResponse{}, nil +} + +func (c *acpClient) KillTerminalCommand(ctx context.Context, params acp.KillTerminalCommandRequest) (acp.KillTerminalCommandResponse, error) { + return acp.KillTerminalCommandResponse{}, nil +} + +func (c *acpClient) TerminalOutput(ctx context.Context, params acp.TerminalOutputRequest) (acp.TerminalOutputResponse, error) { + return acp.TerminalOutputResponse{}, nil +} + +func (c *acpClient) ReleaseTerminal(ctx context.Context, params acp.ReleaseTerminalRequest) (acp.ReleaseTerminalResponse, error) { + return acp.ReleaseTerminalResponse{}, nil +} + +func (c *acpClient) WaitForTerminalExit(ctx context.Context, params acp.WaitForTerminalExitRequest) (acp.WaitForTerminalExitResponse, error) { + return acp.WaitForTerminalExitResponse{}, nil +} + +// SetOnChunk sets a callback that will be called for each streaming chunk. +func (a *ACPAgentIO) SetOnChunk(fn func(chunk string)) { + a.mu.Lock() + defer a.mu.Unlock() + a.onChunk = fn +} + +// NewWithPipes creates an ACPAgentIO connected via the provided pipes +func NewWithPipes(ctx context.Context, toAgent io.Writer, fromAgent io.Reader, logger *slog.Logger, getwd func() (string, error)) (*ACPAgentIO, error) { + if logger == nil { + logger = slog.Default() + } + agentIO := &ACPAgentIO{ctx: ctx, logger: logger} + client := &acpClient{agentIO: agentIO} + + conn := acp.NewClientSideConnection(client, toAgent, fromAgent) + agentIO.conn = conn + + logger.Debug("Initializing ACP connection") + + // Initialize the connection + initResp, err := conn.Initialize(ctx, acp.InitializeRequest{ + ProtocolVersion: acp.ProtocolVersionNumber, + ClientCapabilities: acp.ClientCapabilities{}, + }) + if err != nil { + logger.Error("Failed to initialize ACP connection", "error", err) + return nil, err + } + logger.Debug("ACP initialized", "protocolVersion", initResp.ProtocolVersion) + + // Create a session + cwd, err := getwd() + if err != nil { + logger.Error("Failed to get working directory", "error", err) + return nil, err + } + sessResp, err := conn.NewSession(ctx, acp.NewSessionRequest{ + Cwd: cwd, + McpServers: []acp.McpServer{}, + }) + if err != nil { + logger.Error("Failed to create ACP session", "error", err) + return nil, err + } + agentIO.sessionID = sessResp.SessionId + logger.Debug("ACP session created", "sessionId", sessResp.SessionId) + + return agentIO, nil +} + +// Write sends a message to the agent via ACP prompt +func (a *ACPAgentIO) Write(data []byte) (int, error) { + text := string(data) + + // Strip bracketed paste escape sequences if present + text = strings.TrimPrefix(text, "\x1b[200~") + text = strings.TrimSuffix(text, "\x1b[201~") + + // Strip terminal hack sequences (x\b pattern used for Claude Code compatibility) + text = strings.TrimPrefix(text, "x\b") + + text = strings.TrimSpace(text) + + // Don't send empty prompts + if text == "" { + a.logger.Debug("Ignoring empty prompt", "rawDataLen", len(data)) + return len(data), nil + } + + // Clear previous response + a.mu.Lock() + a.response.Reset() + a.mu.Unlock() + + a.logger.Debug("Sending prompt", + "sessionId", a.sessionID, + "text", text, + "textLen", len(text), + "rawDataLen", len(data)) + + // Ensure the context has not been cancelled before writing prompt + if err := a.ctx.Err(); err != nil { + a.logger.Debug("Aborting write", "error", err) + return 0, err + } + // Use a timeout to prevent hanging indefinitely + promptCtx, cancel := context.WithTimeout(a.ctx, DefaultPromptTimeout) + defer cancel() + + resp, err := a.conn.Prompt(promptCtx, acp.PromptRequest{ + SessionId: a.sessionID, + Prompt: []acp.ContentBlock{acp.TextBlock(text)}, + }) + if err != nil { + a.logger.Error("Prompt failed", "error", err) + return 0, err + } + + a.logger.Debug("Prompt completed", "stopReason", resp.StopReason) + + return len(data), nil +} + +// ReadScreen returns the accumulated agent response +func (a *ACPAgentIO) ReadScreen() string { + a.mu.RLock() + defer a.mu.RUnlock() + return a.response.String() +} From 19b3a8352a911974c0adfe918971476052f8ae88 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 24 Feb 2026 12:09:29 +0000 Subject: [PATCH 2/8] chore: add MockEmitter.WaitForStatus --- x/acpio/acp_conversation_test.go | 92 +++++++++++++++++++++----------- 1 file changed, 62 insertions(+), 30 deletions(-) diff --git a/x/acpio/acp_conversation_test.go b/x/acpio/acp_conversation_test.go index 0811bc3b..fc6f0459 100644 --- a/x/acpio/acp_conversation_test.go +++ b/x/acpio/acp_conversation_test.go @@ -35,17 +35,20 @@ type mockAgentIO struct { // mockEmitter implements screentracker.Emitter for testing. type mockEmitter struct { - mu sync.Mutex - messagesCalls int - statusCalls int - screenCalls int - lastMessages []screentracker.ConversationMessage - lastStatus screentracker.ConversationStatus - lastScreen string + mu sync.Mutex + cond *sync.Cond + messagesCalls int + statusCalls int + screenCalls int + lastMessages []screentracker.ConversationMessage + lastStatus screentracker.ConversationStatus + lastScreen string } func newMockEmitter() *mockEmitter { - return &mockEmitter{} + m := &mockEmitter{} + m.cond = sync.NewCond(&m.mu) + return m } func (m *mockEmitter) EmitMessages(messages []screentracker.ConversationMessage) { @@ -60,6 +63,7 @@ func (m *mockEmitter) EmitStatus(status screentracker.ConversationStatus) { defer m.mu.Unlock() m.statusCalls++ m.lastStatus = status + m.cond.Broadcast() } func (m *mockEmitter) EmitScreen(screen string) { @@ -75,6 +79,30 @@ func (m *mockEmitter) TotalCalls() int { return m.messagesCalls + m.statusCalls + m.screenCalls } +// WaitForStatus blocks until the emitter's last status matches target. +// Must be called with a context that has a deadline to avoid hanging tests. +func (m *mockEmitter) WaitForStatus(ctx context.Context, t *testing.T, target screentracker.ConversationStatus) { + t.Helper() + if _, ok := ctx.Deadline(); !ok { + t.Fatal("must set a deadline to avoid hanging tests") + } + done := make(chan struct{}) + go func() { + m.mu.Lock() + defer m.mu.Unlock() + for m.lastStatus != target { + m.cond.Wait() + } + close(done) + }() + select { + case <-done: + case <-ctx.Done(): + m.cond.Broadcast() // unblock the goroutine + t.Fatalf("timed out waiting for %q status", target) + } +} + func newMockAgentIO() *mockAgentIO { return &mockAgentIO{} } @@ -265,16 +293,17 @@ func Test_Send_RejectsDuplicateSend(t *testing.T) { } func Test_Status_ChangesWhileProcessing(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + mClock := quartz.NewMock(t) mock := newMockAgentIO() + emitter := newMockEmitter() // Block the write so we can observe status changes started, done := mock.BlockWrite() - conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) - conv.Start(context.Background()) - - // Initially stable - assert.Equal(t, screentracker.ConversationStatusStable, conv.Status()) + conv := acpio.NewACPConversation(ctx, mock, nil, nil, emitter, mClock) + conv.Start(ctx) // Send a message err := conv.Send(screentracker.MessagePartText{Content: "test"}) @@ -289,10 +318,8 @@ func Test_Status_ChangesWhileProcessing(t *testing.T) { // Unblock the write close(done) - // Give the goroutine a chance to complete (status update happens after Write returns) - require.Eventually(t, func() bool { - return conv.Status() == screentracker.ConversationStatusStable - }, 100*time.Millisecond, 5*time.Millisecond, "status should return to stable") + // Wait for the goroutine to complete - status should then be stable. + emitter.WaitForStatus(ctx, t, screentracker.ConversationStatusStable) } func Test_Text_ReturnsStreamingContent(t *testing.T) { @@ -337,8 +364,11 @@ func Test_Emitter_CalledOnChanges(t *testing.T) { emitter := newMockEmitter() - conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, emitter, mClock) - conv.Start(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + conv := acpio.NewACPConversation(ctx, mock, nil, nil, emitter, mClock) + conv.Start(ctx) // Send a message err := conv.Send(screentracker.MessagePartText{Content: "test"}) @@ -362,12 +392,12 @@ func Test_Emitter_CalledOnChanges(t *testing.T) { close(done) // Wait for completion emit - require.Eventually(t, func() bool { - emitter.mu.Lock() - c := emitter.messagesCalls - emitter.mu.Unlock() - return c >= 3 // 2 from chunks + 1 from completion - }, 100*time.Millisecond, 5*time.Millisecond, "should receive completion emit") + emitter.WaitForStatus(ctx, t, screentracker.ConversationStatusStable) + + emitter.mu.Lock() + finalMessagesCalls := emitter.messagesCalls + emitter.mu.Unlock() + assert.GreaterOrEqual(t, finalMessagesCalls, 3, "2 from chunks + 1 from completion") } func Test_InitialPrompt_SentOnStart(t *testing.T) { @@ -425,13 +455,17 @@ func Test_Messages_AreCopied(t *testing.T) { } func Test_ErrorRemovesPartialMessage(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + mClock := quartz.NewMock(t) mock := newMockAgentIO() + emitter := newMockEmitter() // Block the write so we can simulate partial content before error started, done := mock.BlockWrite() - conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) - conv.Start(context.Background()) + conv := acpio.NewACPConversation(ctx, mock, nil, nil, emitter, mClock) + conv.Start(ctx) // Send a message err := conv.Send(screentracker.MessagePartText{Content: "test"}) @@ -461,9 +495,7 @@ func Test_ErrorRemovesPartialMessage(t *testing.T) { close(done) // Wait for the conversation to stabilize after the error - require.Eventually(t, func() bool { - return conv.Status() == screentracker.ConversationStatusStable - }, 100*time.Millisecond, 5*time.Millisecond, "status should return to stable") + emitter.WaitForStatus(ctx, t, screentracker.ConversationStatusStable) // The partial agent message should be removed on error. // Only the user message should remain. From dda3717cade21306abf5204deea04912f62a6256 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 24 Feb 2026 12:09:58 +0000 Subject: [PATCH 3/8] fix: do not corrupt user message if late chunk arrives after error --- x/acpio/acp_conversation.go | 13 +++++++++-- x/acpio/acp_conversation_test.go | 38 ++++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/x/acpio/acp_conversation.go b/x/acpio/acp_conversation.go index f58e7ff3..80f72aba 100644 --- a/x/acpio/acp_conversation.go +++ b/x/acpio/acp_conversation.go @@ -179,9 +179,18 @@ func (c *ACPConversation) Text() string { // handleChunk is called for each streaming chunk from the agent. func (c *ACPConversation) handleChunk(chunk string) { c.mu.Lock() + // Log and discard chunks that arrive after the prompt has completed or errored. + // This should not happen under normal operation — if it does, it indicates a + // bug in the ACP SDK or a race in the connection teardown. + if !c.prompting { + c.mu.Unlock() + c.logger.Error("received chunk while not prompting (late/unexpected chunk discarded)", + "chunkLen", len(chunk)) + return + } c.streamingResponse.WriteString(chunk) - // Update the last message (the streaming agent response) - if len(c.messages) > 0 { + // Only update the last message if it's the agent placeholder (defense-in-depth) + if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent { c.messages[len(c.messages)-1].Message = c.streamingResponse.String() } messages := slices.Clone(c.messages) diff --git a/x/acpio/acp_conversation_test.go b/x/acpio/acp_conversation_test.go index fc6f0459..eeb58b0d 100644 --- a/x/acpio/acp_conversation_test.go +++ b/x/acpio/acp_conversation_test.go @@ -504,3 +504,41 @@ func Test_ErrorRemovesPartialMessage(t *testing.T) { assert.Equal(t, screentracker.ConversationRoleUser, messages[0].Role) assert.Equal(t, "test", messages[0].Message) } + +func Test_LateChunkAfterError_DoesNotCorruptUserMessage(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + mClock := quartz.NewMock(t) + mock := newMockAgentIO() + emitter := newMockEmitter() + started, done := mock.BlockWrite() + + conv := acpio.NewACPConversation(ctx, mock, nil, nil, emitter, mClock) + conv.Start(ctx) + + // Given: a send that fails with an error, removing the agent placeholder + err := conv.Send(screentracker.MessagePartText{Content: "hello"}) + require.NoError(t, err) + <-started + + mock.mu.Lock() + mock.writeErr = assert.AnError + mock.mu.Unlock() + close(done) + + emitter.WaitForStatus(ctx, t, screentracker.ConversationStatusStable) + + messages := conv.Messages() + require.Len(t, messages, 1, "agent placeholder should be removed after error") + assert.Equal(t, "hello", messages[0].Message) + + // When: a late chunk arrives after the prompt has already errored + mock.SimulateChunks("late response data") + + // Then: the user message is not corrupted + messages = conv.Messages() + require.Len(t, messages, 1, "no new messages should appear from a late chunk") + assert.Equal(t, "hello", messages[0].Message) + assert.Equal(t, screentracker.ConversationRoleUser, messages[0].Role) +} From 49ebb86d0f6ff2df0f72e151999fc099fe8787a3 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 24 Feb 2026 13:10:14 +0000 Subject: [PATCH 4/8] make ACPConversation.Send() sync to match PTYConversation behaviour --- go.mod | 1 + x/acpio/acp_conversation.go | 32 +++++++------ x/acpio/acp_conversation_test.go | 82 +++++++++++++++++--------------- x/acpio/main_test.go | 11 +++++ 4 files changed, 72 insertions(+), 54 deletions(-) create mode 100644 x/acpio/main_test.go diff --git a/go.mod b/go.mod index 5c1a486f..e43f5ea3 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/spf13/viper v1.20.1 github.com/stretchr/testify v1.11.1 github.com/tmaxmax/go-sse v0.10.0 + go.uber.org/goleak v1.3.0 golang.org/x/term v0.30.0 golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da ) diff --git a/x/acpio/acp_conversation.go b/x/acpio/acp_conversation.go index 80f72aba..962f661f 100644 --- a/x/acpio/acp_conversation.go +++ b/x/acpio/acp_conversation.go @@ -78,10 +78,10 @@ func (c *ACPConversation) Messages() []st.ConversationMessage { return slices.Clone(c.messages) } -// Send sends a message to the agent asynchronously. -// It returns immediately after recording the user message and starts -// the agent request in a background goroutine. Returns an error if -// a message is already being processed. +// Send sends a message to the agent synchronously. +// It blocks until the agent has finished processing and returns any error +// from the underlying write. Returns a validation error immediately if +// the message is invalid or another message is already being processed. func (c *ACPConversation) Send(messageParts ...st.MessagePart) error { message := "" for _, part := range messageParts { @@ -126,10 +126,7 @@ func (c *ACPConversation) Send(messageParts ...st.MessagePart) error { c.logger.Debug("ACPConversation sending message", "message", message) - // Run the blocking write in a goroutine so HTTP returns immediately - go c.executePrompt(messageParts) - - return nil + return c.executePrompt(messageParts) } // Start sets up chunk handling and sends the initial prompt if provided. @@ -139,10 +136,14 @@ func (c *ACPConversation) Start(ctx context.Context) { // Send initial prompt if provided if len(c.initialPrompt) > 0 { - err := c.Send(c.initialPrompt...) - if err != nil { - c.logger.Error("ACPConversation failed to send initial prompt", "error", err) - } + // Run in a goroutine because Send blocks until the prompt completes, + // and Start must return immediately per the Conversation interface. + go func() { + err := c.Send(c.initialPrompt...) + if err != nil { + c.logger.Error("ACPConversation failed to send initial prompt", "error", err) + } + }() } else { // No initial prompt means we start in stable state c.emitter.EmitStatus(c.Status()) @@ -203,8 +204,8 @@ func (c *ACPConversation) handleChunk(chunk string) { c.emitter.EmitScreen(screen) } -// executePrompt runs the actual agent request in background -func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) { +// executePrompt runs the actual agent request and returns any error. +func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) error { var err error for _, part := range messageParts { if c.ctx.Err() != nil { @@ -234,7 +235,7 @@ func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) { c.emitter.EmitMessages(messages) c.emitter.EmitStatus(status) c.emitter.EmitScreen(screen) - return + return err } // Final response should already be in the last message via streaming @@ -253,4 +254,5 @@ func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) { c.emitter.EmitScreen(screen) c.logger.Debug("ACPConversation message complete", "responseLen", len(response)) + return nil } diff --git a/x/acpio/acp_conversation_test.go b/x/acpio/acp_conversation_test.go index eeb58b0d..eb0b4fb6 100644 --- a/x/acpio/acp_conversation_test.go +++ b/x/acpio/acp_conversation_test.go @@ -207,16 +207,17 @@ func Test_Status_InitiallyStable(t *testing.T) { func Test_Send_AddsUserMessage(t *testing.T) { mClock := quartz.NewMock(t) mock := newMockAgentIO() - // Set up blocking to synchronize with the goroutine + // Set up blocking so we can inspect state mid-flight started, done := mock.BlockWrite() conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) conv.Start(context.Background()) - err := conv.Send(screentracker.MessagePartText{Content: "hello"}) - require.NoError(t, err) + // Send blocks until completion, so run in a goroutine + errCh := make(chan error, 1) + go func() { errCh <- conv.Send(screentracker.MessagePartText{Content: "hello"}) }() - // Wait for the write goroutine to start + // Wait for the write to start <-started messages := conv.Messages() @@ -226,8 +227,9 @@ func Test_Send_AddsUserMessage(t *testing.T) { assert.Equal(t, "hello", messages[0].Message) assert.Equal(t, screentracker.ConversationRoleAgent, messages[1].Role) - // Unblock the write to let the test complete cleanly + // Unblock the write to let Send complete close(done) + require.NoError(t, <-errCh) } func Test_Send_RejectsEmptyMessage(t *testing.T) { @@ -277,19 +279,20 @@ func Test_Send_RejectsDuplicateSend(t *testing.T) { conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) conv.Start(context.Background()) - // First send should succeed - err := conv.Send(screentracker.MessagePartText{Content: "first"}) - require.NoError(t, err) + // First send blocks, so run in a goroutine + errCh := make(chan error, 1) + go func() { errCh <- conv.Send(screentracker.MessagePartText{Content: "first"}) }() // Wait for the write to start (ensuring we're in "prompting" state) <-started // Second send while first is processing should fail - err = conv.Send(screentracker.MessagePartText{Content: "second"}) + err := conv.Send(screentracker.MessagePartText{Content: "second"}) assert.ErrorIs(t, err, screentracker.ErrMessageValidationChanging) // Unblock the write to let the test complete cleanly close(done) + require.NoError(t, <-errCh) } func Test_Status_ChangesWhileProcessing(t *testing.T) { @@ -305,9 +308,9 @@ func Test_Status_ChangesWhileProcessing(t *testing.T) { conv := acpio.NewACPConversation(ctx, mock, nil, nil, emitter, mClock) conv.Start(ctx) - // Send a message - err := conv.Send(screentracker.MessagePartText{Content: "test"}) - require.NoError(t, err) + // Send blocks, so run in a goroutine + errCh := make(chan error, 1) + go func() { errCh <- conv.Send(screentracker.MessagePartText{Content: "test"}) }() // Wait for write to start <-started @@ -318,7 +321,8 @@ func Test_Status_ChangesWhileProcessing(t *testing.T) { // Unblock the write close(done) - // Wait for the goroutine to complete - status should then be stable. + // Wait for Send to complete - status should then be stable. + require.NoError(t, <-errCh) emitter.WaitForStatus(ctx, t, screentracker.ConversationStatusStable) } @@ -334,9 +338,9 @@ func Test_Text_ReturnsStreamingContent(t *testing.T) { // Initially empty assert.Equal(t, "", conv.Text()) - // Send a message - err := conv.Send(screentracker.MessagePartText{Content: "question"}) - require.NoError(t, err) + // Send blocks, so run in a goroutine + errCh := make(chan error, 1) + go func() { errCh <- conv.Send(screentracker.MessagePartText{Content: "question"}) }() // Wait for write to start <-started @@ -352,8 +356,9 @@ func Test_Text_ReturnsStreamingContent(t *testing.T) { require.Len(t, messages, 2) assert.Equal(t, "Hello world!", messages[1].Message) - // Unblock the write to let the test complete cleanly + // Unblock the write to let Send complete close(done) + require.NoError(t, <-errCh) } func Test_Emitter_CalledOnChanges(t *testing.T) { @@ -370,9 +375,9 @@ func Test_Emitter_CalledOnChanges(t *testing.T) { conv := acpio.NewACPConversation(ctx, mock, nil, nil, emitter, mClock) conv.Start(ctx) - // Send a message - err := conv.Send(screentracker.MessagePartText{Content: "test"}) - require.NoError(t, err) + // Send blocks, so run in a goroutine + errCh := make(chan error, 1) + go func() { errCh <- conv.Send(screentracker.MessagePartText{Content: "test"}) }() // Wait for write to start <-started @@ -390,6 +395,7 @@ func Test_Emitter_CalledOnChanges(t *testing.T) { // Unblock the write to complete processing close(done) + require.NoError(t, <-errCh) // Wait for completion emit emitter.WaitForStatus(ctx, t, screentracker.ConversationStatusStable) @@ -413,7 +419,7 @@ func Test_InitialPrompt_SentOnStart(t *testing.T) { conv := acpio.NewACPConversation(context.Background(), mock, nil, initialPrompt, nil, mClock) conv.Start(context.Background()) - // Wait for write to start (initial prompt is being sent) + // Wait for write to start (initial prompt is being sent via Start's goroutine) <-started // Should have user message from initial prompt @@ -429,14 +435,15 @@ func Test_InitialPrompt_SentOnStart(t *testing.T) { func Test_Messages_AreCopied(t *testing.T) { mClock := quartz.NewMock(t) mock := newMockAgentIO() - // Set up blocking to synchronize + // Set up blocking so we can inspect state mid-flight started, done := mock.BlockWrite() conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) conv.Start(context.Background()) - err := conv.Send(screentracker.MessagePartText{Content: "test"}) - require.NoError(t, err) + // Send blocks, so run in a goroutine + errCh := make(chan error, 1) + go func() { errCh <- conv.Send(screentracker.MessagePartText{Content: "test"}) }() // Wait for write to start <-started @@ -450,8 +457,9 @@ func Test_Messages_AreCopied(t *testing.T) { originalMessages := conv.Messages() assert.Equal(t, "test", originalMessages[0].Message) - // Unblock the write to let the test complete cleanly + // Unblock the write to let Send complete close(done) + require.NoError(t, <-errCh) } func Test_ErrorRemovesPartialMessage(t *testing.T) { @@ -467,9 +475,9 @@ func Test_ErrorRemovesPartialMessage(t *testing.T) { conv := acpio.NewACPConversation(ctx, mock, nil, nil, emitter, mClock) conv.Start(ctx) - // Send a message - err := conv.Send(screentracker.MessagePartText{Content: "test"}) - require.NoError(t, err) + // Send blocks, so run in a goroutine + errCh := make(chan error, 1) + go func() { errCh <- conv.Send(screentracker.MessagePartText{Content: "test"}) }() // Wait for write to start <-started @@ -494,8 +502,8 @@ func Test_ErrorRemovesPartialMessage(t *testing.T) { mock.mu.Unlock() close(done) - // Wait for the conversation to stabilize after the error - emitter.WaitForStatus(ctx, t, screentracker.ConversationStatusStable) + // Send should return the error + require.ErrorIs(t, <-errCh, assert.AnError) // The partial agent message should be removed on error. // Only the user message should remain. @@ -506,20 +514,16 @@ func Test_ErrorRemovesPartialMessage(t *testing.T) { } func Test_LateChunkAfterError_DoesNotCorruptUserMessage(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - mClock := quartz.NewMock(t) mock := newMockAgentIO() - emitter := newMockEmitter() started, done := mock.BlockWrite() - conv := acpio.NewACPConversation(ctx, mock, nil, nil, emitter, mClock) - conv.Start(ctx) + conv := acpio.NewACPConversation(context.Background(), mock, nil, nil, nil, mClock) + conv.Start(context.Background()) // Given: a send that fails with an error, removing the agent placeholder - err := conv.Send(screentracker.MessagePartText{Content: "hello"}) - require.NoError(t, err) + errCh := make(chan error, 1) + go func() { errCh <- conv.Send(screentracker.MessagePartText{Content: "hello"}) }() <-started mock.mu.Lock() @@ -527,7 +531,7 @@ func Test_LateChunkAfterError_DoesNotCorruptUserMessage(t *testing.T) { mock.mu.Unlock() close(done) - emitter.WaitForStatus(ctx, t, screentracker.ConversationStatusStable) + require.ErrorIs(t, <-errCh, assert.AnError) messages := conv.Messages() require.Len(t, messages, 1, "agent placeholder should be removed after error") diff --git a/x/acpio/main_test.go b/x/acpio/main_test.go new file mode 100644 index 00000000..ee8b314a --- /dev/null +++ b/x/acpio/main_test.go @@ -0,0 +1,11 @@ +package acpio_test + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} From 5bb81350a82f2983c31e3a475dcef42e1e218874 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 24 Feb 2026 13:14:58 +0000 Subject: [PATCH 5/8] fix divergence between Text() and Messages() --- x/acpio/acp_conversation.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x/acpio/acp_conversation.go b/x/acpio/acp_conversation.go index 962f661f..975a2b6a 100644 --- a/x/acpio/acp_conversation.go +++ b/x/acpio/acp_conversation.go @@ -242,7 +242,8 @@ func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) error { // but ensure it's finalized response := c.streamingResponse.String() if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent { - c.messages[len(c.messages)-1].Message = strings.TrimSpace(response) + // Intentionally not trimming space here. + c.messages[len(c.messages)-1].Message = response } messages := slices.Clone(c.messages) status := c.statusLocked() From f9a6afc83f0f032766232b2446c0c214ba268e69 Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 24 Feb 2026 13:23:49 +0000 Subject: [PATCH 6/8] remove default prompt timeout in acpio --- x/acpio/acpio.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/x/acpio/acpio.go b/x/acpio/acpio.go index 77db963e..1a440132 100644 --- a/x/acpio/acpio.go +++ b/x/acpio/acpio.go @@ -7,7 +7,6 @@ import ( "log/slog" "strings" "sync" - "time" acp "github.com/coder/acp-go-sdk" st "github.com/coder/agentapi/lib/screentracker" @@ -16,9 +15,6 @@ import ( // Compile-time assertion that ACPAgentIO implements st.AgentIO var _ st.AgentIO = (*ACPAgentIO)(nil) -// DefaultPromptTimeout is the maximum time to wait for an agent response. -const DefaultPromptTimeout = 5 * time.Minute - // ACPAgentIO implements screentracker.AgentIO using the ACP protocol type ACPAgentIO struct { ctx context.Context @@ -213,11 +209,8 @@ func (a *ACPAgentIO) Write(data []byte) (int, error) { a.logger.Debug("Aborting write", "error", err) return 0, err } - // Use a timeout to prevent hanging indefinitely - promptCtx, cancel := context.WithTimeout(a.ctx, DefaultPromptTimeout) - defer cancel() - resp, err := a.conn.Prompt(promptCtx, acp.PromptRequest{ + resp, err := a.conn.Prompt(a.ctx, acp.PromptRequest{ SessionId: a.sessionID, Prompt: []acp.ContentBlock{acp.TextBlock(text)}, }) From 16c5d22dfbbd38579dacc32faba63dea86a0771b Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 24 Feb 2026 14:50:09 +0000 Subject: [PATCH 7/8] chore: add acpio tests --- x/acpio/acpio_test.go | 251 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 251 insertions(+) create mode 100644 x/acpio/acpio_test.go diff --git a/x/acpio/acpio_test.go b/x/acpio/acpio_test.go new file mode 100644 index 00000000..2e91c547 --- /dev/null +++ b/x/acpio/acpio_test.go @@ -0,0 +1,251 @@ +package acpio_test + +import ( + "context" + "io" + "os" + "sync" + "testing" + + acp "github.com/coder/acp-go-sdk" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/coder/agentapi/x/acpio" +) + +// testAgent implements acp.Agent for testing. +type testAgent struct { + conn *acp.AgentSideConnection + onPrompt func(ctx context.Context, conn *acp.AgentSideConnection, p acp.PromptRequest) (acp.PromptResponse, error) +} + +var _ acp.Agent = (*testAgent)(nil) + +func (a *testAgent) SetAgentConnection(c *acp.AgentSideConnection) { a.conn = c } + +func (a *testAgent) Authenticate(context.Context, acp.AuthenticateRequest) (acp.AuthenticateResponse, error) { + return acp.AuthenticateResponse{}, nil +} + +func (a *testAgent) Initialize(context.Context, acp.InitializeRequest) (acp.InitializeResponse, error) { + return acp.InitializeResponse{ + ProtocolVersion: acp.ProtocolVersionNumber, + AgentCapabilities: acp.AgentCapabilities{}, + }, nil +} + +func (a *testAgent) Cancel(context.Context, acp.CancelNotification) error { return nil } + +func (a *testAgent) NewSession(context.Context, acp.NewSessionRequest) (acp.NewSessionResponse, error) { + return acp.NewSessionResponse{SessionId: "test-session"}, nil +} + +func (a *testAgent) SetSessionMode(context.Context, acp.SetSessionModeRequest) (acp.SetSessionModeResponse, error) { + return acp.SetSessionModeResponse{}, nil +} + +func (a *testAgent) Prompt(ctx context.Context, p acp.PromptRequest) (acp.PromptResponse, error) { + if a.onPrompt != nil { + return a.onPrompt(ctx, a.conn, p) + } + return acp.PromptResponse{StopReason: acp.StopReasonEndTurn}, nil +} + +// newTestPair creates an ACPAgentIO connected to a testAgent via pipes. +func newTestPair(t *testing.T, agent *testAgent) *acpio.ACPAgentIO { + t.Helper() + + // Two pipe pairs: client writes → agent reads, agent writes → client reads. + clientToAgentR, clientToAgentW := io.Pipe() + agentToClientR, agentToClientW := io.Pipe() + + // Client side: peerInput=clientToAgentW (writes to agent), peerOutput=agentToClientR (reads from agent) + // Agent side: peerInput=agentToClientW (writes to client), peerOutput=clientToAgentR (reads from client) + asc := acp.NewAgentSideConnection(agent, agentToClientW, clientToAgentR) + agent.SetAgentConnection(asc) + + agentIO, err := acpio.NewWithPipes( + context.Background(), + clientToAgentW, agentToClientR, + nil, + func() (string, error) { return os.TempDir(), nil }, + ) + require.NoError(t, err) + + t.Cleanup(func() { + _ = clientToAgentW.Close() + _ = agentToClientW.Close() + }) + + return agentIO +} + +// chunkCollector collects chunks from SetOnChunk in a thread-safe way +// and provides a method to wait for a specific number of chunks. +type chunkCollector struct { + mu sync.Mutex + cond *sync.Cond + chunks []string +} + +func newChunkCollector() *chunkCollector { + c := &chunkCollector{} + c.cond = sync.NewCond(&c.mu) + return c +} + +func (c *chunkCollector) callback(chunk string) { + c.mu.Lock() + defer c.mu.Unlock() + c.chunks = append(c.chunks, chunk) + c.cond.Broadcast() +} + +func (c *chunkCollector) waitForN(t *testing.T, n int) []string { + t.Helper() + c.mu.Lock() + defer c.mu.Unlock() + for len(c.chunks) < n { + c.cond.Wait() + } + return append([]string(nil), c.chunks...) +} + +func Test_ACPAgentIO_WriteAndReadScreen(t *testing.T) { + collector := newChunkCollector() + agent := &testAgent{ + onPrompt: func(ctx context.Context, conn *acp.AgentSideConnection, p acp.PromptRequest) (acp.PromptResponse, error) { + _ = conn.SessionUpdate(ctx, acp.SessionNotification{ + SessionId: p.SessionId, + Update: acp.UpdateAgentMessageText("Hello from agent!"), + }) + return acp.PromptResponse{StopReason: acp.StopReasonEndTurn}, nil + }, + } + agentIO := newTestPair(t, agent) + agentIO.SetOnChunk(collector.callback) + + n, err := agentIO.Write([]byte("test prompt")) + require.NoError(t, err) + assert.Equal(t, len("test prompt"), n) + + // SessionUpdate notifications are async — wait for the chunk to arrive. + collector.waitForN(t, 1) + assert.Equal(t, "Hello from agent!", agentIO.ReadScreen()) +} + +func Test_ACPAgentIO_StreamingChunks(t *testing.T) { + collector := newChunkCollector() + agent := &testAgent{ + onPrompt: func(ctx context.Context, conn *acp.AgentSideConnection, p acp.PromptRequest) (acp.PromptResponse, error) { + for _, text := range []string{"Hello", " ", "world!"} { + _ = conn.SessionUpdate(ctx, acp.SessionNotification{ + SessionId: p.SessionId, + Update: acp.UpdateAgentMessageText(text), + }) + } + return acp.PromptResponse{StopReason: acp.StopReasonEndTurn}, nil + }, + } + agentIO := newTestPair(t, agent) + agentIO.SetOnChunk(collector.callback) + + _, err := agentIO.Write([]byte("test")) + require.NoError(t, err) + + // All three chunks should arrive (order may vary due to async notification handling). + chunks := collector.waitForN(t, 3) + assert.Len(t, chunks, 3) + assert.ElementsMatch(t, []string{"Hello", " ", "world!"}, chunks) +} + +func Test_ACPAgentIO_StripsEscapeSequences(t *testing.T) { + received := make(chan string, 1) + agent := &testAgent{ + onPrompt: func(ctx context.Context, conn *acp.AgentSideConnection, p acp.PromptRequest) (acp.PromptResponse, error) { + defer close(received) + for _, block := range p.Prompt { + if block.Text != nil { + received <- block.Text.Text + } + } + return acp.PromptResponse{StopReason: acp.StopReasonEndTurn}, nil + }, + } + agentIO := newTestPair(t, agent) + + // Bracketed paste sequences should be stripped + _, err := agentIO.Write([]byte("\x1b[200~hello world\x1b[201~")) + require.NoError(t, err) + assert.Equal(t, "hello world", <-received) +} + +func Test_ACPAgentIO_IgnoresEmptyPrompt(t *testing.T) { + agent := &testAgent{ + onPrompt: func(ctx context.Context, conn *acp.AgentSideConnection, p acp.PromptRequest) (acp.PromptResponse, error) { + assert.Fail(t, "empty prompt should not reach the agent") + return acp.PromptResponse{StopReason: acp.StopReasonEndTurn}, nil + }, + } + agentIO := newTestPair(t, agent) + + // Empty after stripping should be a no-op + n, err := agentIO.Write([]byte(" \t\n ")) + require.NoError(t, err) + assert.Equal(t, len(" \t\n "), n) +} + +func Test_ACPAgentIO_ToolCallFormattedAsText(t *testing.T) { + collector := newChunkCollector() + agent := &testAgent{ + onPrompt: func(ctx context.Context, conn *acp.AgentSideConnection, p acp.PromptRequest) (acp.PromptResponse, error) { + _ = conn.SessionUpdate(ctx, acp.SessionNotification{ + SessionId: p.SessionId, + Update: acp.StartToolCall( + "call_1", + "Reading file", + acp.WithStartKind(acp.ToolKindRead), + ), + }) + return acp.PromptResponse{StopReason: acp.StopReasonEndTurn}, nil + }, + } + agentIO := newTestPair(t, agent) + agentIO.SetOnChunk(collector.callback) + + _, err := agentIO.Write([]byte("do something")) + require.NoError(t, err) + + collector.waitForN(t, 1) + assert.Contains(t, agentIO.ReadScreen(), "[Tool: read]") + assert.Contains(t, agentIO.ReadScreen(), "Reading file") +} + +func Test_ACPAgentIO_ResetsResponseBetweenWrites(t *testing.T) { + collector := newChunkCollector() + callCount := 0 + agent := &testAgent{ + onPrompt: func(ctx context.Context, conn *acp.AgentSideConnection, p acp.PromptRequest) (acp.PromptResponse, error) { + callCount++ + _ = conn.SessionUpdate(ctx, acp.SessionNotification{ + SessionId: p.SessionId, + Update: acp.UpdateAgentMessageText("response " + string(rune('0'+callCount))), + }) + return acp.PromptResponse{StopReason: acp.StopReasonEndTurn}, nil + }, + } + agentIO := newTestPair(t, agent) + agentIO.SetOnChunk(collector.callback) + + _, err := agentIO.Write([]byte("first")) + require.NoError(t, err) + collector.waitForN(t, 1) + assert.Equal(t, "response 1", agentIO.ReadScreen()) + + _, err = agentIO.Write([]byte("second")) + require.NoError(t, err) + collector.waitForN(t, 2) + // Response should be reset, not accumulated + assert.Equal(t, "response 2", agentIO.ReadScreen()) +} From d8d37f95711fcddfc746c64b6414e770b6d13dce Mon Sep 17 00:00:00 2001 From: Cian Johnston Date: Tue, 24 Feb 2026 14:56:53 +0000 Subject: [PATCH 8/8] fix: do not reuse message IDs --- x/acpio/acp_conversation.go | 7 +++++-- x/acpio/acp_conversation_test.go | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/x/acpio/acp_conversation.go b/x/acpio/acp_conversation.go index 975a2b6a..3d514f2f 100644 --- a/x/acpio/acp_conversation.go +++ b/x/acpio/acp_conversation.go @@ -30,6 +30,7 @@ type ACPConversation struct { cancel context.CancelFunc agentIO ChunkableAgentIO messages []st.ConversationMessage + nextID int // monotonically increasing message ID prompting bool // true while agent is processing streamingResponse strings.Builder logger *slog.Logger @@ -104,18 +105,20 @@ func (c *ACPConversation) Send(messageParts ...st.MessagePart) error { return st.ErrMessageValidationChanging } c.messages = append(c.messages, st.ConversationMessage{ - Id: len(c.messages), + Id: c.nextID, Role: st.ConversationRoleUser, Message: message, Time: c.clock.Now(), }) + c.nextID++ // Add placeholder for streaming agent response c.messages = append(c.messages, st.ConversationMessage{ - Id: len(c.messages), + Id: c.nextID, Role: st.ConversationRoleAgent, Message: "", Time: c.clock.Now(), }) + c.nextID++ c.streamingResponse.Reset() c.prompting = true status := c.statusLocked() diff --git a/x/acpio/acp_conversation_test.go b/x/acpio/acp_conversation_test.go index eb0b4fb6..0632dd17 100644 --- a/x/acpio/acp_conversation_test.go +++ b/x/acpio/acp_conversation_test.go @@ -511,6 +511,25 @@ func Test_ErrorRemovesPartialMessage(t *testing.T) { require.Len(t, messages, 1, "partial agent message should be removed on error") assert.Equal(t, screentracker.ConversationRoleUser, messages[0].Role) assert.Equal(t, "test", messages[0].Message) + + // First exchange allocated IDs 0 (user) and 1 (agent, now removed). + assert.Equal(t, 0, messages[0].Id) + + // Send a second message — IDs must not reuse the removed agent message's ID (1). + mock.mu.Lock() + mock.writeErr = nil + mock.writeBlock = nil + mock.writeStarted = nil + mock.mu.Unlock() + + err := conv.Send(screentracker.MessagePartText{Content: "retry"}) + require.NoError(t, err) + + messages = conv.Messages() + require.Len(t, messages, 3, "first user + second user + second agent") + assert.Equal(t, 0, messages[0].Id, "original user message keeps its ID") + assert.Equal(t, 2, messages[1].Id, "new user message skips removed ID 1") + assert.Equal(t, 3, messages[2].Id, "new agent message continues sequence") } func Test_LateChunkAfterError_DoesNotCorruptUserMessage(t *testing.T) {