Skip to content

Commit 57680e3

Browse files
committed
Fix concurrent sendMessage race by retrying on FAIL_NON_SERIALIZED
Replace tryEmitNext (fail-fast) with emitNext + busyLooping(100ms) in ManagedStdioClientTransport.sendMessage(). The unicast sink's SinkManySerialized wrapper returns FAIL_NON_SERIALIZED when two threads call tryEmitNext concurrently. busyLooping retries the CAS spin instead of immediately failing, making concurrent sends safe. Before: 19/20 test repetitions fail After: 20/20 pass
1 parent fb3633e commit 57680e3

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

src/main/java/org/sonarsource/sonarqube/mcp/client/ManagedStdioClientTransport.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,16 @@ private void handleIncomingErrors() {
137137
errorSink.asFlux().subscribe(stdErrorHandler);
138138
}
139139

140+
private static final Duration EMIT_TIMEOUT = Duration.ofMillis(100);
141+
140142
@Override
141143
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
142-
return outboundSink.tryEmitNext(message).isSuccess()
143-
? Mono.empty()
144-
: Mono.error(new RuntimeException("Failed to enqueue message for '" + serverName + "'"));
144+
try {
145+
outboundSink.emitNext(message, Sinks.EmitFailureHandler.busyLooping(EMIT_TIMEOUT));
146+
return Mono.empty();
147+
} catch (Sinks.EmissionException e) {
148+
return Mono.error(new RuntimeException("Failed to enqueue message for '" + serverName + "'", e));
149+
}
145150
}
146151

147152
private void startInboundProcessing() {

0 commit comments

Comments
 (0)