Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,51 @@
package com.datastax.oss.driver.api.core.metadata;

import edu.umd.cs.findbugs.annotations.NonNull;
import java.net.InetSocketAddress;
import java.net.SocketAddress;

/**
* Encapsulates the information needed to open connections to a node.
*
* <p>By default, the driver assumes plain TCP connections, and this is just a wrapper around an
* {@link InetSocketAddress}. However, more complex deployment scenarios might use a custom
* {@link java.net.InetSocketAddress}. However, more complex deployment scenarios might use a custom
* implementation that contains additional information; for example, if the nodes are accessed
* through a proxy with SNI routing, an SNI server name is needed in addition to the proxy address.
*/
public interface EndPoint {

/**
* Resolves this instance to a socket address.
* Resolves this instance to a single socket address.
*
* <p>This will be called each time the driver opens a new connection to the node. The returned
* address cannot be null.
*
* @deprecated Use {@link #resolveAll()} instead. When a hostname maps to multiple IPs (e.g. in
* dynamic DNS environments) only one address is returned here, causing the driver to miss
* fallback IPs when the first one is unreachable. {@code resolveAll()} returns the full set.
*/
@Deprecated
Comment on lines +39 to +43
@NonNull
SocketAddress resolve();

/**
* Resolves this instance to all known socket addresses.
*
* <p>This is called each time the driver opens a new connection to the node. For endpoints backed
* by a plain IP address the array contains exactly one element. For endpoints whose hostname
* resolves to multiple IPs (e.g. a DNS round-robin entry) all addresses are returned so that the
* driver can try each one in sequence and fall back gracefully when individual IPs are
* unreachable.
*
* <p>The default implementation wraps {@link #resolve()} and returns a single-element array.
* Implementations that can supply multiple addresses should override this method.
*
* <p>The returned array must not be null and must contain at least one element.
*/
@NonNull
default SocketAddress[] resolveAll() {
return new SocketAddress[] {resolve()};
}

/**
* Returns an alternate string representation for use in node-level metric names.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,108 @@ private void connect(
List<ProtocolVersion> attemptedVersions,
CompletableFuture<DriverChannel> resultFuture) {

SocketAddress resolvedAddress;
SocketAddress[] candidates;
try {
resolvedAddress = endPoint.resolve();
candidates = endPoint.resolveAll();
} catch (Exception e) {
resultFuture.completeExceptionally(e);
return;
}

tryNextCandidate(
endPoint,
shardingInfo,
shardId,
options,
nodeMetricUpdater,
currentVersion,
isNegotiating,
attemptedVersions,
resultFuture,
candidates,
0);
}
Comment on lines +222 to +242

/**
* Iterates through the candidate addresses from {@link EndPoint#resolveAll()}. Tries each one in
* sequence; if an address fails for a reason other than protocol-version negotiation exhaustion,
* the next candidate is tried. Only when all candidates are exhausted is the overall {@code
* resultFuture} failed.
*/
private void tryNextCandidate(
EndPoint endPoint,
NodeShardingInfo shardingInfo,
Integer shardId,
DriverChannelOptions options,
NodeMetricUpdater nodeMetricUpdater,
ProtocolVersion currentVersion,
boolean isNegotiating,
List<ProtocolVersion> attemptedVersions,
CompletableFuture<DriverChannel> resultFuture,
SocketAddress[] candidates,
int index) {

SocketAddress candidate = candidates[index];
CompletableFuture<DriverChannel> perAddressFuture = new CompletableFuture<>();
connectToAddress(
endPoint,
shardingInfo,
shardId,
options,
nodeMetricUpdater,
currentVersion,
isNegotiating,
attemptedVersions,
perAddressFuture,
candidate);

perAddressFuture.whenComplete(
(channel, error) -> {
if (error == null) {
resultFuture.complete(channel);
} else if (index + 1 < candidates.length) {
LOG.debug(
"[{}] Failed to connect to {} ({}), trying next address",
logPrefix,
candidate,
error.getMessage());
tryNextCandidate(
endPoint,
shardingInfo,
shardId,
options,
nodeMetricUpdater,
currentVersion,
isNegotiating,
attemptedVersions,
resultFuture,
candidates,
index + 1);
} else {
// Note: might be completed already if the failure happened in initializer()
resultFuture.completeExceptionally(error);
}
});
}

/**
* Performs a Netty bootstrap connect to a single, already-resolved address. Handles
* protocol-version negotiation (downgrade retries) internally, staying on the same address. Uses
* {@code perAddressFuture} so {@link #tryNextCandidate} can distinguish a per-address TCP failure
* (try the next IP) from a successful protocol handshake.
*/
private void connectToAddress(
EndPoint endPoint,
NodeShardingInfo shardingInfo,
Integer shardId,
DriverChannelOptions options,
NodeMetricUpdater nodeMetricUpdater,
ProtocolVersion currentVersion,
boolean isNegotiating,
List<ProtocolVersion> attemptedVersions,
CompletableFuture<DriverChannel> perAddressFuture,
SocketAddress resolvedAddress) {

NettyOptions nettyOptions = context.getNettyOptions();

Bootstrap bootstrap =
Expand All @@ -235,7 +329,8 @@ private void connect(
.channel(nettyOptions.channelClass())
.option(ChannelOption.ALLOCATOR, nettyOptions.allocator())
.handler(
initializer(endPoint, currentVersion, options, nodeMetricUpdater, resultFuture));
initializer(
endPoint, currentVersion, options, nodeMetricUpdater, perAddressFuture));

nettyOptions.afterBootstrapInitialized(bootstrap);

Expand Down Expand Up @@ -294,7 +389,7 @@ private void connect(
ConsistencyLevel.LOCAL_QUORUM.name()));
}
}
resultFuture.complete(driverChannel);
perAddressFuture.complete(driverChannel);
} else {
Throwable error = connectFuture.cause();
if (error instanceof UnsupportedProtocolVersionException && isNegotiating) {
Expand All @@ -307,7 +402,8 @@ private void connect(
logPrefix,
currentVersion,
downgraded.get());
connect(
// Stay on the same address for protocol-version downgrade retries.
connectToAddress(
endPoint,
shardingInfo,
shardId,
Expand All @@ -316,16 +412,17 @@ private void connect(
downgraded.get(),
true,
attemptedVersions,
resultFuture);
perAddressFuture,
resolvedAddress);
} else {
resultFuture.completeExceptionally(
perAddressFuture.completeExceptionally(
UnsupportedProtocolVersionException.forNegotiation(
endPoint, attemptedVersions));
}
} else {
// Note: might be completed already if the failure happened in initializer(), this is
// fine
resultFuture.completeExceptionally(error);
perAddressFuture.completeExceptionally(error);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ public SocketAddress resolve() {
return fallbackEndPoint.resolve();
}

/**
* Returns all socket addresses for this endpoint.
*
* <p>Delegates to {@link #resolve()} to obtain the single address provided by the topology
* monitor (or the fallback endpoint), then returns it as a one-element array. The topology
* monitor resolves each node to exactly one address by design (via a per-host-id lookup), so
* multi-address expansion is not applicable here.
*/
@NonNull
@Override
public SocketAddress[] resolveAll() {
return new SocketAddress[] {resolve()};
}

@Override
public boolean equals(Object other) {
if (other == this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Objects;

public class DefaultEndPoint implements EndPoint, Serializable {
Expand All @@ -41,6 +44,38 @@ public InetSocketAddress resolve() {
return address;
}

/**
* Returns all socket addresses for this endpoint.
*
* <p>If the stored address is unresolved (i.e. the driver was configured with {@code
* RESOLVE_CONTACT_POINTS=false} and the hostname has not been looked up yet), this method calls
* {@link InetAddress#getAllByName(String)} to expand the hostname to every IP it resolves to.
* Each resolved IP is returned as an {@link InetSocketAddress} with the same port as the
* original. If the hostname resolves to only one IP, or if the address is already resolved, a
* single-element array is returned.
*
* <p>If DNS resolution fails, falls back to a single-element array containing {@link #resolve()}.
*/
@NonNull
@Override
public SocketAddress[] resolveAll() {
if (!address.isUnresolved()) {
return new SocketAddress[] {address};
}
try {
InetAddress[] all = InetAddress.getAllByName(address.getHostString());
SocketAddress[] result = new SocketAddress[all.length];
for (int i = 0; i < all.length; i++) {
result[i] = new InetSocketAddress(all[i], address.getPort());
}
return result;
} catch (UnknownHostException e) {
// Fallback: return the single unresolved address; the connect attempt will fail with a
// descriptive error rather than silently returning an empty array.
return new SocketAddress[] {address};
}
}
Comment on lines +61 to +77

@Override
public boolean equals(Object other) {
if (other == this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Comparator;
Expand Down Expand Up @@ -72,6 +73,33 @@ public InetSocketAddress resolve() {
}
}

/**
* Returns all socket addresses for this SNI proxy endpoint.
*
* <p>Re-resolves the proxy hostname on each call and returns one {@link InetSocketAddress} per
* A-record, so that the driver can try every proxy IP in sequence if one is unreachable.
*/
@NonNull
@Override
public SocketAddress[] resolveAll() {
try {
InetAddress[] aRecords = InetAddress.getAllByName(proxyAddress.getHostName());
if (aRecords.length == 0) {
throw new IllegalArgumentException(
"Could not resolve proxy address " + proxyAddress.getHostName());
}
Arrays.sort(aRecords, IP_COMPARATOR);
SocketAddress[] result = new SocketAddress[aRecords.length];
for (int i = 0; i < aRecords.length; i++) {
result[i] = new InetSocketAddress(aRecords[i], proxyAddress.getPort());
}
return result;
} catch (UnknownHostException e) {
throw new IllegalArgumentException(
"Could not resolve proxy address " + proxyAddress.getHostName(), e);
}
}

@Override
public boolean equals(Object other) {
if (other == this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import org.junit.Test;

public class DefaultEndPointTest {
Expand Down Expand Up @@ -57,4 +58,40 @@ public void should_reject_null_address() {
.isInstanceOf(NullPointerException.class)
.hasMessage("address can't be null");
}

@Test
public void resolve_all_returns_single_element_for_already_resolved_address() {
DefaultEndPoint endPoint = new DefaultEndPoint(new InetSocketAddress("127.0.0.1", 9042));
SocketAddress[] all = endPoint.resolveAll();
assertThat(all).hasSize(1);
assertThat(((InetSocketAddress) all[0]).isUnresolved()).isFalse();
assertThat(((InetSocketAddress) all[0]).getHostString()).isEqualTo("127.0.0.1");
}

@Test
public void resolve_all_expands_unresolved_hostname_to_at_least_one_address() {
// localhost reliably resolves to at least 127.0.0.1
DefaultEndPoint endPoint =
new DefaultEndPoint(InetSocketAddress.createUnresolved("localhost", 9042));
SocketAddress[] all = endPoint.resolveAll();
assertThat(all).isNotEmpty();
for (SocketAddress addr : all) {
InetSocketAddress inet = (InetSocketAddress) addr;
assertThat(inet.isUnresolved()).isFalse();
assertThat(inet.getPort()).isEqualTo(9042);
}
}

@Test
public void resolve_all_falls_back_to_single_element_when_hostname_is_unresolvable() {
// Unresolvable hostname: resolveAll() must not throw; it returns the unresolved address.
DefaultEndPoint endPoint =
new DefaultEndPoint(
InetSocketAddress.createUnresolved("this-host-does-not-exist.invalid", 9042));
SocketAddress[] all = endPoint.resolveAll();
assertThat(all).hasSize(1);
// The fallback address is the original unresolved one.
assertThat(((InetSocketAddress) all[0]).getHostString())
.isEqualTo("this-host-does-not-exist.invalid");
}
}
Loading
Loading