[flink] Include lake reader in source metrics and correct max timestamp lag semantics#3356
Open
zuston wants to merge 1 commit into
Open
[flink] Include lake reader in source metrics and correct max timestamp lag semantics#3356zuston wants to merge 1 commit into
zuston wants to merge 1 commit into
Conversation
Member
Author
Contributor
There was a problem hiding this comment.
Pull request overview
This PR updates Fluss’s Flink source metrics to (1) include lake-reader-produced records in event-time lag reporting by preserving LogRecord metadata through bounded scanning, and (2) correct currentFetchEventTimeLag semantics when consuming multiple buckets/partitions by tracking lag per table-bucket and aggregating at the reader level. It also centralizes metrics reporting logic in FlinkRecordsWithSplitIds.
Changes:
- Preserve lake
LogRecordmetadata (offset/timestamp/change type) through bounded readers via a newLogRecordRowIteratoradapter. - Move fetch event-time lag reporting into
FlinkRecordsWithSplitIdsand add per-table-bucketcurrentFetchEventTimeLaggauges. - Add/extend unit tests for per-split metrics reporting and lake metadata preservation.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkRecordsWithSplitIdsTest.java | Adds unit coverage verifying event-time lag reporting from FlinkRecordsWithSplitIds. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/BoundedSplitReaderTest.java | Adds coverage ensuring lake log record metadata is preserved through bounded scanning. |
| fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetricsTest.java | Adds coverage for currentFetchEventTimeLag aggregation behavior and per-bucket gauges. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/LogRecordRowIterator.java | Introduces an iterator contract to expose LogRecord metadata while still iterating InternalRow. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java | Removes fetch-level lag calculation/reporting (now handled downstream). |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkRecordsWithSplitIds.java | Reports per-record timestamp to metrics on record emission. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java | Detects LogRecordRowIterator to build ScanRecord with metadata when available. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/metrics/FlinkSourceReaderMetrics.java | Adds per-bucket lag tracking and a reader-level aggregated CURRENT_FETCH_EVENT_TIME_LAG gauge. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotScanner.java | Makes the lake snapshot row iterator expose LogRecord metadata via LogRecordRowIterator. |
| fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/IndexedLakeSplitRecordIterator.java | Exposes LogRecord metadata via LogRecordRowIterator instead of row-only iteration. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+85
to
91
| if (lag > maxFetchEventTimeLag) { | ||
| if (maxFetchEventTimeLag == UNINITIALIZED) { | ||
| sourceReaderMetricGroup.gauge( | ||
| MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, () -> maxFetchEventTimeLag); | ||
| } | ||
| maxFetchEventTimeLag = lag; | ||
| } |
Comment on lines
+110
to
+120
| assertThat((long) readerEventTimeLagGauge.get().getValue()) | ||
| .isEqualTo(maxReaderEventTimeLag); | ||
| assertThat((long) bucket1EventTimeLagGauge.get().getValue()) | ||
| .isLessThan(maxReaderEventTimeLag); | ||
|
|
||
| long updatedBucket0Timestamp = newerTimestamp - 50000L; | ||
| flinkSourceReaderMetrics.reportRecordEventTime(tableBucket0, updatedBucket0Timestamp); | ||
| assertThat((long) readerEventTimeLagGauge.get().getValue()) | ||
| .isEqualTo(maxReaderEventTimeLag); | ||
| assertThat((long) bucket0EventTimeLagGauge.get().getValue()) | ||
| .isLessThan(maxReaderEventTimeLag); |
Comment on lines
+68
to
+70
| private volatile long maxFetchEventTimeLag = UNINITIALIZED; | ||
| // Map for tracking current fetch event time lag by table bucket | ||
| private final Map<TableBucket, Long> currentFetchEventTimeLags = new HashMap<>(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: close #3304
Brief change log
FlinkRecordsWithSplitIdsTests
API and Format
Documentation