Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -154,18 +153,32 @@ public CompletableFuture<Void> closeAsync() {
jobId,
leaderSessionId);

resultFuture.completeExceptionally(new JobNotFinishedException(jobId));
jobMasterGatewayFuture.completeExceptionally(
new FlinkException("Process has been closed."));

jobMasterServiceFuture.whenComplete(
(jobMasterService, throwable) -> {
if (throwable != null) {
// JobMasterService creation has failed. Nothing to stop then :-)
resultFuture.completeExceptionally(
new JobNotFinishedException(jobId));
terminationFuture.complete(null);
} else {
FutureUtils.forward(
jobMasterService.closeAsync(), terminationFuture);
// Defer JobNotFinishedException until after the JobMasterService
// has fully closed, so any in-flight terminal completion from the
// scheduler (jobReachedGloballyTerminalState) can win the race and
// be observed by the runner. See FLINK-39704.
jobMasterService
.closeAsync()
.whenComplete(
(unused, t) -> {
resultFuture.completeExceptionally(
new JobNotFinishedException(jobId));
if (t != null) {
terminationFuture.completeExceptionally(t);
} else {
terminationFuture.complete(null);
}
});
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,21 @@
* <p>All leadership operations are serialized. This means that granting the leadership has to
* complete before the leadership can be revoked and vice versa.
*
* <p>The {@link #resultFuture} can be completed with the following values: * *
* <p>The {@link #resultFuture} can be completed with the following values:
*
* <ul>
* <li>{@link JobManagerRunnerResult} to signal an initialization failure of the {@link
* JobMasterService} or the completion of a job
* JobMasterService}, the completion of a job, or a globally terminal result observed before
* leadership revocation could be forwarded
* <li>{@link Exception} to signal an unexpected failure
* </ul>
*
* <p>To close the race between a globally terminal result and a leadership revocation that strips
* the forwarded result (see FLINK-39704), terminal results are cached in {@link
* #pendingTerminalResult} the moment they are observed. The cache is populated by {@link
* #rememberGloballyTerminalResultIfCurrentProcess}, drained by either {@link #grantLeadership} (on
* re-grant) or {@link #completeResultFutureAfterClose} (on close), and cleared by {@link
* #onJobCompletion} when forwarding succeeds normally.
*/
public class JobMasterServiceLeadershipRunner implements JobManagerRunner, LeaderContender {

Expand Down Expand Up @@ -108,6 +116,12 @@ public class JobMasterServiceLeadershipRunner implements JobManagerRunner, Leade
@GuardedBy("lock")
private boolean hasCurrentLeaderBeenCancelled = false;

@GuardedBy("lock")
private UUID currentJobMasterServiceProcessLeaderId = null;

@GuardedBy("lock")
private JobManagerRunnerResult pendingTerminalResult = null;

public JobMasterServiceLeadershipRunner(
JobMasterServiceProcessFactory jobMasterServiceProcessFactory,
LeaderElection leaderElection,
Expand Down Expand Up @@ -137,13 +151,12 @@ public CompletableFuture<Void> closeAsync() {
new FlinkException(
"JobMasterServiceLeadershipRunner is closed. Therefore, the corresponding JobMaster will never acquire the leadership."));

resultFuture.complete(
JobManagerRunnerResult.forSuccess(
createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED)));

processTerminationFuture = jobMasterServiceProcess.closeAsync();
}

processTerminationFuture.whenComplete(
(ignored, throwable) -> completeResultFutureAfterClose());

final CompletableFuture<Void> serviceTerminationFuture =
FutureUtils.runAfterwards(
processTerminationFuture,
Expand Down Expand Up @@ -241,29 +254,83 @@ public boolean isInitialized() {

@Override
public void grantLeadership(UUID leaderSessionID) {
runIfStateRunning(
() -> startJobMasterServiceProcessAsync(leaderSessionID),
"starting a new JobMasterServiceProcess");
synchronized (lock) {
if (!isRunning()) {
LOG.debug(
"Ignore 'starting a new JobMasterServiceProcess' because the leadership runner is no longer running.");
return;
}
sequentialOperation =
sequentialOperation.thenCompose(
unused -> flushPendingOrStartNewProcessAsync(leaderSessionID));
handleAsyncOperationError(sequentialOperation, "Could not start the job manager.");
}
}

private CompletableFuture<Void> flushPendingOrStartNewProcessAsync(UUID leaderSessionId) {
final JobManagerRunnerResult cachedTerminalResult;
synchronized (lock) {
if (!isRunning()) {
return FutureUtils.completedVoidFuture();
}
cachedTerminalResult = takePendingTerminalResult();
if (cachedTerminalResult != null) {
state = State.JOB_COMPLETED;
}
}

if (cachedTerminalResult != null) {
LOG.info(
"Flushing previously observed globally terminal result for job {} on re-grant; not starting a new {}. Job state: {}.",
getJobID(),
JobMasterServiceProcess.class.getSimpleName(),
cachedTerminalResult
.getExecutionGraphInfo()
.getArchivedExecutionGraph()
.getState());
resultFuture.complete(cachedTerminalResult);
return FutureUtils.completedVoidFuture();
}

return jobResultStore
.hasJobResultEntryAsync(getJobID())
.thenCompose(
hasJobResult ->
hasJobResult
? handleJobAlreadyDoneIfValidLeader(leaderSessionId)
: createNewJobMasterServiceProcessIfValidLeader(
leaderSessionId));
}

@GuardedBy("lock")
private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
sequentialOperation =
sequentialOperation.thenCompose(
unused ->
jobResultStore
.hasJobResultEntryAsync(getJobID())
.thenCompose(
hasJobResult -> {
if (hasJobResult) {
return handleJobAlreadyDoneIfValidLeader(
leaderSessionId);
} else {
return createNewJobMasterServiceProcessIfValidLeader(
leaderSessionId);
}
}));
handleAsyncOperationError(sequentialOperation, "Could not start the job manager.");
private JobManagerRunnerResult takePendingTerminalResult() {
final JobManagerRunnerResult terminalResult = pendingTerminalResult;
pendingTerminalResult = null;
if (terminalResult != null) {
currentJobMasterServiceProcessLeaderId = null;
}
return terminalResult;
}

private void completeResultFutureAfterClose() {
JobManagerRunnerResult closeResult;
synchronized (lock) {
closeResult = takePendingTerminalResult();
if (closeResult == null) {
closeResult =
JobManagerRunnerResult.forSuccess(
createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED));
currentJobMasterServiceProcessLeaderId = null;
}
}

if (isGloballyTerminalResult(closeResult)) {
LOG.info(
"Flushing globally terminal result for job {} during close. Job state: {}.",
getJobID(),
closeResult.getExecutionGraphInfo().getArchivedExecutionGraph().getState());
}
resultFuture.complete(closeResult);
}

private CompletableFuture<Void> handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) {
Expand Down Expand Up @@ -316,6 +383,9 @@ private void jobAlreadyDone(UUID leaderSessionId) {
@GuardedBy("lock")
private void createNewJobMasterServiceProcess(UUID leaderSessionId) {
Preconditions.checkState(jobMasterServiceProcess.closeAsync().isDone());
Preconditions.checkState(
pendingTerminalResult == null,
"No new JobMasterServiceProcess should be created while a terminal result is pending.");

LOG.info(
"{} for job {} was granted leadership with leader id {}. Creating new {}.",
Expand All @@ -324,6 +394,7 @@ private void createNewJobMasterServiceProcess(UUID leaderSessionId) {
leaderSessionId,
JobMasterServiceProcess.class.getSimpleName());

currentJobMasterServiceProcessLeaderId = leaderSessionId;
jobMasterServiceProcess = jobMasterServiceProcessFactory.create(leaderSessionId);

forwardIfValidLeader(
Expand Down Expand Up @@ -355,17 +426,48 @@ private void confirmLeadership(
private void forwardResultFuture(
UUID leaderSessionId, CompletableFuture<JobManagerRunnerResult> resultFuture) {
resultFuture.whenComplete(
(jobManagerRunnerResult, throwable) ->
runIfValidLeader(
leaderSessionId,
() -> onJobCompletion(jobManagerRunnerResult, throwable),
"result future forwarding"));
(jobManagerRunnerResult, throwable) -> {
rememberGloballyTerminalResultIfCurrentProcess(
leaderSessionId, jobManagerRunnerResult);
runIfValidLeader(
leaderSessionId,
() -> onJobCompletion(jobManagerRunnerResult, throwable),
"result future forwarding");
});
}

private void rememberGloballyTerminalResultIfCurrentProcess(
UUID leaderSessionId, JobManagerRunnerResult jobManagerRunnerResult) {
synchronized (lock) {
if (resultFuture.isDone()) {
return;
}
if (leaderSessionId.equals(currentJobMasterServiceProcessLeaderId)
&& isGloballyTerminalResult(jobManagerRunnerResult)) {
LOG.debug(
"Caching globally terminal job result for job {} in case leadership is lost before forwarding.",
getJobID());
pendingTerminalResult = jobManagerRunnerResult;
}
}
}

private boolean isGloballyTerminalResult(JobManagerRunnerResult jobManagerRunnerResult) {
return jobManagerRunnerResult != null
&& jobManagerRunnerResult.isSuccess()
&& jobManagerRunnerResult
.getExecutionGraphInfo()
.getArchivedExecutionGraph()
.getState()
.isGloballyTerminalState();
}

@GuardedBy("lock")
private void onJobCompletion(
JobManagerRunnerResult jobManagerRunnerResult, Throwable throwable) {
state = State.JOB_COMPLETED;
currentJobMasterServiceProcessLeaderId = null;
pendingTerminalResult = null;

LOG.debug("Completing the result for job {}.", getJobID());

Expand Down Expand Up @@ -423,6 +525,10 @@ private CompletableFuture<Void> stopJobMasterServiceProcess() {

hasCurrentLeaderBeenCancelled = false;

// Intentionally NOT clearing currentJobMasterServiceProcessLeaderId here: a globally
// terminal result from the closing process can still complete its resultFuture during
// closeAsync, and the forwarding callback needs the matching leader id to cache it
// (see FLINK-39704).
return jobMasterServiceProcess.closeAsync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,34 @@ void testSuccessOnTerminalState() {
== JobStatus.FINISHED);
}

@Test
void testTerminalResultPublishedDuringCloseWinsOverJobNotFinished() {
final CompletableFuture<JobMasterService> jobMasterServiceFuture =
new CompletableFuture<>();
DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture);
final CompletableFuture<Void> serviceTerminationFuture = new CompletableFuture<>();
jobMasterServiceFuture.complete(
new TestingJobMasterService("localhost", serviceTerminationFuture, null));

final CompletableFuture<Void> processTerminationFuture = serviceProcess.closeAsync();

assertThat(serviceProcess.getResultFuture()).isNotDone();

serviceProcess.jobReachedGloballyTerminalState(
new ExecutionGraphInfo(
new ArchivedExecutionGraphBuilder().setState(JobStatus.FAILED).build()));

serviceTerminationFuture.complete(null);
processTerminationFuture.join();

assertThat(serviceProcess.getResultFuture())
.isCompletedWithValueMatching(JobManagerRunnerResult::isSuccess)
.isCompletedWithValueMatching(
r ->
r.getExecutionGraphInfo().getArchivedExecutionGraph().getState()
== JobStatus.FAILED);
}

private DefaultJobMasterServiceProcess createTestInstance(
CompletableFuture<JobMasterService> jobMasterServiceFuture) {

Expand Down
Loading