From 6ae52938cd015321fa364c949b1faabad388f0af Mon Sep 17 00:00:00 2001 From: "szymon.habrainski" Date: Tue, 23 Sep 2025 18:29:05 +0200 Subject: [PATCH 01/14] refactor: make AbstractRetryingClient private --- .../linecorp/armeria/client/retry/AbstractRetryingClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java index e8e5cc277e3..acf55d9738b 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java @@ -48,7 +48,7 @@ * @param the {@link Request} type * @param the {@link Response} type */ -public abstract class AbstractRetryingClient +abstract class AbstractRetryingClient extends SimpleDecoratingClient { private static final Logger logger = LoggerFactory.getLogger(AbstractRetryingClient.class); From ef95d6bba3c786dd50781e637862774f1288cd91 Mon Sep 17 00:00:00 2001 From: "szymon.habrainski" Date: Wed, 24 Sep 2025 10:26:41 +0200 Subject: [PATCH 02/14] refactor: extract AbstractRetryingClient.State into RetryContext and RetryCounter --- .../client/retry/AbstractRetryingClient.java | 179 +--------- .../armeria/client/retry/RetryAttempt.java | 43 +++ .../armeria/client/retry/RetryContext.java | 137 +++++++ .../armeria/client/retry/RetryCounter.java | 86 +++++ .../armeria/client/retry/RetryingClient.java | 335 ++++++++++-------- .../client/retry/RetryingRpcClient.java | 126 ++++--- .../armeria/internal/client/ClientUtil.java | 9 + .../RetryingClientLoadBalancingTest.java | 8 +- .../client/retry/RetryingClientTest.java | 20 +- .../client/retry/RetryingRpcClientTest.java | 2 +- 10 files changed, 550 insertions(+), 395 deletions(-) create mode 100644 core/src/main/java/com/linecorp/armeria/client/retry/RetryAttempt.java create mode 100644 core/src/main/java/com/linecorp/armeria/client/retry/RetryContext.java create mode 100644 core/src/main/java/com/linecorp/armeria/client/retry/RetryCounter.java diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java index acf55d9738b..b4380c94a29 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java @@ -29,17 +29,13 @@ import com.linecorp.armeria.client.ClientRequestContext; import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.SimpleDecoratingClient; -import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.Request; import com.linecorp.armeria.common.Response; import com.linecorp.armeria.common.RpcRequest; import com.linecorp.armeria.common.annotation.Nullable; -import com.linecorp.armeria.common.util.TimeoutMode; import com.linecorp.armeria.internal.client.ClientUtil; -import io.netty.util.AsciiString; -import io.netty.util.AttributeKey; import io.netty.util.concurrent.ScheduledFuture; /** @@ -53,15 +49,6 @@ abstract class AbstractRetryingClient private static final Logger logger = LoggerFactory.getLogger(AbstractRetryingClient.class); - /** - * The header which indicates the retry count of a {@link Request}. - * The server might use this value to reject excessive retries, etc. - */ - public static final AsciiString ARMERIA_RETRY_COUNT = HttpHeaderNames.of("armeria-retry-count"); - - private static final AttributeKey STATE = - AttributeKey.valueOf(AbstractRetryingClient.class, "STATE"); - private final RetryConfigMapping mapping; @Nullable @@ -81,10 +68,7 @@ abstract class AbstractRetryingClient public final O execute(ClientRequestContext ctx, I req) throws Exception { final RetryConfig config = mapping.get(ctx, req); requireNonNull(config, "mapping.get() returned null"); - - final State state = new State(config, ctx.responseTimeoutMillis()); - ctx.setAttr(STATE, state); - return doExecute(ctx, req); + return doExecute(ctx, req, config); } /** @@ -98,7 +82,7 @@ protected final RetryConfigMapping mapping() { * Invoked by {@link #execute(ClientRequestContext, Request)} * after the deadline for response timeout is set. */ - protected abstract O doExecute(ClientRequestContext ctx, I req) throws Exception; + protected abstract O doExecute(ClientRequestContext ctx, I req, RetryConfig config) throws Exception; /** * This should be called when retrying is finished. @@ -119,28 +103,6 @@ protected final RetryRule retryRule() { return retryRule; } - /** - * Fetches the {@link RetryConfig} that was mapped by the configured {@link RetryConfigMapping} for a given - * logical request. - */ - final RetryConfig mappedRetryConfig(ClientRequestContext ctx) { - @SuppressWarnings("unchecked") - final RetryConfig config = (RetryConfig) state(ctx).config; - return config; - } - - /** - * Returns the {@link RetryRuleWithContent}. - * - * @throws IllegalStateException if the {@link RetryRuleWithContent} is not set - */ - protected final RetryRuleWithContent retryRuleWithContent() { - checkState(retryConfig != null, "No retryRuleWithContent set. Are you using RetryConfigMapping?"); - final RetryRuleWithContent retryRuleWithContent = retryConfig.retryRuleWithContent(); - checkState(retryRuleWithContent != null, "retryRuleWithContent is not set."); - return retryRuleWithContent; - } - /** * Schedules next retry. */ @@ -170,39 +132,6 @@ protected static void scheduleNextRetry(ClientRequestContext ctx, } } - /** - * Resets the {@link ClientRequestContext#responseTimeoutMillis()}. - * - * @return {@code true} if the response timeout is set, {@code false} if it can't be set due to the timeout - */ - @SuppressWarnings("MethodMayBeStatic") // Intentionally left non-static for better user experience. - protected final boolean setResponseTimeout(ClientRequestContext ctx) { - requireNonNull(ctx, "ctx"); - final long responseTimeoutMillis = state(ctx).responseTimeoutMillis(); - if (responseTimeoutMillis < 0) { - return false; - } else if (responseTimeoutMillis == 0) { - ctx.clearResponseTimeout(); - return true; - } else { - ctx.setResponseTimeoutMillis(TimeoutMode.SET_FROM_NOW, responseTimeoutMillis); - return true; - } - } - - /** - * Returns the next delay which retry will be made after. The delay will be: - * - *

{@code Math.min(responseTimeoutMillis, Backoff.nextDelayMillis(int))} - * - * @return the number of milliseconds to wait for before attempting a retry. -1 if the - * {@code currentAttemptNo} exceeds the {@code maxAttempts} or the {@code nextDelay} is after - * the moment which timeout happens. - */ - protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff) { - return getNextDelay(ctx, backoff, -1); - } - /** * Returns the next delay which retry will be made after. The delay will be: * @@ -214,25 +143,21 @@ protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff) { * the moment which timeout happens. */ @SuppressWarnings("MethodMayBeStatic") // Intentionally left non-static for better user experience. - protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff, long millisAfterFromServer) { - requireNonNull(ctx, "ctx"); - requireNonNull(backoff, "backoff"); - final State state = state(ctx); - final int currentAttemptNo = state.currentAttemptNoWith(backoff); - - if (currentAttemptNo < 0) { - logger.debug("Exceeded the default number of max attempt: {}", state.config.maxTotalAttempts()); + protected final long getNextDelay(RetryContext rctx, Backoff backoff, long millisAfterFromServer) { + if (rctx.counter().hasReachedMaxAttempts()) { + logger.debug("Exceeded the default number of max attempt: {}", rctx.config().maxTotalAttempts()); return -1; } - long nextDelay = backoff.nextDelayMillis(currentAttemptNo); + rctx.counter().consumeAttemptFrom(backoff); + long nextDelay = backoff.nextDelayMillis(rctx.counter().attemptsSoFarWithBackoff(backoff)); if (nextDelay < 0) { logger.debug("Exceeded the number of max attempts in the backoff: {}", backoff); return -1; } nextDelay = Math.max(nextDelay, millisAfterFromServer); - if (state.timeoutForWholeRetryEnabled() && nextDelay > state.actualResponseTimeoutMillis()) { + if (rctx.timeoutForWholeRetryEnabled() && nextDelay > rctx.actualResponseTimeoutMillis()) { // The nextDelay will be after the moment which timeout will happen. So return just -1. return -1; } @@ -240,18 +165,6 @@ protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff, lon return nextDelay; } - /** - * Returns the total number of attempts of the current request represented by the specified - * {@link ClientRequestContext}. - */ - protected static int getTotalAttempts(ClientRequestContext ctx) { - final State state = ctx.attr(STATE); - if (state == null) { - return 0; - } - return state.totalAttemptNo; - } - /** * Creates a new derived {@link ClientRequestContext}, replacing the requests. * If {@link ClientRequestContext#endpointGroup()} exists, a new {@link Endpoint} will be selected. @@ -262,80 +175,4 @@ protected static ClientRequestContext newDerivedContext(ClientRequestContext ctx boolean initialAttempt) { return ClientUtil.newDerivedContext(ctx, req, rpcReq, initialAttempt); } - - private static State state(ClientRequestContext ctx) { - final State state = ctx.attr(STATE); - assert state != null; - return state; - } - - private static final class State { - - private final RetryConfig config; - private final long deadlineNanos; - private final boolean isTimeoutEnabled; - - @Nullable - private Backoff lastBackoff; - private int currentAttemptNoWithLastBackoff; - private int totalAttemptNo; - - State(RetryConfig config, long responseTimeoutMillis) { - this.config = config; - - if (responseTimeoutMillis <= 0 || responseTimeoutMillis == Long.MAX_VALUE) { - deadlineNanos = 0; - isTimeoutEnabled = false; - } else { - deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(responseTimeoutMillis); - isTimeoutEnabled = true; - } - totalAttemptNo = 1; - } - - /** - * Returns the smaller value between {@link RetryConfig#responseTimeoutMillisForEachAttempt()} and - * remaining {@link #responseTimeoutMillis}. - * - * @return 0 if the response timeout for both of each request and whole retry is disabled or - * -1 if the elapsed time from the first request has passed {@code responseTimeoutMillis} - */ - long responseTimeoutMillis() { - if (!timeoutForWholeRetryEnabled()) { - return config.responseTimeoutMillisForEachAttempt(); - } - - final long actualResponseTimeoutMillis = actualResponseTimeoutMillis(); - - // Consider 0 or less than 0 of actualResponseTimeoutMillis as timed out. - if (actualResponseTimeoutMillis <= 0) { - return -1; - } - - if (config.responseTimeoutMillisForEachAttempt() > 0) { - return Math.min(config.responseTimeoutMillisForEachAttempt(), actualResponseTimeoutMillis); - } - - return actualResponseTimeoutMillis; - } - - boolean timeoutForWholeRetryEnabled() { - return isTimeoutEnabled; - } - - long actualResponseTimeoutMillis() { - return TimeUnit.NANOSECONDS.toMillis(deadlineNanos - System.nanoTime()); - } - - int currentAttemptNoWith(Backoff backoff) { - if (totalAttemptNo++ >= config.maxTotalAttempts()) { - return -1; - } - if (lastBackoff != backoff) { - lastBackoff = backoff; - currentAttemptNoWithLastBackoff = 1; - } - return currentAttemptNoWithLastBackoff++; - } - } } diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryAttempt.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryAttempt.java new file mode 100644 index 00000000000..c5778e204ef --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryAttempt.java @@ -0,0 +1,43 @@ +/* + * Copyright 2025 LINE Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linecorp.armeria.client.retry; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.Response; + +class RetryAttempt { + private final ClientRequestContext ctx; + private final O res; + + RetryAttempt(ClientRequestContext ctx, O res) { + this.ctx = ctx; + this.res = res; + } + + ClientRequestContext ctx() { + return ctx; + } + + O res() { + return res; + } + + RetryAttempt setRes(O res) { + return new RetryAttempt<>( + ctx, res + ); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryContext.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryContext.java new file mode 100644 index 00000000000..a3bb894ddb3 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryContext.java @@ -0,0 +1,137 @@ +/* + * Copyright 2025 LINE Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.linecorp.armeria.client.retry; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.Request; +import com.linecorp.armeria.common.Response; +import com.linecorp.armeria.common.util.TimeoutMode; + +class RetryContext { + private final ClientRequestContext ctx; + private final I req; + private final O res; + private final CompletableFuture resFuture; + private final RetryCounter counter; + private final RetryConfig config; + private final long deadlineNanos; + private final boolean isTimeoutEnabled; + + RetryContext( + ClientRequestContext ctx, I req, O res, + CompletableFuture resFuture, + RetryConfig config, long responseTimeoutMillis + ) { + this(ctx, req, res, resFuture, config, responseTimeoutMillis, + new RetryCounter(config.maxTotalAttempts())); + } + + RetryContext( + ClientRequestContext ctx, I req, O res, + CompletableFuture resFuture, + RetryConfig config, long responseTimeoutMillis, + RetryCounter counter + ) { + this.ctx = ctx; + this.req = req; + this.res = res; + this.resFuture = resFuture; + this.config = config; + this.counter = counter; + + if (responseTimeoutMillis <= 0 || responseTimeoutMillis == Long.MAX_VALUE) { + deadlineNanos = 0; + isTimeoutEnabled = false; + } else { + deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(responseTimeoutMillis); + isTimeoutEnabled = true; + } + } + + long responseTimeoutMillis() { + if (!isTimeoutEnabled) { + return config.responseTimeoutMillisForEachAttempt(); + } + + final long actualResponseTimeoutMillis = actualResponseTimeoutMillis(); + + // Consider 0 or less than 0 of actualResponseTimeoutMillis as timed out. + if (actualResponseTimeoutMillis <= 0) { + return -1; + } + + if (config.responseTimeoutMillisForEachAttempt() > 0) { + return Math.min(config.responseTimeoutMillisForEachAttempt(), actualResponseTimeoutMillis); + } + + return actualResponseTimeoutMillis; + } + + public boolean timeoutForWholeRetryEnabled() { + return isTimeoutEnabled; + } + + public long actualResponseTimeoutMillis() { + return TimeUnit.NANOSECONDS.toMillis( + deadlineNanos - System.nanoTime()); + } + + /** + * Resets the {@link ClientRequestContext#responseTimeoutMillis()}. + * + * @return {@code true} if the response timeout is set, {@code false} if it can't be set due to the timeout + */ + boolean setResponseTimeout() { + final long responseTimeoutMillis = responseTimeoutMillis(); + if (responseTimeoutMillis < 0) { + return false; + } else if (responseTimeoutMillis == 0) { + ctx.clearResponseTimeout(); + return true; + } else { + ctx.setResponseTimeoutMillis(TimeoutMode.SET_FROM_NOW, responseTimeoutMillis); + return true; + } + } + + ClientRequestContext ctx() { + return ctx; + } + + I req() { + return req; + } + + O res() { + return res; + } + + RetryConfig config() { + return config; + } + + CompletableFuture resFuture() { + return resFuture; + } + + RetryCounter counter() { + return counter; + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryCounter.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryCounter.java new file mode 100644 index 00000000000..13a4e67c0ca --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryCounter.java @@ -0,0 +1,86 @@ +/* + * Copyright 2025 LINE Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.linecorp.armeria.client.retry; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects; + +import com.linecorp.armeria.common.annotation.Nullable; + +final class RetryCounter { + private final int maxAttempts; + + private int numberAttemptsSoFar; + @Nullable + private Backoff lastBackoff; + private int numberAttemptsSoFarForLastBackoff; + + RetryCounter(int maxAttempts) { + checkArgument(maxAttempts > 0, "maxAttempts: %s (expected: > 0)", maxAttempts); + this.maxAttempts = maxAttempts; + numberAttemptsSoFar = 0; + lastBackoff = null; + numberAttemptsSoFarForLastBackoff = 0; + } + + public void consumeAttemptFrom(@Nullable Backoff backoff) { + checkState(!hasReachedMaxAttempts(), "Exceeded the maximum number of attempts: %s", maxAttempts); + + ++numberAttemptsSoFar; + + if (backoff != null) { + if (lastBackoff != backoff) { + lastBackoff = backoff; + numberAttemptsSoFarForLastBackoff = 0; + } + numberAttemptsSoFarForLastBackoff++; + } else { + assert lastBackoff == null; + } + } + + public int attemptsSoFarWithBackoff(Backoff backoff) { + requireNonNull(backoff, "backoff"); + if (lastBackoff != backoff) { + return 0; + } else { + return numberAttemptsSoFarForLastBackoff; + } + } + + public boolean hasReachedMaxAttempts() { + return numberAttemptsSoFar >= maxAttempts; + } + + @Override + public String toString() { + return MoreObjects + .toStringHelper(this) + .add("maxAttempts", maxAttempts) + .add("numberAttemptsSoFar", numberAttemptsSoFar) + .add("lastBackoff", lastBackoff) + .add("numberAttemptsSoFarForLastBackoff", numberAttemptsSoFarForLastBackoff) + .toString(); + } + + public int numberAttemptsSoFar() { + return numberAttemptsSoFar; + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index d914bfece68..a37a45b2e07 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -23,7 +23,6 @@ import java.time.Duration; import java.util.Date; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; import java.util.function.Function; import org.slf4j.Logger; @@ -53,6 +52,7 @@ import com.linecorp.armeria.internal.client.AggregatedHttpRequestDuplicator; import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil; import com.linecorp.armeria.internal.client.ClientRequestContextExtension; +import com.linecorp.armeria.internal.client.ClientUtil; import com.linecorp.armeria.internal.client.TruncatingHttpResponse; import io.netty.handler.codec.DateFormatter; @@ -241,12 +241,18 @@ public static Function newDecorator(RetryRul } @Override - protected HttpResponse doExecute(ClientRequestContext ctx, HttpRequest req) throws Exception { + protected HttpResponse doExecute(ClientRequestContext ctx, HttpRequest req, + RetryConfig config) + throws Exception { final CompletableFuture responseFuture = new CompletableFuture<>(); final HttpResponse res = HttpResponse.of(responseFuture, ctx.eventLoop()); if (ctx.exchangeType().isRequestStreaming()) { final HttpRequestDuplicator reqDuplicator = req.toDuplicator(ctx.eventLoop().withoutContext(), 0); - doExecute0(ctx, reqDuplicator, req, res, responseFuture); + final HttpRetryContext rctx = new HttpRetryContext( + ctx, req, reqDuplicator, res, responseFuture, + config, ctx.responseTimeoutMillis()); + rctx.counter().consumeAttemptFrom(null); + doExecute0(rctx); } else { req.aggregate(AggregationOptions.usePooledObjects(ctx.alloc(), ctx.eventLoop())) .handle((agg, cause) -> { @@ -254,7 +260,12 @@ protected HttpResponse doExecute(ClientRequestContext ctx, HttpRequest req) thro handleException(ctx, null, responseFuture, cause, true); } else { final HttpRequestDuplicator reqDuplicator = new AggregatedHttpRequestDuplicator(agg); - doExecute0(ctx, reqDuplicator, req, res, responseFuture); + final HttpRetryContext rctx = + new HttpRetryContext( + ctx, req, reqDuplicator, res, responseFuture, + config, ctx.responseTimeoutMillis()); + rctx.counter().consumeAttemptFrom(null); + doExecute0(rctx); } return null; }); @@ -262,234 +273,231 @@ protected HttpResponse doExecute(ClientRequestContext ctx, HttpRequest req) thro return res; } - private void doExecute0(ClientRequestContext ctx, HttpRequestDuplicator rootReqDuplicator, - HttpRequest originalReq, HttpResponse returnedRes, - CompletableFuture future) { - final int totalAttempts = getTotalAttempts(ctx); - final boolean initialAttempt = totalAttempts <= 1; - // The request or response has been aborted by the client before it receives a response, + private void doExecute0(HttpRetryContext rctx) { + // We already claimed the attempt so this attempt is included in rctx.counter().numberAttemptsSoFar(). + final boolean isInitialAttempt = rctx.counter().numberAttemptsSoFar() <= 1; + // The request or attemptRes has been aborted by the client before it receives a attemptRes, // so stop retrying. - if (originalReq.whenComplete().isCompletedExceptionally()) { - originalReq.whenComplete().handle((unused, cause) -> { - handleException(ctx, rootReqDuplicator, future, cause, initialAttempt); + if (rctx.req().whenComplete().isCompletedExceptionally()) { + rctx.req().whenComplete().handle((unused, cause) -> { + handleException(rctx, cause, isInitialAttempt); return null; }); return; } - if (returnedRes.isComplete()) { - returnedRes.whenComplete().handle((result, cause) -> { + if (rctx.res().isComplete()) { + rctx.res().whenComplete().handle((result, cause) -> { final Throwable abortCause; if (cause != null) { abortCause = cause; } else { abortCause = AbortedStreamException.get(); } - handleException(ctx, rootReqDuplicator, future, abortCause, initialAttempt); + handleException(rctx, abortCause, isInitialAttempt); return null; }); return; } - if (!setResponseTimeout(ctx)) { - handleException(ctx, rootReqDuplicator, future, ResponseTimeoutException.get(), initialAttempt); + if (!rctx.setResponseTimeout()) { + handleException(rctx, ResponseTimeoutException.get(), isInitialAttempt); return; } + final RetryAttempt attempt = executeAttempt(rctx); + + if (attempt == null) { + return; + } + + if (!rctx.ctx().exchangeType().isResponseStreaming() || rctx.config().requiresResponseTrailers()) { + attempt.res().aggregate().handle((aggregated, cause) -> { + if (cause != null) { + attempt.ctx().logBuilder().endRequest(cause); + attempt.ctx().logBuilder().endResponse(cause); + handleResponseWithoutContent(rctx, + attempt.setRes(HttpResponse.ofFailure(cause)), cause); + } else { + completeLogIfBytesNotTransferred(attempt.ctx(), aggregated); + final RetryAttempt aggregatedAttempt = attempt.setRes( + aggregated.toHttpResponse()); + attempt.ctx().log().whenAvailable(RequestLogProperty.RESPONSE_END_TIME).thenRun(() -> { + handleAggregatedResponse(rctx, aggregatedAttempt); + }); + } + return null; + }); + } else { + handleStreamingResponse(rctx, attempt); + } + } + + @Nullable + private RetryAttempt executeAttempt(HttpRetryContext rctx) { + final boolean isInitialAttempt = rctx.counter().numberAttemptsSoFar() <= 1; final HttpRequest duplicateReq; - if (initialAttempt) { - duplicateReq = rootReqDuplicator.duplicate(); + if (isInitialAttempt) { + duplicateReq = rctx.requestDuplicator().duplicate(); } else { - final RequestHeadersBuilder newHeaders = originalReq.headers().toBuilder(); - newHeaders.setInt(ARMERIA_RETRY_COUNT, totalAttempts - 1); - duplicateReq = rootReqDuplicator.duplicate(newHeaders.build()); + final RequestHeadersBuilder newHeaders = rctx.req().headers().toBuilder(); + newHeaders.setInt(ClientUtil.ARMERIA_RETRY_COUNT, rctx.counter().numberAttemptsSoFar() - 1); + duplicateReq = rctx.requestDuplicator().duplicate(newHeaders.build()); } - final ClientRequestContext derivedCtx; + final ClientRequestContext attemptCtx; try { - derivedCtx = newDerivedContext(ctx, duplicateReq, ctx.rpcRequest(), initialAttempt); - } catch (Throwable t) { - handleException(ctx, rootReqDuplicator, future, t, initialAttempt); - return; + attemptCtx = newDerivedContext(rctx.ctx(), duplicateReq, rctx.ctx().rpcRequest(), isInitialAttempt); + } catch (Throwable cause) { + handleException(rctx, cause, isInitialAttempt); + return null; } - final HttpRequest ctxReq = derivedCtx.request(); - assert ctxReq != null; - final HttpResponse response; - final ClientRequestContextExtension ctxExtension = derivedCtx.as(ClientRequestContextExtension.class); - if (!initialAttempt && ctxExtension != null && derivedCtx.endpoint() == null) { + final HttpRequest attemptReq = attemptCtx.request(); + assert attemptReq != null; + final HttpResponse attemptRes; + final ClientRequestContextExtension ctxExtension = attemptCtx.as(ClientRequestContextExtension.class); + if (!isInitialAttempt && ctxExtension != null && attemptCtx.endpoint() == null) { // clear the pending throwable to retry endpoint selection - ClientPendingThrowableUtil.removePendingThrowable(derivedCtx); + ClientPendingThrowableUtil.removePendingThrowable(attemptCtx); // if the endpoint hasn't been selected, try to initialize the ctx with a new endpoint/event loop - response = initContextAndExecuteWithFallback( + attemptRes = initContextAndExecuteWithFallback( unwrap(), ctxExtension, HttpResponse::of, - (context, cause) -> HttpResponse.ofFailure(cause), ctxReq, false); + (context, cause) -> HttpResponse.ofFailure(cause), attemptReq, false); } else { - response = executeWithFallback(unwrap(), derivedCtx, - (context, cause) -> HttpResponse.ofFailure(cause), ctxReq, false); + attemptRes = executeWithFallback(unwrap(), attemptCtx, + (context, cause) -> HttpResponse.ofFailure(cause), attemptReq, + false); } - final RetryConfig config = mappedRetryConfig(ctx); - if (!ctx.exchangeType().isResponseStreaming() || config.requiresResponseTrailers()) { - response.aggregate().handle((aggregated, cause) -> { - if (cause != null) { - derivedCtx.logBuilder().endRequest(cause); - derivedCtx.logBuilder().endResponse(cause); - handleResponseWithoutContent(config, ctx, rootReqDuplicator, originalReq, returnedRes, - future, derivedCtx, HttpResponse.ofFailure(cause), cause); - } else { - completeLogIfBytesNotTransferred(aggregated, derivedCtx); - derivedCtx.log().whenAvailable(RequestLogProperty.RESPONSE_END_TIME).thenRun(() -> { - handleAggregatedResponse(config, ctx, rootReqDuplicator, originalReq, returnedRes, - future, derivedCtx, aggregated); - }); - } - return null; - }); - } else { - handleStreamingResponse(config, ctx, rootReqDuplicator, originalReq, returnedRes, - future, derivedCtx, response); - } + return new RetryAttempt<>(attemptCtx, attemptRes); } - // TODO(ikhoon): Add a request-scope class such as RetryRequestContext to avoid passing too many parameters. - private void handleResponseWithoutContent(RetryConfig config, ClientRequestContext ctx, - HttpRequestDuplicator rootReqDuplicator, HttpRequest originalReq, - HttpResponse returnedRes, CompletableFuture future, - ClientRequestContext derivedCtx, HttpResponse response, + private void handleResponseWithoutContent(HttpRetryContext rctx, + RetryAttempt attempt, @Nullable Throwable responseCause) { if (responseCause != null) { responseCause = Exceptions.peel(responseCause); } try { - final RetryRule retryRule = retryRule(config); - final CompletionStage f = retryRule.shouldRetry(derivedCtx, responseCause); - f.handle((decision, shouldRetryCause) -> { - warnIfExceptionIsRaised(retryRule, shouldRetryCause); - handleRetryDecision(decision, ctx, derivedCtx, rootReqDuplicator, - originalReq, returnedRes, future, response); - return null; - }); + final RetryRule retryRule = retryRule(rctx.config()); + retryRule.shouldRetry(attempt.ctx(), responseCause) + .handle((decision, shouldRetryCause) -> { + warnIfExceptionIsRaised(retryRule, shouldRetryCause); + handleRetryDecision(decision, rctx, attempt); + return null; + }); } catch (Throwable cause) { - response.abort(); - handleException(ctx, rootReqDuplicator, future, cause, false); + // TODO: abortAttempt(attempt); + attempt.res().abort(); + handleException(rctx, cause, false); } } - private void handleStreamingResponse(RetryConfig retryConfig, ClientRequestContext ctx, - HttpRequestDuplicator rootReqDuplicator, - HttpRequest originalReq, HttpResponse returnedRes, - CompletableFuture future, - ClientRequestContext derivedCtx, - HttpResponse response) { - final SplitHttpResponse splitResponse = response.split(); - splitResponse.headers().handle((headers, headersCause) -> { + private void handleStreamingResponse(HttpRetryContext rctx, + RetryAttempt attempt) { + final SplitHttpResponse splitAttemptRes = attempt.res().split(); + splitAttemptRes.headers().handle((headers, headersCause) -> { final Throwable responseCause; if (headersCause == null) { - final RequestLog log = derivedCtx.log().getIfAvailable(RequestLogProperty.RESPONSE_CAUSE); + final RequestLog log = attempt.ctx().log().getIfAvailable(RequestLogProperty.RESPONSE_CAUSE); responseCause = log != null ? log.responseCause() : null; } else { responseCause = Exceptions.peel(headersCause); } - completeLogIfBytesNotTransferred(response, headers, derivedCtx, responseCause); - - derivedCtx.log().whenAvailable(RequestLogProperty.RESPONSE_HEADERS).thenRun(() -> { - if (retryConfig.needsContentInRule() && responseCause == null) { - final HttpResponse response0 = splitResponse.unsplit(); - final HttpResponseDuplicator duplicator = - response0.toDuplicator(derivedCtx.eventLoop().withoutContext(), - derivedCtx.maxResponseLength()); + completeLogIfBytesNotTransferred(attempt.ctx(), headers, responseCause, attempt.res()); + + attempt.ctx().log().whenAvailable(RequestLogProperty.RESPONSE_HEADERS).thenRun(() -> { + if (rctx.config().needsContentInRule() && responseCause == null) { + final HttpResponse unsplitAttemptRes = splitAttemptRes.unsplit(); + final HttpResponseDuplicator attemptResDuplicator = + unsplitAttemptRes.toDuplicator(attempt.ctx().eventLoop().withoutContext(), + attempt.ctx().maxResponseLength()); try { - final TruncatingHttpResponse truncatingHttpResponse = - new TruncatingHttpResponse(duplicator.duplicate(), - retryConfig.maxContentLength()); - final HttpResponse duplicated = duplicator.duplicate(); - duplicator.close(); + final TruncatingHttpResponse truncatingAttemptRes = + new TruncatingHttpResponse(attemptResDuplicator.duplicate(), + rctx.config().maxContentLength()); + final RetryAttempt attemptWithResHeaders = attempt.setRes( + attemptResDuplicator.duplicate()); + attemptResDuplicator.close(); final RetryRuleWithContent ruleWithContent = - retryConfig.retryRuleWithContent(); + rctx.config().retryRuleWithContent(); assert ruleWithContent != null; - ruleWithContent.shouldRetry(derivedCtx, truncatingHttpResponse, null) + ruleWithContent.shouldRetry(attemptWithResHeaders.ctx(), truncatingAttemptRes, null) .handle((decision, cause) -> { warnIfExceptionIsRaised(ruleWithContent, cause); - truncatingHttpResponse.abort(); - handleRetryDecision(decision, ctx, derivedCtx, rootReqDuplicator, - originalReq, returnedRes, future, duplicated); + truncatingAttemptRes.abort(); + handleRetryDecision(decision, rctx, attemptWithResHeaders); return null; }); } catch (Throwable cause) { - duplicator.abort(cause); - handleException(ctx, rootReqDuplicator, future, cause, false); + attemptResDuplicator.abort(cause); + handleException(rctx, cause, false); } } else { - final HttpResponse response0; if (responseCause != null) { - splitResponse.body().abort(responseCause); - response0 = HttpResponse.ofFailure(responseCause); + splitAttemptRes.body().abort(responseCause); + handleResponseWithoutContent(rctx, + attempt.setRes(HttpResponse.ofFailure(responseCause)), + responseCause); } else { - response0 = splitResponse.unsplit(); + handleResponseWithoutContent(rctx, + attempt.setRes(splitAttemptRes.unsplit()), + responseCause); } - handleResponseWithoutContent(retryConfig, ctx, rootReqDuplicator, originalReq, returnedRes, - future, derivedCtx, response0, responseCause); } }); return null; }); } - private void handleAggregatedResponse(RetryConfig retryConfig, ClientRequestContext ctx, - HttpRequestDuplicator rootReqDuplicator, - HttpRequest originalReq, HttpResponse returnedRes, - CompletableFuture future, - ClientRequestContext derivedCtx, - AggregatedHttpResponse aggregatedRes) { - if (retryConfig.needsContentInRule()) { - final RetryRuleWithContent ruleWithContent = retryConfig.retryRuleWithContent(); + private void handleAggregatedResponse(HttpRetryContext rctx, + RetryAttempt attempt) { + if (rctx.config().needsContentInRule()) { + final RetryRuleWithContent ruleWithContent = rctx.config().retryRuleWithContent(); assert ruleWithContent != null; try { - ruleWithContent.shouldRetry(derivedCtx, aggregatedRes.toHttpResponse(), null) + ruleWithContent.shouldRetry(attempt.ctx(), attempt.res(), null) .handle((decision, cause) -> { warnIfExceptionIsRaised(ruleWithContent, cause); - handleRetryDecision( - decision, ctx, derivedCtx, rootReqDuplicator, originalReq, - returnedRes, future, aggregatedRes.toHttpResponse()); + handleRetryDecision(decision, rctx, attempt); return null; }); } catch (Throwable cause) { - handleException(ctx, rootReqDuplicator, future, cause, false); + handleException(rctx, cause, false); } return; } - handleResponseWithoutContent(retryConfig, ctx, rootReqDuplicator, originalReq, returnedRes, - future, derivedCtx, aggregatedRes.toHttpResponse(), null); + handleResponseWithoutContent(rctx, attempt, null); } - private static void completeLogIfBytesNotTransferred(AggregatedHttpResponse response, - ClientRequestContext ctx) { + private static void completeLogIfBytesNotTransferred( + ClientRequestContext ctx, AggregatedHttpResponse res) { if (!ctx.log().isAvailable(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME)) { final RequestLogBuilder logBuilder = ctx.logBuilder(); logBuilder.endRequest(); - logBuilder.responseHeaders(response.headers()); - if (!response.trailers().isEmpty()) { - logBuilder.responseTrailers(response.trailers()); + logBuilder.responseHeaders(res.headers()); + if (!res.trailers().isEmpty()) { + logBuilder.responseTrailers(res.trailers()); } logBuilder.endResponse(); } } private static void completeLogIfBytesNotTransferred( - HttpResponse response, @Nullable ResponseHeaders headers, ClientRequestContext ctx, - @Nullable Throwable responseCause) { + ClientRequestContext ctx, + @Nullable ResponseHeaders headers, @Nullable Throwable resCause, HttpResponse res) { if (!ctx.log().isAvailable(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME)) { final RequestLogBuilder logBuilder = ctx.logBuilder(); - if (responseCause != null) { - logBuilder.endRequest(responseCause); - logBuilder.endResponse(responseCause); + if (resCause != null) { + logBuilder.endRequest(resCause); + logBuilder.endResponse(resCause); } else { logBuilder.endRequest(); if (headers != null) { logBuilder.responseHeaders(headers); } - response.whenComplete().handle((unused, cause) -> { + res.whenComplete().handle((unused, cause) -> { if (cause != null) { logBuilder.endResponse(cause); } else { @@ -507,13 +515,19 @@ private static void warnIfExceptionIsRaised(Object retryRule, @Nullable Throwabl } } + private static void handleException(HttpRetryContext rctx, Throwable cause, + boolean endRequestLog) { + handleException(rctx.ctx(), rctx.requestDuplicator(), rctx.resFuture(), + cause, endRequestLog); + } + private static void handleException(ClientRequestContext ctx, - @Nullable HttpRequestDuplicator rootReqDuplicator, + @Nullable HttpRequestDuplicator reqDuplicator, CompletableFuture future, Throwable cause, boolean endRequestLog) { future.completeExceptionally(cause); - if (rootReqDuplicator != null) { - rootReqDuplicator.abort(cause); + if (reqDuplicator != null) { + reqDuplicator.abort(cause); } if (endRequestLog) { ctx.logBuilder().endRequest(cause); @@ -521,34 +535,33 @@ private static void handleException(ClientRequestContext ctx, ctx.logBuilder().endResponse(cause); } - private void handleRetryDecision(@Nullable RetryDecision decision, ClientRequestContext ctx, - ClientRequestContext derivedCtx, HttpRequestDuplicator rootReqDuplicator, - HttpRequest originalReq, HttpResponse returnedRes, - CompletableFuture future, HttpResponse originalRes) { + private void handleRetryDecision(@Nullable RetryDecision decision, + HttpRetryContext rctx, + RetryAttempt attempt) { final Backoff backoff = decision != null ? decision.backoff() : null; if (backoff != null) { - final long millisAfter = useRetryAfter ? getRetryAfterMillis(derivedCtx) : -1; - final long nextDelay = getNextDelay(ctx, backoff, millisAfter); + final long millisAfter = useRetryAfter ? getRetryAfterMillis(attempt.ctx()) : -1; + final long nextDelay = getNextDelay(rctx, backoff, millisAfter); if (nextDelay >= 0) { - abortResponse(originalRes, derivedCtx); + abortAttempt(attempt); scheduleNextRetry( - ctx, cause -> handleException(ctx, rootReqDuplicator, future, cause, false), - () -> doExecute0(ctx, rootReqDuplicator, originalReq, returnedRes, future), + rctx.ctx(), cause -> handleException(rctx, cause, false), + () -> doExecute0(rctx), nextDelay); return; } } - onRetryingComplete(ctx); - future.complete(originalRes); - rootReqDuplicator.close(); + onRetryingComplete(rctx.ctx()); + rctx.resFuture().complete(attempt.res()); + rctx.requestDuplicator().close(); } - private static void abortResponse(HttpResponse originalRes, ClientRequestContext derivedCtx) { + private static void abortAttempt(RetryAttempt attempt) { // Set response content with null to make sure that the log is complete. - final RequestLogBuilder logBuilder = derivedCtx.logBuilder(); + final RequestLogBuilder logBuilder = attempt.ctx().logBuilder(); logBuilder.responseContent(null, null); logBuilder.responseContentPreview(null); - originalRes.abort(); + attempt.res().abort(); } private static long getRetryAfterMillis(ClientRequestContext ctx) { @@ -591,4 +604,20 @@ private static RetryRule retryRule(RetryConfig retryConfig) { return rule; } } + + private static final class HttpRetryContext extends RetryContext { + private final HttpRequestDuplicator requestDuplicator; + + HttpRetryContext(ClientRequestContext ctx, HttpRequest req, + HttpRequestDuplicator requestDuplicator, + HttpResponse res, CompletableFuture resFuture, + RetryConfig config, long responseTimeoutMillis) { + super(ctx, req, res, resFuture, config, responseTimeoutMillis); + this.requestDuplicator = requestDuplicator; + } + + HttpRequestDuplicator requestDuplicator() { + return requestDuplicator; + } + } } diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java index 32146db4c68..f7558eace9e 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java @@ -32,6 +32,7 @@ import com.linecorp.armeria.common.RpcResponse; import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil; import com.linecorp.armeria.internal.client.ClientRequestContextExtension; +import com.linecorp.armeria.internal.client.ClientUtil; import com.linecorp.armeria.internal.common.util.StringUtil; /** @@ -141,99 +142,112 @@ public static RetryingRpcClientBuilder builder(RetryConfigMapping m } @Override - protected RpcResponse doExecute(ClientRequestContext ctx, RpcRequest req) throws Exception { - final CompletableFuture future = new CompletableFuture<>(); - final RpcResponse res = RpcResponse.from(future); - doExecute0(ctx, req, res, future); + protected RpcResponse doExecute(ClientRequestContext ctx, RpcRequest req, RetryConfig config) + throws Exception { + final CompletableFuture resFuture = new CompletableFuture<>(); + final RpcResponse res = RpcResponse.from(resFuture); + final RetryContext rctx = + new RetryContext<>(ctx, req, res, resFuture, config, ctx.responseTimeoutMillis()); + rctx.counter().consumeAttemptFrom(null); + doExecute0(rctx); return res; } - private void doExecute0(ClientRequestContext ctx, RpcRequest req, - RpcResponse returnedRes, CompletableFuture future) { - final int totalAttempts = getTotalAttempts(ctx); + private void doExecute0(RetryContext rctx) { + final int totalAttempts = rctx.counter().numberAttemptsSoFar(); final boolean initialAttempt = totalAttempts <= 1; - if (returnedRes.isDone()) { + if (rctx.res().isDone()) { // The response has been cancelled by the client before it receives a response, so stop retrying. - handleException(ctx, future, new CancellationException( + handleException(rctx, new CancellationException( "the response returned to the client has been cancelled"), initialAttempt); return; } - if (!setResponseTimeout(ctx)) { - handleException(ctx, future, ResponseTimeoutException.get(), initialAttempt); + if (!rctx.setResponseTimeout()) { + handleException(rctx, ResponseTimeoutException.get(), initialAttempt); return; } - final ClientRequestContext derivedCtx = newDerivedContext(ctx, null, req, initialAttempt); + final RetryAttempt attempt = executeAttempt(rctx); - if (!initialAttempt) { - derivedCtx.mutateAdditionalRequestHeaders( - mutator -> mutator.add(ARMERIA_RETRY_COUNT, StringUtil.toString(totalAttempts - 1))); - } - - final RpcResponse res; - - final ClientRequestContextExtension ctxExtension = derivedCtx.as(ClientRequestContextExtension.class); - final EndpointGroup endpointGroup = derivedCtx.endpointGroup(); - if (!initialAttempt && ctxExtension != null && - endpointGroup != null && derivedCtx.endpoint() == null) { - // clear the pending throwable to retry endpoint selection - ClientPendingThrowableUtil.removePendingThrowable(derivedCtx); - // if the endpoint hasn't been selected, try to initialize the ctx with a new endpoint/event loop - res = initContextAndExecuteWithFallback(unwrap(), ctxExtension, RpcResponse::from, - (context, cause) -> RpcResponse.ofFailure(cause), - req, true); - } else { - res = executeWithFallback(unwrap(), derivedCtx, - (context, cause) -> RpcResponse.ofFailure(cause), - req, true); - } - - final RetryConfig retryConfig = mappedRetryConfig(ctx); final RetryRuleWithContent retryRule = - retryConfig.needsContentInRule() ? - retryConfig.retryRuleWithContent() : retryConfig.fromRetryRule(); - res.handle((unused1, cause) -> { + rctx.config().needsContentInRule() ? + rctx.config().retryRuleWithContent() : rctx.config().fromRetryRule(); + attempt.res().handle((unused1, cause) -> { try { assert retryRule != null; - retryRule.shouldRetry(derivedCtx, res, cause).handle((decision, unused3) -> { + retryRule.shouldRetry(attempt.ctx(), attempt.res(), cause).handle((decision, unused3) -> { final Backoff backoff = decision != null ? decision.backoff() : null; if (backoff != null) { - final long nextDelay = getNextDelay(derivedCtx, backoff); + final long nextDelay = getNextDelay(rctx, backoff, -1); if (nextDelay < 0) { - onRetryComplete(ctx, derivedCtx, res, future); + onRetryComplete(rctx, attempt); return null; } - scheduleNextRetry(ctx, cause0 -> handleException(ctx, future, cause0, false), - () -> doExecute0(ctx, req, returnedRes, future), nextDelay); + scheduleNextRetry(rctx.ctx(), cause0 -> handleException(rctx, cause0, false), + () -> doExecute0(rctx), nextDelay); } else { - onRetryComplete(ctx, derivedCtx, res, future); + onRetryComplete(rctx, attempt); } return null; }); } catch (Throwable t) { - handleException(ctx, future, t, false); + handleException(rctx, t, false); } return null; }); } - private static void onRetryComplete(ClientRequestContext ctx, ClientRequestContext derivedCtx, - RpcResponse res, CompletableFuture future) { - onRetryingComplete(ctx); - final HttpRequest actualHttpReq = derivedCtx.request(); + private RetryAttempt executeAttempt(RetryContext rctx) { + final int totalAttempts = rctx.counter().numberAttemptsSoFar(); + final boolean initialAttempt = totalAttempts <= 1; + final ClientRequestContext attemptCtx = newDerivedContext(rctx.ctx(), null, rctx.req(), initialAttempt); + + if (!initialAttempt) { + attemptCtx.mutateAdditionalRequestHeaders( + mutator -> mutator.add(ClientUtil.ARMERIA_RETRY_COUNT, + StringUtil.toString(totalAttempts - 1))); + } + + final RpcResponse attemptRes; + final ClientRequestContextExtension attemptCtxExtension = attemptCtx.as( + ClientRequestContextExtension.class); + final EndpointGroup endpointGroup = attemptCtx.endpointGroup(); + if (!initialAttempt && attemptCtxExtension != null && + endpointGroup != null && attemptCtx.endpoint() == null) { + // clear the pending throwable to retry endpoint selection + ClientPendingThrowableUtil.removePendingThrowable(attemptCtx); + // if the endpoint hasn't been selected, try to initialize the ctx with a new endpoint/event loop + attemptRes = initContextAndExecuteWithFallback(unwrap(), attemptCtxExtension, RpcResponse::from, + (context, cause) -> + RpcResponse.ofFailure(cause), + rctx.req(), true); + } else { + attemptRes = executeWithFallback(unwrap(), attemptCtx, + (context, cause) -> + RpcResponse.ofFailure(cause), + rctx.req(), true); + } + + return new RetryAttempt<>(attemptCtx, attemptRes); + } + + private static void onRetryComplete(RetryContext rctx, + RetryAttempt attempt) { + onRetryingComplete(rctx.ctx()); + final HttpRequest actualHttpReq = attempt.ctx().request(); if (actualHttpReq != null) { - ctx.updateRequest(actualHttpReq); + rctx.ctx().updateRequest(actualHttpReq); } - future.complete(res); + rctx.resFuture().complete(attempt.res()); } - private static void handleException(ClientRequestContext ctx, CompletableFuture future, + private static void handleException(RetryContext rctx, Throwable cause, boolean endRequestLog) { - future.completeExceptionally(cause); + rctx.resFuture().completeExceptionally(cause); if (endRequestLog) { - ctx.logBuilder().endRequest(cause); + rctx.ctx().logBuilder().endRequest(cause); } - ctx.logBuilder().endResponse(cause); + rctx.ctx().logBuilder().endResponse(cause); } } diff --git a/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java b/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java index a4f65d598fc..15e10f59863 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java +++ b/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java @@ -30,6 +30,7 @@ import com.linecorp.armeria.client.UnprocessedRequestException; import com.linecorp.armeria.client.WebClient; import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.Request; import com.linecorp.armeria.common.RequestId; @@ -45,6 +46,8 @@ import com.linecorp.armeria.common.util.Exceptions; import com.linecorp.armeria.common.util.SafeCloseable; +import io.netty.util.AsciiString; + public final class ClientUtil { /** @@ -53,6 +56,12 @@ public final class ClientUtil { public static final URI UNDEFINED_URI = URI.create("http://" + ClientBuilderParamsUtil.UNDEFINED_URI_AUTHORITY); + /** + * The header which indicates the retry count of a {@link Request}. + * The server might use this value to reject excessive retries, etc. + */ + public static final AsciiString ARMERIA_RETRY_COUNT = HttpHeaderNames.of("armeria-retry-count"); + public static > O initContextAndExecuteWithFallback( U delegate, diff --git a/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientLoadBalancingTest.java b/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientLoadBalancingTest.java index a2b52679da6..b30d19582ef 100644 --- a/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientLoadBalancingTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientLoadBalancingTest.java @@ -105,7 +105,7 @@ void test(TestMode mode) { } // Retry only once on failure. - if (!HttpStatus.OK.equals(status) && AbstractRetryingClient.getTotalAttempts(ctx) <= 1) { + if (!HttpStatus.OK.equals(status) && ctx.log().partial().currentAttempt() <= 1) { return UnmodifiableFuture.completedFuture(RetryDecision.retry(Backoff.withoutDelay())); } else { return UnmodifiableFuture.completedFuture(RetryDecision.noRetry()); @@ -127,9 +127,9 @@ void test(TestMode mode) { case FAILURE: final List expectedPortsWhenRetried = ImmutableList.builder() - .addAll(expectedPorts) - .addAll(expectedPorts) - .build(); + .addAll(expectedPorts) + .addAll(expectedPorts) + .build(); assertThat(accessedPorts).isEqualTo(expectedPortsWhenRetried); break; } diff --git a/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientTest.java b/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientTest.java index c68446ad7e3..f00e5aa175b 100644 --- a/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientTest.java @@ -16,8 +16,8 @@ package com.linecorp.armeria.client.retry; -import static com.linecorp.armeria.client.retry.AbstractRetryingClient.ARMERIA_RETRY_COUNT; import static com.linecorp.armeria.common.util.Exceptions.peel; +import static com.linecorp.armeria.internal.client.ClientUtil.ARMERIA_RETRY_COUNT; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.catchThrowable; @@ -526,15 +526,15 @@ void honorRetryMapping() { void evaluatesMappingOnce() { final AtomicInteger evaluations = new AtomicInteger(0); final RetryConfigMapping mapping = - (ctx, req) -> { - evaluations.incrementAndGet(); - return RetryConfig - .builder0(RetryRule.builder() - .onStatus(HttpStatus.valueOf(500)) - .thenBackoff()) - .maxTotalAttempts(2) - .build(); - }; + (ctx, req) -> { + evaluations.incrementAndGet(); + return RetryConfig + .builder0(RetryRule.builder() + .onStatus(HttpStatus.valueOf(500)) + .thenBackoff()) + .maxTotalAttempts(2) + .build(); + }; final WebClient client = client(mapping); diff --git a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/client/retry/RetryingRpcClientTest.java b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/client/retry/RetryingRpcClientTest.java index 99ec07c9752..376906c5267 100644 --- a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/client/retry/RetryingRpcClientTest.java +++ b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/client/retry/RetryingRpcClientTest.java @@ -15,7 +15,7 @@ */ package com.linecorp.armeria.it.client.retry; -import static com.linecorp.armeria.client.retry.AbstractRetryingClient.ARMERIA_RETRY_COUNT; +import static com.linecorp.armeria.internal.client.ClientUtil.ARMERIA_RETRY_COUNT; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.catchThrowable; From d2d03d23fefe601d91123aa5e59efc8a9e3bc4bf Mon Sep 17 00:00:00 2001 From: "szymon.habrainski" Date: Wed, 24 Sep 2025 20:36:44 +0200 Subject: [PATCH 03/14] fix: GrpcWebRetryTest --- .../armeria/client/retry/RetryingClient.java | 85 +++++++++---------- 1 file changed, 42 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index a37a45b2e07..e835c03d6b5 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -310,28 +310,48 @@ private void doExecute0(HttpRetryContext rctx) { return; } - if (!rctx.ctx().exchangeType().isResponseStreaming() || rctx.config().requiresResponseTrailers()) { - attempt.res().aggregate().handle((aggregated, cause) -> { - if (cause != null) { - attempt.ctx().logBuilder().endRequest(cause); - attempt.ctx().logBuilder().endResponse(cause); - handleResponseWithoutContent(rctx, - attempt.setRes(HttpResponse.ofFailure(cause)), cause); - } else { - completeLogIfBytesNotTransferred(attempt.ctx(), aggregated); - final RetryAttempt aggregatedAttempt = attempt.setRes( - aggregated.toHttpResponse()); - attempt.ctx().log().whenAvailable(RequestLogProperty.RESPONSE_END_TIME).thenRun(() -> { - handleAggregatedResponse(rctx, aggregatedAttempt); - }); - } - return null; - }); + if (rctx.ctx().exchangeType().isResponseStreaming() && !rctx.config().requiresResponseTrailers()) { + handleStreamingAttemptResponse(rctx, attempt); } else { - handleStreamingResponse(rctx, attempt); + handleAggregatedAttemptResponse(rctx, attempt); } } + private void handleAggregatedAttemptResponse(HttpRetryContext rctx, RetryAttempt attempt) { + attempt.res().aggregate().handle((aggAttemptRes, cause) -> { + if (cause != null) { + attempt.ctx().logBuilder().endRequest(cause); + attempt.ctx().logBuilder().endResponse(cause); + handleResponseWithoutContent(rctx, + attempt.setRes(HttpResponse.ofFailure(cause)), cause); + } else { + completeLogIfBytesNotTransferred(attempt.ctx(), aggAttemptRes); + attempt.ctx().log().whenAvailable(RequestLogProperty.RESPONSE_END_TIME).thenRun(() -> { + if (rctx.config().needsContentInRule()) { + final RetryRuleWithContent ruleWithContent = + rctx.config().retryRuleWithContent(); + assert ruleWithContent != null; + try { + ruleWithContent.shouldRetry(attempt.ctx(), aggAttemptRes.toHttpResponse(), null) + .handle((decision, cause3) -> { + warnIfExceptionIsRaised(ruleWithContent, cause3); + handleRetryDecision(decision, rctx, attempt.setRes( + aggAttemptRes.toHttpResponse())); + return null; + }); + } catch (Throwable cause2) { + handleException(rctx, cause2, false); + } + return; + } + handleResponseWithoutContent(rctx, attempt.setRes( + aggAttemptRes.toHttpResponse()), null); + }); + } + return null; + }); + } + @Nullable private RetryAttempt executeAttempt(HttpRetryContext rctx) { final boolean isInitialAttempt = rctx.counter().numberAttemptsSoFar() <= 1; @@ -387,14 +407,13 @@ private void handleResponseWithoutContent(HttpRetryContext rctx, return null; }); } catch (Throwable cause) { - // TODO: abortAttempt(attempt); - attempt.res().abort(); + abortAttempt(attempt); handleException(rctx, cause, false); } } - private void handleStreamingResponse(HttpRetryContext rctx, - RetryAttempt attempt) { + private void handleStreamingAttemptResponse(HttpRetryContext rctx, + RetryAttempt attempt) { final SplitHttpResponse splitAttemptRes = attempt.res().split(); splitAttemptRes.headers().handle((headers, headersCause) -> { final Throwable responseCause; @@ -443,7 +462,7 @@ private void handleStreamingResponse(HttpRetryContext rctx, } else { handleResponseWithoutContent(rctx, attempt.setRes(splitAttemptRes.unsplit()), - responseCause); + null); } } }); @@ -451,26 +470,6 @@ private void handleStreamingResponse(HttpRetryContext rctx, }); } - private void handleAggregatedResponse(HttpRetryContext rctx, - RetryAttempt attempt) { - if (rctx.config().needsContentInRule()) { - final RetryRuleWithContent ruleWithContent = rctx.config().retryRuleWithContent(); - assert ruleWithContent != null; - try { - ruleWithContent.shouldRetry(attempt.ctx(), attempt.res(), null) - .handle((decision, cause) -> { - warnIfExceptionIsRaised(ruleWithContent, cause); - handleRetryDecision(decision, rctx, attempt); - return null; - }); - } catch (Throwable cause) { - handleException(rctx, cause, false); - } - return; - } - handleResponseWithoutContent(rctx, attempt, null); - } - private static void completeLogIfBytesNotTransferred( ClientRequestContext ctx, AggregatedHttpResponse res) { if (!ctx.log().isAvailable(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME)) { From 809cee2c6307bbab73d111318c61e2e0f9de9601 Mon Sep 17 00:00:00 2001 From: "szymon.habrainski" Date: Fri, 26 Sep 2025 09:03:35 +0200 Subject: [PATCH 04/14] refactor: move out rctx construction --- .../armeria/client/retry/RetryingClient.java | 118 ++++++++++-------- 1 file changed, 67 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index e835c03d6b5..fd19a74c756 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -49,6 +49,7 @@ import com.linecorp.armeria.common.logging.RequestLogProperty; import com.linecorp.armeria.common.stream.AbortedStreamException; import com.linecorp.armeria.common.util.Exceptions; +import com.linecorp.armeria.common.util.UnmodifiableFuture; import com.linecorp.armeria.internal.client.AggregatedHttpRequestDuplicator; import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil; import com.linecorp.armeria.internal.client.ClientRequestContextExtension; @@ -244,63 +245,52 @@ public static Function newDecorator(RetryRul protected HttpResponse doExecute(ClientRequestContext ctx, HttpRequest req, RetryConfig config) throws Exception { - final CompletableFuture responseFuture = new CompletableFuture<>(); - final HttpResponse res = HttpResponse.of(responseFuture, ctx.eventLoop()); - if (ctx.exchangeType().isRequestStreaming()) { - final HttpRequestDuplicator reqDuplicator = req.toDuplicator(ctx.eventLoop().withoutContext(), 0); - final HttpRetryContext rctx = new HttpRetryContext( - ctx, req, reqDuplicator, res, responseFuture, - config, ctx.responseTimeoutMillis()); - rctx.counter().consumeAttemptFrom(null); - doExecute0(rctx); - } else { - req.aggregate(AggregationOptions.usePooledObjects(ctx.alloc(), ctx.eventLoop())) - .handle((agg, cause) -> { - if (cause != null) { - handleException(ctx, null, responseFuture, cause, true); - } else { - final HttpRequestDuplicator reqDuplicator = new AggregatedHttpRequestDuplicator(agg); - final HttpRetryContext rctx = - new HttpRetryContext( - ctx, req, reqDuplicator, res, responseFuture, - config, ctx.responseTimeoutMillis()); - rctx.counter().consumeAttemptFrom(null); - doExecute0(rctx); - } - return null; - }); - } + final CompletableFuture resFuture = new CompletableFuture<>(); + final HttpResponse res = HttpResponse.of(resFuture, ctx.eventLoop()); + + retryContext(ctx, req, res, resFuture, config) + .handle((rctx, cause) -> { + if (cause != null) { + handleException(ctx, null, resFuture, cause, true); + return null; + } + + // The request or attemptRes has been aborted by the client before it receives a attemptRes, + // so stop retrying. + rctx.req().whenComplete().handle((unused, reqCause) -> { + if (reqCause != null) { + handleException(rctx, reqCause, true); + } + return null; + }); + + rctx.res().whenComplete().handle((result, resCause) -> { + final Throwable abortCause; + if (resCause != null) { + abortCause = resCause; + } else { + abortCause = AbortedStreamException.get(); + } + handleException(rctx, abortCause, true); + return null; + }); + + rctx.counter().consumeAttemptFrom(null); + retry(rctx); + + return null; + }); + return res; } - private void doExecute0(HttpRetryContext rctx) { - // We already claimed the attempt so this attempt is included in rctx.counter().numberAttemptsSoFar(). - final boolean isInitialAttempt = rctx.counter().numberAttemptsSoFar() <= 1; - // The request or attemptRes has been aborted by the client before it receives a attemptRes, - // so stop retrying. - if (rctx.req().whenComplete().isCompletedExceptionally()) { - rctx.req().whenComplete().handle((unused, cause) -> { - handleException(rctx, cause, isInitialAttempt); - return null; - }); - return; - } - if (rctx.res().isComplete()) { - rctx.res().whenComplete().handle((result, cause) -> { - final Throwable abortCause; - if (cause != null) { - abortCause = cause; - } else { - abortCause = AbortedStreamException.get(); - } - handleException(rctx, abortCause, isInitialAttempt); - return null; - }); + private void retry(HttpRetryContext rctx) { + if (isRetryCompleted(rctx)) { return; } if (!rctx.setResponseTimeout()) { - handleException(rctx, ResponseTimeoutException.get(), isInitialAttempt); + handleException(rctx, ResponseTimeoutException.get(), rctx.counter().numberAttemptsSoFar() <= 1); return; } @@ -470,6 +460,32 @@ private void handleStreamingAttemptResponse(HttpRetryContext rctx, }); } + private CompletableFuture retryContext(ClientRequestContext ctx, HttpRequest req, + HttpResponse res, + CompletableFuture resFuture, + RetryConfig config) { + + if (ctx.exchangeType().isRequestStreaming()) { + return UnmodifiableFuture.completedFuture(new HttpRetryContext( + ctx, req, req.toDuplicator(ctx.eventLoop().withoutContext(), 0), res, resFuture, config, + ctx.responseTimeoutMillis() + )); + } else { + return req.aggregate(AggregationOptions.usePooledObjects(ctx.alloc(), ctx.eventLoop())) + .thenApply(AggregatedHttpRequestDuplicator::new) + .thenApply(reqDuplicator -> new HttpRetryContext( + ctx, req, reqDuplicator, res, resFuture, + config, ctx.responseTimeoutMillis() + )); + } + } + + private boolean isRetryCompleted(HttpRetryContext rctx) { + return rctx.ctx().isCancelled() || rctx.req().whenComplete().isCompletedExceptionally() || rctx.res() + .whenComplete() + .isDone(); + } + private static void completeLogIfBytesNotTransferred( ClientRequestContext ctx, AggregatedHttpResponse res) { if (!ctx.log().isAvailable(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME)) { @@ -545,7 +561,7 @@ private void handleRetryDecision(@Nullable RetryDecision decision, abortAttempt(attempt); scheduleNextRetry( rctx.ctx(), cause -> handleException(rctx, cause, false), - () -> doExecute0(rctx), + () -> retry(rctx), nextDelay); return; } From 65d646ae8fc4a113404df5bbd935dd2b83ec87fd Mon Sep 17 00:00:00 2001 From: "szymon.habrainski" Date: Fri, 26 Sep 2025 09:08:04 +0200 Subject: [PATCH 05/14] refactor: make req and res completion handlers more compact --- .../armeria/client/retry/RetryingClient.java | 34 ++++++++----------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index fd19a74c756..1846f70eef6 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -257,23 +257,18 @@ protected HttpResponse doExecute(ClientRequestContext ctx, HttpRequest req, // The request or attemptRes has been aborted by the client before it receives a attemptRes, // so stop retrying. - rctx.req().whenComplete().handle((unused, reqCause) -> { - if (reqCause != null) { + rctx.req().whenComplete() + .exceptionally(reqCause -> { handleException(rctx, reqCause, true); - } - return null; - }); - - rctx.res().whenComplete().handle((result, resCause) -> { - final Throwable abortCause; - if (resCause != null) { - abortCause = resCause; - } else { - abortCause = AbortedStreamException.get(); - } - handleException(rctx, abortCause, true); - return null; - }); + return null; + }); + + rctx.res().whenComplete() + .handle((result, resCause) -> { + handleException(rctx, resCause == null ? AbortedStreamException.get() : resCause, + true); + return null; + }); rctx.counter().consumeAttemptFrom(null); retry(rctx); @@ -481,9 +476,10 @@ private CompletableFuture retryContext(ClientRequestContext ct } private boolean isRetryCompleted(HttpRetryContext rctx) { - return rctx.ctx().isCancelled() || rctx.req().whenComplete().isCompletedExceptionally() || rctx.res() - .whenComplete() - .isDone(); + return rctx.ctx().isCancelled() || rctx.req().whenComplete().isCompletedExceptionally() || + rctx.res() + .whenComplete() + .isDone(); } private static void completeLogIfBytesNotTransferred( From cd443c050ee7bed8a253d01bb0dee40d6ee77608 Mon Sep 17 00:00:00 2001 From: "szymon.habrainski" Date: Sun, 28 Sep 2025 17:25:04 +0200 Subject: [PATCH 06/14] refactor: centralize retry decision-making --- .../armeria/client/retry/RetryingClient.java | 224 +++++++++--------- 1 file changed, 111 insertions(+), 113 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index 1846f70eef6..9a2bc84d950 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -23,6 +23,7 @@ import java.time.Duration; import java.util.Date; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Function; import org.slf4j.Logger; @@ -251,7 +252,9 @@ protected HttpResponse doExecute(ClientRequestContext ctx, HttpRequest req, retryContext(ctx, req, res, resFuture, config) .handle((rctx, cause) -> { if (cause != null) { - handleException(ctx, null, resFuture, cause, true); + resFuture.completeExceptionally(cause); + ctx.logBuilder().endRequest(cause); + ctx.logBuilder().endResponse(cause); return null; } @@ -259,14 +262,13 @@ protected HttpResponse doExecute(ClientRequestContext ctx, HttpRequest req, // so stop retrying. rctx.req().whenComplete() .exceptionally(reqCause -> { - handleException(rctx, reqCause, true); + handleException(rctx, reqCause); return null; }); rctx.res().whenComplete() .handle((result, resCause) -> { - handleException(rctx, resCause == null ? AbortedStreamException.get() : resCause, - true); + handleException(rctx, resCause == null ? AbortedStreamException.get() : resCause); return null; }); @@ -285,13 +287,15 @@ private void retry(HttpRetryContext rctx) { } if (!rctx.setResponseTimeout()) { - handleException(rctx, ResponseTimeoutException.get(), rctx.counter().numberAttemptsSoFar() <= 1); + handleException(rctx, ResponseTimeoutException.get()); return; } - final RetryAttempt attempt = executeAttempt(rctx); - - if (attempt == null) { + final RetryAttempt attempt; + try { + attempt = executeAttempt(rctx); + } catch (Throwable cause) { + handleException(rctx, cause); return; } @@ -307,37 +311,94 @@ private void handleAggregatedAttemptResponse(HttpRetryContext rctx, RetryAttempt if (cause != null) { attempt.ctx().logBuilder().endRequest(cause); attempt.ctx().logBuilder().endResponse(cause); - handleResponseWithoutContent(rctx, - attempt.setRes(HttpResponse.ofFailure(cause)), cause); + decideAndHandleDecision(rctx, attempt.setRes(HttpResponse.ofFailure(cause)), + HttpResponse.ofFailure(cause), null); } else { completeLogIfBytesNotTransferred(attempt.ctx(), aggAttemptRes); attempt.ctx().log().whenAvailable(RequestLogProperty.RESPONSE_END_TIME).thenRun(() -> { - if (rctx.config().needsContentInRule()) { - final RetryRuleWithContent ruleWithContent = - rctx.config().retryRuleWithContent(); - assert ruleWithContent != null; - try { - ruleWithContent.shouldRetry(attempt.ctx(), aggAttemptRes.toHttpResponse(), null) - .handle((decision, cause3) -> { - warnIfExceptionIsRaised(ruleWithContent, cause3); - handleRetryDecision(decision, rctx, attempt.setRes( - aggAttemptRes.toHttpResponse())); - return null; - }); - } catch (Throwable cause2) { - handleException(rctx, cause2, false); - } - return; - } - handleResponseWithoutContent(rctx, attempt.setRes( - aggAttemptRes.toHttpResponse()), null); + decideAndHandleDecision(rctx, attempt.setRes(aggAttemptRes.toHttpResponse()), + aggAttemptRes.toHttpResponse(), null); }); } return null; }); } - @Nullable + private void decideAndHandleDecision(HttpRetryContext rctx, + RetryAttempt attempt, + @Nullable HttpResponse resToDecide, + @Nullable Throwable causeToDecide) { + decide(rctx, attempt, resToDecide, causeToDecide) + .handle((decision, decisionCause) -> { + if (resToDecide != null) { + // resToDecide.abort(); + } + + if (decisionCause != null) { + abortAttempt(attempt); + handleException(rctx, decisionCause); + } else { + handleDecision(rctx, attempt, decision); + } + return null; + }); + } + + private CompletionStage decide(HttpRetryContext rctx, + RetryAttempt attempt, + @Nullable HttpResponse resToDecide, + @Nullable Throwable causeToDecide) { + if (causeToDecide != null) { + causeToDecide = Exceptions.peel(causeToDecide); + } + + try { + if (rctx.config().needsContentInRule()) { + assert resToDecide != null ^ causeToDecide != null; + final RetryRuleWithContent retryRuleWithContent = + rctx.config().retryRuleWithContent(); + assert retryRuleWithContent != null; + return retryRuleWithContent + .shouldRetry(attempt.ctx(), resToDecide, causeToDecide) + .handle((decision, cause) -> { + warnIfExceptionIsRaised(retryRuleWithContent, cause); + return decision; + }); + } else { + final RetryRule retryRuleWithoutContent = rctx.config().retryRule(); + assert retryRuleWithoutContent != null; + return retryRuleWithoutContent + .shouldRetry(attempt.ctx(), causeToDecide) + .handle((decision, cause) -> { + warnIfExceptionIsRaised(retryRuleWithoutContent, cause); + return decision; + }); + } + } catch (Throwable ruleCause) { + return UnmodifiableFuture.exceptionallyCompletedFuture(ruleCause); + } + } + + private void handleDecision(HttpRetryContext rctx, RetryAttempt attempt, + @Nullable RetryDecision decision) { + final Backoff backoff = decision != null ? decision.backoff() : null; + if (backoff != null) { + final long millisAfter = useRetryAfter ? getRetryAfterMillis(attempt.ctx()) : -1; + final long nextDelay = getNextDelay(rctx, backoff, millisAfter); + if (nextDelay >= 0) { + abortAttempt(attempt); + scheduleNextRetry( + rctx.ctx(), scheduleCause -> handleException(rctx, scheduleCause), + () -> retry(rctx), + nextDelay); + return; + } + } + onRetryingComplete(rctx.ctx()); + rctx.resFuture().complete(attempt.res()); + rctx.requestDuplicator().close(); + } + private RetryAttempt executeAttempt(HttpRetryContext rctx) { final boolean isInitialAttempt = rctx.counter().numberAttemptsSoFar() <= 1; final HttpRequest duplicateReq; @@ -350,12 +411,8 @@ private RetryAttempt executeAttempt(HttpRetryContext rctx) { } final ClientRequestContext attemptCtx; - try { - attemptCtx = newDerivedContext(rctx.ctx(), duplicateReq, rctx.ctx().rpcRequest(), isInitialAttempt); - } catch (Throwable cause) { - handleException(rctx, cause, isInitialAttempt); - return null; - } + + attemptCtx = newDerivedContext(rctx.ctx(), duplicateReq, rctx.ctx().rpcRequest(), isInitialAttempt); final HttpRequest attemptReq = attemptCtx.request(); assert attemptReq != null; @@ -377,26 +434,6 @@ private RetryAttempt executeAttempt(HttpRetryContext rctx) { return new RetryAttempt<>(attemptCtx, attemptRes); } - private void handleResponseWithoutContent(HttpRetryContext rctx, - RetryAttempt attempt, - @Nullable Throwable responseCause) { - if (responseCause != null) { - responseCause = Exceptions.peel(responseCause); - } - try { - final RetryRule retryRule = retryRule(rctx.config()); - retryRule.shouldRetry(attempt.ctx(), responseCause) - .handle((decision, shouldRetryCause) -> { - warnIfExceptionIsRaised(retryRule, shouldRetryCause); - handleRetryDecision(decision, rctx, attempt); - return null; - }); - } catch (Throwable cause) { - abortAttempt(attempt); - handleException(rctx, cause, false); - } - } - private void handleStreamingAttemptResponse(HttpRetryContext rctx, RetryAttempt attempt) { final SplitHttpResponse splitAttemptRes = attempt.res().split(); @@ -417,37 +454,30 @@ private void handleStreamingAttemptResponse(HttpRetryContext rctx, unsplitAttemptRes.toDuplicator(attempt.ctx().eventLoop().withoutContext(), attempt.ctx().maxResponseLength()); try { + final RetryAttempt attemptWithResHeaders = attempt.setRes( + attemptResDuplicator.duplicate()); final TruncatingHttpResponse truncatingAttemptRes = new TruncatingHttpResponse(attemptResDuplicator.duplicate(), rctx.config().maxContentLength()); - final RetryAttempt attemptWithResHeaders = attempt.setRes( - attemptResDuplicator.duplicate()); attemptResDuplicator.close(); - final RetryRuleWithContent ruleWithContent = - rctx.config().retryRuleWithContent(); - assert ruleWithContent != null; - ruleWithContent.shouldRetry(attemptWithResHeaders.ctx(), truncatingAttemptRes, null) - .handle((decision, cause) -> { - warnIfExceptionIsRaised(ruleWithContent, cause); - truncatingAttemptRes.abort(); - handleRetryDecision(decision, rctx, attemptWithResHeaders); - return null; - }); + decideAndHandleDecision(rctx, attemptWithResHeaders, truncatingAttemptRes, + null); } catch (Throwable cause) { attemptResDuplicator.abort(cause); - handleException(rctx, cause, false); } } else { if (responseCause != null) { splitAttemptRes.body().abort(responseCause); - handleResponseWithoutContent(rctx, - attempt.setRes(HttpResponse.ofFailure(responseCause)), - responseCause); + decideAndHandleDecision( + rctx, attempt.setRes(HttpResponse.ofFailure(responseCause)), null, + responseCause + ); } else { - handleResponseWithoutContent(rctx, - attempt.setRes(splitAttemptRes.unsplit()), - null); + decideAndHandleDecision(rctx, + attempt.setRes(splitAttemptRes.unsplit()), + null, + null); } } }); @@ -526,45 +556,13 @@ private static void warnIfExceptionIsRaised(Object retryRule, @Nullable Throwabl } } - private static void handleException(HttpRetryContext rctx, Throwable cause, - boolean endRequestLog) { - handleException(rctx.ctx(), rctx.requestDuplicator(), rctx.resFuture(), - cause, endRequestLog); - } - - private static void handleException(ClientRequestContext ctx, - @Nullable HttpRequestDuplicator reqDuplicator, - CompletableFuture future, Throwable cause, - boolean endRequestLog) { - future.completeExceptionally(cause); - if (reqDuplicator != null) { - reqDuplicator.abort(cause); + private static void handleException(HttpRetryContext rctx, Throwable cause) { + rctx.resFuture().completeExceptionally(cause); + rctx.requestDuplicator().abort(cause); + if (!rctx.ctx().logBuilder().isRequestComplete()) { + rctx.ctx().logBuilder().endRequest(cause); } - if (endRequestLog) { - ctx.logBuilder().endRequest(cause); - } - ctx.logBuilder().endResponse(cause); - } - - private void handleRetryDecision(@Nullable RetryDecision decision, - HttpRetryContext rctx, - RetryAttempt attempt) { - final Backoff backoff = decision != null ? decision.backoff() : null; - if (backoff != null) { - final long millisAfter = useRetryAfter ? getRetryAfterMillis(attempt.ctx()) : -1; - final long nextDelay = getNextDelay(rctx, backoff, millisAfter); - if (nextDelay >= 0) { - abortAttempt(attempt); - scheduleNextRetry( - rctx.ctx(), cause -> handleException(rctx, cause, false), - () -> retry(rctx), - nextDelay); - return; - } - } - onRetryingComplete(rctx.ctx()); - rctx.resFuture().complete(attempt.res()); - rctx.requestDuplicator().close(); + rctx.ctx().logBuilder().endResponse(cause); } private static void abortAttempt(RetryAttempt attempt) { From af0c3a4851db940fbbf6a61147f1d49a17dfd704 Mon Sep 17 00:00:00 2001 From: "szymon.habrainski" Date: Sun, 28 Sep 2025 17:54:53 +0200 Subject: [PATCH 07/14] fix: only treat res completion as exception when the res was completed before retrying completed --- .../armeria/client/retry/RetryingClient.java | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index 9a2bc84d950..6f74be38f19 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -268,7 +268,11 @@ protected HttpResponse doExecute(ClientRequestContext ctx, HttpRequest req, rctx.res().whenComplete() .handle((result, resCause) -> { - handleException(rctx, resCause == null ? AbortedStreamException.get() : resCause); + if (!rctx.resFuture().isDone()) { + // We are still retrying: we were not the ones completing rctx.res(). + handleException(rctx, + resCause == null ? AbortedStreamException.get() : resCause); + } return null; }); @@ -282,7 +286,7 @@ protected HttpResponse doExecute(ClientRequestContext ctx, HttpRequest req, } private void retry(HttpRetryContext rctx) { - if (isRetryCompleted(rctx)) { + if (isRequestCompletedExternally(rctx)) { return; } @@ -396,18 +400,18 @@ private void handleDecision(HttpRetryContext rctx, RetryAttempt at } onRetryingComplete(rctx.ctx()); rctx.resFuture().complete(attempt.res()); - rctx.requestDuplicator().close(); + rctx.reqDuplicator().close(); } private RetryAttempt executeAttempt(HttpRetryContext rctx) { final boolean isInitialAttempt = rctx.counter().numberAttemptsSoFar() <= 1; final HttpRequest duplicateReq; if (isInitialAttempt) { - duplicateReq = rctx.requestDuplicator().duplicate(); + duplicateReq = rctx.reqDuplicator().duplicate(); } else { final RequestHeadersBuilder newHeaders = rctx.req().headers().toBuilder(); newHeaders.setInt(ClientUtil.ARMERIA_RETRY_COUNT, rctx.counter().numberAttemptsSoFar() - 1); - duplicateReq = rctx.requestDuplicator().duplicate(newHeaders.build()); + duplicateReq = rctx.reqDuplicator().duplicate(newHeaders.build()); } final ClientRequestContext attemptCtx; @@ -505,7 +509,7 @@ private CompletableFuture retryContext(ClientRequestContext ct } } - private boolean isRetryCompleted(HttpRetryContext rctx) { + private boolean isRequestCompletedExternally(HttpRetryContext rctx) { return rctx.ctx().isCancelled() || rctx.req().whenComplete().isCompletedExceptionally() || rctx.res() .whenComplete() @@ -558,7 +562,7 @@ private static void warnIfExceptionIsRaised(Object retryRule, @Nullable Throwabl private static void handleException(HttpRetryContext rctx, Throwable cause) { rctx.resFuture().completeExceptionally(cause); - rctx.requestDuplicator().abort(cause); + rctx.reqDuplicator().abort(cause); if (!rctx.ctx().logBuilder().isRequestComplete()) { rctx.ctx().logBuilder().endRequest(cause); } @@ -615,18 +619,18 @@ private static RetryRule retryRule(RetryConfig retryConfig) { } private static final class HttpRetryContext extends RetryContext { - private final HttpRequestDuplicator requestDuplicator; + private final HttpRequestDuplicator reqDuplicator; HttpRetryContext(ClientRequestContext ctx, HttpRequest req, - HttpRequestDuplicator requestDuplicator, + HttpRequestDuplicator reqDuplicator, HttpResponse res, CompletableFuture resFuture, RetryConfig config, long responseTimeoutMillis) { super(ctx, req, res, resFuture, config, responseTimeoutMillis); - this.requestDuplicator = requestDuplicator; + this.reqDuplicator = reqDuplicator; } - HttpRequestDuplicator requestDuplicator() { - return requestDuplicator; + HttpRequestDuplicator reqDuplicator() { + return reqDuplicator; } } } From 8fc6d84c76c307ddcd7ca06d1d22988790a28a8f Mon Sep 17 00:00:00 2001 From: "szymon.habrainski" Date: Sun, 28 Sep 2025 18:02:39 +0200 Subject: [PATCH 08/14] refactor: remove newDerivedContext from AbstractRetryingClient --- .../client/retry/AbstractRetryingClient.java | 15 --------------- .../armeria/client/retry/RetryingClient.java | 3 ++- .../armeria/client/retry/RetryingRpcClient.java | 3 ++- 3 files changed, 4 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java index b4380c94a29..6e29cd1ceb4 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java @@ -27,14 +27,10 @@ import com.linecorp.armeria.client.Client; import com.linecorp.armeria.client.ClientFactory; import com.linecorp.armeria.client.ClientRequestContext; -import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.SimpleDecoratingClient; -import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.Request; import com.linecorp.armeria.common.Response; -import com.linecorp.armeria.common.RpcRequest; import com.linecorp.armeria.common.annotation.Nullable; -import com.linecorp.armeria.internal.client.ClientUtil; import io.netty.util.concurrent.ScheduledFuture; @@ -164,15 +160,4 @@ protected final long getNextDelay(RetryContext rctx, Backoff backoff, long return nextDelay; } - - /** - * Creates a new derived {@link ClientRequestContext}, replacing the requests. - * If {@link ClientRequestContext#endpointGroup()} exists, a new {@link Endpoint} will be selected. - */ - protected static ClientRequestContext newDerivedContext(ClientRequestContext ctx, - @Nullable HttpRequest req, - @Nullable RpcRequest rpcReq, - boolean initialAttempt) { - return ClientUtil.newDerivedContext(ctx, req, rpcReq, initialAttempt); - } } diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index 6f74be38f19..1f47c29bd79 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -416,7 +416,8 @@ private RetryAttempt executeAttempt(HttpRetryContext rctx) { final ClientRequestContext attemptCtx; - attemptCtx = newDerivedContext(rctx.ctx(), duplicateReq, rctx.ctx().rpcRequest(), isInitialAttempt); + attemptCtx = ClientUtil.newDerivedContext(rctx.ctx(), duplicateReq, rctx.ctx().rpcRequest(), + isInitialAttempt); final HttpRequest attemptReq = attemptCtx.request(); assert attemptReq != null; diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java index f7558eace9e..d025185098c 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java @@ -201,7 +201,8 @@ private void doExecute0(RetryContext rctx) { private RetryAttempt executeAttempt(RetryContext rctx) { final int totalAttempts = rctx.counter().numberAttemptsSoFar(); final boolean initialAttempt = totalAttempts <= 1; - final ClientRequestContext attemptCtx = newDerivedContext(rctx.ctx(), null, rctx.req(), initialAttempt); + final ClientRequestContext attemptCtx = ClientUtil.newDerivedContext(rctx.ctx(), null, rctx.req(), + initialAttempt); if (!initialAttempt) { attemptCtx.mutateAdditionalRequestHeaders( From ec4b6ead0096cd32a03ed47936dd2ed2e1ea6a71 Mon Sep 17 00:00:00 2001 From: "szymon.habrainski" Date: Sun, 28 Sep 2025 18:04:01 +0200 Subject: [PATCH 09/14] refactor: remove onRetryingComplete from AbstractRetryingClient --- .../armeria/client/retry/AbstractRetryingClient.java | 7 ------- .../com/linecorp/armeria/client/retry/RetryingClient.java | 2 +- .../linecorp/armeria/client/retry/RetryingRpcClient.java | 2 +- 3 files changed, 2 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java index 6e29cd1ceb4..2b72ffdd951 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java @@ -80,13 +80,6 @@ protected final RetryConfigMapping mapping() { */ protected abstract O doExecute(ClientRequestContext ctx, I req, RetryConfig config) throws Exception; - /** - * This should be called when retrying is finished. - */ - protected static void onRetryingComplete(ClientRequestContext ctx) { - ctx.logBuilder().endResponseWithLastChild(); - } - /** * Returns the {@link RetryRule}. * diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index 1f47c29bd79..788664a7d98 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -398,7 +398,7 @@ private void handleDecision(HttpRetryContext rctx, RetryAttempt at return; } } - onRetryingComplete(rctx.ctx()); + rctx.ctx().logBuilder().endResponseWithLastChild(); rctx.resFuture().complete(attempt.res()); rctx.reqDuplicator().close(); } diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java index d025185098c..627722174c7 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java @@ -235,7 +235,7 @@ private RetryAttempt executeAttempt(RetryContext rctx, RetryAttempt attempt) { - onRetryingComplete(rctx.ctx()); + rctx.ctx().logBuilder().endResponseWithLastChild(); final HttpRequest actualHttpReq = attempt.ctx().request(); if (actualHttpReq != null) { rctx.ctx().updateRequest(actualHttpReq); From cf2f828d99ef4cedd0fc1e49825d804ce3e2ed5c Mon Sep 17 00:00:00 2001 From: "szymon.habrainski" Date: Sun, 28 Sep 2025 18:04:44 +0200 Subject: [PATCH 10/14] refactor: remove mapping from AbstractRetryingClient --- .../armeria/client/retry/AbstractRetryingClient.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java index 2b72ffdd951..0f605bc7034 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java @@ -67,13 +67,6 @@ public final O execute(ClientRequestContext ctx, I req) throws Exception { return doExecute(ctx, req, config); } - /** - * Returns the current {@link RetryConfigMapping} set for this client. - */ - protected final RetryConfigMapping mapping() { - return mapping; - } - /** * Invoked by {@link #execute(ClientRequestContext, Request)} * after the deadline for response timeout is set. From 83e82093312a9beb1b9c4544175af160552d0a84 Mon Sep 17 00:00:00 2001 From: "szymon.habrainski" Date: Sun, 28 Sep 2025 18:05:04 +0200 Subject: [PATCH 11/14] refactor: remove retryRule from AbstractRetryingClient --- .../client/retry/AbstractRetryingClient.java | 26 +++---------------- .../armeria/client/retry/RetryingClient.java | 2 +- .../client/retry/RetryingRpcClient.java | 2 +- 3 files changed, 6 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java index 0f605bc7034..24ebffdf850 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java @@ -15,7 +15,6 @@ */ package com.linecorp.armeria.client.retry; -import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; import java.util.concurrent.TimeUnit; @@ -30,7 +29,6 @@ import com.linecorp.armeria.client.SimpleDecoratingClient; import com.linecorp.armeria.common.Request; import com.linecorp.armeria.common.Response; -import com.linecorp.armeria.common.annotation.Nullable; import io.netty.util.concurrent.ScheduledFuture; @@ -45,24 +43,20 @@ abstract class AbstractRetryingClient private static final Logger logger = LoggerFactory.getLogger(AbstractRetryingClient.class); - private final RetryConfigMapping mapping; - - @Nullable - private final RetryConfig retryConfig; + private final RetryConfigMapping retryConfigMapping; /** * Creates a new instance that decorates the specified {@link Client}. */ AbstractRetryingClient( - Client delegate, RetryConfigMapping mapping, @Nullable RetryConfig retryConfig) { + Client delegate, RetryConfigMapping retryConfigMapping) { super(delegate); - this.mapping = requireNonNull(mapping, "mapping"); - this.retryConfig = retryConfig; + this.retryConfigMapping = requireNonNull(retryConfigMapping, "mapping"); } @Override public final O execute(ClientRequestContext ctx, I req) throws Exception { - final RetryConfig config = mapping.get(ctx, req); + final RetryConfig config = retryConfigMapping.get(ctx, req); requireNonNull(config, "mapping.get() returned null"); return doExecute(ctx, req, config); } @@ -73,18 +67,6 @@ public final O execute(ClientRequestContext ctx, I req) throws Exception { */ protected abstract O doExecute(ClientRequestContext ctx, I req, RetryConfig config) throws Exception; - /** - * Returns the {@link RetryRule}. - * - * @throws IllegalStateException if the {@link RetryRule} is not set - */ - protected final RetryRule retryRule() { - checkState(retryConfig != null, "No retryRule set. Are you using RetryConfigMapping?"); - final RetryRule retryRule = retryConfig.retryRule(); - checkState(retryRule != null, "retryRule is not set."); - return retryRule; - } - /** * Schedules next retry. */ diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index 788664a7d98..d55e2e5799a 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -238,7 +238,7 @@ public static Function newDecorator(RetryRul RetryConfigMapping mapping, @Nullable RetryConfig retryConfig, boolean useRetryAfter) { - super(delegate, mapping, retryConfig); + super(delegate, retryConfig != null ? (ctx, req) -> retryConfig : mapping); this.useRetryAfter = useRetryAfter; } diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java index 627722174c7..11ebeb789f6 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java @@ -138,7 +138,7 @@ public static RetryingRpcClientBuilder builder(RetryConfigMapping m * Creates a new instance that decorates the specified {@link RpcClient}. */ RetryingRpcClient(RpcClient delegate, RetryConfigMapping mapping) { - super(delegate, mapping, null); + super(delegate, mapping); } @Override From 5b309b24d9dff5abe979a2cf1a5afa442decec56 Mon Sep 17 00:00:00 2001 From: "szymon.habrainski" Date: Sun, 28 Sep 2025 18:24:25 +0200 Subject: [PATCH 12/14] revert: remove second constructor We do not offer retry customization at the moment. In a later PR we will add an API to pass in a custom counter (and scheduler). --- .../linecorp/armeria/client/retry/RetryContext.java | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryContext.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryContext.java index a3bb894ddb3..eb67ddd5ba4 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryContext.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryContext.java @@ -38,23 +38,13 @@ class RetryContext { ClientRequestContext ctx, I req, O res, CompletableFuture resFuture, RetryConfig config, long responseTimeoutMillis - ) { - this(ctx, req, res, resFuture, config, responseTimeoutMillis, - new RetryCounter(config.maxTotalAttempts())); - } - - RetryContext( - ClientRequestContext ctx, I req, O res, - CompletableFuture resFuture, - RetryConfig config, long responseTimeoutMillis, - RetryCounter counter ) { this.ctx = ctx; this.req = req; this.res = res; this.resFuture = resFuture; this.config = config; - this.counter = counter; + counter = new RetryCounter(config.maxTotalAttempts()); if (responseTimeoutMillis <= 0 || responseTimeoutMillis == Long.MAX_VALUE) { deadlineNanos = 0; From f6ce24c3205532474f8371966a9dd3d9e19e2e8b Mon Sep 17 00:00:00 2001 From: "szymon.habrainski" Date: Sun, 28 Sep 2025 18:47:33 +0200 Subject: [PATCH 13/14] fix: abort res to decide and hand over cause in handleAggregatedAttemptResponse --- .../com/linecorp/armeria/client/retry/RetryingClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index d55e2e5799a..4c53994dffc 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -316,7 +316,7 @@ private void handleAggregatedAttemptResponse(HttpRetryContext rctx, RetryAttempt attempt.ctx().logBuilder().endRequest(cause); attempt.ctx().logBuilder().endResponse(cause); decideAndHandleDecision(rctx, attempt.setRes(HttpResponse.ofFailure(cause)), - HttpResponse.ofFailure(cause), null); + HttpResponse.ofFailure(cause), cause); } else { completeLogIfBytesNotTransferred(attempt.ctx(), aggAttemptRes); attempt.ctx().log().whenAvailable(RequestLogProperty.RESPONSE_END_TIME).thenRun(() -> { @@ -335,7 +335,7 @@ private void decideAndHandleDecision(HttpRetryContext rctx, decide(rctx, attempt, resToDecide, causeToDecide) .handle((decision, decisionCause) -> { if (resToDecide != null) { - // resToDecide.abort(); + resToDecide.abort(); } if (decisionCause != null) { From d2690813b3fc0d660a2f8ed85c4b70f2c62ff682 Mon Sep 17 00:00:00 2001 From: "szymon.habrainski" Date: Sun, 28 Sep 2025 18:54:52 +0200 Subject: [PATCH 14/14] fix: RetryingRpcClientTest.doNotRetryWhenResponseIsCancelled --- .../com/linecorp/armeria/client/retry/RetryingClient.java | 2 +- .../linecorp/armeria/client/retry/RetryingRpcClient.java | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index 4c53994dffc..146c856b30e 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -511,7 +511,7 @@ private CompletableFuture retryContext(ClientRequestContext ct } private boolean isRequestCompletedExternally(HttpRetryContext rctx) { - return rctx.ctx().isCancelled() || rctx.req().whenComplete().isCompletedExceptionally() || + return rctx.req().whenComplete().isCompletedExceptionally() || rctx.res() .whenComplete() .isDone(); diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java index 11ebeb789f6..a8f8adb7dd8 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java @@ -149,14 +149,14 @@ protected RpcResponse doExecute(ClientRequestContext ctx, RpcRequest req, RetryC final RetryContext rctx = new RetryContext<>(ctx, req, res, resFuture, config, ctx.responseTimeoutMillis()); rctx.counter().consumeAttemptFrom(null); - doExecute0(rctx); + retry(rctx); return res; } - private void doExecute0(RetryContext rctx) { + private void retry(RetryContext rctx) { final int totalAttempts = rctx.counter().numberAttemptsSoFar(); final boolean initialAttempt = totalAttempts <= 1; - if (rctx.res().isDone()) { + if (rctx.ctx().isCancelled() || rctx.res().isDone()) { // The response has been cancelled by the client before it receives a response, so stop retrying. handleException(rctx, new CancellationException( "the response returned to the client has been cancelled"), initialAttempt); @@ -185,7 +185,7 @@ private void doExecute0(RetryContext rctx) { } scheduleNextRetry(rctx.ctx(), cause0 -> handleException(rctx, cause0, false), - () -> doExecute0(rctx), nextDelay); + () -> retry(rctx), nextDelay); } else { onRetryComplete(rctx, attempt); }