Skip to content

Commit d3743bc

Browse files
committed
Fix concurrent sendMessage race by retrying on FAIL_NON_SERIALIZED
Replace tryEmitNext (fail-fast) with emitNext + busyLooping(10ms) in ManagedStdioClientTransport.sendMessage(). The unicast sink's SinkManySerialized wrapper returns FAIL_NON_SERIALIZED when two threads call tryEmitNext concurrently. busyLooping retries the CAS spin instead of immediately failing, making concurrent sends safe. The contention window is microseconds (single CAS operation), so the 10ms duration is just a generous upper bound. Before: 18/20 test repetitions fail After: 20/20 pass
1 parent fb3633e commit d3743bc

File tree

1 file changed

+9
-3
lines changed

1 file changed

+9
-3
lines changed

src/main/java/org/sonarsource/sonarqube/mcp/client/ManagedStdioClientTransport.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,15 @@ private void handleIncomingErrors() {
139139

140140
@Override
141141
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
142-
return outboundSink.tryEmitNext(message).isSuccess()
143-
? Mono.empty()
144-
: Mono.error(new RuntimeException("Failed to enqueue message for '" + serverName + "'"));
142+
try {
143+
// busyLooping retries on FAIL_NON_SERIALIZED (concurrent tryEmitNext from another thread)
144+
// instead of failing immediately. The contention window is microseconds (single CAS),
145+
// so the spin resolves almost instantly; the duration is just an upper bound.
146+
outboundSink.emitNext(message, Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(10)));
147+
return Mono.empty();
148+
} catch (Sinks.EmissionException e) {
149+
return Mono.error(new RuntimeException("Failed to enqueue message for '" + serverName + "'", e));
150+
}
145151
}
146152

147153
private void startInboundProcessing() {

0 commit comments

Comments
 (0)