diff --git a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java index 626c112a506..a661e34a7e7 100644 --- a/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java +++ b/common/src/main/java/org/apache/celeborn/common/client/MasterClient.java @@ -171,14 +171,16 @@ private T sendMessageInner(Object message, Class clz) throws Throwable { private boolean shouldRetry(@Nullable RpcEndpointRef oldRef, Throwable e) { // It will always throw celeborn exception , so we need to get the cause // 'CelebornException: Exception thrown in awaitResult' - if (e.getCause() instanceof MasterNotLeaderException) { - MasterNotLeaderException exception = (MasterNotLeaderException) e.getCause(); + MasterNotLeaderException exception = findMasterNotLeaderException(e); + if (exception != null) { String leaderAddr = isWorker ? exception.getSuggestedInternalLeaderAddress() : exception.getSuggestedLeaderAddress(); if (!leaderAddr.equals(MasterNotLeaderException.LEADER_NOT_PRESENTED)) { - setRpcEndpointRef(leaderAddr); + if (!setRpcEndpointRef(leaderAddr)) { + resetRpcEndpointRef(oldRef); + } } else { LOG.warn("Master leader is not present currently, please check masters' status!"); resetRpcEndpointRef(oldRef); @@ -191,13 +193,55 @@ private boolean shouldRetry(@Nullable RpcEndpointRef oldRef, Throwable e) { return false; } - private void setRpcEndpointRef(String masterEndpoint) { + @Nullable + private MasterNotLeaderException findMasterNotLeaderException(Throwable throwable) { + Throwable current = throwable; + while (current != null) { + if (current instanceof MasterNotLeaderException) { + return (MasterNotLeaderException) current; + } + current = current.getCause(); + } + return null; + } + + private boolean setRpcEndpointRef(String masterEndpoint) { // This method should never care newer or old value, we just set the suggested master endpoint. // If an error occurs when setting the suggested Master, it means that the Master may be down. // At this time, we just set `rpcEndpointRef` to null. Then next time, we will re-select the // Master and get the correct leader. - rpcEndpointRef.set(setupEndpointRef(masterEndpoint)); - LOG.info("Fail over to master {}.", masterEndpoint); + String nextMasterEndpoint = masterEndpoint; + Set triedMasterEndpoints = new HashSet<>(); + while (triedMasterEndpoints.add(nextMasterEndpoint)) { + try { + RpcEndpointRef endpointRef = setupEndpointRef(nextMasterEndpoint); + if (endpointRef != null) { + rpcEndpointRef.set(endpointRef); + LOG.info("Fail over to master {}.", nextMasterEndpoint); + return true; + } + break; + } catch (RuntimeException e) { + MasterNotLeaderException exception = findMasterNotLeaderException(e); + if (exception == null) { + break; + } + + String leaderAddr = + isWorker + ? exception.getSuggestedInternalLeaderAddress() + : exception.getSuggestedLeaderAddress(); + if (MasterNotLeaderException.LEADER_NOT_PRESENTED.equals(leaderAddr)) { + break; + } + nextMasterEndpoint = leaderAddr; + } + } + rpcEndpointRef.set(null); + LOG.info( + "Fail over to master {} failed during endpoint setup; will retry with another master.", + masterEndpoint); + return false; } private void resetRpcEndpointRef(@Nullable RpcEndpointRef oldRef) { @@ -241,7 +285,13 @@ private RpcEndpointRef getOrSetupRpcEndpointRef(AtomicInteger currentIndex) { if (endpointRef == null) { int index = currentIndex.get(); do { - RpcEndpointRef tempEndpointRef = setupEndpointRef(activeMasterEndpoints.get(index)); + RpcEndpointRef tempEndpointRef; + try { + tempEndpointRef = setupEndpointRef(activeMasterEndpoints.get(index)); + } catch (RuntimeException e) { + currentIndex.set((index + 1) % activeMasterEndpoints.size()); + throw e; + } if (rpcEndpointRef.compareAndSet(null, tempEndpointRef)) { index = (index + 1) % activeMasterEndpoints.size(); } @@ -269,6 +319,14 @@ private RpcEndpointRef setupEndpointRef(String endpoint) { rpcEnv.setupEndpointRef( RpcAddress.fromHostAndPort(endpoint), masterEndpointResolver.masterEndpointName()); } catch (Exception e) { + MasterNotLeaderException exception = findMasterNotLeaderException(e); + if (exception != null + && !MasterNotLeaderException.LEADER_NOT_PRESENTED.equals( + isWorker + ? exception.getSuggestedInternalLeaderAddress() + : exception.getSuggestedLeaderAddress())) { + throw new RuntimeException(e); + } // Catch all exceptions. Because we don't care whether this exception is IOException or // TimeoutException or other exceptions, so we just try to connect to host:port, if fail, // we try next address. diff --git a/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java b/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java index bb74d99b2c7..26ad11c4320 100644 --- a/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java +++ b/common/src/test/java/org/apache/celeborn/common/client/MasterClientSuiteJ.java @@ -245,6 +245,257 @@ public void testOneMasterTimeoutInHA() { checkOneMasterAskFailedInHA(new RpcTimeoutException("test", new TimeoutException("test"))); } + @Test + public void testBootstrapMasterNotLeaderRedirectsToSuggestedLeaderInHA() { + final CelebornConf conf = + conf() + .set(CelebornConf.HA_ENABLED().key(), "true") + .set(CelebornConf.MASTER_ENDPOINTS().key(), "host1:9097,host3:9097") + .set(CelebornConf.MASTER_CLIENT_MAX_RETRIES().key(), "5"); + + final RpcEndpointRef master2 = Mockito.mock(RpcEndpointRef.class); + Mockito.doReturn(Future$.MODULE$.successful(mockResponse)) + .when(master2) + .ask(Mockito.any(), Mockito.any(), Mockito.any()); + + Mockito.doAnswer( + invocation -> { + RpcAddress address = invocation.getArgument(0, RpcAddress.class); + switch (address.host()) { + case "host1": + throw new RuntimeException( + new MasterNotLeaderException("host1:9097", "host2:9097", null)); + case "host2": + return master2; + case "host3": + throw new AssertionError("Should follow the suggested leader before host3."); + default: + fail("Should use master host1/host2:" + masterPort + ", but use " + address); + } + return null; + }) + .when(rpcEnv) + .setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString()); + + MasterClient client = new MasterClient(rpcEnv, conf, false); + HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class); + + HeartbeatFromWorkerResponse response = null; + try { + response = client.askSync(message, HeartbeatFromWorkerResponse.class); + } catch (Throwable t) { + LOG.error("It should redirect to the suggested leader during bootstrap.", t); + fail("It should redirect to the suggested leader during bootstrap."); + } + + assertEquals(mockResponse, response); + } + + @Test + public void testBootstrapMasterNotLeaderWithoutLeaderContinuesToOtherConfiguredMastersInHA() { + final CelebornConf conf = prepareForCelebornConfWithHA(); + + final RpcEndpointRef master2 = Mockito.mock(RpcEndpointRef.class); + Mockito.doReturn(Future$.MODULE$.successful(mockResponse)) + .when(master2) + .ask(Mockito.any(), Mockito.any(), Mockito.any()); + + Mockito.doAnswer( + invocation -> { + RpcAddress address = invocation.getArgument(0, RpcAddress.class); + switch (address.host()) { + case "host1": + throw new RuntimeException( + new MasterNotLeaderException( + "host1:9097", MasterNotLeaderException.LEADER_NOT_PRESENTED, null)); + case "host2": + return master2; + default: + fail("Should use master host1/host2:" + masterPort + ", but use " + address); + } + return null; + }) + .when(rpcEnv) + .setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString()); + + MasterClient client = new MasterClient(rpcEnv, conf, false); + HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class); + + HeartbeatFromWorkerResponse response = null; + try { + response = client.askSync(message, HeartbeatFromWorkerResponse.class); + } catch (Throwable t) { + LOG.error("It should continue to another configured master during bootstrap.", t); + fail("It should continue to another configured master during bootstrap."); + } + + assertEquals(mockResponse, response); + } + + @Test + public void testSuggestedLeaderSetupFailureRetriesAnotherMasterInHA() { + final CelebornConf conf = + conf() + .set(CelebornConf.HA_ENABLED().key(), "true") + .set(CelebornConf.MASTER_ENDPOINTS().key(), "host1:9097,host4:9097") + .set(CelebornConf.MASTER_CLIENT_MAX_RETRIES().key(), "5"); + + final RpcEndpointRef master1 = Mockito.mock(RpcEndpointRef.class); + final RpcEndpointRef master3 = Mockito.mock(RpcEndpointRef.class); + + Mockito.doReturn( + Future$.MODULE$.failed(new MasterNotLeaderException("host1:9097", "host2:9097", null))) + .when(master1) + .ask(Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.doReturn(Future$.MODULE$.successful(mockResponse)) + .when(master3) + .ask(Mockito.any(), Mockito.any(), Mockito.any()); + + Mockito.doAnswer( + invocation -> { + RpcAddress address = invocation.getArgument(0, RpcAddress.class); + switch (address.host()) { + case "host1": + return master1; + case "host2": + throw new RuntimeException( + new MasterNotLeaderException("host2:9097", "host3:9097", null)); + case "host3": + return master3; + case "host4": + throw new AssertionError("Should follow chained suggested leaders before host4."); + default: + fail( + "Should use master host1/host2/host3:" + masterPort + ", but use " + address); + } + return null; + }) + .when(rpcEnv) + .setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString()); + + MasterClient client = new MasterClient(rpcEnv, conf, false); + HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class); + + HeartbeatFromWorkerResponse response = null; + try { + response = client.askSync(message, HeartbeatFromWorkerResponse.class); + } catch (Throwable t) { + LOG.error("It should retry with another master when suggested leader setup races.", t); + fail("It should retry with another master when suggested leader setup races."); + } + + assertEquals(mockResponse, response); + } + + @Test + public void testSuggestedLeaderRedirectCycleRetriesConfiguredMasterInHA() { + final CelebornConf conf = + conf() + .set(CelebornConf.HA_ENABLED().key(), "true") + .set(CelebornConf.MASTER_ENDPOINTS().key(), "host1:9097,host4:9097") + .set(CelebornConf.MASTER_CLIENT_MAX_RETRIES().key(), "5"); + + final RpcEndpointRef master1 = Mockito.mock(RpcEndpointRef.class); + final RpcEndpointRef master4 = Mockito.mock(RpcEndpointRef.class); + + Mockito.doReturn( + Future$.MODULE$.failed(new MasterNotLeaderException("host1:9097", "host2:9097", null))) + .when(master1) + .ask(Mockito.any(), Mockito.any(), Mockito.any()); + Mockito.doReturn(Future$.MODULE$.successful(mockResponse)) + .when(master4) + .ask(Mockito.any(), Mockito.any(), Mockito.any()); + + Mockito.doAnswer( + invocation -> { + RpcAddress address = invocation.getArgument(0, RpcAddress.class); + switch (address.host()) { + case "host1": + return master1; + case "host2": + throw new RuntimeException( + new MasterNotLeaderException("host2:9097", "host3:9097", null)); + case "host3": + throw new RuntimeException( + new MasterNotLeaderException("host3:9097", "host2:9097", null)); + case "host4": + return master4; + default: + fail( + "Should use master host1/host2/host3/host4:" + + masterPort + + ", but use " + + address); + } + return null; + }) + .when(rpcEnv) + .setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString()); + + MasterClient client = new MasterClient(rpcEnv, conf, false); + HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class); + + HeartbeatFromWorkerResponse response = null; + try { + response = client.askSync(message, HeartbeatFromWorkerResponse.class); + } catch (Throwable t) { + LOG.error("It should retry another configured master after a redirect cycle.", t); + fail("It should retry another configured master after a redirect cycle."); + } + + assertEquals(mockResponse, response); + Mockito.verify(rpcEnv, Mockito.times(1)) + .setupEndpointRef( + Mockito.eq(RpcAddress.fromHostAndPort("host2:9097")), Mockito.anyString()); + Mockito.verify(rpcEnv, Mockito.times(1)) + .setupEndpointRef( + Mockito.eq(RpcAddress.fromHostAndPort("host3:9097")), Mockito.anyString()); + } + + @Test + public void testBootstrapSuggestedLeaderConnectionFailureRetriesConfiguredMasterInHA() { + final CelebornConf conf = prepareForCelebornConfWithHA(); + + final RpcEndpointRef master3 = Mockito.mock(RpcEndpointRef.class); + + Mockito.doReturn(Future$.MODULE$.successful(mockResponse)) + .when(master3) + .ask(Mockito.any(), Mockito.any(), Mockito.any()); + + Mockito.doAnswer( + invocation -> { + RpcAddress address = invocation.getArgument(0, RpcAddress.class); + switch (address.host()) { + case "host1": + throw new RuntimeException( + new MasterNotLeaderException("host1:9097", "host2:9097", null)); + case "host2": + throw new IOException("test"); + case "host3": + return master3; + default: + fail( + "Should use master host1/host2/host3:" + masterPort + ", but use " + address); + } + return null; + }) + .when(rpcEnv) + .setupEndpointRef(Mockito.any(RpcAddress.class), Mockito.anyString()); + + MasterClient client = new MasterClient(rpcEnv, conf, false); + HeartbeatFromWorker message = Mockito.mock(HeartbeatFromWorker.class); + + HeartbeatFromWorkerResponse response = null; + try { + response = client.askSync(message, HeartbeatFromWorkerResponse.class); + } catch (Throwable t) { + LOG.error("It should retry another configured master after bootstrap redirect fails.", t); + fail("It should retry another configured master after bootstrap redirect fails."); + } + + assertEquals(mockResponse, response); + } + private void checkOneMasterDownInHA(Exception causedByException) { final CelebornConf conf = prepareForCelebornConfWithHA(); @@ -440,15 +691,19 @@ private void prepareForEndpointRefWithoutRetry(Supplier> supplier) { } private CelebornConf prepareForCelebornConfWithoutHA() { - return conf.clone() + return conf() .set(CelebornConf.HA_ENABLED().key(), "false") .set(CelebornConf.MASTER_ENDPOINTS().key(), masterHost + ":" + masterPort); } private CelebornConf prepareForCelebornConfWithHA() { - return conf.clone() + return conf() .set(CelebornConf.HA_ENABLED().key(), "true") .set(CelebornConf.MASTER_ENDPOINTS().key(), "host1:9097,host2:9097,host3:9097") .set(CelebornConf.MASTER_CLIENT_MAX_RETRIES().key(), "5"); } + + private CelebornConf conf() { + return conf.clone(); + } }