diff --git a/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransport.java b/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransport.java index ff436b12de..590f01185e 100644 --- a/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransport.java +++ b/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransport.java @@ -502,7 +502,8 @@ public T unmarshalFrom(Object data, TypeRef typeRef) { } private Tuple2, Iterable> parse(ServerSentEvent event) { - if (MESSAGE_EVENT_TYPE.equals(event.event())) { + String eventType = event.event(); + if (eventType == null || eventType.isEmpty() || MESSAGE_EVENT_TYPE.equals(eventType)) { try { // We don't support batching ATM and probably won't since the next version // considers removing it. diff --git a/mcp/transport/mcp-spring-webflux/src/test/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransportErrorHandlingIT.java b/mcp/transport/mcp-spring-webflux/src/test/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransportErrorHandlingIT.java index a5d5698274..8eb8425420 100644 --- a/mcp/transport/mcp-spring-webflux/src/test/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransportErrorHandlingIT.java +++ b/mcp/transport/mcp-spring-webflux/src/test/java/org/springframework/ai/mcp/client/webflux/transport/WebClientStreamableHttpTransportErrorHandlingIT.java @@ -396,6 +396,62 @@ else if (status == 404) { StepVerifier.create(transport.closeGracefully()).verifyComplete(); } + /** + * Test that SSE frames with omitted {@code event:} field are treated as default + * {@code message} events per the SSE specification. An SSE frame containing only a + * {@code data:} line (no {@code event:} line) must not be silently dropped. + */ + @Test + void testSseFrameWithoutEventTypeIsAccepted() throws InterruptedException { + CountDownLatch sseLatch = new CountDownLatch(1); + CountDownLatch messageLatch = new CountDownLatch(1); + + // Use a fresh server endpoint for this test + this.server.createContext("/mcp-no-event", exchange -> { + String method = exchange.getRequestMethod(); + if ("POST".equals(method)) { + // Respond with text/event-stream SSE stream that has NO event: line + String sessionId = "sse-no-event-session"; + exchange.getResponseHeaders().set("Content-Type", "text/event-stream"); + exchange.getResponseHeaders().set(HttpHeaders.MCP_SESSION_ID, sessionId); + exchange.sendResponseHeaders(200, 0); + // SSE frame with only data: — event type intentionally omitted + String sseData = "data: {\"jsonrpc\":\"2.0\",\"result\":{\"protocolVersion\":\"2025-03-26\"," + + "\"capabilities\":{},\"serverInfo\":{\"name\":\"test\",\"version\":\"1.0\"}},\"id\":\"test-id\"}\n\n"; + try { + exchange.getResponseBody().write(sseData.getBytes()); + exchange.getResponseBody().flush(); + } + catch (IOException ex) { + // stream already closed + } + sseLatch.countDown(); + } + else { + exchange.sendResponseHeaders(405, 0); + } + exchange.close(); + }); + + var transport = WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(this.host)) + .endpoint("/mcp-no-event") + .build(); + + StepVerifier.create(transport.connect(msg -> { + messageLatch.countDown(); + return Mono.empty(); + })).verifyComplete(); + + var testMessage = createTestMessage(); + transport.sendMessage(testMessage).subscribe(); + + // SSE frame must have been received and dispatched to the message handler + assertThat(sseLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(messageLatch.await(5, TimeUnit.SECONDS)).isTrue(); + + StepVerifier.create(transport.closeGracefully()).verifyComplete(); + } + private McpSchema.JSONRPCRequest createTestMessage() { var initializeRequest = new McpSchema.InitializeRequest(ProtocolVersions.MCP_2025_03_26, McpSchema.ClientCapabilities.builder().roots(true).build(),