diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java index 02f7017c5a28c..5fe9cedb1c555 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java @@ -32,7 +32,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -107,10 +106,11 @@ public InputStatus pollNext(ReaderOutput output) throws Exception { @Override public List snapshotState(long checkpointId) { - List state = - currentReader != null - ? currentReader.snapshotState(checkpointId) - : Collections.emptyList(); + if (currentReader == null) { + return new ArrayList<>(restoredSplits); + } + + List state = currentReader.snapshotState(checkpointId); return HybridSourceSplit.wrapSplits(state, currentSourceIndex, switchedSources); } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java index 5fe62e0dbf771..40a3d8248b385 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java @@ -254,7 +254,7 @@ void testReaderRecovery() throws Exception { assertAndClearSourceReaderFinishedEvent(readerContext, -1); reader.handleSourceEvents(new SwitchSourceEvent(0, source, false)); - MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 2147483647); + MockSourceSplit mockSplit = new MockSourceSplit(0, 0, Integer.MAX_VALUE); SwitchedSources switchedSources = new SwitchedSources(); switchedSources.put(0, source); @@ -282,6 +282,36 @@ void testReaderRecovery() throws Exception { reader.close(); } + @Test + void testReaderRecoverySnapshotBeforeSwitchSourceEvent() throws Exception { + TestingReaderContext readerContext = new TestingReaderContext(); + MockBaseSource source = new MockBaseSource(1, 1, Boundedness.BOUNDED); + + HybridSourceReader reader = new HybridSourceReader<>(readerContext); + reader.start(); + assertAndClearSourceReaderFinishedEvent(readerContext, -1); + reader.handleSourceEvents(new SwitchSourceEvent(0, source, false)); + + MockSourceSplit mockSplit = new MockSourceSplit(0, 0, Integer.MAX_VALUE); + SwitchedSources switchedSources = new SwitchedSources(); + switchedSources.put(0, source); + HybridSourceSplit hybridSplit = HybridSourceSplit.wrapSplit(mockSplit, 0, switchedSources); + reader.addSplits(Collections.singletonList(hybridSplit)); + List snapshot = reader.snapshotState(0); + reader.close(); + + readerContext.clearSentEvents(); + HybridSourceReader recoveredReader = new HybridSourceReader<>(readerContext); + recoveredReader.addSplits(snapshot); + recoveredReader.start(); + assertThat(currentReader(recoveredReader)).isNull(); + + List recoverySnapshot = recoveredReader.snapshotState(1); + assertThat(recoverySnapshot).containsExactly(hybridSplit); + + recoveredReader.close(); + } + @Test void testReaderRecoveryInitializationOrder() throws Exception { TestingReaderContext readerContext = new TestingReaderContext(); @@ -293,7 +323,7 @@ void testReaderRecoveryInitializationOrder() throws Exception { assertAndClearSourceReaderFinishedEvent(readerContext, -1); reader.handleSourceEvents(new SwitchSourceEvent(0, source, false)); - MockSourceSplit mockSplit = new MockSourceSplit(0, 0, 2147483647); + MockSourceSplit mockSplit = new MockSourceSplit(0, 0, Integer.MAX_VALUE); SwitchedSources switchedSources = new SwitchedSources(); switchedSources.put(0, source); HybridSourceSplit hybridSplit = HybridSourceSplit.wrapSplit(mockSplit, 0, switchedSources);