Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,16 @@
</ul>
</td>
</tr>
<tr>
<td colspan="2">Query parameters</td>
</tr>
<tr>
<td colspan="2">
<ul>
<li><code>maxExceptions</code> (optional): Comma-separated list of integer values that specifies the upper limit of exceptions to return.</li>
</ul>
</td>
</tr>
<tr>
<td colspan="2">
<label>
Expand Down
9 changes: 9 additions & 0 deletions docs/static/generated/rest_v1_dispatcher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ paths:
required: true
schema:
$ref: "#/components/schemas/ApplicationID"
- name: maxExceptions
in: query
description: Comma-separated list of integer values that specifies the upper
limit of exceptions to return.
required: false
style: form
schema:
type: integer
format: int32
responses:
"200":
description: The request was successful.
Expand Down
5 changes: 4 additions & 1 deletion flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@
} ]
},
"query-parameters" : {
"queryParameters" : [ ]
"queryParameters" : [ {
"key" : "maxExceptions",
"mandatory" : false
} ]
},
"request" : {
"type" : "object",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.messages.ApplicationExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.application.ApplicationExceptionsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ApplicationJsonArchivist;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
Expand All @@ -38,6 +39,7 @@
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

Expand All @@ -47,17 +49,19 @@ public class ApplicationExceptionsHandler
RestfulGateway,
EmptyRequestBody,
ApplicationExceptionsInfoWithHistory,
ApplicationMessageParameters>
ApplicationExceptionsMessageParameters>
implements ApplicationJsonArchivist {

static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just an observation, that in https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java#L63 we also have same constant. maybe worth extracting if we revisit default in the future. however, it is just an observation (not a blocker).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, since you noted it's not a blocker for this fix, I'd prefer to keep this PR scoped and file a follow-up PR in the future


public ApplicationExceptionsHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Duration timeout,
Map<String, String> responseHeaders,
MessageHeaders<
EmptyRequestBody,
ApplicationExceptionsInfoWithHistory,
ApplicationMessageParameters>
ApplicationExceptionsMessageParameters>
messageHeaders) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders);
}
Expand All @@ -66,13 +70,20 @@ public ApplicationExceptionsHandler(
public CompletableFuture<ApplicationExceptionsInfoWithHistory> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway) {
ApplicationID applicationId = request.getPathParameter(ApplicationIDPathParameter.class);
final List<Integer> exceptionToReportMaxSizes =
request.getQueryParameter(UpperLimitExceptionParameter.class);
final int exceptionToReportMaxSize =
exceptionToReportMaxSizes.size() > 0
? exceptionToReportMaxSizes.get(0)
: MAX_NUMBER_EXCEPTION_TO_REPORT;

return gateway.requestApplication(applicationId, timeout)
.thenApply(
archivedApplication ->
ApplicationExceptionsInfoWithHistory
.fromApplicationExceptionHistory(
archivedApplication.getExceptionHistory()));
archivedApplication.getExceptionHistory(),
exceptionToReportMaxSize));
}

@Override
Expand All @@ -88,6 +99,7 @@ public Collection<ArchivedJson> archiveApplicationWithPath(
new ArchivedJson(
path,
ApplicationExceptionsInfoWithHistory.fromApplicationExceptionHistory(
archivedApplication.getExceptionHistory())));
archivedApplication.getExceptionHistory(),
MAX_NUMBER_EXCEPTION_TO_REPORT)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,11 @@ public String toString() {
}

public static ApplicationExceptionsInfoWithHistory fromApplicationExceptionHistory(
Collection<ApplicationExceptionHistoryEntry> exceptions) {
Collection<ApplicationExceptionHistoryEntry> exceptions, int maxSize) {
return new ApplicationExceptionsInfoWithHistory(
new ApplicationExceptionHistory(
exceptions.stream()
.limit(maxSize)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that change the existing behavior (the return order)?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Stream.limit(n) preserves encounter order and just truncates to the first n elements, same iteration order as before, only shorter

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But which part/slice of history that gives us in the result? After this PR the user of this API will receive the most oldest one (it drops the most recent). And it looks inconsistent with JobExceptions history, which reverses first and as a result returns newest.
Is it intentional? Or I'm missing something?

Copy link
Copy Markdown
Author

@jubins jubins May 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, that wasn't intentional. Pushed change which reverses the history before .limit(maxSize) in ApplicationExceptionsInfoWithHistory#fromApplicationExceptionHistory, so the endpoint now returns the newest N entries

.map(
exception ->
new ApplicationExceptionInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.runtime.rest.handler.application.ApplicationExceptionsHandler;
import org.apache.flink.runtime.rest.messages.ApplicationExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;

Expand All @@ -33,7 +32,7 @@ public class ApplicationExceptionsHeaders
implements RuntimeMessageHeaders<
EmptyRequestBody,
ApplicationExceptionsInfoWithHistory,
ApplicationMessageParameters> {
ApplicationExceptionsMessageParameters> {

private static final ApplicationExceptionsHeaders INSTANCE = new ApplicationExceptionsHeaders();

Expand All @@ -58,8 +57,8 @@ public HttpResponseStatus getResponseStatusCode() {
}

@Override
public ApplicationMessageParameters getUnresolvedMessageParameters() {
return new ApplicationMessageParameters();
public ApplicationExceptionsMessageParameters getUnresolvedMessageParameters() {
return new ApplicationExceptionsMessageParameters();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
/** {@link MessageParameters} for {@link ApplicationExceptionsHandler}. */
public class ApplicationExceptionsMessageParameters extends ApplicationMessageParameters {

private final UpperLimitExceptionParameter upperLimitExceptionParameter =
public final UpperLimitExceptionParameter upperLimitExceptionParameter =
new UpperLimitExceptionParameter();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.messages.ApplicationExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.application.ApplicationExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.application.ApplicationExceptionsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
Expand Down Expand Up @@ -57,13 +58,28 @@ class ApplicationExceptionsHandlerTest {

private static HandlerRequest<EmptyRequestBody> createRequest(ApplicationID applicationId)
throws HandlerRequestException {
return createRequest(applicationId, Collections.emptyMap());
}

private static HandlerRequest<EmptyRequestBody> createRequest(
ApplicationID applicationId, int maxExceptions) throws HandlerRequestException {
Map<String, List<String>> queryParameters = new HashMap<>();
queryParameters.put(
UpperLimitExceptionParameter.KEY,
Collections.singletonList(Integer.toString(maxExceptions)));
return createRequest(applicationId, queryParameters);
}

private static HandlerRequest<EmptyRequestBody> createRequest(
ApplicationID applicationId, Map<String, List<String>> queryParameters)
throws HandlerRequestException {
Map<String, String> pathParameters = new HashMap<>();
pathParameters.put(ApplicationIDPathParameter.KEY, applicationId.toString());
return HandlerRequest.resolveParametersAndCreate(
EmptyRequestBody.getInstance(),
new ApplicationMessageParameters(),
new ApplicationExceptionsMessageParameters(),
pathParameters,
Collections.emptyMap(),
queryParameters,
Collections.emptyList());
}

Expand Down Expand Up @@ -143,6 +159,43 @@ void testExceptionWithJobId() throws Exception {
assertThat(exceptionInfo.getJobId()).isEqualTo(jobId);
}

@Test
void testMaxExceptionsLimitsHistorySize() throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could u please clarify, whether there are tests for:

  1. Default-cap path
  2. The case when maxException > historySize

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, they were both missing as explicit cases. Added two tests in ApplicationExceptionsHandlerTest.java:

  1. testDefaultCapAppliedWhenMaxExceptionsNotProvided, exercises the default-cap branch in handleRequest. It builds 25 history entries, omits maxExceptions from the request, asserts the response is capped at MAX_NUMBER_EXCEPTION_TO_REPORT (20).
  2. testMaxExceptionsLargerThanHistorySizeReturnsAllEntries, builds 3 history entries, requests maxExceptions=10, asserts the response contains all 3 entries (no padding, no overflow).

final List<ApplicationExceptionHistoryEntry> exceptionHistory = new ArrayList<>();
for (int i = 0; i < 5; i++) {
exceptionHistory.add(
new ApplicationExceptionHistoryEntry(
new RuntimeException("exception #" + i),
System.currentTimeMillis(),
null));
}

final ArchivedApplication applicationWithExceptions =
new ArchivedApplication(
archivedApplication.getApplicationId(),
archivedApplication.getApplicationName(),
ApplicationState.FAILED,
new long[] {1L, 1L, 1L, 1L, 1L, 1L, 1L},
Collections.emptyMap(),
exceptionHistory);

testingRestfulGateway =
new TestingRestfulGateway.Builder()
.setRequestApplicationFunction(
applicationId ->
CompletableFuture.completedFuture(
applicationWithExceptions))
.build();

final HandlerRequest<EmptyRequestBody> limitedRequest =
createRequest(archivedApplication.getApplicationId(), 2);

final ApplicationExceptionsInfoWithHistory response =
handler.handleRequest(limitedRequest, testingRestfulGateway).get();

assertThat(response.getExceptionHistory().getEntries()).hasSize(2);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we also verify/assert the intended order?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, updated the test to assign distinct timestamps (baseTimestamp + i) so we can assert exact order via extracting(...::getTimestamp).containsExactly(baseTimestamp, baseTimestamp + 1), which verifies both the count and that the first two entries are returned in insertion order

}

@Test
void testExceptionWithoutJobId() throws Exception {
final RuntimeException rootCause = new RuntimeException("exception #0");
Expand Down