Skip to content

Commit 6b84225

Browse files
authored
MCP-374 Fix concurrent sendMessage race in ManagedStdioClientTransport (#290)
* Add test reproducing concurrent sendMessage race condition When two MCP tool calls arrive in parallel (e.g. get_current_architecture with depth=0 and depth=1), the SQ MCP server dispatches them on separate reactor threads that both call sendMessage() on the same transport. The unicast Reactor sink's SinkManySerialized wrapper uses a CAS-based guard that returns FAIL_NON_SERIALIZED when two threads call tryEmitNext concurrently, causing "Failed to enqueue message for 'sonar-cag'". This test reproduces the race: 19/20 repetitions fail before the fix. * Fix concurrent sendMessage race by retrying on FAIL_NON_SERIALIZED Replace tryEmitNext (fail-fast) with emitNext + busyLooping(100ms) in ManagedStdioClientTransport.sendMessage(). The unicast sink's SinkManySerialized wrapper returns FAIL_NON_SERIALIZED when two threads call tryEmitNext concurrently. busyLooping retries the CAS spin instead of immediately failing, making concurrent sends safe. The contention window is microseconds (single CAS operation), so the 100ms duration is just a generous upper bound for pathological cases like GC pauses. Before: 18/20 test repetitions fail After: 20/20 pass
1 parent e2e22e5 commit 6b84225

File tree

2 files changed

+130
-3
lines changed

2 files changed

+130
-3
lines changed

src/main/java/org/sonarsource/sonarqube/mcp/client/ManagedStdioClientTransport.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,15 @@ private void handleIncomingErrors() {
139139

140140
@Override
141141
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
142-
return outboundSink.tryEmitNext(message).isSuccess()
143-
? Mono.empty()
144-
: Mono.error(new RuntimeException("Failed to enqueue message for '" + serverName + "'"));
142+
try {
143+
// busyLooping retries on FAIL_NON_SERIALIZED (concurrent tryEmitNext from another thread)
144+
// instead of failing immediately. The contention window is microseconds (single CAS),
145+
// so the spin resolves almost instantly; the duration is just an upper bound.
146+
outboundSink.emitNext(message, Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
147+
return Mono.empty();
148+
} catch (Sinks.EmissionException e) {
149+
return Mono.error(new RuntimeException("Failed to enqueue message for '" + serverName + "'", e));
150+
}
145151
}
146152

147153
private void startInboundProcessing() {
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* SonarQube MCP Server
3+
* Copyright (C) 2025 SonarSource
4+
* mailto:info AT sonarsource DOT com
5+
*
6+
* This program is free software; you can redistribute it and/or
7+
* modify it under the terms of the Sonar Source-Available License Version 1, as published by SonarSource Sàrl.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12+
* See the Sonar Source-Available License for more details.
13+
*
14+
* You should have received a copy of the Sonar Source-Available License
15+
* along with this program; if not, see https://sonarsource.com/license/ssal/
16+
*/
17+
package org.sonarsource.sonarqube.mcp.client;
18+
19+
import io.modelcontextprotocol.client.transport.ServerParameters;
20+
import io.modelcontextprotocol.spec.McpSchema;
21+
import java.time.Duration;
22+
import java.util.Map;
23+
import java.util.concurrent.CopyOnWriteArrayList;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.CyclicBarrier;
26+
import java.util.concurrent.TimeUnit;
27+
import org.junit.jupiter.api.AfterEach;
28+
import org.junit.jupiter.api.RepeatedTest;
29+
import org.sonarsource.sonarqube.mcp.transport.McpJsonMappers;
30+
import reactor.core.publisher.Mono;
31+
32+
import static org.assertj.core.api.Assertions.assertThat;
33+
34+
/**
35+
* Reproduces the concurrent sendMessage bug in ManagedStdioClientTransport.
36+
*
37+
* <p>When two threads call sendMessage simultaneously on the same transport,
38+
* the unicast Reactor sink rejects the second emit with FAIL_NON_SERIALIZED,
39+
* surfacing as "Failed to enqueue message for 'sonar-cag'".
40+
*
41+
* <p>This matches the production failure observed when an MCP client sends two
42+
* parallel tool calls (e.g., get_current_architecture with depth=0 and depth=1),
43+
* which are dispatched on separate boundedElastic threads that race on the same transport.
44+
*/
45+
class ManagedStdioClientTransportConcurrencyTest {
46+
47+
private ManagedStdioClientTransport transport;
48+
49+
@AfterEach
50+
void tearDown() {
51+
if (transport != null) {
52+
transport.closeGracefully().block(Duration.ofSeconds(5));
53+
}
54+
}
55+
56+
/**
57+
* Reproduces the exact production scenario: two concurrent tool calls through
58+
* the same transport. The unicast sink's CAS-based serialization guard causes
59+
* one of the two tryEmitNext calls to return FAIL_NON_SERIALIZED.
60+
*
61+
* Repeated 20 times because the race is timing-dependent.
62+
*/
63+
@RepeatedTest(20)
64+
void concurrent_sendMessage_should_not_fail() throws Exception {
65+
// Use 'cat' as a simple process that keeps stdin/stdout open
66+
var serverParams = ServerParameters.builder("cat")
67+
.env(Map.of())
68+
.build();
69+
transport = new ManagedStdioClientTransport("test-server", serverParams, McpJsonMappers.DEFAULT);
70+
71+
// Connect the transport (starts process, sets up sinks and processing threads)
72+
transport.connect(mono -> mono.flatMap(msg -> Mono.empty())).block(Duration.ofSeconds(5));
73+
74+
// Two JSON-RPC messages mimicking two concurrent tool calls
75+
var msg1 = new McpSchema.JSONRPCRequest("2.0", "tools/call", "1",
76+
Map.of("name", "get_current_architecture", "arguments", Map.of("depth", 0)));
77+
var msg2 = new McpSchema.JSONRPCRequest("2.0", "tools/call", "2",
78+
Map.of("name", "get_current_architecture", "arguments", Map.of("depth", 1)));
79+
80+
// CyclicBarrier maximizes chance of concurrent execution
81+
var barrier = new CyclicBarrier(2);
82+
var errors = new CopyOnWriteArrayList<Throwable>();
83+
var latch = new CountDownLatch(2);
84+
85+
Runnable sender1 = () -> {
86+
try {
87+
barrier.await(2, TimeUnit.SECONDS);
88+
transport.sendMessage(msg1).block(Duration.ofSeconds(2));
89+
} catch (Exception e) {
90+
errors.add(e);
91+
} finally {
92+
latch.countDown();
93+
}
94+
};
95+
96+
Runnable sender2 = () -> {
97+
try {
98+
barrier.await(2, TimeUnit.SECONDS);
99+
transport.sendMessage(msg2).block(Duration.ofSeconds(2));
100+
} catch (Exception e) {
101+
errors.add(e);
102+
} finally {
103+
latch.countDown();
104+
}
105+
};
106+
107+
var t1 = new Thread(sender1, "boundedElastic-5");
108+
var t2 = new Thread(sender2, "boundedElastic-6");
109+
t1.start();
110+
t2.start();
111+
112+
latch.await(5, TimeUnit.SECONDS);
113+
114+
// With the unicast sink, one send fails with "Failed to enqueue message"
115+
// due to FAIL_NON_SERIALIZED when the two tryEmitNext calls overlap.
116+
assertThat(errors)
117+
.as("Both concurrent sendMessage calls should succeed, but the unicast sink " +
118+
"rejects one with FAIL_NON_SERIALIZED when two threads race on tryEmitNext")
119+
.isEmpty();
120+
}
121+
}

0 commit comments

Comments
 (0)