From 2b66b56ea1a8e6713f724566d06b5d8b15d9ec77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Thu, 5 Mar 2026 14:57:17 +0100 Subject: [PATCH] Implement notification logic for MCP server transports MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dariusz Jędrzejczyk --- .../transport/WebFluxSseServerTransportProvider.java | 12 ++++++++++++ .../WebFluxStreamableServerTransportProvider.java | 12 ++++++++++++ .../transport/WebMvcSseServerTransportProvider.java | 12 ++++++++++++ .../WebMvcStreamableServerTransportProvider.java | 12 ++++++++++++ pom.xml | 2 +- 5 files changed, 49 insertions(+), 1 deletion(-) diff --git a/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/server/webflux/transport/WebFluxSseServerTransportProvider.java b/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/server/webflux/transport/WebFluxSseServerTransportProvider.java index c8063a819b..7ceb8c67a3 100644 --- a/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/server/webflux/transport/WebFluxSseServerTransportProvider.java +++ b/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/server/webflux/transport/WebFluxSseServerTransportProvider.java @@ -254,6 +254,18 @@ public Mono notifyClients(String method, Object params) { // actually // doing that. + @Override + public Mono notifyClient(String sessionId, String method, Object params) { + return Mono.defer(() -> { + McpServerSession session = this.sessions.get(sessionId); + if (session == null) { + logger.debug("Session {} not found", sessionId); + return Mono.empty(); + } + return session.sendNotification(method, params); + }); + } + /** * Initiates a graceful shutdown of all the sessions. This method ensures all active * sessions are properly closed and cleaned up. diff --git a/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/server/webflux/transport/WebFluxStreamableServerTransportProvider.java b/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/server/webflux/transport/WebFluxStreamableServerTransportProvider.java index c1860d90ed..e8b7ce1291 100644 --- a/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/server/webflux/transport/WebFluxStreamableServerTransportProvider.java +++ b/mcp/transport/mcp-spring-webflux/src/main/java/org/springframework/ai/mcp/server/webflux/transport/WebFluxStreamableServerTransportProvider.java @@ -148,6 +148,18 @@ public Mono notifyClients(String method, Object params) { .then(); } + @Override + public Mono notifyClient(String sessionId, String method, Object params) { + return Mono.defer(() -> { + McpStreamableServerSession session = this.sessions.get(sessionId); + if (session == null) { + logger.debug("Session {} not found", sessionId); + return Mono.empty(); + } + return session.sendNotification(method, params); + }); + } + @Override public Mono closeGracefully() { return Mono.defer(() -> { diff --git a/mcp/transport/mcp-spring-webmvc/src/main/java/org/springframework/ai/mcp/server/webmvc/transport/WebMvcSseServerTransportProvider.java b/mcp/transport/mcp-spring-webmvc/src/main/java/org/springframework/ai/mcp/server/webmvc/transport/WebMvcSseServerTransportProvider.java index f3cbf241b3..fce5951ba8 100644 --- a/mcp/transport/mcp-spring-webmvc/src/main/java/org/springframework/ai/mcp/server/webmvc/transport/WebMvcSseServerTransportProvider.java +++ b/mcp/transport/mcp-spring-webmvc/src/main/java/org/springframework/ai/mcp/server/webmvc/transport/WebMvcSseServerTransportProvider.java @@ -228,6 +228,18 @@ public Mono notifyClients(String method, Object params) { .then(); } + @Override + public Mono notifyClient(String sessionId, String method, Object params) { + return Mono.defer(() -> { + McpServerSession session = this.sessions.get(sessionId); + if (session == null) { + logger.debug("Session {} not found", sessionId); + return Mono.empty(); + } + return session.sendNotification(method, params); + }); + } + /** * Initiates a graceful shutdown of the transport. This method: *
    diff --git a/mcp/transport/mcp-spring-webmvc/src/main/java/org/springframework/ai/mcp/server/webmvc/transport/WebMvcStreamableServerTransportProvider.java b/mcp/transport/mcp-spring-webmvc/src/main/java/org/springframework/ai/mcp/server/webmvc/transport/WebMvcStreamableServerTransportProvider.java index 73cd9519f6..531c134146 100644 --- a/mcp/transport/mcp-spring-webmvc/src/main/java/org/springframework/ai/mcp/server/webmvc/transport/WebMvcStreamableServerTransportProvider.java +++ b/mcp/transport/mcp-spring-webmvc/src/main/java/org/springframework/ai/mcp/server/webmvc/transport/WebMvcStreamableServerTransportProvider.java @@ -207,6 +207,18 @@ public Mono notifyClients(String method, Object params) { }); } + @Override + public Mono notifyClient(String sessionId, String method, Object params) { + return Mono.defer(() -> { + McpStreamableServerSession session = this.sessions.get(sessionId); + if (session == null) { + logger.debug("Session {} not found", sessionId); + return Mono.empty(); + } + return session.sendNotification(method, params); + }); + } + /** * Initiates a graceful shutdown of the transport. * @return A Mono that completes when all cleanup operations are finished diff --git a/pom.xml b/pom.xml index 3b15421846..f4c39ee5ae 100644 --- a/pom.xml +++ b/pom.xml @@ -356,7 +356,7 @@ - 1.0.1-SNAPSHOT + 1.1.0-SNAPSHOT 1.0.0-SNAPSHOT