Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/ActiveState/termtest/xpty v0.6.0
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/charmbracelet/bubbletea v1.3.4
github.com/coder/acp-go-sdk v0.6.3
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225
github.com/coder/quartz v0.1.2
github.com/danielgtaylor/huma/v2 v2.32.0
Expand All @@ -15,6 +16,7 @@ require (
github.com/spf13/viper v1.20.1
github.com/stretchr/testify v1.11.1
github.com/tmaxmax/go-sse v0.10.0
go.uber.org/goleak v1.3.0
golang.org/x/term v0.30.0
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ github.com/ckaznocha/intrange v0.3.1 h1:j1onQyXvHUsPWujDH6WIjhyH26gkRt/txNlV7Lsp
github.com/ckaznocha/intrange v0.3.1/go.mod h1:QVepyz1AkUoFQkpEqksSYpNpUo3c5W7nWh/s6SHIJJk=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coder/acp-go-sdk v0.6.3 h1:LsXQytehdjKIYJnoVWON/nf7mqbiarnyuyE3rrjBsXQ=
github.com/coder/acp-go-sdk v0.6.3/go.mod h1:yKzM/3R9uELp4+nBAwwtkS0aN1FOFjo11CNPy37yFko=
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225 h1:tRIViZ5JRmzdOEo5wUWngaGEFBG8OaE1o2GIHN5ujJ8=
github.com/coder/agentapi-sdk-go v0.0.0-20250505131810-560d1d88d225/go.mod h1:rNLVpYgEVeu1Zk29K64z6Od8RBP9DwqCu9OfCzh8MR4=
github.com/coder/paralleltestctx v0.0.1 h1:eauyehej1XYTGwgzGWMTjeRIVgOpU6XLPNVb2oi6kDs=
Expand Down
262 changes: 262 additions & 0 deletions x/acpio/acp_conversation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package acpio

import (
"context"
"log/slog"
"slices"
"strings"
"sync"

st "github.com/coder/agentapi/lib/screentracker"
"github.com/coder/quartz"
)

// Compile-time assertion that ACPConversation implements st.Conversation
var _ st.Conversation = (*ACPConversation)(nil)

// ChunkableAgentIO extends AgentIO with chunk callback support for streaming responses.
// This interface is what ACPConversation needs from its AgentIO implementation.
type ChunkableAgentIO interface {
st.AgentIO
SetOnChunk(fn func(chunk string))
}

// ACPConversation tracks conversations with ACP-based agents.
// Unlike PTY-based Conversation, ACP has blocking writes where the
// response is complete when Write() returns.
type ACPConversation struct {
mu sync.Mutex
ctx context.Context
cancel context.CancelFunc
agentIO ChunkableAgentIO
messages []st.ConversationMessage
nextID int // monotonically increasing message ID
prompting bool // true while agent is processing
streamingResponse strings.Builder
logger *slog.Logger
emitter st.Emitter
initialPrompt []st.MessagePart
clock quartz.Clock
}

// noopEmitter is a no-op implementation of Emitter for when no emitter is provided.
type noopEmitter struct{}

func (noopEmitter) EmitMessages([]st.ConversationMessage) {}
func (noopEmitter) EmitStatus(st.ConversationStatus) {}
func (noopEmitter) EmitScreen(string) {}

// NewACPConversation creates a new ACPConversation.
// If emitter is provided, it will receive events when messages/status/screen change.
// If clock is nil, a real clock will be used.
func NewACPConversation(ctx context.Context, agentIO ChunkableAgentIO, logger *slog.Logger, initialPrompt []st.MessagePart, emitter st.Emitter, clock quartz.Clock) *ACPConversation {
if logger == nil {
logger = slog.Default()
}
if clock == nil {
clock = quartz.NewReal()
}
if emitter == nil {
emitter = noopEmitter{}
}
ctx, cancel := context.WithCancel(ctx)
c := &ACPConversation{
ctx: ctx,
cancel: cancel,
agentIO: agentIO,
logger: logger,
initialPrompt: initialPrompt,
emitter: emitter,
clock: clock,
}
return c
}

// Messages returns the conversation history.
func (c *ACPConversation) Messages() []st.ConversationMessage {
c.mu.Lock()
defer c.mu.Unlock()
return slices.Clone(c.messages)
}

// Send sends a message to the agent synchronously.
// It blocks until the agent has finished processing and returns any error
// from the underlying write. Returns a validation error immediately if
// the message is invalid or another message is already being processed.
func (c *ACPConversation) Send(messageParts ...st.MessagePart) error {
message := ""
for _, part := range messageParts {
message += part.String()
}

// Validate whitespace BEFORE trimming (match PTY behavior)
if message != strings.TrimSpace(message) {
return st.ErrMessageValidationWhitespace
}

if message == "" {
return st.ErrMessageValidationEmpty
}

// Check if already prompting and set state atomically
c.mu.Lock()
if c.prompting {
c.mu.Unlock()
return st.ErrMessageValidationChanging
}
c.messages = append(c.messages, st.ConversationMessage{
Id: c.nextID,
Role: st.ConversationRoleUser,
Message: message,
Time: c.clock.Now(),
})
c.nextID++
// Add placeholder for streaming agent response
c.messages = append(c.messages, st.ConversationMessage{
Id: c.nextID,
Role: st.ConversationRoleAgent,
Message: "",
Time: c.clock.Now(),
})
c.nextID++
c.streamingResponse.Reset()
c.prompting = true
status := c.statusLocked()
c.mu.Unlock()

// Emit status change to "running" before starting the prompt
c.emitter.EmitStatus(status)

c.logger.Debug("ACPConversation sending message", "message", message)

return c.executePrompt(messageParts)
}

// Start sets up chunk handling and sends the initial prompt if provided.
func (c *ACPConversation) Start(ctx context.Context) {
// Wire up the chunk callback for streaming
c.agentIO.SetOnChunk(c.handleChunk)

// Send initial prompt if provided
if len(c.initialPrompt) > 0 {
// Run in a goroutine because Send blocks until the prompt completes,
// and Start must return immediately per the Conversation interface.
go func() {
err := c.Send(c.initialPrompt...)
if err != nil {
c.logger.Error("ACPConversation failed to send initial prompt", "error", err)
}
}()
} else {
// No initial prompt means we start in stable state
c.emitter.EmitStatus(c.Status())
}
}

// Status returns the current conversation status.
func (c *ACPConversation) Status() st.ConversationStatus {
c.mu.Lock()
defer c.mu.Unlock()
return c.statusLocked()
}

// statusLocked returns the status without acquiring the lock (caller must hold lock).
func (c *ACPConversation) statusLocked() st.ConversationStatus {
if c.prompting {
return st.ConversationStatusChanging // agent is processing
}
return st.ConversationStatusStable
}

// Stop cancels any in-progress operations.
func (c *ACPConversation) Stop() {
c.cancel()
}

// Text returns the current streaming response text.
func (c *ACPConversation) Text() string {
c.mu.Lock()
defer c.mu.Unlock()
return c.streamingResponse.String()
}

// handleChunk is called for each streaming chunk from the agent.
func (c *ACPConversation) handleChunk(chunk string) {
c.mu.Lock()
// Log and discard chunks that arrive after the prompt has completed or errored.
// This should not happen under normal operation — if it does, it indicates a
// bug in the ACP SDK or a race in the connection teardown.
if !c.prompting {
c.mu.Unlock()
c.logger.Error("received chunk while not prompting (late/unexpected chunk discarded)",
"chunkLen", len(chunk))
return
}
c.streamingResponse.WriteString(chunk)
// Only update the last message if it's the agent placeholder (defense-in-depth)
if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent {
c.messages[len(c.messages)-1].Message = c.streamingResponse.String()
}
messages := slices.Clone(c.messages)
status := c.statusLocked()
screen := c.streamingResponse.String()
c.mu.Unlock()

c.emitter.EmitMessages(messages)
c.emitter.EmitStatus(status)
c.emitter.EmitScreen(screen)
}

// executePrompt runs the actual agent request and returns any error.
func (c *ACPConversation) executePrompt(messageParts []st.MessagePart) error {
var err error
for _, part := range messageParts {
if c.ctx.Err() != nil {
err = c.ctx.Err()
break
}
if partErr := part.Do(c.agentIO); partErr != nil {
err = partErr
break
}
}

c.mu.Lock()
c.prompting = false

if err != nil {
c.logger.Error("ACPConversation message failed", "error", err)
// Remove the agent's streaming message on error (may be empty or partial)
if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent {
c.messages = c.messages[:len(c.messages)-1]
}
messages := slices.Clone(c.messages)
status := c.statusLocked()
screen := c.streamingResponse.String()
c.mu.Unlock()

c.emitter.EmitMessages(messages)
c.emitter.EmitStatus(status)
c.emitter.EmitScreen(screen)
return err
}

// Final response should already be in the last message via streaming
// but ensure it's finalized
response := c.streamingResponse.String()
if len(c.messages) > 0 && c.messages[len(c.messages)-1].Role == st.ConversationRoleAgent {
// Intentionally not trimming space here.
c.messages[len(c.messages)-1].Message = response
}
messages := slices.Clone(c.messages)
status := c.statusLocked()
screen := c.streamingResponse.String()
c.mu.Unlock()

c.emitter.EmitMessages(messages)
c.emitter.EmitStatus(status)
c.emitter.EmitScreen(screen)

c.logger.Debug("ACPConversation message complete", "responseLen", len(response))
return nil
}
Loading