From e82fee00b7298d1579cc2d00fd8f7874a4cbd04e Mon Sep 17 00:00:00 2001 From: JGoP-L <741047428@qq.com> Date: Tue, 24 Feb 2026 11:13:40 +0800 Subject: [PATCH] fix: keep connection open for keep-alive ping in responseStream After sending a response in responseStream(), if no listening stream exists yet (i.e., the client hasn't established a GET SSE connection), promote the current response stream to a listening stream instead of closing it. This allows KeepAliveScheduler to send periodic ping messages through the transport. Clients like Cursor that don't establish a separate GET listening stream would otherwise have the connection closed immediately after each response, causing the MCP server to appear as disconnected after the idle-timeout period. Fixes #681 --- .../spec/McpStreamableServerSession.java | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java index 95f8959f5..1ab6eef09 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java @@ -171,7 +171,8 @@ public Mono responseStream(McpSchema.JSONRPCRequest jsonrpcRequest, McpStr return transport .sendMessage(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), null, new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.METHOD_NOT_FOUND, - error.message(), error.data()))); + error.message(), error.data()))) + .then(promoteToListeningStreamOrClose(stream, transport)); } return requestHandler .handle(new McpAsyncServerExchange(this.id, stream, clientCapabilities.get(), clientInfo.get(), @@ -189,7 +190,30 @@ public Mono responseStream(McpSchema.JSONRPCRequest jsonrpcRequest, McpStr return Mono.just(errorResponse); }) .flatMap(transport::sendMessage) - .then(transport.closeGracefully()); + .then(promoteToListeningStreamOrClose(stream, transport)); + }); + } + + /** + * Promotes the given response stream to the session's listening stream if no + * listening stream has been established yet. If a listening stream already exists, + * closes the transport gracefully. This allows clients that only use POST (without a + * separate GET SSE stream) to keep the connection alive for keep-alive pings. + * @param stream the response stream to potentially promote + * @param transport the transport to close if promotion is not needed + * @return Mono that completes after either promoting or closing + */ + private Mono promoteToListeningStreamOrClose(McpStreamableServerSessionStream stream, + McpStreamableServerTransport transport) { + return Mono.defer(() -> { + McpLoggableSession currentListeningStream = this.listeningStreamRef.get(); + if (currentListeningStream == this.missingMcpTransportSession) { + if (this.listeningStreamRef.compareAndSet(this.missingMcpTransportSession, stream)) { + logger.debug("Converted response stream to listening stream for session {}", this.id); + return Mono.empty(); + } + } + return transport.closeGracefully(); }); }