Skip to content

Commit 3a90d9e

Browse files
sharpninjaclaude
andcommitted
Fix REPL agent-stdio YAML pipe being echoed instead of executed
Replace the echo stub in AgentStdioHandler with a real dispatch pipeline: multi-line YAML framing, YamlDotNet-based envelope parsing, command dispatcher routing client.<name>.<method> through IGenericClientPassthrough, and error wrapping that keeps the loop alive. Adds 11 acceptance tests covering serializer round-trip, dispatcher routing, and stream framing. FR-MCP-REPL-001, TR-MCP-REPL-001/003/004, TEST-MCP-REPL-001 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 54d3ab8 commit 3a90d9e

7 files changed

Lines changed: 1221 additions & 64 deletions

File tree

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
// FR-MCP-REPL-001: YAML Protocol STDIO REPL Host - Stream-level envelope loop
2+
// FR-MCP-REPL-002: REPL Lifecycle Management - Read/accumulate/dispatch loop
3+
// TR-MCP-REPL-003: Command Loop Lifecycle - Multi-line YAML framing
4+
// TEST-MCP-REPL-001: REPL host processes well-formed multi-line YAML envelopes end-to-end
5+
6+
using System.Text;
7+
8+
namespace McpServer.Repl.Core;
9+
10+
/// <summary>
11+
/// Runs the REPL agent-stdio read/write loop: reads envelope lines from a <see cref="TextReader"/>,
12+
/// accumulates them into complete YAML documents (terminated by a blank line or the
13+
/// <c>---</c> document separator), dispatches each via <see cref="IReplCommandDispatcher"/>,
14+
/// and writes the response envelope back to the <see cref="TextWriter"/>.
15+
/// </summary>
16+
public interface IAgentStdioProtocol
17+
{
18+
/// <summary>
19+
/// Runs the agent-stdio loop until the reader reports end-of-stream or cancellation.
20+
/// </summary>
21+
/// <param name="reader">Inbound envelope stream.</param>
22+
/// <param name="writer">Outbound response stream. The implementation flushes after each envelope.</param>
23+
/// <param name="cancellationToken">Cancellation token.</param>
24+
Task RunAsync(TextReader reader, TextWriter writer, CancellationToken cancellationToken);
25+
}
26+
27+
/// <summary>
28+
/// Default <see cref="IAgentStdioProtocol"/> implementation. Accumulates inbound lines into
29+
/// YAML documents using the blank-line and <c>---</c> conventions, delegates parsing to
30+
/// <see cref="IYamlSerializer"/>, and routes each envelope through
31+
/// <see cref="IReplCommandDispatcher"/>. Malformed documents are reported as
32+
/// <c>invalid_envelope</c> errors and the loop continues.
33+
/// </summary>
34+
public sealed class AgentStdioProtocol : IAgentStdioProtocol
35+
{
36+
private readonly IYamlSerializer _serializer;
37+
private readonly IReplCommandDispatcher _dispatcher;
38+
39+
/// <summary>
40+
/// Initializes a new <see cref="AgentStdioProtocol"/>.
41+
/// </summary>
42+
/// <param name="serializer">YAML serializer used to parse and emit envelopes.</param>
43+
/// <param name="dispatcher">Command dispatcher invoked once per complete inbound envelope.</param>
44+
public AgentStdioProtocol(IYamlSerializer serializer, IReplCommandDispatcher dispatcher)
45+
{
46+
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
47+
_dispatcher = dispatcher ?? throw new ArgumentNullException(nameof(dispatcher));
48+
}
49+
50+
/// <inheritdoc />
51+
public async Task RunAsync(TextReader reader, TextWriter writer, CancellationToken cancellationToken)
52+
{
53+
ArgumentNullException.ThrowIfNull(reader);
54+
ArgumentNullException.ThrowIfNull(writer);
55+
56+
var buffer = new StringBuilder();
57+
58+
while (!cancellationToken.IsCancellationRequested)
59+
{
60+
var line = await reader.ReadLineAsync(cancellationToken).ConfigureAwait(false);
61+
if (line is null)
62+
{
63+
// End of stream — flush any accumulated envelope before exiting.
64+
await FlushAsync(buffer, writer, cancellationToken).ConfigureAwait(false);
65+
return;
66+
}
67+
68+
if (IsDocumentBoundary(line))
69+
{
70+
await FlushAsync(buffer, writer, cancellationToken).ConfigureAwait(false);
71+
continue;
72+
}
73+
74+
buffer.AppendLine(line);
75+
}
76+
77+
// Cancellation requested — attempt a final flush so callers don't lose the last envelope.
78+
await FlushAsync(buffer, writer, cancellationToken).ConfigureAwait(false);
79+
}
80+
81+
private static bool IsDocumentBoundary(string line)
82+
{
83+
if (string.IsNullOrWhiteSpace(line))
84+
{
85+
return true;
86+
}
87+
88+
var trimmed = line.TrimEnd();
89+
return trimmed == "---";
90+
}
91+
92+
private async Task FlushAsync(StringBuilder buffer, TextWriter writer, CancellationToken cancellationToken)
93+
{
94+
if (buffer.Length == 0)
95+
{
96+
return;
97+
}
98+
99+
var document = buffer.ToString();
100+
buffer.Clear();
101+
102+
if (string.IsNullOrWhiteSpace(document))
103+
{
104+
return;
105+
}
106+
107+
IYamlEnvelope envelope;
108+
try
109+
{
110+
envelope = _serializer.Deserialize(document);
111+
}
112+
catch (Exception ex) when (ex is FormatException or InvalidOperationException)
113+
{
114+
var errorEnvelope = new YamlEnvelope
115+
{
116+
Type = "error",
117+
Payload = new ErrorPayload
118+
{
119+
RequestId = TryExtractRequestId(document),
120+
Code = "invalid_envelope",
121+
Message = ex.Message,
122+
},
123+
};
124+
await WriteEnvelopeAsync(errorEnvelope, writer, cancellationToken).ConfigureAwait(false);
125+
return;
126+
}
127+
128+
IYamlEnvelope response;
129+
try
130+
{
131+
response = await _dispatcher.DispatchAsync(envelope, cancellationToken).ConfigureAwait(false);
132+
}
133+
catch (OperationCanceledException)
134+
{
135+
throw;
136+
}
137+
catch (Exception ex)
138+
{
139+
response = new YamlEnvelope
140+
{
141+
Type = "error",
142+
Payload = new ErrorPayload
143+
{
144+
RequestId = (envelope.Payload as IRequestPayload)?.RequestId ?? "unknown",
145+
Code = "dispatch_error",
146+
Message = ex.Message,
147+
},
148+
};
149+
}
150+
151+
await WriteEnvelopeAsync(response, writer, cancellationToken).ConfigureAwait(false);
152+
}
153+
154+
private async Task WriteEnvelopeAsync(IYamlEnvelope envelope, TextWriter writer, CancellationToken cancellationToken)
155+
{
156+
var yaml = _serializer.Serialize(envelope);
157+
// Envelopes are framed by a blank line, matching the inbound convention.
158+
await writer.WriteAsync(yaml.AsMemory(), cancellationToken).ConfigureAwait(false);
159+
if (!yaml.EndsWith('\n'))
160+
{
161+
await writer.WriteLineAsync().ConfigureAwait(false);
162+
}
163+
await writer.WriteLineAsync().ConfigureAwait(false);
164+
await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
165+
}
166+
167+
private static string TryExtractRequestId(string document)
168+
{
169+
// Best-effort recovery of a requestId from a malformed document so callers can correlate.
170+
foreach (var rawLine in document.Split('\n'))
171+
{
172+
var line = rawLine.Trim();
173+
if (line.StartsWith("requestId:", StringComparison.OrdinalIgnoreCase))
174+
{
175+
return line["requestId:".Length..].Trim().Trim('"');
176+
}
177+
}
178+
179+
return "unknown";
180+
}
181+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
// FR-MCP-REPL-001: YAML Protocol STDIO REPL Host - Server-side command dispatcher
2+
// FR-MCP-REPL-003: Command Namespace Parity - Request routing to client passthrough
3+
// TR-MCP-REPL-004: Command Registry and Dispatcher - Envelope-to-handler routing
4+
// TEST-MCP-REPL-001: REPL host processes well-formed YAML command envelopes
5+
6+
namespace McpServer.Repl.Core;
7+
8+
/// <summary>
9+
/// Dispatches parsed YAML envelopes to the appropriate handler and returns the response
10+
/// envelope. Responsible for routing <c>hello</c> handshakes and <c>request</c> envelopes
11+
/// by method namespace (currently <c>client.*.*</c> via <see cref="IGenericClientPassthrough"/>).
12+
/// Unknown namespaces produce a <c>method_not_found</c> error envelope so the agent loop
13+
/// can respond and continue instead of crashing.
14+
/// </summary>
15+
public interface IReplCommandDispatcher
16+
{
17+
/// <summary>
18+
/// Dispatches a parsed YAML envelope and returns the response envelope (result or error).
19+
/// Never throws for recoverable dispatch failures — unexpected exceptions are caught and
20+
/// wrapped in an error envelope so the caller's read/write loop can remain alive.
21+
/// </summary>
22+
/// <param name="envelope">The inbound envelope to dispatch. Must have a non-null payload.</param>
23+
/// <param name="cancellationToken">Cancellation token propagated to handlers.</param>
24+
/// <returns>The response envelope to emit back to the caller.</returns>
25+
Task<IYamlEnvelope> DispatchAsync(IYamlEnvelope envelope, CancellationToken cancellationToken);
26+
}
27+
28+
/// <summary>
29+
/// Default <see cref="IReplCommandDispatcher"/> implementation. Routes <c>hello</c> envelopes
30+
/// to a handshake response and <c>request</c> envelopes with the <c>client.&lt;clientName&gt;.&lt;methodName&gt;</c>
31+
/// method shape to <see cref="IGenericClientPassthrough.InvokeAsync"/>. All other method
32+
/// namespaces produce a <c>method_not_found</c> error envelope.
33+
/// </summary>
34+
public sealed class ReplCommandDispatcher : IReplCommandDispatcher
35+
{
36+
private const string ServerProtocolVersion = "1.0";
37+
private readonly IGenericClientPassthrough _passthrough;
38+
39+
/// <summary>
40+
/// Initializes a new <see cref="ReplCommandDispatcher"/>.
41+
/// </summary>
42+
/// <param name="passthrough">The generic client passthrough used to invoke <c>client.*.*</c> methods.</param>
43+
public ReplCommandDispatcher(IGenericClientPassthrough passthrough)
44+
{
45+
_passthrough = passthrough ?? throw new ArgumentNullException(nameof(passthrough));
46+
}
47+
48+
/// <inheritdoc />
49+
public async Task<IYamlEnvelope> DispatchAsync(IYamlEnvelope envelope, CancellationToken cancellationToken)
50+
{
51+
ArgumentNullException.ThrowIfNull(envelope);
52+
53+
switch (envelope.Type)
54+
{
55+
case "hello":
56+
return BuildHelloResponse(envelope.Payload as IHelloPayload);
57+
58+
case "request":
59+
if (envelope.Payload is not IRequestPayload request)
60+
{
61+
return BuildError(
62+
requestId: "unknown",
63+
code: "invalid_envelope",
64+
message: "Request envelope is missing a request payload.");
65+
}
66+
return await DispatchRequestAsync(request, cancellationToken).ConfigureAwait(false);
67+
68+
default:
69+
return BuildError(
70+
requestId: "unknown",
71+
code: "invalid_envelope",
72+
message: $"Unsupported envelope type: {envelope.Type}");
73+
}
74+
}
75+
76+
private async Task<IYamlEnvelope> DispatchRequestAsync(IRequestPayload request, CancellationToken cancellationToken)
77+
{
78+
var method = request.Method ?? "";
79+
80+
if (method.StartsWith("client.", StringComparison.Ordinal))
81+
{
82+
return await DispatchClientRequestAsync(request, cancellationToken).ConfigureAwait(false);
83+
}
84+
85+
return BuildError(
86+
requestId: request.RequestId,
87+
code: "method_not_found",
88+
message: $"Method '{method}' is not routed by this dispatcher. " +
89+
"Supported namespaces: client.<clientName>.<methodName>.");
90+
}
91+
92+
private async Task<IYamlEnvelope> DispatchClientRequestAsync(IRequestPayload request, CancellationToken cancellationToken)
93+
{
94+
// method shape: client.<clientName>.<methodName>
95+
var parts = request.Method.Split('.', 3);
96+
if (parts.Length != 3 || parts[0] != "client" ||
97+
string.IsNullOrEmpty(parts[1]) || string.IsNullOrEmpty(parts[2]))
98+
{
99+
return BuildError(
100+
requestId: request.RequestId,
101+
code: "method_not_found",
102+
message: $"Method '{request.Method}' does not match the expected 'client.<clientName>.<methodName>' shape.");
103+
}
104+
105+
var clientName = parts[1];
106+
var methodName = parts[2];
107+
var args = request.Params is null
108+
? new Dictionary<string, object?>()
109+
: new Dictionary<string, object?>(request.Params, StringComparer.OrdinalIgnoreCase);
110+
111+
try
112+
{
113+
var result = await _passthrough.InvokeAsync(clientName, methodName, args, cancellationToken).ConfigureAwait(false);
114+
return new YamlEnvelope
115+
{
116+
Type = "result",
117+
Payload = new ResultPayload
118+
{
119+
RequestId = request.RequestId,
120+
Result = result,
121+
},
122+
};
123+
}
124+
catch (OperationCanceledException)
125+
{
126+
throw;
127+
}
128+
catch (Exception ex)
129+
{
130+
return BuildError(
131+
requestId: request.RequestId,
132+
code: "method_invocation_error",
133+
message: ex.Message,
134+
details: new Dictionary<string, object?>
135+
{
136+
["clientName"] = clientName,
137+
["methodName"] = methodName,
138+
["exceptionType"] = ex.GetType().FullName,
139+
});
140+
}
141+
}
142+
143+
private static IYamlEnvelope BuildHelloResponse(IHelloPayload? request)
144+
{
145+
var capabilities = new List<string> { "client-passthrough" };
146+
if (request?.Capabilities is not null)
147+
{
148+
capabilities.AddRange(request.Capabilities);
149+
}
150+
151+
return new YamlEnvelope
152+
{
153+
Type = "hello",
154+
Payload = new HelloPayload
155+
{
156+
ProtocolVersion = ServerProtocolVersion,
157+
Capabilities = capabilities,
158+
},
159+
};
160+
}
161+
162+
private static IYamlEnvelope BuildError(
163+
string requestId,
164+
string code,
165+
string message,
166+
IReadOnlyDictionary<string, object?>? details = null)
167+
{
168+
return new YamlEnvelope
169+
{
170+
Type = "error",
171+
Payload = new ErrorPayload
172+
{
173+
RequestId = requestId,
174+
Code = code,
175+
Message = message,
176+
Details = details,
177+
},
178+
};
179+
}
180+
}

0 commit comments

Comments
 (0)