diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java index 95f8959f5..1ab6eef09 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java @@ -171,7 +171,8 @@ public Mono responseStream(McpSchema.JSONRPCRequest jsonrpcRequest, McpStr return transport .sendMessage(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), null, new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.METHOD_NOT_FOUND, - error.message(), error.data()))); + error.message(), error.data()))) + .then(promoteToListeningStreamOrClose(stream, transport)); } return requestHandler .handle(new McpAsyncServerExchange(this.id, stream, clientCapabilities.get(), clientInfo.get(), @@ -189,7 +190,30 @@ public Mono responseStream(McpSchema.JSONRPCRequest jsonrpcRequest, McpStr return Mono.just(errorResponse); }) .flatMap(transport::sendMessage) - .then(transport.closeGracefully()); + .then(promoteToListeningStreamOrClose(stream, transport)); + }); + } + + /** + * Promotes the given response stream to the session's listening stream if no + * listening stream has been established yet. If a listening stream already exists, + * closes the transport gracefully. This allows clients that only use POST (without a + * separate GET SSE stream) to keep the connection alive for keep-alive pings. + * @param stream the response stream to potentially promote + * @param transport the transport to close if promotion is not needed + * @return Mono that completes after either promoting or closing + */ + private Mono promoteToListeningStreamOrClose(McpStreamableServerSessionStream stream, + McpStreamableServerTransport transport) { + return Mono.defer(() -> { + McpLoggableSession currentListeningStream = this.listeningStreamRef.get(); + if (currentListeningStream == this.missingMcpTransportSession) { + if (this.listeningStreamRef.compareAndSet(this.missingMcpTransportSession, stream)) { + logger.debug("Converted response stream to listening stream for session {}", this.id); + return Mono.empty(); + } + } + return transport.closeGracefully(); }); }