Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,19 @@ public class FlinkSourceEnumerator
/** Buckets that have been assigned to readers. */
private final Set<TableBucket> assignedTableBuckets;

/**
* Remaining lake snapshot and hybrid lake/Fluss splits to assign.
*
* <p>The field has three states:
*
* <ul>
* <li>{@code null}: lake split initialization has not run yet.
* <li>empty list: lake split initialization has run, or this enumerator was started in
* Fluss-only (non-lake) mode and must not initialize lake splits after restore.
* <li>non-empty list: lake split initialization has run and these splits still need to be
* assigned.
* </ul>
*/
@Nullable private List<SourceSplitBase> pendingHybridLakeFlussSplits;

private final long scanPartitionDiscoveryIntervalMs;
Expand Down Expand Up @@ -1207,11 +1220,16 @@ public void addReader(int subtaskId) {

@Override
public SourceEnumeratorState snapshotState(long checkpointId) {
List<SourceSplitBase> remainingHybridLakeFlussSplits =
// Preserve Fluss-only (non-lake) startup across restore. Otherwise a restored
// enumerator with a non-null lakeSource would treat null as "not initialized yet"
// and generate lake snapshot splits.
lakeSource == null ? Collections.emptyList() : pendingHybridLakeFlussSplits;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what you mean, but the code seems hard to understand without sufficient context. I've also thought about this:

  • If the enumerator is created by FlinkSource#createEnumerator, it indicates a stateless restart. Therefore, whether to generate lake splits depends on whether it's a LakeSource.

  • If the enumerator is created by FlinkSource#restoreEnumerator, there's no need to generate lake splits again. This is because before the first checkpoint is taken, FlinkSourceEnumerator#start → FlinkSourceEnumerator#generateHybridLakeFlussSplits has already been executed. Thus, upon restoration, the lake splits do not need to be regenerated.

Therefore, even if the job was previously started from a specified timestamp, according to this logic, as long as a checkpoint has been taken, upon stateful restart it will not read the lake splits again.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the original approach in snapshotState() is hard to understand without sufficient context.

I've reworked the fix to use the checkpointTriggeredBefore flag in generateHybridLakeFlussSplits() instead. While the restore-awareness logic still requires some thought, this approach keeps all the complexity contained within a single method rather than spreading it across snapshotState().

Additionally, I changed startInStreamModeForNonPartitionedTable to call generateHybridLakeFlussSplits() synchronously, consistent with the partitioned-table path in start(). This ensures lake split initialization always completes before any checkpoint can be triggered, which is a prerequisite for the checkpointTriggeredBefore guard to work correctly.

final SourceEnumeratorState enumeratorState =
new SourceEnumeratorState(
assignedTableBuckets,
assignedPartitions,
pendingHybridLakeFlussSplits,
remainingHybridLakeFlussSplits,
leaseContext.getKvSnapshotLeaseId());
LOG.debug("Source Checkpoint is {}", enumeratorState);
return enumeratorState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,15 @@ private List<SourceSplitBase> deserializeRemainingHybridLakeFlussSplits(
if (in.readBoolean()) {
int numSplits = in.readInt();
List<SourceSplitBase> splits = new ArrayList<>(numSplits);
int version = in.readInt();
if (numSplits == 0) {
return splits;
}
SourceSplitSerializer sourceSplitSerializer =
new SourceSplitSerializer(
checkNotNull(
lakeSource,
"lake source must not be null when there are hybrid lake splits."));
int version = in.readInt();
for (int i = 0; i < numSplits; i++) {
int splitSizeInBytes = in.readInt();
byte[] splitBytes = new byte[splitSizeInBytes];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.fluss.flink.source.split.LogSplit;
import org.apache.fluss.flink.source.split.SnapshotSplit;
import org.apache.fluss.flink.source.split.SourceSplitBase;
import org.apache.fluss.flink.source.state.SourceEnumeratorState;
import org.apache.fluss.flink.utils.FlinkTestBase;
import org.apache.fluss.lake.source.LakeSource;
import org.apache.fluss.lake.source.LakeSplit;
Expand Down Expand Up @@ -210,6 +211,97 @@ void testInvalidSplitAssignmentBatchSize() throws Exception {
}
}

@Test
void testRestoreFlussOnlySourceWithLakeSourceDoesNotGenerateLakeSplits(@TempDir Path tempDir)
throws Throwable {
long tableId =
createTable(DEFAULT_TABLE_PATH, DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR);
ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
Map<Long, String> partitionNameByIds =
waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH);
Long partitionId = partitionNameByIds.keySet().stream().sorted().findFirst().get();
String partitionName = partitionNameByIds.get(partitionId);

LakeTableSnapshot lakeTableSnapshot =
new LakeTableSnapshot(
0,
ImmutableMap.of(
new TableBucket(tableId, partitionId, 0), 50L,
new TableBucket(tableId, partitionId, 1), 50L,
new TableBucket(tableId, partitionId, 2), 50L));
LakeTableHelper lakeTableHelper = new LakeTableHelper(zooKeeperClient, tempDir.toString());
lakeTableHelper.registerLakeTableSnapshotV1(tableId, lakeTableSnapshot);

ResolvedPartitionSpec partitionSpec =
ResolvedPartitionSpec.fromPartitionName(
Collections.singletonList("name"), partitionName);
LakeSource<LakeSplit> lakeSource =
new TestingLakeSource(
DEFAULT_BUCKET_NUM,
Collections.singletonList(
new PartitionInfo(
partitionId, partitionSpec, DEFAULT_REMOTE_DATA_DIR)));

SourceEnumeratorState checkpointState;
try (MockSplitEnumeratorContext<SourceSplitBase> context =
new MockSplitEnumeratorContext<>(1)) {
FlinkSourceEnumerator enumerator =
new FlinkSourceEnumerator(
DEFAULT_TABLE_PATH,
flussConf,
true,
false,
context,
OffsetsInitializer.timestamp(1000L),
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
streaming,
null,
null,
LeaseContext.DEFAULT,
false);

checkpointState = enumerator.snapshotState(1L);
assertThat(checkpointState.getRemainingHybridLakeFlussSplits()).isNotNull().isEmpty();
}

try (MockSplitEnumeratorContext<SourceSplitBase> context =
new MockSplitEnumeratorContext<>(DEFAULT_BUCKET_NUM);
MockWorkExecutor workExecutor = new MockWorkExecutor(context);
FlinkSourceEnumerator restoredEnumerator =
new FlinkSourceEnumerator(
DEFAULT_TABLE_PATH,
flussConf,
false,
true,
context,
checkpointState.getAssignedBuckets(),
checkpointState.getAssignedPartitions(),
checkpointState.getRemainingHybridLakeFlussSplits(),
OffsetsInitializer.full(),
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
streaming,
null,
lakeSource,
workExecutor,
LeaseContext.DEFAULT,
true)) {
restoredEnumerator.start();
runPeriodicPartitionDiscovery(workExecutor);

for (int i = 0; i < DEFAULT_BUCKET_NUM; i++) {
registerReader(context, restoredEnumerator, i);
}

List<SourceSplitBase> assignedSplits =
getReadersAssignments(context).values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
assertThat(assignedSplits).isNotEmpty();
assertThat(assignedSplits).allMatch(split -> split instanceof LogSplit);
assertThat(assignedSplits).noneMatch(split -> split instanceof LakeSnapshotSplit);
}
}

@Test
void testPkTableWithSnapshotSplits() throws Throwable {
long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,26 @@ void testInconsistentLakeSourceSerde() throws Exception {
serializer.deserialize(serializer.getVersion(), serialized);
assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
}

@Test
void testEmptyPendingSplitsCheckpointSerdeWithoutLakeSource() throws Exception {
FlussSourceEnumeratorStateSerializer serializer =
new FlussSourceEnumeratorStateSerializer(null);

SourceEnumeratorState sourceEnumeratorState =
new SourceEnumeratorState(
Collections.emptySet(),
Collections.emptyMap(),
Collections.emptyList(),
LeaseContext.DEFAULT.getKvSnapshotLeaseId());

byte[] serialized = serializer.serialize(sourceEnumeratorState);
SourceEnumeratorState deserializedSourceEnumeratorState =
serializer.deserialize(serializer.getVersion(), serialized);

assertThat(deserializedSourceEnumeratorState).isEqualTo(sourceEnumeratorState);
assertThat(deserializedSourceEnumeratorState.getRemainingHybridLakeFlussSplits())
.isNotNull()
.isEmpty();
}
}