Skip to content

Commit fb3633e

Browse files
committed
Add test reproducing concurrent sendMessage race condition
When two MCP tool calls arrive in parallel (e.g. get_current_architecture with depth=0 and depth=1), the SQ MCP server dispatches them on separate reactor threads that both call sendMessage() on the same transport. The unicast Reactor sink's SinkManySerialized wrapper uses a CAS-based guard that returns FAIL_NON_SERIALIZED when two threads call tryEmitNext concurrently, causing "Failed to enqueue message for 'sonar-cag'". This test reproduces the race: 19/20 repetitions fail before the fix.
1 parent e2e22e5 commit fb3633e

File tree

1 file changed

+121
-0
lines changed

1 file changed

+121
-0
lines changed
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* SonarQube MCP Server
3+
* Copyright (C) 2025 SonarSource
4+
* mailto:info AT sonarsource DOT com
5+
*
6+
* This program is free software; you can redistribute it and/or
7+
* modify it under the terms of the Sonar Source-Available License Version 1, as published by SonarSource Sàrl.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
12+
* See the Sonar Source-Available License for more details.
13+
*
14+
* You should have received a copy of the Sonar Source-Available License
15+
* along with this program; if not, see https://sonarsource.com/license/ssal/
16+
*/
17+
package org.sonarsource.sonarqube.mcp.client;
18+
19+
import io.modelcontextprotocol.client.transport.ServerParameters;
20+
import io.modelcontextprotocol.spec.McpSchema;
21+
import java.time.Duration;
22+
import java.util.Map;
23+
import java.util.concurrent.CopyOnWriteArrayList;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.CyclicBarrier;
26+
import java.util.concurrent.TimeUnit;
27+
import org.junit.jupiter.api.AfterEach;
28+
import org.junit.jupiter.api.RepeatedTest;
29+
import org.sonarsource.sonarqube.mcp.transport.McpJsonMappers;
30+
import reactor.core.publisher.Mono;
31+
32+
import static org.assertj.core.api.Assertions.assertThat;
33+
34+
/**
35+
* Reproduces the concurrent sendMessage bug in ManagedStdioClientTransport.
36+
*
37+
* <p>When two threads call sendMessage simultaneously on the same transport,
38+
* the unicast Reactor sink rejects the second emit with FAIL_NON_SERIALIZED,
39+
* surfacing as "Failed to enqueue message for 'sonar-cag'".
40+
*
41+
* <p>This matches the production failure observed when an MCP client sends two
42+
* parallel tool calls (e.g., get_current_architecture with depth=0 and depth=1),
43+
* which are dispatched on separate boundedElastic threads that race on the same transport.
44+
*/
45+
class ManagedStdioClientTransportConcurrencyTest {
46+
47+
private ManagedStdioClientTransport transport;
48+
49+
@AfterEach
50+
void tearDown() {
51+
if (transport != null) {
52+
transport.closeGracefully().block(Duration.ofSeconds(5));
53+
}
54+
}
55+
56+
/**
57+
* Reproduces the exact production scenario: two concurrent tool calls through
58+
* the same transport. The unicast sink's CAS-based serialization guard causes
59+
* one of the two tryEmitNext calls to return FAIL_NON_SERIALIZED.
60+
*
61+
* Repeated 20 times because the race is timing-dependent.
62+
*/
63+
@RepeatedTest(20)
64+
void concurrent_sendMessage_should_not_fail() throws Exception {
65+
// Use 'cat' as a simple process that keeps stdin/stdout open
66+
var serverParams = ServerParameters.builder("cat")
67+
.env(Map.of())
68+
.build();
69+
transport = new ManagedStdioClientTransport("test-server", serverParams, McpJsonMappers.DEFAULT);
70+
71+
// Connect the transport (starts process, sets up sinks and processing threads)
72+
transport.connect(mono -> mono.flatMap(msg -> Mono.empty())).block(Duration.ofSeconds(5));
73+
74+
// Two JSON-RPC messages mimicking two concurrent tool calls
75+
var msg1 = new McpSchema.JSONRPCRequest("2.0", "tools/call", "1",
76+
Map.of("name", "get_current_architecture", "arguments", Map.of("depth", 0)));
77+
var msg2 = new McpSchema.JSONRPCRequest("2.0", "tools/call", "2",
78+
Map.of("name", "get_current_architecture", "arguments", Map.of("depth", 1)));
79+
80+
// CyclicBarrier maximizes chance of concurrent execution
81+
var barrier = new CyclicBarrier(2);
82+
var errors = new CopyOnWriteArrayList<Throwable>();
83+
var latch = new CountDownLatch(2);
84+
85+
Runnable sender1 = () -> {
86+
try {
87+
barrier.await(2, TimeUnit.SECONDS);
88+
transport.sendMessage(msg1).block(Duration.ofSeconds(2));
89+
} catch (Exception e) {
90+
errors.add(e);
91+
} finally {
92+
latch.countDown();
93+
}
94+
};
95+
96+
Runnable sender2 = () -> {
97+
try {
98+
barrier.await(2, TimeUnit.SECONDS);
99+
transport.sendMessage(msg2).block(Duration.ofSeconds(2));
100+
} catch (Exception e) {
101+
errors.add(e);
102+
} finally {
103+
latch.countDown();
104+
}
105+
};
106+
107+
var t1 = new Thread(sender1, "boundedElastic-5");
108+
var t2 = new Thread(sender2, "boundedElastic-6");
109+
t1.start();
110+
t2.start();
111+
112+
latch.await(5, TimeUnit.SECONDS);
113+
114+
// With the unicast sink, one send fails with "Failed to enqueue message"
115+
// due to FAIL_NON_SERIALIZED when the two tryEmitNext calls overlap.
116+
assertThat(errors)
117+
.as("Both concurrent sendMessage calls should succeed, but the unicast sink " +
118+
"rejects one with FAIL_NON_SERIALIZED when two threads race on tryEmitNext")
119+
.isEmpty();
120+
}
121+
}

0 commit comments

Comments
 (0)