Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,16 @@ private <T> T sendMessageInner(Object message, Class<T> clz) throws Throwable {
private boolean shouldRetry(@Nullable RpcEndpointRef oldRef, Throwable e) {
// It will always throw celeborn exception , so we need to get the cause
// 'CelebornException: Exception thrown in awaitResult'
if (e.getCause() instanceof MasterNotLeaderException) {
MasterNotLeaderException exception = (MasterNotLeaderException) e.getCause();
MasterNotLeaderException exception = findMasterNotLeaderException(e);
if (exception != null) {
String leaderAddr =
isWorker
? exception.getSuggestedInternalLeaderAddress()
: exception.getSuggestedLeaderAddress();
if (!leaderAddr.equals(MasterNotLeaderException.LEADER_NOT_PRESENTED)) {
setRpcEndpointRef(leaderAddr);
if (!setRpcEndpointRef(leaderAddr)) {
resetRpcEndpointRef(oldRef);
}
} else {
LOG.warn("Master leader is not present currently, please check masters' status!");
resetRpcEndpointRef(oldRef);
Expand All @@ -191,13 +193,55 @@ private boolean shouldRetry(@Nullable RpcEndpointRef oldRef, Throwable e) {
return false;
}

private void setRpcEndpointRef(String masterEndpoint) {
@Nullable
private MasterNotLeaderException findMasterNotLeaderException(Throwable throwable) {
Throwable current = throwable;
while (current != null) {
if (current instanceof MasterNotLeaderException) {
return (MasterNotLeaderException) current;
}
current = current.getCause();
}
return null;
}

private boolean setRpcEndpointRef(String masterEndpoint) {
// This method should never care newer or old value, we just set the suggested master endpoint.
// If an error occurs when setting the suggested Master, it means that the Master may be down.
// At this time, we just set `rpcEndpointRef` to null. Then next time, we will re-select the
// Master and get the correct leader.
rpcEndpointRef.set(setupEndpointRef(masterEndpoint));
LOG.info("Fail over to master {}.", masterEndpoint);
String nextMasterEndpoint = masterEndpoint;
Set<String> triedMasterEndpoints = new HashSet<>();
while (triedMasterEndpoints.add(nextMasterEndpoint)) {
try {
RpcEndpointRef endpointRef = setupEndpointRef(nextMasterEndpoint);
if (endpointRef != null) {
rpcEndpointRef.set(endpointRef);
LOG.info("Fail over to master {}.", nextMasterEndpoint);
return true;
}
break;
} catch (RuntimeException e) {
MasterNotLeaderException exception = findMasterNotLeaderException(e);
if (exception == null) {
break;
}

String leaderAddr =
isWorker
? exception.getSuggestedInternalLeaderAddress()
: exception.getSuggestedLeaderAddress();
if (MasterNotLeaderException.LEADER_NOT_PRESENTED.equals(leaderAddr)) {
break;
}
nextMasterEndpoint = leaderAddr;
}
}
rpcEndpointRef.set(null);
LOG.info(
"Fail over to master {} failed during endpoint setup; will retry with another master.",
masterEndpoint);
return false;
}

private void resetRpcEndpointRef(@Nullable RpcEndpointRef oldRef) {
Expand Down Expand Up @@ -241,7 +285,13 @@ private RpcEndpointRef getOrSetupRpcEndpointRef(AtomicInteger currentIndex) {
if (endpointRef == null) {
int index = currentIndex.get();
do {
RpcEndpointRef tempEndpointRef = setupEndpointRef(activeMasterEndpoints.get(index));
RpcEndpointRef tempEndpointRef;
try {
tempEndpointRef = setupEndpointRef(activeMasterEndpoints.get(index));
} catch (RuntimeException e) {
currentIndex.set((index + 1) % activeMasterEndpoints.size());
throw e;
}
if (rpcEndpointRef.compareAndSet(null, tempEndpointRef)) {
index = (index + 1) % activeMasterEndpoints.size();
}
Expand Down Expand Up @@ -269,6 +319,14 @@ private RpcEndpointRef setupEndpointRef(String endpoint) {
rpcEnv.setupEndpointRef(
RpcAddress.fromHostAndPort(endpoint), masterEndpointResolver.masterEndpointName());
} catch (Exception e) {
MasterNotLeaderException exception = findMasterNotLeaderException(e);
if (exception != null
&& !MasterNotLeaderException.LEADER_NOT_PRESENTED.equals(
isWorker
? exception.getSuggestedInternalLeaderAddress()
: exception.getSuggestedLeaderAddress())) {
throw new RuntimeException(e);
}
// Catch all exceptions. Because we don't care whether this exception is IOException or
// TimeoutException or other exceptions, so we just try to connect to host:port, if fail,
// we try next address.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,257 @@ public void testOneMasterTimeoutInHA() {
checkOneMasterAskFailedInHA(new RpcTimeoutException("test", new TimeoutException("test")));
}

@Test
public void testBootstrapMasterNotLeaderRedirectsToSuggestedLeaderInHA() {
final CelebornConf conf =
conf()
.set(CelebornConf.HA_ENABLED().key(), "true")
.set(CelebornConf.MASTER_ENDPOINTS().key(), "host1:9097,host3:9097")
.set(CelebornConf.MASTER_CLIENT_MAX_RETRIES().key(), "5");

final RpcEndpointRef master2 = Mockito.mock(RpcEndpointRef.class);
Mockito.doReturn(Future$.MODULE$.successful(mockResponse))
.when(master2)
.ask(Mockito.any(), Mockito.any(), Mockito.any());

Mockito.doAnswer(
invocation -> {
RpcAddress address = invocation.getArgument(0, RpcAddress.class);
switch (address.host()) {
case "host1":
throw new RuntimeException(
new MasterNotLeaderException("host1:9097", "host2:9097", null));
case "host2":
return master2;
case "host3":
throw new AssertionError("Should follow the suggested leader before host3.");
default:
fail("Should use master host1/host2:" + masterPort + ", but use " + address);
}
return null;
})
.when(rpcEnv)
.setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString());

MasterClient client = new MasterClient(rpcEnv, conf, false);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);

HeartbeatFromWorkerResponse response = null;
try {
response = client.askSync(message, HeartbeatFromWorkerResponse.class);
} catch (Throwable t) {
LOG.error("It should redirect to the suggested leader during bootstrap.", t);
fail("It should redirect to the suggested leader during bootstrap.");
}

assertEquals(mockResponse, response);
}

@Test
public void testBootstrapMasterNotLeaderWithoutLeaderContinuesToOtherConfiguredMastersInHA() {
final CelebornConf conf = prepareForCelebornConfWithHA();

final RpcEndpointRef master2 = Mockito.mock(RpcEndpointRef.class);
Mockito.doReturn(Future$.MODULE$.successful(mockResponse))
.when(master2)
.ask(Mockito.any(), Mockito.any(), Mockito.any());

Mockito.doAnswer(
invocation -> {
RpcAddress address = invocation.getArgument(0, RpcAddress.class);
switch (address.host()) {
case "host1":
throw new RuntimeException(
new MasterNotLeaderException(
"host1:9097", MasterNotLeaderException.LEADER_NOT_PRESENTED, null));
case "host2":
return master2;
default:
fail("Should use master host1/host2:" + masterPort + ", but use " + address);
}
return null;
})
.when(rpcEnv)
.setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString());

MasterClient client = new MasterClient(rpcEnv, conf, false);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);

HeartbeatFromWorkerResponse response = null;
try {
response = client.askSync(message, HeartbeatFromWorkerResponse.class);
} catch (Throwable t) {
LOG.error("It should continue to another configured master during bootstrap.", t);
fail("It should continue to another configured master during bootstrap.");
}

assertEquals(mockResponse, response);
}

@Test
public void testSuggestedLeaderSetupFailureRetriesAnotherMasterInHA() {
final CelebornConf conf =
conf()
.set(CelebornConf.HA_ENABLED().key(), "true")
.set(CelebornConf.MASTER_ENDPOINTS().key(), "host1:9097,host4:9097")
.set(CelebornConf.MASTER_CLIENT_MAX_RETRIES().key(), "5");

final RpcEndpointRef master1 = Mockito.mock(RpcEndpointRef.class);
final RpcEndpointRef master3 = Mockito.mock(RpcEndpointRef.class);

Mockito.doReturn(
Future$.MODULE$.failed(new MasterNotLeaderException("host1:9097", "host2:9097", null)))
.when(master1)
.ask(Mockito.any(), Mockito.any(), Mockito.any());
Mockito.doReturn(Future$.MODULE$.successful(mockResponse))
.when(master3)
.ask(Mockito.any(), Mockito.any(), Mockito.any());

Mockito.doAnswer(
invocation -> {
RpcAddress address = invocation.getArgument(0, RpcAddress.class);
switch (address.host()) {
case "host1":
return master1;
case "host2":
throw new RuntimeException(
new MasterNotLeaderException("host2:9097", "host3:9097", null));
case "host3":
return master3;
case "host4":
throw new AssertionError("Should follow chained suggested leaders before host4.");
default:
fail(
"Should use master host1/host2/host3:" + masterPort + ", but use " + address);
}
return null;
})
.when(rpcEnv)
.setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString());

MasterClient client = new MasterClient(rpcEnv, conf, false);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);

HeartbeatFromWorkerResponse response = null;
try {
response = client.askSync(message, HeartbeatFromWorkerResponse.class);
} catch (Throwable t) {
LOG.error("It should retry with another master when suggested leader setup races.", t);
fail("It should retry with another master when suggested leader setup races.");
}

assertEquals(mockResponse, response);
}

@Test
public void testSuggestedLeaderRedirectCycleRetriesConfiguredMasterInHA() {
final CelebornConf conf =
conf()
.set(CelebornConf.HA_ENABLED().key(), "true")
.set(CelebornConf.MASTER_ENDPOINTS().key(), "host1:9097,host4:9097")
.set(CelebornConf.MASTER_CLIENT_MAX_RETRIES().key(), "5");

final RpcEndpointRef master1 = Mockito.mock(RpcEndpointRef.class);
final RpcEndpointRef master4 = Mockito.mock(RpcEndpointRef.class);

Mockito.doReturn(
Future$.MODULE$.failed(new MasterNotLeaderException("host1:9097", "host2:9097", null)))
.when(master1)
.ask(Mockito.any(), Mockito.any(), Mockito.any());
Mockito.doReturn(Future$.MODULE$.successful(mockResponse))
.when(master4)
.ask(Mockito.any(), Mockito.any(), Mockito.any());

Mockito.doAnswer(
invocation -> {
RpcAddress address = invocation.getArgument(0, RpcAddress.class);
switch (address.host()) {
case "host1":
return master1;
case "host2":
throw new RuntimeException(
new MasterNotLeaderException("host2:9097", "host3:9097", null));
case "host3":
throw new RuntimeException(
new MasterNotLeaderException("host3:9097", "host2:9097", null));
case "host4":
return master4;
default:
fail(
"Should use master host1/host2/host3/host4:"
+ masterPort
+ ", but use "
+ address);
}
return null;
})
.when(rpcEnv)
.setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString());

MasterClient client = new MasterClient(rpcEnv, conf, false);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);

HeartbeatFromWorkerResponse response = null;
try {
response = client.askSync(message, HeartbeatFromWorkerResponse.class);
} catch (Throwable t) {
LOG.error("It should retry another configured master after a redirect cycle.", t);
fail("It should retry another configured master after a redirect cycle.");
}

assertEquals(mockResponse, response);
Mockito.verify(rpcEnv, Mockito.times(1))
.setupEndpointRef(
Mockito.eq(RpcAddress.fromHostAndPort("host2:9097")), Mockito.anyString());
Mockito.verify(rpcEnv, Mockito.times(1))
.setupEndpointRef(
Mockito.eq(RpcAddress.fromHostAndPort("host3:9097")), Mockito.anyString());
}

@Test
public void testBootstrapSuggestedLeaderConnectionFailureRetriesConfiguredMasterInHA() {
final CelebornConf conf = prepareForCelebornConfWithHA();

final RpcEndpointRef master3 = Mockito.mock(RpcEndpointRef.class);

Mockito.doReturn(Future$.MODULE$.successful(mockResponse))
.when(master3)
.ask(Mockito.any(), Mockito.any(), Mockito.any());

Mockito.doAnswer(
invocation -> {
RpcAddress address = invocation.getArgument(0, RpcAddress.class);
switch (address.host()) {
case "host1":
throw new RuntimeException(
new MasterNotLeaderException("host1:9097", "host2:9097", null));
case "host2":
throw new IOException("test");
case "host3":
return master3;
default:
fail(
"Should use master host1/host2/host3:" + masterPort + ", but use " + address);
}
return null;
})
.when(rpcEnv)
.setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString());

MasterClient client = new MasterClient(rpcEnv, conf, false);
HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class);

HeartbeatFromWorkerResponse response = null;
try {
response = client.askSync(message, HeartbeatFromWorkerResponse.class);
} catch (Throwable t) {
LOG.error("It should retry another configured master after bootstrap redirect fails.", t);
fail("It should retry another configured master after bootstrap redirect fails.");
}

assertEquals(mockResponse, response);
}

private void checkOneMasterDownInHA(Exception causedByException) {
final CelebornConf conf = prepareForCelebornConfWithHA();

Expand Down Expand Up @@ -440,15 +691,19 @@ private void prepareForEndpointRefWithoutRetry(Supplier<Future<?>> supplier) {
}

private CelebornConf prepareForCelebornConfWithoutHA() {
return conf.clone()
return conf()
.set(CelebornConf.HA_ENABLED().key(), "false")
.set(CelebornConf.MASTER_ENDPOINTS().key(), masterHost + ":" + masterPort);
}

private CelebornConf prepareForCelebornConfWithHA() {
return conf.clone()
return conf()
.set(CelebornConf.HA_ENABLED().key(), "true")
.set(CelebornConf.MASTER_ENDPOINTS().key(), "host1:9097,host2:9097,host3:9097")
.set(CelebornConf.MASTER_CLIENT_MAX_RETRIES().key(), "5");
}

private CelebornConf conf() {
return conf.clone();
}
}
Loading