diff --git a/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransport.java b/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransport.java index ff436b12de..777be9ea88 100644 --- a/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransport.java +++ b/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransport.java @@ -502,17 +502,22 @@ public T unmarshalFrom(Object data, TypeRef typeRef) { } private Tuple2, Iterable> parse(ServerSentEvent event) { - if (MESSAGE_EVENT_TYPE.equals(event.event())) { + if (isMessageEvent(event.event())) { + String data = event.data(); + if (data == null || data.isEmpty()) { + logger.debug("Ignoring SSE message event with empty data: {}", event); + return Tuples.of(Optional.empty(), List.of()); + } try { // We don't support batching ATM and probably won't since the next version // considers removing it. - McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.jsonMapper, event.data()); + McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.jsonMapper, data); String eventId = event.id(); Optional idOpt = (eventId != null) ? Optional.of(eventId) : Optional.empty(); return Tuples.of(idOpt, List.of(message)); } catch (IOException ioException) { - throw new McpTransportException("Error parsing JSON-RPC message: " + event.data(), ioException); + throw new McpTransportException("Error parsing JSON-RPC message: " + data, ioException); } } else { @@ -521,6 +526,11 @@ private Tuple2, Iterable> parse(Serve } } + private static boolean isMessageEvent(@Nullable String eventType) { + // Per SSE semantics, missing/blank event type defaults to "message". + return eventType == null || eventType.isEmpty() || MESSAGE_EVENT_TYPE.equals(eventType); + } + /** * Builder for {@link WebClientStreamableHttpTransport}. */ diff --git a/mcp/transport/mcp-spring-webflux/src/test/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransportSseParsingIT.java b/mcp/transport/mcp-spring-webflux/src/test/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransportSseParsingIT.java new file mode 100644 index 0000000000..127255e08d --- /dev/null +++ b/mcp/transport/mcp-spring-webflux/src/test/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransportSseParsingIT.java @@ -0,0 +1,100 @@ +/* + * Copyright 2023-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.ai.mcp.client.webflux.transport; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import com.sun.net.httpserver.HttpServer; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.ProtocolVersions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import reactor.test.StepVerifier; + +import org.springframework.web.reactive.function.client.WebClient; + +import static org.assertj.core.api.Assertions.assertThat; + +@Timeout(15) +class WebClientStreamableHttpTransportSseParsingIT { + + private HttpServer server; + + private WebClientStreamableHttpTransport transport; + + private CountDownLatch handlerLatch; + + private AtomicReference receivedMessage; + + @BeforeEach + void setUp() throws IOException { + this.handlerLatch = new CountDownLatch(1); + this.receivedMessage = new AtomicReference<>(); + this.server = HttpServer.create(new InetSocketAddress(0), 0); + this.server.createContext("/mcp", exchange -> { + exchange.getResponseHeaders().set("Content-Type", "text/event-stream"); + exchange.sendResponseHeaders(200, 0); + String response = "data: {\"jsonrpc\":\"2.0\",\"id\":\"test-id\",\"result\":{}}\n\n"; + exchange.getResponseBody().write(response.getBytes()); + exchange.close(); + }); + this.server.start(); + String host = "http://localhost:" + this.server.getAddress().getPort(); + this.transport = WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(host)).build(); + } + + @AfterEach + void tearDown() { + if (this.transport != null) { + StepVerifier.create(this.transport.closeGracefully()).verifyComplete(); + } + if (this.server != null) { + this.server.stop(0); + } + } + + @Test + void shouldParseSseEventWithoutEventFieldAsMessage() throws InterruptedException { + StepVerifier.create(this.transport.connect(inbound -> inbound.doOnNext(message -> { + this.receivedMessage.set(message); + this.handlerLatch.countDown(); + }))).verifyComplete(); + + StepVerifier.create(this.transport.sendMessage(createInitializeMessage())).verifyComplete(); + + assertThat(this.handlerLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(this.receivedMessage.get()).isInstanceOf(McpSchema.JSONRPCResponse.class); + + McpSchema.JSONRPCResponse response = (McpSchema.JSONRPCResponse) this.receivedMessage.get(); + assertThat(response.id()).isEqualTo("test-id"); + } + + private McpSchema.JSONRPCRequest createInitializeMessage() { + var initializeRequest = new McpSchema.InitializeRequest(ProtocolVersions.MCP_2025_03_26, + McpSchema.ClientCapabilities.builder().roots(true).build(), + new McpSchema.Implementation("Test Client", "1.0.0")); + return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE, "test-id", + initializeRequest); + } + +}