Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
title: Fix several concurrency bugs in HttpShardHandler / ParallelHttpShardHandler that
could cause search threads to hang in take() or return HTTP 500 instead of honoring
shards.tolerant under thread-pool saturation
type: fixed
authors:
- name: Mark Miller
links:
- name: SOLR-18244
url: https://issues.apache.org/jira/browse/SOLR-18244
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ public class HttpShardHandler extends ShardHandler {
protected final BlockingQueue<ShardResponse> responses;
private final AtomicBoolean canceled = new AtomicBoolean(false);

// Returns the monitor object that guards all cancellation-related state transitions
// (the canceled flag, the responseFutureMap, the responses queue's CANCELLATION_NOTIFICATION).
// Subclasses that track additional cancellable state must synchronize on this monitor when
// reading or transitioning that state so the whole cancellation invariant stays atomic.
protected final Object cancellationLock() {
return canceled;
}

protected final boolean isCanceled() {
return canceled.get();
}
Comment thread
markrmiller marked this conversation as resolved.

private final Map<String, List<String>> shardToURLs;
protected LBHttp2SolrClient lbClient;

Expand Down Expand Up @@ -278,27 +290,47 @@ protected void makeShardRequest(
// on the map already having the future.
future.whenComplete(
(LBSolrClient.Rsp rsp, Throwable throwable) -> {
if (rsp != null) {
ssr.nl = rsp.getResponse();
srsp.setShardAddress(rsp.getServer());
} else if (throwable != null) {
srsp.setException(throwable);
if (throwable instanceof SolrException) {
srsp.setResponseCode(((SolrException) throwable).code());
try {
if (rsp != null) {
ssr.nl = rsp.getResponse();
srsp.setShardAddress(rsp.getServer());
} else if (throwable != null) {
srsp.setException(throwable);
if (throwable instanceof SolrException) {
srsp.setResponseCode(((SolrException) throwable).code());
}
}
}
ssr.elapsedTime =
TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);
// Synchronize on cancelled so this code and cancelAll() cannot happen at the same time
synchronized (canceled) {
// We don't want to add responses after the requests have been canceled
if (responseFutureMap.containsKey(srsp)) {
responses.add(HttpShardHandler.this.transformResponse(sreq, srsp, shard));
ssr.elapsedTime =
TimeUnit.MILLISECONDS.convert(
System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);
enqueueIfTracked(srsp, HttpShardHandler.this.transformResponse(sreq, srsp, shard));
} catch (Throwable t) {
// If anything above throws (subclass transformResponse, malformed Rsp, OOM in the
// lambda) the response would never be enqueued — but responseFutureMap still tracks
// srsp, so take() would park forever. Record the failure on srsp and enqueue it raw
// (bypassing transformResponse, which may be the thrower).
srsp.setException(t);
if (t instanceof SolrException) {
srsp.setResponseCode(((SolrException) t).code());
}
enqueueIfTracked(srsp, srsp);
}
Comment thread
markrmiller marked this conversation as resolved.
});
}

/**
* Enqueue {@code value} into the {@link #responses} queue iff {@code key} is still tracked in
* {@link #responseFutureMap}, holding the cancellation monitor so this stays atomic with {@link
* #cancelAll()}'s clear.
*/
private void enqueueIfTracked(ShardResponse key, ShardResponse value) {
synchronized (canceled) {
if (responseFutureMap.containsKey(key)) {
responses.add(value);
}
}
}

/** Subclasses could modify the request based on the shard */
@SuppressWarnings("unused")
protected QueryRequest createQueryRequest(
Expand Down Expand Up @@ -328,7 +360,12 @@ private ShardResponse take(boolean bailOnError) {
ShardResponse previousResponse = null;
try {
while (responsesPending()) {
ShardResponse rsp = responses.take();
ShardResponse rsp = awaitNextResponse();
if (rsp == null) {
// awaitNextResponse() returned without a response — only happens for subclasses that
// override with a timed poll. Re-evaluate responsesPending() and either re-wait or exit.
continue;
}
if (rsp == CANCELLATION_NOTIFICATION) {
// This is only queued in cancelAll(), so all outstanding futures have already been
// canceled.
Expand Down Expand Up @@ -376,6 +413,23 @@ protected boolean responsesPending() {
return !responseFutureMap.isEmpty() || !responses.isEmpty();
}

/**
* Wait for the next response from the {@link #responses} queue. Defaults to a blocking {@link
* BlockingQueue#take()}.
*
* <p>Subclasses that gate {@link #responsesPending()} on an async tracker outside the {@link
* #responses} queue's lifecycle (e.g. {@link ParallelHttpShardHandler#submitFutures}) MUST
* override this with a timed poll. The cancellation lock can serialize {@link
* #responsesPending()} reads with state mutations, but it cannot signal the queue's internal
* {@code Condition}: if the tracker drains without anything being enqueued to {@link #responses},
* a thread parked in {@link BlockingQueue#take()} would never wake up. Returning {@code null}
* from this method instructs {@link #take(boolean)} to re-check {@link #responsesPending()} and
* either re-wait or exit cleanly.
*/
protected ShardResponse awaitNextResponse() throws InterruptedException {
return responses.take();
}

@Override
public void cancelAll() {
// Canceled must be set to true before calling the cancellation code, to ensure that new tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@
*/
package org.apache.solr.handler.component;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import net.jcip.annotations.NotThreadSafe;
import org.apache.solr.client.solrj.impl.LBSolrClient;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A version of {@link HttpShardHandler} optimized for massively-sharded collections.
Expand All @@ -43,19 +44,20 @@
@NotThreadSafe
public class ParallelHttpShardHandler extends HttpShardHandler {

@SuppressWarnings("unused")
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final ExecutorService commExecutor;

/*
* Unlike the basic HttpShardHandler, this class allows us to exit submit before
* the responseFutureMap is updated. If the runnables that
* do that are slow to execute the calling code could attempt to takeCompleted(),
* while pending is still zero. In this condition, the code would assume that all
* requests are processed (despite the runnables created by this class still
* waiting). Thus, we need to track that there are attempts still in flight.
* Track in-flight async submits so responsesPending() doesn't return false
* while a runnable is still queued or executing.
*
* inFlightSubmits is the authoritative loop-guard counter. AtomicInteger.get()
* is exact; ConcurrentHashMap.size() is documented as an estimate — its
* sumCount() can settle at a non-zero value while the table is empty under
* concurrent put/remove — so we don't use submitFutures.isEmpty() as the guard.
*
* submitFutures is retained only as the iteration target for cancelAll.
*/
private final AtomicInteger inFlightSubmits = new AtomicInteger();
private final ConcurrentMap<ShardResponse, CompletableFuture<Void>> submitFutures;

public ParallelHttpShardHandler(ParallelHttpShardHandlerFactory httpShardHandlerFactory) {
Expand All @@ -66,9 +68,29 @@ public ParallelHttpShardHandler(ParallelHttpShardHandlerFactory httpShardHandler

@Override
protected boolean responsesPending() {
// ensure we can't exit while loop in HttpShardHandler.take(boolean) until we've completed
// submitting all of the shard requests
return super.responsesPending() || !submitFutures.isEmpty();
// Take the lock so the read of inFlightSubmits is serialized with super.responsesPending()'s
// reads of responseFutureMap and responses. Without it, take()'s loop can transiently observe
// both responseFutureMap empty (super.makeShardRequest hasn't put yet) and inFlightSubmits == 0
// (outer.whenComplete already decremented for a different submit) even though those events are
// causally ordered, and the in-flight inner whenComplete's responses.add would be silently
// lost.
synchronized (cancellationLock()) {
return super.responsesPending() || inFlightSubmits.get() > 0;
}
}

/**
* Override the base class's blocking wait with a timed poll. inFlightSubmits is an async tracker
* outside the {@link #responses} queue's lifecycle: when an outer {@code whenComplete} decrements
* the last in-flight submit but the inner {@code whenComplete} hasn't fired yet to enqueue the
* response, a thread parked in {@code responses.take()} would never be woken — the cancellation
* lock cannot signal the queue's internal {@code Condition}. Polling lets the outer {@link
* #take(boolean)} loop re-evaluate {@link #responsesPending()} until the inner callback enqueues
* the real response (or the trackers drain to empty).
*/
@Override
protected ShardResponse awaitNextResponse() throws InterruptedException {
return responses.poll(50, TimeUnit.MILLISECONDS);
}

@Override
Expand All @@ -80,22 +102,73 @@ protected void makeShardRequest(
SimpleSolrResponse ssr,
ShardResponse srsp,
long startTimeNS) {
CompletableFuture<Void> completableFuture =
CompletableFuture.runAsync(
() -> super.makeShardRequest(sreq, shard, params, lbReq, ssr, srsp, startTimeNS),
commExecutor);
submitFutures.put(srsp, completableFuture);
// Holder so the lambda can read its own outer future. We can't capture the variable directly
// (it would have to be effectively final, but we assign it from runAsync). AtomicReference
// gives the lambda volatile-style visibility on the assignment that happens after runAsync
// returns.
AtomicReference<CompletableFuture<Void>> selfRef = new AtomicReference<>();
CompletableFuture<Void> completableFuture;
// Increment BEFORE runAsync so responsesPending() never observes inFlightSubmits == 0 while
// there is a submit in flight. The matching decrement is in the unconditional finally of
// whenComplete below, or in the catch block if runAsync itself rejects.
inFlightSubmits.incrementAndGet();
try {
completableFuture =
CompletableFuture.runAsync(
() -> {
// Skip the work if THIS specific outer future was cancelled (e.g. cancelAll
// cancelled
// it before this runnable got CPU time). Avoids a wasted lbClient.requestAsync that
// super.makeShardRequest would just immediately cancel anyway. selfRef may briefly
// be null if the runnable runs before the assignment below — in that case we fall
// through to super, which has its own canceled-check guard.
CompletableFuture<Void> self = selfRef.get();
if (self != null && self.isCancelled()) {
return;
}
super.makeShardRequest(sreq, shard, params, lbReq, ssr, srsp, startTimeNS);
},
commExecutor);
} catch (RejectedExecutionException ree) {
// Saturation or shutdown of commExecutor would otherwise propagate synchronously,
// crash SearchHandler's distributed loop before cancelAll() runs, abandon any
// already-submitted shard requests, and return HTTP 500 even when shards.tolerant=true.
// Treat it as a shard failure so the responses queue stays consistent and shards.tolerant
// semantics are honored. SERVICE_UNAVAILABLE (503) marks it as transient.
// No future was produced, so whenComplete will never fire — undo the increment here.
inFlightSubmits.decrementAndGet();
recordShardSubmitError(
srsp,
new SolrException(
SolrException.ErrorCode.SERVICE_UNAVAILABLE,
"Comm executor thread pool is full, unable to send request to shard: " + shard,
ree));
return;
}
Comment thread
markrmiller marked this conversation as resolved.
// Publish the self-reference BEFORE the cancellation check so that if the cancellation block
// below cancels this future, the runnable (whenever it runs) will see the cancellation via
// selfRef.get().isCancelled(). AtomicReference provides happens-before across threads.
selfRef.set(completableFuture);

// Synchronize registering submitFutures with the same monitor super uses for responseFutureMap.
// If cancelAll has already set canceled=true, don't track this request — cancel the outer
// future and return. The runnable, when it runs, will see self.isCancelled() and short-circuit.
// Mirrors super.makeShardRequest's check-and-put-or-early-return pattern on responseFutureMap.
synchronized (cancellationLock()) {
if (isCanceled()) {
completableFuture.cancel(true);
// whenComplete is never registered on this early-return path, so undo the increment.
inFlightSubmits.decrementAndGet();
return;
}
submitFutures.put(srsp, completableFuture);
}
completableFuture.whenComplete(
(r, t) -> {
try {
if (t != null) {
Throwable failure = t;
if (failure instanceof CompletionException) {
CompletionException completionException = (CompletionException) failure;
if (completionException.getCause() != null) {
failure = completionException.getCause();
}
}
Throwable failure =
(t instanceof CompletionException && t.getCause() != null) ? t.getCause() : t;
if (!(failure instanceof CancellationException)) {
recordShardSubmitError(
srsp,
Expand All @@ -106,23 +179,35 @@ protected void makeShardRequest(
}
}
} finally {
// Remove so that we keep track of in-flight submits only
submitFutures.remove(srsp);
// Order matters: remove from submitFutures (under the cross-tracker lock so cancelAll
// sees a consistent set), then decrement the loop-guard counter. Decrementing last
// means responsesPending() — which reads under the same lock — never observes
// inFlightSubmits == 0 while submitFutures still contains this entry.
synchronized (cancellationLock()) {
submitFutures.remove(srsp);
}
inFlightSubmits.decrementAndGet();
}
});
}

@Override
public void cancelAll() {
super.cancelAll();
submitFutures
.values()
.forEach(
future -> {
if (!future.isDone()) {
future.cancel(true);
}
});
submitFutures.clear();
// Synchronize the whole cancellation — super.cancelAll plus our submitFutures ops — on the
// same monitor so the invariant matches HttpShardHandler.cancelAll. Without this, a runnable
// entering super.makeShardRequest's synchronized block could observe canceled=true while
// submitFutures is still being walked, leaving the maps mutually inconsistent.
synchronized (cancellationLock()) {
super.cancelAll();
submitFutures
.values()
.forEach(
future -> {
if (!future.isDone()) {
future.cancel(true);
}
});
submitFutures.clear();
}
}
}
Loading
Loading