From 49684f86159513167a8e8668a3aee3b98f6b205c Mon Sep 17 00:00:00 2001 From: YuanHanzhong Date: Tue, 19 May 2026 00:03:03 +0800 Subject: [PATCH 1/2] [FLINK-39645][connector-base] Preserve restored HybridSource splits during snapshot --- .../source/hybrid/HybridSourceReader.java | 10 +++---- .../source/hybrid/HybridSourceReaderTest.java | 30 +++++++++++++++++++ 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java index 02f7017c5a28c..5fe9cedb1c555 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java @@ -32,7 +32,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -107,10 +106,11 @@ public InputStatus pollNext(ReaderOutput output) throws Exception { @Override public List snapshotState(long checkpointId) { - List state = - currentReader != null - ? currentReader.snapshotState(checkpointId) - : Collections.emptyList(); + if (currentReader == null) { + return new ArrayList<>(restoredSplits); + } + + List state = currentReader.snapshotState(checkpointId); return HybridSourceSplit.wrapSplits(state, currentSourceIndex, switchedSources); } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java index 5fe62e0dbf771..d4226cff2fba4 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java @@ -282,6 +282,36 @@ void testReaderRecovery() throws Exception { reader.close(); } + @Test + void testReaderRecoverySnapshotBeforeSwitchSourceEvent() throws Exception { + TestingReaderContext readerContext = new TestingReaderContext(); + MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED); + + HybridSourceReader reader = new HybridSourceReader<>(readerContext); + reader.start(); + assertAndClearSourceReaderFinishedEvent(readerContext, -1); + reader.handleSourceEvents(new SwitchSourceEvent(0, source, false)); + + MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 2147483647); + SwitchedSources switchedSources = new SwitchedSources(); + switchedSources.put(0, source); + HybridSourceSplit hybridSplit = HybridSourceSplit.wrapSplit(mockSplit, 0, switchedSources); + reader.addSplits(Collections.singletonList(hybridSplit)); + List snapshot = reader.snapshotState(0); + reader.close(); + + readerContext.clearSentEvents(); + HybridSourceReader recoveredReader = new HybridSourceReader<>(readerContext); + recoveredReader.addSplits(snapshot); + recoveredReader.start(); + assertThat(currentReader(recoveredReader)).isNull(); + + List recoverySnapshot = recoveredReader.snapshotState(1); + assertThat(recoverySnapshot).containsExactly(hybridSplit); + + recoveredReader.close(); + } + @Test void testReaderRecoveryInitializationOrder() throws Exception { TestingReaderContext readerContext = new TestingReaderContext(); From d505f902e4bb16141303d75e63ef049486b2f291 Mon Sep 17 00:00:00 2001 From: YuanHanzhong Date: Tue, 19 May 2026 08:04:05 +0800 Subject: [PATCH 2/2] [hotfix][test] Use named max int constant in HybridSourceReaderTest --- .../base/source/hybrid/HybridSourceReaderTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java index d4226cff2fba4..40a3d8248b385 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java @@ -254,7 +254,7 @@ void testReaderRecovery() throws Exception { assertAndClearSourceReaderFinishedEvent(readerContext, -1); reader.handleSourceEvents(new SwitchSourceEvent(0, source, false)); - MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 2147483647); + MockSourceSplit mockSplit = new MockSourceSplit(0, 0, Integer.MAX_VALUE); SwitchedSources switchedSources = new SwitchedSources(); switchedSources.put(0, source); @@ -292,7 +292,7 @@ void testReaderRecoverySnapshotBeforeSwitchSourceEvent() throws Exception { assertAndClearSourceReaderFinishedEvent(readerContext, -1); reader.handleSourceEvents(new SwitchSourceEvent(0, source, false)); - MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 2147483647); + MockSourceSplit mockSplit = new MockSourceSplit(0, 0, Integer.MAX_VALUE); SwitchedSources switchedSources = new SwitchedSources(); switchedSources.put(0, source); HybridSourceSplit hybridSplit = HybridSourceSplit.wrapSplit(mockSplit, 0, switchedSources); @@ -323,7 +323,7 @@ void testReaderRecoveryInitializationOrder() throws Exception { assertAndClearSourceReaderFinishedEvent(readerContext, -1); reader.handleSourceEvents(new SwitchSourceEvent(0, source, false)); - MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 2147483647); + MockSourceSplit mockSplit = new MockSourceSplit(0, 0, Integer.MAX_VALUE); SwitchedSources switchedSources = new SwitchedSources(); switchedSources.put(0, source); HybridSourceSplit hybridSplit = HybridSourceSplit.wrapSplit(mockSplit, 0, switchedSources);