Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/setup/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,29 @@ nephren.raven.apiclient.configs.<apiClient-name>.headers.<headers-key>=applicati
# not required, error resolver for the api client, default is DefaultErrorResolver
nephren.raven.apiclient.configs.<apiClient-name>.error-resolver=com.nephren.raven.apiclient.
serviceExample.client.errorresolver.DefaultErrorResolver

# not required, opt this client out of shared connection pooling so it gets its own
# dedicated Reactor Netty ConnectionProvider, default is false. By default clients
# sharing a scheme://host:port (and identical read/write timeouts) reuse the same pool.
# Note: this isolates the connection pool only — Reactor Netty's process-global event-loop
# threads are still shared. Replace the RavenHttpClientFactory bean if full thread
# isolation is required.
nephren.raven.apiclient.configs.<apiClient-name>.isolate-pool=true
```

## Connection Pool Tunables

These apply to every `ConnectionProvider` the default factory creates — both shared pools
and isolated ones. Most apps will never need to touch them; defaults match Reactor Netty
conventions and are sized for typical client fan-out.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

```
# not required, max concurrent connections per pool, default is 500
nephren.raven.apiclient.shared-pool.max-connections=500

# not required, max queued acquire attempts when the pool is saturated.
# -1 (default) is unbounded; set a positive value to fail-fast under load.
nephren.raven.apiclient.shared-pool.pending-acquire-max-count=-1
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
```

## Api Scheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
import com.nephren.raven.apiclient.aop.fallback.RavenApiClientFallback;
import com.nephren.raven.apiclient.body.ApiBodyResolver;
import com.nephren.raven.apiclient.errorresolver.ApiErrorResolver;
import com.nephren.raven.apiclient.http.RavenHttpClientFactory;
import com.nephren.raven.apiclient.reactor.helper.SchedulerHelper;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
Expand All @@ -21,7 +19,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.aopalliance.intercept.MethodInterceptor;
Expand Down Expand Up @@ -92,9 +89,11 @@ private void prepareBodyResolvers() {
}

private void prepareWebClient() {
RavenHttpClientFactory factory = applicationContext.getBean(RavenHttpClientFactory.class);
HttpClient httpClient = factory.httpClient(name, metadata.getProperties());
WebClient.Builder builder = applicationContext.getBean(WebClient.Builder.class)
.exchangeStrategies(getExchangeStrategies()).baseUrl(metadata.getProperties().getUrl())
.clientConnector(new ReactorClientHttpConnector(getHttpClient()))
.clientConnector(new ReactorClientHttpConnector(httpClient))
Comment on lines 91 to +97
.defaultHeaders(
httpHeaders -> metadata.getProperties().getHeaders().forEach(httpHeaders::add));
webClient = builder.build();
Expand Down Expand Up @@ -149,20 +148,6 @@ private ExchangeStrategies getExchangeStrategies() {
}).build();
}

private HttpClient getHttpClient() {
return HttpClient.create().option(
ChannelOption.CONNECT_TIMEOUT_MILLIS,
(int) metadata.getProperties().getConnectTimeout().toMillis())
.doOnConnected(connection -> connection
.addHandlerLast(
new ReadTimeoutHandler(metadata.getProperties().getReadTimeout().toMillis(),
TimeUnit.MILLISECONDS))
.addHandlerLast(
new WriteTimeoutHandler(metadata.getProperties().getWriteTimeout().toMillis(),
TimeUnit.MILLISECONDS))
);
}

@Override
public Object invoke(MethodInvocation invocation) {
Method method = invocation.getMethod();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import com.nephren.raven.apiclient.body.JsonBodyResolver;
import com.nephren.raven.apiclient.body.MultipartBodyResolver;
import com.nephren.raven.apiclient.errorresolver.DefaultApiErrorResolver;
import com.nephren.raven.apiclient.http.DefaultRavenHttpClientFactory;
import com.nephren.raven.apiclient.http.RavenHttpClientFactory;
import com.nephren.raven.apiclient.properties.RavenApiClientProperties;
import com.nephren.raven.apiclient.properties.RavenSharedPoolProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand All @@ -14,7 +17,8 @@
@Configuration
@Import(RavenApiClientRegistrar.class)
@EnableConfigurationProperties({
RavenApiClientProperties.class
RavenApiClientProperties.class,
RavenSharedPoolProperties.class
})
public class RavenApiConfiguration {

Expand Down Expand Up @@ -42,4 +46,11 @@ public DefaultApiErrorResolver defaultApiErrorResolver() {
return new DefaultApiErrorResolver();
}

@Bean
@ConditionalOnMissingBean
public RavenHttpClientFactory ravenHttpClientFactory(
RavenSharedPoolProperties sharedPoolProperties) {
return new DefaultRavenHttpClientFactory(sharedPoolProperties);
Comment on lines +49 to +53
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package com.nephren.raven.apiclient.http;

import com.nephren.raven.apiclient.properties.RavenApiClientProperties.ApiClientConfigProperties;
import com.nephren.raven.apiclient.properties.RavenSharedPoolProperties;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;

/**
* Default {@link RavenHttpClientFactory} implementation.
*
* <p>Maintains a {@link ConcurrentHashMap} of base {@link HttpClient} instances keyed by
* pool key. Each base owns a dedicated {@link ConnectionProvider} sized by
* {@link RavenSharedPoolProperties}; isolated keys ({@code "isolated:<clientName>"}) get
* their own provider too — they are isolated from <em>each other</em> as well as from the
* shared pools, which would not be the case if the implementation fell back to
* {@code HttpClient.create()} (that uses Reactor Netty's process-global pool).</p>
*
* <p>The pool key folds the configured timeouts in because the
* {@link ReadTimeoutHandler} / {@link WriteTimeoutHandler} we install attach at the channel
* level. Sharing a pooled channel between clients with different timeouts would mean the
* second client gets the first one's handlers, which is silently incorrect. Pool reuse
* therefore happens only between clients that target the same backend <em>with the same
* timeouts</em> — typically the entire application.</p>
*
* <p>Implements {@link DisposableBean} so the cached {@link ConnectionProvider}s release
* their resources on application context shutdown. Disposal is time-bounded so a stuck
* provider cannot hang the shutdown phase.</p>
*/
@Slf4j
public class DefaultRavenHttpClientFactory implements RavenHttpClientFactory, DisposableBean {

private static final Duration DISPOSE_TIMEOUT = Duration.ofSeconds(30);

private final RavenSharedPoolProperties sharedPoolProperties;
private final Map<String, HttpClient> baseClients = new ConcurrentHashMap<>();
private final Map<String, ConnectionProvider> ownedProviders = new ConcurrentHashMap<>();

public DefaultRavenHttpClientFactory(RavenSharedPoolProperties sharedPoolProperties) {
this.sharedPoolProperties = sharedPoolProperties;
}

@Override
public HttpClient httpClient(String clientName, ApiClientConfigProperties config) {
String poolKey = RavenHttpClientFactory.defaultPoolKey(clientName, config);
HttpClient base = baseClients.computeIfAbsent(poolKey, this::buildBase);
return base
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
(int) config.getConnectTimeout().toMillis())
.doOnConnected(connection -> connection
.addHandlerLast(new ReadTimeoutHandler(
config.getReadTimeout().toMillis(), TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(
config.getWriteTimeout().toMillis(), TimeUnit.MILLISECONDS)));
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}

private HttpClient buildBase(String poolKey) {
ConnectionProvider provider = ConnectionProvider.builder("raven-" + sanitize(poolKey))
.maxConnections(sharedPoolProperties.getMaxConnections())
.pendingAcquireMaxCount(sharedPoolProperties.getPendingAcquireMaxCount())
.build();
ownedProviders.put(poolKey, provider);
log.debug(
"#RavenHttpClientFactory creating HttpClient for {} (maxConnections={},"
+ " pendingAcquireMaxCount={})",
poolKey,
sharedPoolProperties.getMaxConnections(),
sharedPoolProperties.getPendingAcquireMaxCount());
return HttpClient.create(provider);
Comment on lines +103 to +111
}

/**
* Reactor Netty {@code ConnectionProvider} names cannot contain certain characters; strip
* the URL-flavored ones so the provider name remains valid and human-readable.
*/
private static String sanitize(String poolKey) {
return poolKey
.replace("://", "-")
.replace(":", "-")
.replace("/", "-")
.replace("|", "-")
.replace(",", "-")
.replace("=", "-");
}

/**
* Disposes every cached {@link ConnectionProvider} in parallel and waits at most
* {@link #DISPOSE_TIMEOUT} for the whole batch to complete. Per-provider failures (or a
* provider that exceeds the budget) are logged but do not block the others — disposal of
* one stalled provider must not gate shutdown of the rest of the application.
*/
@Override
public void destroy() {
if (ownedProviders.isEmpty()) {
return;
}
List<Mono<Void>> disposals = ownedProviders.entrySet().stream()
.map(entry -> entry.getValue().disposeLater()
.doOnError(err -> log.warn(
"#RavenHttpClientFactory failed to dispose connection provider for {}",
entry.getKey(), err))
.onErrorResume(err -> Mono.empty()))
.toList();
try {
Mono.when(disposals).block(DISPOSE_TIMEOUT);
} catch (RuntimeException e) {
log.warn("#RavenHttpClientFactory disposal exceeded {} budget; some providers"
+ " may not have shut down cleanly", DISPOSE_TIMEOUT, e);
}
ownedProviders.clear();
baseClients.clear();
}
}
47 changes: 47 additions & 0 deletions src/main/java/com/nephren/raven/apiclient/http/PoolKeys.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.nephren.raven.apiclient.http;

import java.net.URI;
import java.net.URISyntaxException;

/**
* Internal helpers for deriving stable pool keys from configured URLs.
*
* <p>The key shape is intentionally narrow — scheme + host + (resolved) port — so that two
* clients hitting the same logical backend share a pool regardless of differences in path,
* query, or trailing slashes. Inputs without an explicit scheme (e.g. the
* {@code localhost:8080} format used in our setup docs) are normalized as if they were
* {@code http://...} so they collapse onto the same key as their scheme-prefixed siblings.
* Anything still unparseable falls back to a {@code raw:} prefix to preserve isolation
* without throwing.</p>
*/
final class PoolKeys {

private PoolKeys() {}

static String fromUrl(String url) {
if (url == null || url.isBlank()) {
return "default";
}
try {
URI uri = new URI(url);
if (uri.getHost() == null && !url.contains("://")) {
// Inputs like "localhost:8080" or "localhost:8080/api" parse as opaque URIs with no
// host; re-parse with an http:// prefix so they normalize to the same scheme/host/port
// shape as fully-qualified URLs.
uri = new URI("http://" + url);
}
Comment on lines +21 to +32
String scheme = uri.getScheme() == null ? "http" : uri.getScheme().toLowerCase();
String host = uri.getHost();
if (host == null) {
return "raw:" + url;
}
int port = uri.getPort();
if (port == -1) {
port = "https".equals(scheme) ? 443 : 80;
}
return scheme + "://" + host + ":" + port;
} catch (URISyntaxException e) {
return "raw:" + url;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.nephren.raven.apiclient.http;

import com.nephren.raven.apiclient.properties.RavenApiClientProperties.ApiClientConfigProperties;
import reactor.netty.http.client.HttpClient;

/**
* Extension point for materializing the Reactor Netty {@link HttpClient} that backs each
* {@code @RavenApiClient}'s {@code WebClient}.
*
* <p>The default implementation shares connection pools and event loops across all clients
* targeting the same scheme+host:port (with the same configured timeouts) and applies the
* per-client connect/read/write timeouts on top. Replace this bean to take full control of
* pooling — typical reasons include custom TLS configuration, a metric-instrumented
* {@code ConnectionProvider}, sharing pools with other parts of an application, or a
* different keying strategy.</p>
*
* <p>The full per-client configuration is passed to {@link #httpClient(String,
* ApiClientConfigProperties)} so a replacement implementation can make decisions based on
* URL, headers, fallback class, or any other property. Implementations must be thread-safe;
* {@link #httpClient} is called once per declared client at bean initialization, but the
* returned {@link HttpClient} may be invoked concurrently from any thread.</p>
*/
public interface RavenHttpClientFactory {

/**
* Resolve an {@link HttpClient} for the given client. Implementations are free to share
* an underlying {@code ConnectionProvider}/{@code LoopResources} between calls; the
* default implementation does so when {@code config} resolves to the same pool key.
*
* @param clientName the {@code @RavenApiClient(name = ...)} value, used by the default
* implementation when {@code isolate-pool} is set.
* @param config the merged {@link ApiClientConfigProperties} for this client (after
* defaults are applied), giving the implementation access to URL,
* timeouts, headers, etc.
* @return an {@link HttpClient} ready for use; the underlying connection pool may be
* shared with other clients depending on the factory's keying strategy.
*/
HttpClient httpClient(String clientName, ApiClientConfigProperties config);

/**
* The default pool key the bundled implementation uses:
* {@code scheme://host:port|r=<read>,w=<write>} for shared pools, or
* {@code "isolated:" + clientName} when the client is configured with
* {@code isolate-pool: true}.
*
* <p>Read and write timeouts are folded into the shared key because Reactor Netty's
* {@link io.netty.handler.timeout.ReadTimeoutHandler} /
* {@link io.netty.handler.timeout.WriteTimeoutHandler} attach to the channel, so two
* clients with different read/write timeouts cannot safely share a pooled channel. Connect
* timeout is intentionally <em>not</em> part of the key — it only affects opening new
* sockets and is reapplied per call as a channel option, so clients that differ only in
* connect timeout can still share a pool. Custom factories that apply timeouts at the
* request level (e.g. via {@code responseTimeout}) may use a narrower key.</p>
*/
static String defaultPoolKey(String clientName, ApiClientConfigProperties config) {
if (Boolean.TRUE.equals(config.getIsolatePool())) {
return "isolated:" + clientName;
}
return PoolKeys.fromUrl(config.getUrl())
+ "|r=" + config.getReadTimeout().toMillis()
+ ",w=" + config.getWriteTimeout().toMillis();
Comment on lines +60 to +62
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public static void copyConfigPropertiesFromSourceToTarget(
target.setErrorResolver(source.getErrorResolver());
}

if (Objects.nonNull(source.getIsolatePool())) {
target.setIsolatePool(source.getIsolatePool());
}

source.getHeaders().forEach((key, value) -> target.getHeaders().put(key, value));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,27 @@ public static class ApiClientConfigProperties {
private Map<String, String> headers = new HashMap<>();

private Class<? extends ApiErrorResolver> errorResolver;

/**
* When {@code true}, this client uses its own dedicated Reactor Netty
* {@code ConnectionProvider} instead of one keyed by scheme+host:port (and read/write
* timeouts). Treated as {@code false} when unset — most callers benefit from sharing.
* Set to {@code true} when this client has TLS/proxy needs that differ from siblings
* targeting the same host, or when its load profile would otherwise starve them on the
* shared connection pool.
*
* <p><strong>Scope:</strong> the default factory only isolates the
* {@code ConnectionProvider} (i.e. the connection pool); event-loop threads come from
* Reactor Netty's process-global {@code LoopResources} and are still shared across all
* clients in the JVM. A custom {@link com.nephren.raven.apiclient.http.RavenHttpClientFactory}
* implementation can replace that if full thread isolation is required.</p>
*
* <p>Modeled as the wrapper {@link Boolean} so that a value set on
* {@code configs.default.isolate-pool} can be inherited by named configs that omit the
* key — a primitive default would unconditionally overwrite the inherited value back to
* {@code false} during property merging.</p>
*/
private Boolean isolatePool;
Comment on lines +67 to +72
}

}
Loading