From 2c4e096e9a6174e6f7cdc746716354342603b1aa Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Sun, 17 May 2026 17:30:01 -0500 Subject: [PATCH] SOLR-18244: Fix several concurrency bugs in ParallelHttpShardHandler - Catch RejectedExecutionException from commExecutor and route through recordShardSubmitError so saturation/shutdown does not propagate synchronously and abandon already-submitted shard requests; this preserves shards.tolerant semantics and returns a 503 instead of 500. - Synchronize submitFutures registration and removal on the same monitor HttpShardHandler uses for responseFutureMap so cancelAll observes a consistent view of both maps. - Skip work in the runAsync runnable when the outer future has already been cancelled, avoiding a wasted lbClient.requestAsync call. - In HttpShardHandler.takeCompletedIncludingErrors, replace the blocking responses.take() with a 50ms poll so subclasses that gate responsesPending() on an async tracker (e.g. submitFutures) cannot cause a lost-wakeup hang when the tracker drains between the pending check and the queue wait. - Wrap the inner whenComplete body in a try/catch so a thrown transformResponse (or other failure inside the lambda) still enqueues a response and unblocks take() instead of stranding the consumer. - Expose cancellationLock() and isCanceled() on HttpShardHandler so subclasses can synchronize on the same monitor that already guards the canceled flag, responseFutureMap, and the responses queue. - Synchronize ParallelHttpShardHandler.responsesPending() on the cancellation monitor. Without the lock, take()'s loop performs three unsynchronized isEmpty() reads (responseFutureMap, responses, submitFutures) and can transiently observe all three as empty even though super.makeShardRequest's put and outer.whenComplete's submitFutures.remove are causally ordered. The intermediate state is consistent with each individual read but never with the cross-map invariant, so take() exits with null and the pending response (which arrives moments later via the inner whenComplete) is silently dropped by SearchHandler. Acquiring the lock serializes the three reads with every state-mutating operation. - Stop using submitFutures.isEmpty() as the loop guard in responsesPending(); use an exact AtomicInteger (inFlightSubmits) instead. ConcurrentHashMap.size()/isEmpty() are documented estimates: sumCount() = baseCount + sum(counterCells), and under contended put/remove from many threads the per-cell deltas can settle at a non-zero value while the table is physically empty. When that happens, isEmpty() returns false on a logically empty map, responsesPending() stays true, and the parent thread parks in responses.take() forever. AtomicInteger.get() is exact under any concurrency. The counter is incremented before runAsync, decremented unconditionally in the runAsync future's whenComplete finally, and also decremented in the RejectedExecutionException catch (no future) and the canceled-before-track early return (no whenComplete). The submitFutures map is retained only as the iteration target for cancelAll. Adds unit coverage for the rejected-executor path, the cancellation invariant, and a stress reproduction of the async inner-future race. --- ...lel-http-shard-handler-lost-wakeup-fix.yml | 10 + .../handler/component/HttpShardHandler.java | 86 ++++- .../component/ParallelHttpShardHandler.java | 163 +++++++--- .../ParallelHttpShardHandlerTest.java | 305 +++++++++++++++++- 4 files changed, 506 insertions(+), 58 deletions(-) create mode 100644 changelog/unreleased/SOLR-18244-parallel-http-shard-handler-lost-wakeup-fix.yml diff --git a/changelog/unreleased/SOLR-18244-parallel-http-shard-handler-lost-wakeup-fix.yml b/changelog/unreleased/SOLR-18244-parallel-http-shard-handler-lost-wakeup-fix.yml new file mode 100644 index 000000000000..96706027bc12 --- /dev/null +++ b/changelog/unreleased/SOLR-18244-parallel-http-shard-handler-lost-wakeup-fix.yml @@ -0,0 +1,10 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: Fix several concurrency bugs in HttpShardHandler / ParallelHttpShardHandler that + could cause search threads to hang in take() or return HTTP 500 instead of honoring + shards.tolerant under thread-pool saturation +type: fixed +authors: + - name: Mark Miller +links: + - name: SOLR-18244 + url: https://issues.apache.org/jira/browse/SOLR-18244 diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java index 73b960db91ea..57b1184f2bc9 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java @@ -96,6 +96,18 @@ public class HttpShardHandler extends ShardHandler { protected final BlockingQueue responses; private final AtomicBoolean canceled = new AtomicBoolean(false); + // Returns the monitor object that guards all cancellation-related state transitions + // (the canceled flag, the responseFutureMap, the responses queue's CANCELLATION_NOTIFICATION). + // Subclasses that track additional cancellable state must synchronize on this monitor when + // reading or transitioning that state so the whole cancellation invariant stays atomic. + protected final Object cancellationLock() { + return canceled; + } + + protected final boolean isCanceled() { + return canceled.get(); + } + private final Map> shardToURLs; protected LBHttp2SolrClient lbClient; @@ -278,27 +290,47 @@ protected void makeShardRequest( // on the map already having the future. future.whenComplete( (LBSolrClient.Rsp rsp, Throwable throwable) -> { - if (rsp != null) { - ssr.nl = rsp.getResponse(); - srsp.setShardAddress(rsp.getServer()); - } else if (throwable != null) { - srsp.setException(throwable); - if (throwable instanceof SolrException) { - srsp.setResponseCode(((SolrException) throwable).code()); + try { + if (rsp != null) { + ssr.nl = rsp.getResponse(); + srsp.setShardAddress(rsp.getServer()); + } else if (throwable != null) { + srsp.setException(throwable); + if (throwable instanceof SolrException) { + srsp.setResponseCode(((SolrException) throwable).code()); + } } - } - ssr.elapsedTime = - TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); - // Synchronize on cancelled so this code and cancelAll() cannot happen at the same time - synchronized (canceled) { - // We don't want to add responses after the requests have been canceled - if (responseFutureMap.containsKey(srsp)) { - responses.add(HttpShardHandler.this.transformResponse(sreq, srsp, shard)); + ssr.elapsedTime = + TimeUnit.MILLISECONDS.convert( + System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS); + enqueueIfTracked(srsp, HttpShardHandler.this.transformResponse(sreq, srsp, shard)); + } catch (Throwable t) { + // If anything above throws (subclass transformResponse, malformed Rsp, OOM in the + // lambda) the response would never be enqueued — but responseFutureMap still tracks + // srsp, so take() would park forever. Record the failure on srsp and enqueue it raw + // (bypassing transformResponse, which may be the thrower). + srsp.setException(t); + if (t instanceof SolrException) { + srsp.setResponseCode(((SolrException) t).code()); } + enqueueIfTracked(srsp, srsp); } }); } + /** + * Enqueue {@code value} into the {@link #responses} queue iff {@code key} is still tracked in + * {@link #responseFutureMap}, holding the cancellation monitor so this stays atomic with {@link + * #cancelAll()}'s clear. + */ + private void enqueueIfTracked(ShardResponse key, ShardResponse value) { + synchronized (canceled) { + if (responseFutureMap.containsKey(key)) { + responses.add(value); + } + } + } + /** Subclasses could modify the request based on the shard */ @SuppressWarnings("unused") protected QueryRequest createQueryRequest( @@ -328,7 +360,12 @@ private ShardResponse take(boolean bailOnError) { ShardResponse previousResponse = null; try { while (responsesPending()) { - ShardResponse rsp = responses.take(); + ShardResponse rsp = awaitNextResponse(); + if (rsp == null) { + // awaitNextResponse() returned without a response — only happens for subclasses that + // override with a timed poll. Re-evaluate responsesPending() and either re-wait or exit. + continue; + } if (rsp == CANCELLATION_NOTIFICATION) { // This is only queued in cancelAll(), so all outstanding futures have already been // canceled. @@ -376,6 +413,23 @@ protected boolean responsesPending() { return !responseFutureMap.isEmpty() || !responses.isEmpty(); } + /** + * Wait for the next response from the {@link #responses} queue. Defaults to a blocking {@link + * BlockingQueue#take()}. + * + *

Subclasses that gate {@link #responsesPending()} on an async tracker outside the {@link + * #responses} queue's lifecycle (e.g. {@link ParallelHttpShardHandler#submitFutures}) MUST + * override this with a timed poll. The cancellation lock can serialize {@link + * #responsesPending()} reads with state mutations, but it cannot signal the queue's internal + * {@code Condition}: if the tracker drains without anything being enqueued to {@link #responses}, + * a thread parked in {@link BlockingQueue#take()} would never wake up. Returning {@code null} + * from this method instructs {@link #take(boolean)} to re-check {@link #responsesPending()} and + * either re-wait or exit cleanly. + */ + protected ShardResponse awaitNextResponse() throws InterruptedException { + return responses.take(); + } + @Override public void cancelAll() { // Canceled must be set to true before calling the cancellation code, to ensure that new tasks diff --git a/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java index a3e964cf95f9..68557b02164f 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/ParallelHttpShardHandler.java @@ -16,19 +16,20 @@ */ package org.apache.solr.handler.component; -import java.lang.invoke.MethodHandles; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import net.jcip.annotations.NotThreadSafe; import org.apache.solr.client.solrj.impl.LBSolrClient; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.ModifiableSolrParams; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A version of {@link HttpShardHandler} optimized for massively-sharded collections. @@ -43,19 +44,20 @@ @NotThreadSafe public class ParallelHttpShardHandler extends HttpShardHandler { - @SuppressWarnings("unused") - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final ExecutorService commExecutor; /* - * Unlike the basic HttpShardHandler, this class allows us to exit submit before - * the responseFutureMap is updated. If the runnables that - * do that are slow to execute the calling code could attempt to takeCompleted(), - * while pending is still zero. In this condition, the code would assume that all - * requests are processed (despite the runnables created by this class still - * waiting). Thus, we need to track that there are attempts still in flight. + * Track in-flight async submits so responsesPending() doesn't return false + * while a runnable is still queued or executing. + * + * inFlightSubmits is the authoritative loop-guard counter. AtomicInteger.get() + * is exact; ConcurrentHashMap.size() is documented as an estimate — its + * sumCount() can settle at a non-zero value while the table is empty under + * concurrent put/remove — so we don't use submitFutures.isEmpty() as the guard. + * + * submitFutures is retained only as the iteration target for cancelAll. */ + private final AtomicInteger inFlightSubmits = new AtomicInteger(); private final ConcurrentMap> submitFutures; public ParallelHttpShardHandler(ParallelHttpShardHandlerFactory httpShardHandlerFactory) { @@ -66,9 +68,29 @@ public ParallelHttpShardHandler(ParallelHttpShardHandlerFactory httpShardHandler @Override protected boolean responsesPending() { - // ensure we can't exit while loop in HttpShardHandler.take(boolean) until we've completed - // submitting all of the shard requests - return super.responsesPending() || !submitFutures.isEmpty(); + // Take the lock so the read of inFlightSubmits is serialized with super.responsesPending()'s + // reads of responseFutureMap and responses. Without it, take()'s loop can transiently observe + // both responseFutureMap empty (super.makeShardRequest hasn't put yet) and inFlightSubmits == 0 + // (outer.whenComplete already decremented for a different submit) even though those events are + // causally ordered, and the in-flight inner whenComplete's responses.add would be silently + // lost. + synchronized (cancellationLock()) { + return super.responsesPending() || inFlightSubmits.get() > 0; + } + } + + /** + * Override the base class's blocking wait with a timed poll. inFlightSubmits is an async tracker + * outside the {@link #responses} queue's lifecycle: when an outer {@code whenComplete} decrements + * the last in-flight submit but the inner {@code whenComplete} hasn't fired yet to enqueue the + * response, a thread parked in {@code responses.take()} would never be woken — the cancellation + * lock cannot signal the queue's internal {@code Condition}. Polling lets the outer {@link + * #take(boolean)} loop re-evaluate {@link #responsesPending()} until the inner callback enqueues + * the real response (or the trackers drain to empty). + */ + @Override + protected ShardResponse awaitNextResponse() throws InterruptedException { + return responses.poll(50, TimeUnit.MILLISECONDS); } @Override @@ -80,22 +102,73 @@ protected void makeShardRequest( SimpleSolrResponse ssr, ShardResponse srsp, long startTimeNS) { - CompletableFuture completableFuture = - CompletableFuture.runAsync( - () -> super.makeShardRequest(sreq, shard, params, lbReq, ssr, srsp, startTimeNS), - commExecutor); - submitFutures.put(srsp, completableFuture); + // Holder so the lambda can read its own outer future. We can't capture the variable directly + // (it would have to be effectively final, but we assign it from runAsync). AtomicReference + // gives the lambda volatile-style visibility on the assignment that happens after runAsync + // returns. + AtomicReference> selfRef = new AtomicReference<>(); + CompletableFuture completableFuture; + // Increment BEFORE runAsync so responsesPending() never observes inFlightSubmits == 0 while + // there is a submit in flight. The matching decrement is in the unconditional finally of + // whenComplete below, or in the catch block if runAsync itself rejects. + inFlightSubmits.incrementAndGet(); + try { + completableFuture = + CompletableFuture.runAsync( + () -> { + // Skip the work if THIS specific outer future was cancelled (e.g. cancelAll + // cancelled + // it before this runnable got CPU time). Avoids a wasted lbClient.requestAsync that + // super.makeShardRequest would just immediately cancel anyway. selfRef may briefly + // be null if the runnable runs before the assignment below — in that case we fall + // through to super, which has its own canceled-check guard. + CompletableFuture self = selfRef.get(); + if (self != null && self.isCancelled()) { + return; + } + super.makeShardRequest(sreq, shard, params, lbReq, ssr, srsp, startTimeNS); + }, + commExecutor); + } catch (RejectedExecutionException ree) { + // Saturation or shutdown of commExecutor would otherwise propagate synchronously, + // crash SearchHandler's distributed loop before cancelAll() runs, abandon any + // already-submitted shard requests, and return HTTP 500 even when shards.tolerant=true. + // Treat it as a shard failure so the responses queue stays consistent and shards.tolerant + // semantics are honored. SERVICE_UNAVAILABLE (503) marks it as transient. + // No future was produced, so whenComplete will never fire — undo the increment here. + inFlightSubmits.decrementAndGet(); + recordShardSubmitError( + srsp, + new SolrException( + SolrException.ErrorCode.SERVICE_UNAVAILABLE, + "Comm executor thread pool is full, unable to send request to shard: " + shard, + ree)); + return; + } + // Publish the self-reference BEFORE the cancellation check so that if the cancellation block + // below cancels this future, the runnable (whenever it runs) will see the cancellation via + // selfRef.get().isCancelled(). AtomicReference provides happens-before across threads. + selfRef.set(completableFuture); + + // Synchronize registering submitFutures with the same monitor super uses for responseFutureMap. + // If cancelAll has already set canceled=true, don't track this request — cancel the outer + // future and return. The runnable, when it runs, will see self.isCancelled() and short-circuit. + // Mirrors super.makeShardRequest's check-and-put-or-early-return pattern on responseFutureMap. + synchronized (cancellationLock()) { + if (isCanceled()) { + completableFuture.cancel(true); + // whenComplete is never registered on this early-return path, so undo the increment. + inFlightSubmits.decrementAndGet(); + return; + } + submitFutures.put(srsp, completableFuture); + } completableFuture.whenComplete( (r, t) -> { try { if (t != null) { - Throwable failure = t; - if (failure instanceof CompletionException) { - CompletionException completionException = (CompletionException) failure; - if (completionException.getCause() != null) { - failure = completionException.getCause(); - } - } + Throwable failure = + (t instanceof CompletionException && t.getCause() != null) ? t.getCause() : t; if (!(failure instanceof CancellationException)) { recordShardSubmitError( srsp, @@ -106,23 +179,35 @@ protected void makeShardRequest( } } } finally { - // Remove so that we keep track of in-flight submits only - submitFutures.remove(srsp); + // Order matters: remove from submitFutures (under the cross-tracker lock so cancelAll + // sees a consistent set), then decrement the loop-guard counter. Decrementing last + // means responsesPending() — which reads under the same lock — never observes + // inFlightSubmits == 0 while submitFutures still contains this entry. + synchronized (cancellationLock()) { + submitFutures.remove(srsp); + } + inFlightSubmits.decrementAndGet(); } }); } @Override public void cancelAll() { - super.cancelAll(); - submitFutures - .values() - .forEach( - future -> { - if (!future.isDone()) { - future.cancel(true); - } - }); - submitFutures.clear(); + // Synchronize the whole cancellation — super.cancelAll plus our submitFutures ops — on the + // same monitor so the invariant matches HttpShardHandler.cancelAll. Without this, a runnable + // entering super.makeShardRequest's synchronized block could observe canceled=true while + // submitFutures is still being walked, leaving the maps mutually inconsistent. + synchronized (cancellationLock()) { + super.cancelAll(); + submitFutures + .values() + .forEach( + future -> { + if (!future.isDone()) { + future.cancel(true); + } + }); + submitFutures.clear(); + } } } diff --git a/solr/core/src/test/org/apache/solr/handler/component/ParallelHttpShardHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/component/ParallelHttpShardHandlerTest.java index f98aac8e3c38..3e9a1a78eb17 100644 --- a/solr/core/src/test/org/apache/solr/handler/component/ParallelHttpShardHandlerTest.java +++ b/solr/core/src/test/org/apache/solr/handler/component/ParallelHttpShardHandlerTest.java @@ -19,16 +19,35 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.impl.LBHttp2SolrClient; import org.apache.solr.client.solrj.impl.LBSolrClient; import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrNamedThreadFactory; +import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; public class ParallelHttpShardHandlerTest extends SolrTestCaseJ4 { + @BeforeClass + public static void ensureWorkingMockito() { + assumeWorkingMockito(); + } + private static class DirectExecutorService extends AbstractExecutorService { private volatile boolean shutdown; @@ -73,9 +92,7 @@ public void testSubmitFailureIsRecordedWhenSuperThrows() throws Exception { // Force super.makeShardRequest to throw before it enqueues the response future. handler.lbClient = null; - ShardRequest shardRequest = new ShardRequest(); - shardRequest.params = new ModifiableSolrParams(); - shardRequest.actualShards = new String[] {"shardA"}; + ShardRequest shardRequest = buildShardRequest("shardA"); ShardResponse shardResponse = new ShardResponse(); shardResponse.setShardRequest(shardRequest); @@ -107,4 +124,286 @@ public void testSubmitFailureIsRecordedWhenSuperThrows() throws Exception { recorded.getException()); assertTrue(recorded.getException() instanceof SolrException); } + + /** + * Verifies the contract that when the commExecutor rejects the runnable, the failure is recorded + * via recordShardSubmitError (i.e., shows up in the responses queue) rather than being propagated + * synchronously to the caller. + * + *

This exercises issue #1 from the ParallelHttpShardHandler review: with a single-thread + * ThreadPoolExecutor backed by a SynchronousQueue, once the worker is busy, the next + * CompletableFuture.runAsync(...) call throws RejectedExecutionException synchronously out of + * makeShardRequest. The expected (post-fix) behavior is that the error is routed through + * recordShardSubmitError instead. + */ + @Test + public void testRejectedExecutorRecordsErrorInsteadOfThrowing() throws Exception { + CountDownLatch holdWorker = new CountDownLatch(1); + CountDownLatch workerStarted = new CountDownLatch(1); + ThreadPoolExecutor busyExecutor = + new ExecutorUtil.MDCAwareThreadPoolExecutor( + 1, 1, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>()); // default AbortPolicy + try { + // Occupy the single worker thread so the next submission has nowhere to go. + busyExecutor.execute( + () -> { + workerStarted.countDown(); + try { + holdWorker.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + assertTrue("worker did not start within timeout", workerStarted.await(5, TimeUnit.SECONDS)); + + ParallelHttpShardHandlerFactory factory = new ParallelHttpShardHandlerFactory(); + factory.commExecutor = busyExecutor; + ParallelHttpShardHandler handler = new ParallelHttpShardHandler(factory); + + ShardRequest shardRequest = buildShardRequest("shardA"); + + ShardResponse shardResponse = new ShardResponse(); + shardResponse.setShardRequest(shardRequest); + shardResponse.setShard("shardA"); + + HttpShardHandler.SimpleSolrResponse simpleResponse = + new HttpShardHandler.SimpleSolrResponse(); + shardResponse.setSolrResponse(simpleResponse); + + ModifiableSolrParams params = new ModifiableSolrParams(); + QueryRequest queryRequest = new QueryRequest(params); + LBSolrClient.Endpoint endpoint = new LBSolrClient.Endpoint("http://ignored:8983/solr"); + LBSolrClient.Req lbReq = new LBSolrClient.Req(queryRequest, List.of(endpoint)); + + // The desired contract: rejection is captured and surfaced through the responses queue + // (i.e., this call should not throw RejectedExecutionException). + try { + handler.makeShardRequest( + shardRequest, + "shardA", + params, + lbReq, + simpleResponse, + shardResponse, + System.nanoTime()); + } catch (RejectedExecutionException ree) { + fail( + "makeShardRequest should not propagate RejectedExecutionException; the failure " + + "should be recorded via recordShardSubmitError. Got: " + + ree); + } + + ShardResponse recorded = handler.responses.poll(2, TimeUnit.SECONDS); + assertNotNull( + "Expected the executor rejection to be recorded as a shard failure in the responses" + + " queue, but no response arrived", + recorded); + assertSame( + "The recorded shard response should be the same instance passed in", + shardResponse, + recorded); + assertNotNull( + "Expected an exception to be attached to the recorded shard response", + recorded.getException()); + } finally { + holdWorker.countDown(); + busyExecutor.shutdownNow(); + busyExecutor.awaitTermination(5, TimeUnit.SECONDS); + } + } + + private ShardRequest buildShardRequest(String shard) { + ShardRequest sreq = new ShardRequest(); + sreq.params = new ModifiableSolrParams(); + sreq.actualShards = new String[] {shard}; + return sreq; + } + + /** + * Runs handler.takeCompletedIncludingErrors() on a worker thread with a timeout. If take() does + * not return within timeoutMs, fails the test with a clear message naming the iteration and phase + * — this is the signal for the lost-wakeup bug. + */ + private ShardResponse runTakeWithTimeout( + ParallelHttpShardHandler handler, + ExecutorService takeExecutor, + int iteration, + String phaseLabel, + long timeoutMs) + throws Exception { + Future future = takeExecutor.submit(handler::takeCompletedIncludingErrors); + try { + return future.get(timeoutMs, TimeUnit.MILLISECONDS); + } catch (TimeoutException te) { + future.cancel(true); + fail( + "take() hung in iteration " + + iteration + + " " + + phaseLabel + + ": did not return within " + + timeoutMs + + "ms. The worker thread is parked in LinkedBlockingQueue.take() waiting for" + + " an element that will never arrive because the handler's state transitioned" + + " to empty without anything being enqueued on the responses queue."); + throw new AssertionError("unreachable"); + } catch (ExecutionException ee) { + throw new AssertionError( + "take() threw unexpectedly in iteration " + iteration + " " + phaseLabel, ee.getCause()); + } + } + + /** + * More aggressive variant of the lost-wakeup stress test that uses asynchronous inner-future + * completion on a dedicated scheduler. In production the inner future (from {@code + * lbClient.requestAsync}) completes on a Jetty IO thread, not synchronously at the registration + * site. That timing gap between {@code super.makeShardRequest} returning (and the outer {@code + * whenComplete} firing to remove {@code submitFutures}) and the inner {@code whenComplete} firing + * (to add to {@code responses}) is exactly where the observed 930-handler hang lives. This test + * matches that timing. + */ + @Test + public void testTakeDoesNotHangUnderAsyncInnerFutureCompletion() throws Exception { + final int iterations = 1000; + final long perIterationTimeoutMs = 3_000; + + ExecutorService commExecutor = + new ExecutorUtil.MDCAwareThreadPoolExecutor( + 0, + Integer.MAX_VALUE, + 5L, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + new SolrNamedThreadFactory("testCommExecutor")); + + // Simulates Jetty IO threads: a small pool that completes the inner future asynchronously + // some tiny amount of time after requestAsync() returns, exposing the race window. + ExecutorService mockIoThreads = + ExecutorUtil.newMDCAwareFixedThreadPool(2, new SolrNamedThreadFactory("testMockIo")); + + ExecutorService takeExecutor = + ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("testTakeRunner")); + + try { + for (int i = 0; i < iterations; i++) { + runAsyncRaceCycle(commExecutor, mockIoThreads, takeExecutor, i, perIterationTimeoutMs); + } + } finally { + takeExecutor.shutdownNow(); + takeExecutor.awaitTermination(5, TimeUnit.SECONDS); + mockIoThreads.shutdownNow(); + mockIoThreads.awaitTermination(5, TimeUnit.SECONDS); + commExecutor.shutdown(); + if (!commExecutor.awaitTermination(15, TimeUnit.SECONDS)) { + commExecutor.shutdownNow(); + commExecutor.awaitTermination(5, TimeUnit.SECONDS); + } + } + } + + private void runAsyncRaceCycle( + ExecutorService commExecutor, + ExecutorService mockIoThreads, + ExecutorService takeExecutor, + int iteration, + long timeoutMs) + throws Exception { + + ParallelHttpShardHandlerFactory factory = new ParallelHttpShardHandlerFactory(); + factory.commExecutor = commExecutor; + ParallelHttpShardHandler handler = new ParallelHttpShardHandler(factory); + + // LB client that returns a future which completes asynchronously on a separate thread — + // mimicking the Jetty IO thread model. This creates a real race between: + // (a) the outer runAsync future completing + its whenComplete removing submitFutures, + // (b) the inner future completing + its whenComplete adding to responses. + LBHttp2SolrClient mockLb = Mockito.mock(LBHttp2SolrClient.class); + Mockito.when(mockLb.requestAsync(Mockito.any(LBSolrClient.Req.class))) + .thenAnswer( + inv -> { + CompletableFuture f = new CompletableFuture<>(); + mockIoThreads.execute(() -> f.complete(new LBSolrClient.Rsp())); + return f; + }); + handler.lbClient = mockLb; + + // Single-shard submit → take. This is the simplest real workload. Under async inner-future + // completion, the outer whenComplete (removing submitFutures) and inner whenComplete + // (adding to responses) race. If there's a window where responsesPending() transitions to + // false without the responses queue getting an entry, take() parks forever. + ShardRequest sreq = buildShardRequest("shard-" + iteration); + handler.submit(sreq, "shard-" + iteration, sreq.params); + + ShardResponse rsp = + runTakeWithTimeout(handler, takeExecutor, iteration, "async-race", timeoutMs); + + assertNotNull( + "async-race iteration " + iteration + " take() returned null — response was never enqueued", + rsp); + } + + /** + * Invariant test for the cancellation synchronization contract in {@link + * ParallelHttpShardHandler}: when {@code makeShardRequest} is invoked while {@code canceled} is + * already {@code true}, the outer future must be cancelled and NOT tracked in {@code + * submitFutures}. This keeps {@code submitFutures} consistent with the cancellation state — + * mirroring {@link HttpShardHandler#makeShardRequest}'s check-and-put pattern on {@code + * responseFutureMap}. + * + *

Without this invariant, a runnable could observe {@code canceled=true} (and early-return in + * super) while {@code submitFutures} still tracks its outer future, leaving the outer + * whenComplete's bookkeeping racing against {@code cancelAll}'s own submitFutures sweep. + */ + @Test + public void testCanceledMakeShardRequestDoesNotTrackSubmitFutures() throws Exception { + ExecutorService commExecutor = + new ExecutorUtil.MDCAwareThreadPoolExecutor( + 0, + Integer.MAX_VALUE, + 5L, + TimeUnit.SECONDS, + new SynchronousQueue<>(), + new SolrNamedThreadFactory("invariantTestComm")); + + try { + ParallelHttpShardHandlerFactory factory = new ParallelHttpShardHandlerFactory(); + factory.commExecutor = commExecutor; + ParallelHttpShardHandler handler = new ParallelHttpShardHandler(factory); + + LBHttp2SolrClient mockLb = Mockito.mock(LBHttp2SolrClient.class); + Mockito.when(mockLb.requestAsync(Mockito.any(LBSolrClient.Req.class))) + .thenAnswer(inv -> new CompletableFuture()); + handler.lbClient = mockLb; + + // Force canceled=true and drain the CANCELLATION_NOTIFICATION so we can observe the + // post-cancel state cleanly. + handler.cancelAll(); + assertNotNull( + "CANCELLATION_NOTIFICATION should be queued by cancelAll", + handler.responses.poll(2, TimeUnit.SECONDS)); + + ShardRequest sreq = buildShardRequest("shardA"); + ShardResponse srsp = new ShardResponse(); + srsp.setShardRequest(sreq); + srsp.setShard("shardA"); + HttpShardHandler.SimpleSolrResponse ssr = new HttpShardHandler.SimpleSolrResponse(); + srsp.setSolrResponse(ssr); + + ModifiableSolrParams params = new ModifiableSolrParams(); + QueryRequest queryRequest = new QueryRequest(params); + LBSolrClient.Endpoint endpoint = new LBSolrClient.Endpoint("http://ignored:8983/solr"); + LBSolrClient.Req lbReq = new LBSolrClient.Req(queryRequest, List.of(endpoint)); + + // Invoke makeShardRequest while canceled=true. Expected: outer is cancelled, nothing is + // tracked in submitFutures, responsesPending() stays false. + handler.makeShardRequest(sreq, "shardA", params, lbReq, ssr, srsp, System.nanoTime()); + + assertFalse( + "submitFutures must not track requests submitted while canceled=true", + handler.responsesPending()); + } finally { + commExecutor.shutdownNow(); + commExecutor.awaitTermination(5, TimeUnit.SECONDS); + } + } }