Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,27 @@ public enum DefaultDriverOption implements DriverOption {
*
* <p>Value type: boolean
*/
CLIENT_ROUTES_SHARD_AWARENESS_ENABLED("advanced.client-routes.shard-awarness-enabled");
CLIENT_ROUTES_SHARD_AWARENESS_ENABLED("advanced.client-routes.shard-awarness-enabled"),

/**
* Whether the driver may fall back to a direct connection when no client route is available for a
* node.
*
* <p>When {@code true} (the default), nodes that have no matching entry in {@code
* system.client_routes} — or whose proxy address cannot be reached or resolved — are contacted
* directly using their broadcast address. This preserves backward-compatible mixed proxy/direct
* topologies where some nodes are behind the private endpoint and others are not.
*
* <p>When {@code false}, the driver never falls back to a direct connection. Any node without a
* reachable route is treated as unreachable: it stays DOWN and the reconnection loop retries
* until a {@code CLIENT_ROUTES_CHANGE} event publishes the route. Note that setting this to
* {@code false} does <em>not</em> actively close existing direct connections; connection pools
* that were established before the flag was applied may continue to operate until naturally
* recycled.
*
* <p>Value type: boolean
*/
CLIENT_ROUTES_DIRECT_CONNECTION_FALLBACK("advanced.client-routes.direct-connection-fallback");

private final String path;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) {
// values) with no sensible scalar default, analogous to how CONFIG_RELOAD_INTERVAL is omitted.
map.put(TypedDriverOption.CLIENT_ROUTES_NATIVE_TRANSPORT_PORT, 9042);
map.put(TypedDriverOption.CLIENT_ROUTES_SHARD_AWARENESS_ENABLED, false);
map.put(TypedDriverOption.CLIENT_ROUTES_DIRECT_CONNECTION_FALLBACK, true);
}

@Immutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,15 @@ public String toString() {
new TypedDriverOption<>(
DefaultDriverOption.CLIENT_ROUTES_SHARD_AWARENESS_ENABLED, GenericType.BOOLEAN);

/**
* Whether the driver may fall back to a direct connection when no client route is available for a
* node. When {@code true} (default), nodes without a reachable route are contacted directly via
* their broadcast address. When {@code false}, no fallback is attempted and the node stays DOWN.
*/
public static final TypedDriverOption<Boolean> CLIENT_ROUTES_DIRECT_CONNECTION_FALLBACK =
new TypedDriverOption<>(
DefaultDriverOption.CLIENT_ROUTES_DIRECT_CONNECTION_FALLBACK, GenericType.BOOLEAN);

private static Iterable<TypedDriverOption<?>> introspectBuiltInValues() {
try {
ImmutableList.Builder<TypedDriverOption<?>> result = ImmutableList.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,42 @@
import java.net.SocketAddress;
import java.util.Objects;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientRoutesEndPoint implements EndPoint {
private static final Logger LOG = LoggerFactory.getLogger(ClientRoutesEndPoint.class);

private final UUID hostId;
private final ClientRoutesTopologyMonitor topologyMonitor;
private final String metricPrefix;
@NonNull private final EndPoint fallbackEndPoint;
private final boolean directConnectionFallback;

/**
* @param topologyMonitor the topology monitor used to resolve the endpoint address on demand.
* @param hostId the host UUID identifying this node in the cluster.
* @param broadcastInetAddress the node's broadcast address (from system.peers or system.local),
* used to build a stable metric prefix. May be {@code null} if the address could not be
* determined, in which case the hostId is used as the metric prefix instead.
* @param fallbackEndPoint the default endpoint to fall back to when {@code
* topologyMonitor.resolve()} returns {@code null}, i.e. when this node is not accessed via a
* cloud private endpoint. Must not be {@code null}.
* @param fallbackEndPoint the endpoint to use when {@code topologyMonitor.resolve()} returns
* {@code null} and {@code directConnectionFallback} is {@code true}. Always required.
* @param directConnectionFallback when {@code true}, {@link #resolve()} falls back to {@code
* fallbackEndPoint} if no client route is found. When {@code false}, throws instead, keeping
* the node DOWN until a route is published.
*/
public ClientRoutesEndPoint(
@NonNull ClientRoutesTopologyMonitor topologyMonitor,
@NonNull UUID hostId,
@Nullable InetAddress broadcastInetAddress,
@NonNull EndPoint fallbackEndPoint) {
@NonNull EndPoint fallbackEndPoint,
boolean directConnectionFallback) {
this.topologyMonitor =
Objects.requireNonNull(topologyMonitor, "Topology monitor cannot be null");
this.hostId = Objects.requireNonNull(hostId, "HOST uuid cannot be null");
this.fallbackEndPoint =
Objects.requireNonNull(fallbackEndPoint, "Fallback endpoint cannot be null");
this.directConnectionFallback = directConnectionFallback;
this.metricPrefix = buildMetricPrefix(broadcastInetAddress, hostId);
}

Expand All @@ -73,7 +82,24 @@ public SocketAddress resolve() {
} catch (IOException e) {
throw new UncheckedIOException("DNS resolution failed for host_id=" + hostId, e);
}
return fallbackEndPoint.resolve();
Comment thread
nikagra marked this conversation as resolved.
if (directConnectionFallback) {
// Default (backward-compatible) mode: fall back to the node's broadcast address.
// This supports mixed proxy/direct topologies where some nodes are behind the private
// endpoint and others are reached directly.
return fallbackEndPoint.resolve();
}
// direct-connection-fallback=false: the driver must not bypass the proxy infrastructure.
// The node will remain DOWN and the reconnection loop will retry until a
// CLIENT_ROUTES_CHANGE event populates the route.
LOG.warn(
"No client route entry found for host_id={}. "
+ "The node will remain DOWN until a route is published via CLIENT_ROUTES_CHANGE.",
hostId);
throw new IllegalStateException(
"No client route entry found for host_id="
+ hostId
+ ". Direct connection fallback is disabled"
+ " (advanced.client-routes.direct-connection-fallback = false).");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class ClientRoutesTopologyMonitor extends DefaultTopologyMonitor {
private final String logPrefix;
private final AtomicReference<Map<UUID, ClientRouteRecord>> resolvedRoutesCache;
private final boolean useSSL;
private final boolean directConnectionFallback;
private volatile boolean closed = false;
private final AtomicInteger consecutiveEmptyResults = new AtomicInteger(0);

Expand Down Expand Up @@ -147,6 +148,11 @@ public ClientRoutesTopologyMonitor(
this.logPrefix = context.getSessionName();
this.resolvedRoutesCache = new AtomicReference<>(Collections.emptyMap());
this.useSSL = context.getSslEngineFactory().isPresent();
this.directConnectionFallback =
context
.getConfig()
.getDefaultProfile()
.getBoolean(DefaultDriverOption.CLIENT_ROUTES_DIRECT_CONNECTION_FALLBACK);
}

@Override
Expand Down Expand Up @@ -459,14 +465,15 @@ protected EndPoint buildNodeEndPoint(
UUID hostId = row.getUuid("host_id");
if (hostId == null) {
LOG.warn(
"[{}] host_id is null in system row for address {} — cannot assign a client route. "
+ "This may indicate corrupted system tables. "
+ "Falling back to default endpoint resolution.",
"[{}] host_id is null in system row for address {} — cannot build a client-routes"
+ " endpoint. This may indicate corrupted system tables. The node will be ignored.",
logPrefix,
broadcastRpcAddress);
return super.buildNodeEndPoint(row, broadcastRpcAddress, localEndPoint);
throw new IllegalStateException(
"host_id is null in system row for address "
+ broadcastRpcAddress
+ "; cannot build a ClientRoutesEndPoint without a host_id");
}
EndPoint fallback = super.buildNodeEndPoint(row, broadcastRpcAddress, localEndPoint);
InetAddress broadcastInetAddress = null;
if (broadcastRpcAddress != null) {
broadcastInetAddress = broadcastRpcAddress.getAddress();
Expand All @@ -477,7 +484,9 @@ protected EndPoint buildNodeEndPoint(
if (broadcastInetAddress == null) {
broadcastInetAddress = row.getInetAddress("peer");
}
return new ClientRoutesEndPoint(this, hostId, broadcastInetAddress, fallback);
EndPoint fallback = super.buildNodeEndPoint(row, broadcastRpcAddress, localEndPoint);
return new ClientRoutesEndPoint(
this, hostId, broadcastInetAddress, fallback, directConnectionFallback);
}

/**
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,21 @@ datastax-java-driver {
# Default: false
shard-awarness-enabled = false

# When true (default), nodes that have no matching entry in system.client_routes — or whose
# proxy address cannot be reached or resolved — are contacted directly using their broadcast
# address. This preserves backward-compatible mixed proxy/direct topologies where some nodes
# are behind the private endpoint and others are not.
#
# When false, the driver never falls back to a direct connection. Any node without a reachable
# route stays DOWN and the reconnection loop retries until a CLIENT_ROUTES_CHANGE event
# publishes the route. Note that setting this to false does NOT actively close existing direct
# connections; pools established before this flag was applied may continue operating until
# naturally recycled.
#
# Required: no
# Default: true
direct-connection-fallback = true

}

# Whether to resolve the addresses passed to `basic.contact-points`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,39 @@ public void should_resolve_via_topology_monitor() throws UnknownHostException {
when(topologyMonitor.resolve(hostId)).thenReturn(expected);

ClientRoutesEndPoint ep =
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true);

assertThat(ep.resolve()).isEqualTo(expected);
}

@Test
public void should_fallback_when_resolve_returns_null() throws UnknownHostException {
public void should_throw_when_direct_connection_fallback_disabled_and_no_route()
throws UnknownHostException {
UUID hostId = UUID.randomUUID();
InetSocketAddress fallbackAddr = new InetSocketAddress("10.0.0.1", 9042);
when(topologyMonitor.resolve(hostId)).thenReturn(null);
when(fallbackEndPoint.resolve()).thenReturn(fallbackAddr);

ClientRoutesEndPoint ep =
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, false);

assertThat(ep.resolve()).isEqualTo(fallbackAddr);
assertThatThrownBy(ep::resolve)
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("No client route entry found")
.hasMessageContaining(hostId.toString())
.hasMessageContaining("direct-connection-fallback");
}

@Test
public void should_fall_back_to_broadcast_when_direct_connection_fallback_enabled()
throws UnknownHostException {
UUID hostId = UUID.randomUUID();
InetSocketAddress fallbackAddress = new InetSocketAddress("10.0.0.1", 9042);
when(topologyMonitor.resolve(hostId)).thenReturn(null);
when(fallbackEndPoint.resolve()).thenReturn(fallbackAddress);

ClientRoutesEndPoint ep =
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true);

assertThat(ep.resolve()).isEqualTo(fallbackAddress);
}

@Test
Expand All @@ -71,7 +88,7 @@ public void should_wrap_io_exceptions_in_unchecked_io_exception() throws Unknown
when(topologyMonitor.resolve(hostId)).thenThrow(new UnknownHostException("no-such-host"));

ClientRoutesEndPoint ep =
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true);

assertThatThrownBy(ep::resolve)
.isInstanceOf(UncheckedIOException.class)
Expand All @@ -86,7 +103,7 @@ public void should_reflect_route_changes_on_subsequent_resolve() throws UnknownH
when(topologyMonitor.resolve(hostId)).thenReturn(addr1);

ClientRoutesEndPoint ep =
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true);

assertThat(ep.resolve()).isEqualTo(addr1);

Expand All @@ -102,9 +119,9 @@ public void should_reflect_route_changes_on_subsequent_resolve() throws UnknownH
public void should_be_equal_when_same_host_id() {
UUID hostId = UUID.randomUUID();
ClientRoutesEndPoint ep1 =
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true);
ClientRoutesEndPoint ep2 =
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true);

assertThat(ep1).isEqualTo(ep2);
assertThat(ep1.hashCode()).isEqualTo(ep2.hashCode());
Expand All @@ -113,9 +130,9 @@ public void should_be_equal_when_same_host_id() {
@Test
public void should_not_be_equal_when_different_host_id() {
ClientRoutesEndPoint ep1 =
new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null, fallbackEndPoint);
new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null, fallbackEndPoint, true);
ClientRoutesEndPoint ep2 =
new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null, fallbackEndPoint);
new ClientRoutesEndPoint(topologyMonitor, UUID.randomUUID(), null, fallbackEndPoint, true);

assertThat(ep1).isNotEqualTo(ep2);
}
Expand All @@ -124,7 +141,7 @@ public void should_not_be_equal_when_different_host_id() {
public void should_not_be_equal_to_non_client_routes_endpoint() {
UUID hostId = UUID.randomUUID();
ClientRoutesEndPoint ep =
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true);

assertThat(ep).isNotEqualTo("not an endpoint");
assertThat(ep).isNotEqualTo(null);
Expand All @@ -136,7 +153,7 @@ public void should_not_be_equal_to_non_client_routes_endpoint() {
public void should_use_host_id_as_metric_prefix_when_address_is_null() {
UUID hostId = UUID.randomUUID();
ClientRoutesEndPoint ep =
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true);

assertThat(ep.asMetricPrefix()).isEqualTo(hostId.toString());
}
Expand All @@ -146,7 +163,7 @@ public void should_format_ipv4_metric_prefix() throws Exception {
UUID hostId = UUID.randomUUID();
InetAddress ipv4 = InetAddress.getByAddress(new byte[] {10, 0, 0, 1});
ClientRoutesEndPoint ep =
new ClientRoutesEndPoint(topologyMonitor, hostId, ipv4, fallbackEndPoint);
new ClientRoutesEndPoint(topologyMonitor, hostId, ipv4, fallbackEndPoint, true);

assertThat(ep.asMetricPrefix()).isEqualTo("10_0_0_1_" + hostId);
}
Expand All @@ -157,7 +174,7 @@ public void should_format_ipv6_metric_prefix() throws Exception {
InetAddress ipv6 =
InetAddress.getByAddress(new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1});
ClientRoutesEndPoint ep =
new ClientRoutesEndPoint(topologyMonitor, hostId, ipv6, fallbackEndPoint);
new ClientRoutesEndPoint(topologyMonitor, hostId, ipv6, fallbackEndPoint, true);

// IPv6 keeps colons (consistent with DefaultEndPoint), dots replaced by underscores
assertThat(ep.asMetricPrefix()).isEqualTo("0:0:0:0:0:0:0:1_" + hostId);
Expand All @@ -169,7 +186,7 @@ public void should_format_ipv6_metric_prefix() throws Exception {
public void should_return_host_id_as_string() {
UUID hostId = UUID.randomUUID();
ClientRoutesEndPoint ep =
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint);
new ClientRoutesEndPoint(topologyMonitor, hostId, null, fallbackEndPoint, true);

assertThat(ep.toString()).isEqualTo("ClientRoutesEndPoint(" + hostId + ")");
}
Expand Down
Loading
Loading