diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcess.java index f95a06cca3908..9e9647fae82a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcess.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.concurrent.FutureUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,18 +153,32 @@ public CompletableFuture closeAsync() { jobId, leaderSessionId); - resultFuture.completeExceptionally(new JobNotFinishedException(jobId)); jobMasterGatewayFuture.completeExceptionally( new FlinkException("Process has been closed.")); jobMasterServiceFuture.whenComplete( (jobMasterService, throwable) -> { if (throwable != null) { - // JobMasterService creation has failed. Nothing to stop then :-) + resultFuture.completeExceptionally( + new JobNotFinishedException(jobId)); terminationFuture.complete(null); } else { - FutureUtils.forward( - jobMasterService.closeAsync(), terminationFuture); + // Defer JobNotFinishedException until after the JobMasterService + // has fully closed, so any in-flight terminal completion from the + // scheduler (jobReachedGloballyTerminalState) can win the race and + // be observed by the runner. See FLINK-39704. + jobMasterService + .closeAsync() + .whenComplete( + (unused, t) -> { + resultFuture.completeExceptionally( + new JobNotFinishedException(jobId)); + if (t != null) { + terminationFuture.completeExceptionally(t); + } else { + terminationFuture.complete(null); + } + }); } }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java index 2884ce3b47075..55db9c163ccb5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java @@ -62,13 +62,21 @@ *

All leadership operations are serialized. This means that granting the leadership has to * complete before the leadership can be revoked and vice versa. * - *

The {@link #resultFuture} can be completed with the following values: * * + *

The {@link #resultFuture} can be completed with the following values: * *

+ * + *

To close the race between a globally terminal result and a leadership revocation that strips + * the forwarded result (see FLINK-39704), terminal results are cached in {@link + * #pendingTerminalResult} the moment they are observed. The cache is populated by {@link + * #rememberGloballyTerminalResultIfCurrentProcess}, drained by either {@link #grantLeadership} (on + * re-grant) or {@link #completeResultFutureAfterClose} (on close), and cleared by {@link + * #onJobCompletion} when forwarding succeeds normally. */ public class JobMasterServiceLeadershipRunner implements JobManagerRunner, LeaderContender { @@ -108,6 +116,12 @@ public class JobMasterServiceLeadershipRunner implements JobManagerRunner, Leade @GuardedBy("lock") private boolean hasCurrentLeaderBeenCancelled = false; + @GuardedBy("lock") + private UUID currentJobMasterServiceProcessLeaderId = null; + + @GuardedBy("lock") + private JobManagerRunnerResult pendingTerminalResult = null; + public JobMasterServiceLeadershipRunner( JobMasterServiceProcessFactory jobMasterServiceProcessFactory, LeaderElection leaderElection, @@ -137,13 +151,12 @@ public CompletableFuture closeAsync() { new FlinkException( "JobMasterServiceLeadershipRunner is closed. Therefore, the corresponding JobMaster will never acquire the leadership.")); - resultFuture.complete( - JobManagerRunnerResult.forSuccess( - createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED))); - processTerminationFuture = jobMasterServiceProcess.closeAsync(); } + processTerminationFuture.whenComplete( + (ignored, throwable) -> completeResultFutureAfterClose()); + final CompletableFuture serviceTerminationFuture = FutureUtils.runAfterwards( processTerminationFuture, @@ -241,29 +254,83 @@ public boolean isInitialized() { @Override public void grantLeadership(UUID leaderSessionID) { - runIfStateRunning( - () -> startJobMasterServiceProcessAsync(leaderSessionID), - "starting a new JobMasterServiceProcess"); + synchronized (lock) { + if (!isRunning()) { + LOG.debug( + "Ignore 'starting a new JobMasterServiceProcess' because the leadership runner is no longer running."); + return; + } + sequentialOperation = + sequentialOperation.thenCompose( + unused -> flushPendingOrStartNewProcessAsync(leaderSessionID)); + handleAsyncOperationError(sequentialOperation, "Could not start the job manager."); + } + } + + private CompletableFuture flushPendingOrStartNewProcessAsync(UUID leaderSessionId) { + final JobManagerRunnerResult cachedTerminalResult; + synchronized (lock) { + if (!isRunning()) { + return FutureUtils.completedVoidFuture(); + } + cachedTerminalResult = takePendingTerminalResult(); + if (cachedTerminalResult != null) { + state = State.JOB_COMPLETED; + } + } + + if (cachedTerminalResult != null) { + LOG.info( + "Flushing previously observed globally terminal result for job {} on re-grant; not starting a new {}. Job state: {}.", + getJobID(), + JobMasterServiceProcess.class.getSimpleName(), + cachedTerminalResult + .getExecutionGraphInfo() + .getArchivedExecutionGraph() + .getState()); + resultFuture.complete(cachedTerminalResult); + return FutureUtils.completedVoidFuture(); + } + + return jobResultStore + .hasJobResultEntryAsync(getJobID()) + .thenCompose( + hasJobResult -> + hasJobResult + ? handleJobAlreadyDoneIfValidLeader(leaderSessionId) + : createNewJobMasterServiceProcessIfValidLeader( + leaderSessionId)); } @GuardedBy("lock") - private void startJobMasterServiceProcessAsync(UUID leaderSessionId) { - sequentialOperation = - sequentialOperation.thenCompose( - unused -> - jobResultStore - .hasJobResultEntryAsync(getJobID()) - .thenCompose( - hasJobResult -> { - if (hasJobResult) { - return handleJobAlreadyDoneIfValidLeader( - leaderSessionId); - } else { - return createNewJobMasterServiceProcessIfValidLeader( - leaderSessionId); - } - })); - handleAsyncOperationError(sequentialOperation, "Could not start the job manager."); + private JobManagerRunnerResult takePendingTerminalResult() { + final JobManagerRunnerResult terminalResult = pendingTerminalResult; + pendingTerminalResult = null; + if (terminalResult != null) { + currentJobMasterServiceProcessLeaderId = null; + } + return terminalResult; + } + + private void completeResultFutureAfterClose() { + JobManagerRunnerResult closeResult; + synchronized (lock) { + closeResult = takePendingTerminalResult(); + if (closeResult == null) { + closeResult = + JobManagerRunnerResult.forSuccess( + createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED)); + currentJobMasterServiceProcessLeaderId = null; + } + } + + if (isGloballyTerminalResult(closeResult)) { + LOG.info( + "Flushing globally terminal result for job {} during close. Job state: {}.", + getJobID(), + closeResult.getExecutionGraphInfo().getArchivedExecutionGraph().getState()); + } + resultFuture.complete(closeResult); } private CompletableFuture handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) { @@ -316,6 +383,9 @@ private void jobAlreadyDone(UUID leaderSessionId) { @GuardedBy("lock") private void createNewJobMasterServiceProcess(UUID leaderSessionId) { Preconditions.checkState(jobMasterServiceProcess.closeAsync().isDone()); + Preconditions.checkState( + pendingTerminalResult == null, + "No new JobMasterServiceProcess should be created while a terminal result is pending."); LOG.info( "{} for job {} was granted leadership with leader id {}. Creating new {}.", @@ -324,6 +394,7 @@ private void createNewJobMasterServiceProcess(UUID leaderSessionId) { leaderSessionId, JobMasterServiceProcess.class.getSimpleName()); + currentJobMasterServiceProcessLeaderId = leaderSessionId; jobMasterServiceProcess = jobMasterServiceProcessFactory.create(leaderSessionId); forwardIfValidLeader( @@ -355,17 +426,48 @@ private void confirmLeadership( private void forwardResultFuture( UUID leaderSessionId, CompletableFuture resultFuture) { resultFuture.whenComplete( - (jobManagerRunnerResult, throwable) -> - runIfValidLeader( - leaderSessionId, - () -> onJobCompletion(jobManagerRunnerResult, throwable), - "result future forwarding")); + (jobManagerRunnerResult, throwable) -> { + rememberGloballyTerminalResultIfCurrentProcess( + leaderSessionId, jobManagerRunnerResult); + runIfValidLeader( + leaderSessionId, + () -> onJobCompletion(jobManagerRunnerResult, throwable), + "result future forwarding"); + }); + } + + private void rememberGloballyTerminalResultIfCurrentProcess( + UUID leaderSessionId, JobManagerRunnerResult jobManagerRunnerResult) { + synchronized (lock) { + if (resultFuture.isDone()) { + return; + } + if (leaderSessionId.equals(currentJobMasterServiceProcessLeaderId) + && isGloballyTerminalResult(jobManagerRunnerResult)) { + LOG.debug( + "Caching globally terminal job result for job {} in case leadership is lost before forwarding.", + getJobID()); + pendingTerminalResult = jobManagerRunnerResult; + } + } + } + + private boolean isGloballyTerminalResult(JobManagerRunnerResult jobManagerRunnerResult) { + return jobManagerRunnerResult != null + && jobManagerRunnerResult.isSuccess() + && jobManagerRunnerResult + .getExecutionGraphInfo() + .getArchivedExecutionGraph() + .getState() + .isGloballyTerminalState(); } @GuardedBy("lock") private void onJobCompletion( JobManagerRunnerResult jobManagerRunnerResult, Throwable throwable) { state = State.JOB_COMPLETED; + currentJobMasterServiceProcessLeaderId = null; + pendingTerminalResult = null; LOG.debug("Completing the result for job {}.", getJobID()); @@ -423,6 +525,10 @@ private CompletableFuture stopJobMasterServiceProcess() { hasCurrentLeaderBeenCancelled = false; + // Intentionally NOT clearing currentJobMasterServiceProcessLeaderId here: a globally + // terminal result from the closing process can still complete its resultFuture during + // closeAsync, and the forwarding callback needs the matching leader id to cache it + // (see FLINK-39704). return jobMasterServiceProcess.closeAsync(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java index 02df484668108..134818c066af3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java @@ -268,6 +268,34 @@ void testSuccessOnTerminalState() { == JobStatus.FINISHED); } + @Test + void testTerminalResultPublishedDuringCloseWinsOverJobNotFinished() { + final CompletableFuture jobMasterServiceFuture = + new CompletableFuture<>(); + DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture); + final CompletableFuture serviceTerminationFuture = new CompletableFuture<>(); + jobMasterServiceFuture.complete( + new TestingJobMasterService("localhost", serviceTerminationFuture, null)); + + final CompletableFuture processTerminationFuture = serviceProcess.closeAsync(); + + assertThat(serviceProcess.getResultFuture()).isNotDone(); + + serviceProcess.jobReachedGloballyTerminalState( + new ExecutionGraphInfo( + new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build())); + + serviceTerminationFuture.complete(null); + processTerminationFuture.join(); + + assertThat(serviceProcess.getResultFuture()) + .isCompletedWithValueMatching(JobManagerRunnerResult::isSuccess) + .isCompletedWithValueMatching( + r -> + r.getExecutionGraphInfo().getArchivedExecutionGraph().getState() + == JobStatus.FAILED); + } + private DefaultJobMasterServiceProcess createTestInstance( CompletableFuture jobMasterServiceFuture) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java index f3c367b52ca97..914976d10a0ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java @@ -57,6 +57,8 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import javax.annotation.Nonnull; @@ -68,6 +70,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -246,17 +249,26 @@ void testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializationErro @Nonnull private ExecutionGraphInfo createFailedExecutionGraphInfo(FlinkException testException) { + return createExecutionGraphInfo(JobStatus.FAILED, testException); + } + + @Nonnull + private ExecutionGraphInfo createExecutionGraphInfo(JobStatus jobStatus, Throwable cause) { return new ExecutionGraphInfo( ArchivedExecutionGraph.createSparseArchivedExecutionGraph( jobGraph.getJobID(), jobGraph.getName(), - JobStatus.FAILED, + jobStatus, jobGraph.getJobType(), - testException, + cause, null, 1L)); } + private JobManagerRunnerResult createGloballyTerminalResult(JobStatus jobStatus) { + return JobManagerRunnerResult.forSuccess(createExecutionGraphInfo(jobStatus, null)); + } + @Test void testJobMasterServiceProcessIsTerminatedOnClose() throws Exception { final CompletableFuture terminationFuture = new CompletableFuture<>(); @@ -482,7 +494,7 @@ void runCancellationFailsTest(Consumer } @Test - void testResultFutureCompletionOfOutdatedLeaderIsIgnored() throws Exception { + void testInitializationFailureCompletionOfOutdatedLeaderIsIgnored() throws Exception { final CompletableFuture resultFuture = new CompletableFuture<>(); final JobMasterServiceLeadershipRunner jobManagerRunner = newJobMasterServiceLeadershipRunnerBuilder() @@ -503,8 +515,9 @@ void testResultFutureCompletionOfOutdatedLeaderIsIgnored() throws Exception { .isNotDone(); resultFuture.complete( - JobManagerRunnerResult.forSuccess( - createFailedExecutionGraphInfo(new FlinkException("test exception")))); + JobManagerRunnerResult.forInitializationFailure( + createFailedExecutionGraphInfo(new FlinkException("test exception")), + new FlinkException("test exception"))); assertThat(jobManagerRunner.getResultFuture()) .as("The runner result should not be completed if the leadership is lost.") @@ -527,6 +540,157 @@ void testResultFutureCompletionOfOutdatedLeaderIsIgnored() throws Exception { }); } + @ParameterizedTest(name = "{0}") + @EnumSource( + value = JobStatus.class, + names = {"FAILED", "FINISHED", "CANCELED"}) + void testCloseAsyncCompletesWithGloballyTerminalResultObservedBeforeLeadershipRevoke( + JobStatus terminalStatus) throws Exception { + final CompletableFuture resultFuture = new CompletableFuture<>(); + final ControlledForwardingCheck check = new ControlledForwardingCheck(); + final JobMasterServiceLeadershipRunner jobManagerRunner = + newJobMasterServiceLeadershipRunnerBuilder() + .setLeaderElection(check.election) + .withSingleJobMasterServiceProcess( + TestingJobMasterServiceProcess.newBuilder() + .setGetResultFutureSupplier(() -> resultFuture) + .build()) + .build(); + + jobManagerRunner.start(); + try { + check.election.isLeader(UUID.randomUUID()).join(); + + resultFuture.complete(createGloballyTerminalResult(terminalStatus)); + assertThat(jobManagerRunner.getResultFuture()).isNotDone(); + + check.election.notLeader(); + jobManagerRunner.closeAsync().get(); + + assertRunnerResultHasJobStatus(jobManagerRunner, terminalStatus); + } finally { + check.release(); + } + } + + @Test + void testCloseAsyncFlushesCachedGloballyTerminalResultAfterRevoke() throws Exception { + final CompletableFuture resultFuture = new CompletableFuture<>(); + final ControlledForwardingCheck check = new ControlledForwardingCheck(); + final JobMasterServiceLeadershipRunner jobManagerRunner = + newJobMasterServiceLeadershipRunnerBuilder() + .setLeaderElection(check.election) + .withSingleJobMasterServiceProcess( + TestingJobMasterServiceProcess.newBuilder() + .setGetResultFutureSupplier(() -> resultFuture) + .build()) + .build(); + + jobManagerRunner.start(); + try { + check.election.isLeader(UUID.randomUUID()).join(); + + check.election.notLeader(); + resultFuture.complete(createGloballyTerminalResult(JobStatus.FAILED)); + + assertThat(jobManagerRunner.getResultFuture()).isNotDone(); + + jobManagerRunner.closeAsync().get(); + + assertRunnerResultHasJobStatus(jobManagerRunner, JobStatus.FAILED); + } finally { + check.release(); + } + } + + @Test + void testGrantLeadershipFlushesCachedTerminalResultObservedAfterRevoke() throws Exception { + final CompletableFuture resultFuture = new CompletableFuture<>(); + final AtomicInteger createdProcesses = new AtomicInteger(); + final ControlledForwardingCheck check = new ControlledForwardingCheck(); + final JobMasterServiceLeadershipRunner jobManagerRunner = + newJobMasterServiceLeadershipRunnerBuilder() + .setLeaderElection(check.election) + .setJobMasterServiceProcessFactory( + TestingJobMasterServiceProcessFactory.newBuilder() + .setJobId(jobGraph.getJobID()) + .setJobName(jobGraph.getName()) + .setJobMasterServiceProcessFunction( + ignored -> { + createdProcesses.incrementAndGet(); + return TestingJobMasterServiceProcess + .newBuilder() + .setGetResultFutureSupplier( + () -> resultFuture) + .build(); + }) + .build()) + .build(); + + jobManagerRunner.start(); + try { + check.election.isLeader(UUID.randomUUID()).join(); + assertThat(createdProcesses).hasValue(1); + + check.election.notLeader(); + resultFuture.complete(createGloballyTerminalResult(JobStatus.FAILED)); + + assertThat(jobManagerRunner.getResultFuture()).isNotDone(); + + jobManagerRunner.grantLeadership(UUID.randomUUID()); + + assertThat(createdProcesses) + .as( + "A re-grant after a cached terminal result must flush it instead of starting a zombie process.") + .hasValue(1); + + assertRunnerResultHasJobStatus(jobManagerRunner, JobStatus.FAILED); + } finally { + check.release(); + } + } + + private static void assertRunnerResultHasJobStatus( + JobMasterServiceLeadershipRunner runner, JobStatus expectedStatus) { + assertThatFuture(runner.getResultFuture()) + .eventuallySucceeds() + .satisfies( + result -> + assertThat( + result.getExecutionGraphInfo() + .getArchivedExecutionGraph() + .getState()) + .isEqualTo(expectedStatus)); + } + + private static final class ControlledForwardingCheck { + final CompletableFuture delayedForwardingCheck = new CompletableFuture<>(); + final TestingLeaderElection election; + + ControlledForwardingCheck() { + final Queue> hasLeadershipResults = + new ArrayDeque<>( + Arrays.asList( + CompletableFuture.completedFuture(true), + delayedForwardingCheck)); + this.election = + new TestingLeaderElection() { + @Override + public synchronized CompletableFuture hasLeadershipAsync( + UUID leaderSessionId) { + return hasLeadershipResults.isEmpty() + ? CompletableFuture.completedFuture(false) + : hasLeadershipResults.poll(); + } + }; + } + + void release() throws Exception { + delayedForwardingCheck.complete(false); + election.close(); + } + } + @Test void testJobMasterGatewayIsInvalidatedOnLeadershipChanges() throws Exception { final JobMasterServiceLeadershipRunner jobManagerRunner =