Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ public <T> T unmarshalFrom(Object data, TypeRef<T> typeRef) {
}

private Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> parse(ServerSentEvent<String> event) {
if (MESSAGE_EVENT_TYPE.equals(event.event())) {
String eventType = event.event();
if (eventType == null || eventType.isEmpty() || MESSAGE_EVENT_TYPE.equals(eventType)) {
try {
// We don't support batching ATM and probably won't since the next version
// considers removing it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,62 @@ else if (status == 404) {
StepVerifier.create(transport.closeGracefully()).verifyComplete();
}

/**
* Test that SSE frames with omitted {@code event:} field are treated as default
* {@code message} events per the SSE specification. An SSE frame containing only a
* {@code data:} line (no {@code event:} line) must not be silently dropped.
*/
@Test
void testSseFrameWithoutEventTypeIsAccepted() throws InterruptedException {
CountDownLatch sseLatch = new CountDownLatch(1);
CountDownLatch messageLatch = new CountDownLatch(1);

// Use a fresh server endpoint for this test
this.server.createContext("/mcp-no-event", exchange -> {
String method = exchange.getRequestMethod();
if ("POST".equals(method)) {
// Respond with text/event-stream SSE stream that has NO event: line
String sessionId = "sse-no-event-session";
exchange.getResponseHeaders().set("Content-Type", "text/event-stream");
exchange.getResponseHeaders().set(HttpHeaders.MCP_SESSION_ID, sessionId);
exchange.sendResponseHeaders(200, 0);
// SSE frame with only data: — event type intentionally omitted
String sseData = "data: {\"jsonrpc\":\"2.0\",\"result\":{\"protocolVersion\":\"2025-03-26\","
+ "\"capabilities\":{},\"serverInfo\":{\"name\":\"test\",\"version\":\"1.0\"}},\"id\":\"test-id\"}\n\n";
try {
exchange.getResponseBody().write(sseData.getBytes());
exchange.getResponseBody().flush();
}
catch (IOException ex) {
// stream already closed
}
sseLatch.countDown();
}
else {
exchange.sendResponseHeaders(405, 0);
}
exchange.close();
});

var transport = WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(this.host))
.endpoint("/mcp-no-event")
.build();

StepVerifier.create(transport.connect(msg -> {
messageLatch.countDown();
return Mono.empty();
})).verifyComplete();

var testMessage = createTestMessage();
transport.sendMessage(testMessage).subscribe();

// SSE frame must have been received and dispatched to the message handler
assertThat(sseLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(messageLatch.await(5, TimeUnit.SECONDS)).isTrue();

StepVerifier.create(transport.closeGracefully()).verifyComplete();
}

private McpSchema.JSONRPCRequest createTestMessage() {
var initializeRequest = new McpSchema.InitializeRequest(ProtocolVersions.MCP_2025_03_26,
McpSchema.ClientCapabilities.builder().roots(true).build(),
Expand Down