Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,16 @@ public class ConfigOptions {
+ KV_SCANNER_MAX_BATCH_SIZE.key()
+ "'.");

public static final ConfigOption<Boolean> CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED =
key("client.scanner.kv.server-side.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Master switch for using the server-side KV scan in bounded reads "
+ "of primary-key tables when no KV snapshot file is available. When "
+ "false (default), bounded primary-key reads fall back to the prior "
+ "behavior (log-only when lake is enabled, or fail when lake is disabled).");

public static final ConfigOption<Integer> CLIENT_LOOKUP_QUEUE_SIZE =
key("client.lookup.queue-size")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,9 +435,18 @@ public boolean isBounded() {
+ modificationScanType
+ " statement with conditions on primary key.");
}
if (!isDataLakeEnabled) {
if (!isDataLakeEnabled
&& !(hasPrimaryKey()
&& flussConfig.get(
ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED))) {
throw new UnsupportedOperationException(
"Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode.");
"Batch mode requires either data-lake integration"
+ " (set '"
+ ConfigOptions.TABLE_DATALAKE_ENABLED.key()
+ "' = 'true') or server-side KV scan on a"
+ " primary-key table (set '"
+ ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED.key()
+ "' = 'true').");
}
return source;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public void emitRecord(
.asLogSplitState()
.setNextOffset(recordAndPosition.record().logOffset() + 1);
}
} else if (splitState.isKvBatchSplitState()) {
processAndEmitRecord(recordAndPosition.record(), sourceOutput);
} else if (splitState.isLakeSplit()) {
if (lakeRecordRecordEmitter == null) {
lakeRecordRecordEmitter = new LakeRecordRecordEmitter<>(this::processAndEmitRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.fluss.flink.source.event.PartitionsRemovedEvent;
import org.apache.fluss.flink.source.reader.LeaseContext;
import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit;
import org.apache.fluss.flink.source.split.KvBatchSplit;
import org.apache.fluss.flink.source.split.LogSplit;
import org.apache.fluss.flink.source.split.SourceSplitBase;
import org.apache.fluss.flink.source.state.SourceEnumeratorState;
Expand Down Expand Up @@ -469,6 +470,7 @@ public void start() {
}

private void startInBatchMode() {
boolean kvBatchEnabled = flussConf.get(ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED);
if (lakeEnabled) {
if (lakeSource == null) {
throw new IllegalStateException(
Expand All @@ -477,39 +479,65 @@ private void startInBatchMode() {
context.callAsync(
() -> {
List<SourceSplitBase> splits = generateHybridLakeFlussSplits();
// No lake snapshot exists, fall back to Fluss-only splits
if (splits == null) {
LOG.info(
"No lake snapshot found for table {},"
+ " falling back to Fluss-only splits.",
tablePath);
if (isPartitioned) {
Set<PartitionInfo> partitionInfos = listPartitions();
Collection<Partition> partitions =
partitionInfos.stream()
.map(
p ->
new Partition(
p.getPartitionId(),
p.getPartitionName()))
.collect(Collectors.toList());
// Use log-only splits to avoid generating mixed split
// types (HybridSnapshotLogSplit + LogSplit) for
// primary-key tables, which is not supported.
splits = this.initLogTablePartitionSplits(partitions);
} else {
splits = this.getLogSplit(null, null);
}
splits = generateFlussOnlyBatchSplits(kvBatchEnabled);
}
return splits;
},
this::handleSplitsAdd);
} else if (kvBatchEnabled && hasPrimaryKey) {
context.callAsync(() -> generateFlussOnlyBatchSplits(true), this::handleSplitsAdd);
} else {
throw new UnsupportedOperationException(
String.format(
"Batch only supports when table option '%s' is set to true.",
ConfigOptions.TABLE_DATALAKE_ENABLED));
"Batch mode requires either '%s' = 'true' (data-lake integration) "
+ "or '%s' = 'true' (server-side KV scan, primary-key tables only).",
ConfigOptions.TABLE_DATALAKE_ENABLED,
ConfigOptions.CLIENT_SCANNER_KV_SERVER_SIDE_ENABLED));
Comment thread
polyzos marked this conversation as resolved.
Outdated
}
}

private List<SourceSplitBase> generateFlussOnlyBatchSplits(boolean kvBatchEnabled) {
if (kvBatchEnabled && hasPrimaryKey) {
if (isPartitioned) {
Set<PartitionInfo> partitionInfos = listPartitions();
List<SourceSplitBase> splits = new ArrayList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
splits.addAll(
buildKvBatchSplits(
partitionInfo.getPartitionId(),
partitionInfo.getPartitionName()));
}
return splits;
}
return buildKvBatchSplits(null, null);
}
if (isPartitioned) {
Set<PartitionInfo> partitionInfos = listPartitions();
Collection<Partition> partitions =
partitionInfos.stream()
.map(p -> new Partition(p.getPartitionId(), p.getPartitionName()))
.collect(Collectors.toList());
return this.initLogTablePartitionSplits(partitions);
}
return this.getLogSplit(null, null);
}

private List<SourceSplitBase> buildKvBatchSplits(
@Nullable Long partitionId, @Nullable String partitionName) {
List<SourceSplitBase> splits = new ArrayList<>();
for (int bucketId = 0; bucketId < tableInfo.getNumBuckets(); bucketId++) {
TableBucket tb = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
if (ignoreTableBucket(tb)) {
continue;
}
splits.add(new KvBatchSplit(tb, partitionName));
}
return splits;
}

private void startInStreamModeForNonPartitionedTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.fluss.flink.source.reader.fetcher.FlinkSourceFetcherManager;
import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit;
import org.apache.fluss.flink.source.split.HybridSnapshotLogSplitState;
import org.apache.fluss.flink.source.split.KvBatchSplitState;
import org.apache.fluss.flink.source.split.LogSplitState;
import org.apache.fluss.flink.source.split.SourceSplitBase;
import org.apache.fluss.flink.source.split.SourceSplitState;
Expand Down Expand Up @@ -163,6 +164,8 @@ protected SourceSplitState initializedState(SourceSplitBase split) {
return new HybridSnapshotLogSplitState(split.asHybridSnapshotLogSplit());
} else if (split.isLogSplit()) {
return new LogSplitState(split.asLogSplit());
} else if (split.isKvBatchSplit()) {
return new KvBatchSplitState(split.asKvBatchSplit());
} else if (split.isLakeSplit()) {
return LakeSplitStateInitializer.initializedState(split);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ public void handleSplitsChanges(SplitsChange<SourceSplitBase> splitsChanges) {
subscribeLog(sourceSplitBase, hybridSnapshotLogSplit.getLogStartingOffset());
} else if (sourceSplitBase.isLogSplit()) {
subscribeLog(sourceSplitBase, sourceSplitBase.asLogSplit().getStartingOffset());
} else if (sourceSplitBase.isKvBatchSplit()) {
boundedSplits.add(sourceSplitBase);
} else if (sourceSplitBase.isLakeSplit()) {
getLakeSplitReader().addSplit(sourceSplitBase, boundedSplits);
if (sourceSplitBase instanceof LakeSnapshotAndFlussLogSplit) {
Expand Down Expand Up @@ -412,6 +414,12 @@ private void checkSnapshotSplitOrStartNext() {
snapshotSplit.getTableBucket(), snapshotSplit.getSnapshotId());
currentBoundedSplitReader =
new BoundedSplitReader(batchScanner, snapshotSplit.recordsToSkip());
} else if (currentBoundedSplit.isKvBatchSplit()) {
BatchScanner batchScanner =
table.newScan()
.project(projectedFields)
.createBatchScanner(currentBoundedSplit.getTableBucket());
currentBoundedSplitReader = new BoundedSplitReader(batchScanner, 0);
} else if (currentBoundedSplit.isLakeSplit()) {
currentBoundedSplitReader =
getLakeSplitReader().getBoundedSplitScanner(currentBoundedSplit);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.source.split;

import org.apache.fluss.metadata.TableBucket;

import javax.annotation.Nullable;

/**
* A bounded split that reads the full primary-key state of a bucket via the server-side KV scan
* (FIP-17). Emitted by the enumerator when a primary-key table has no KV snapshot file available
* for the bucket and the source is running in bounded mode.
*
* <p>This split has no resumable position: on Flink task restart the bucket is rescanned from
* scratch. Snapshot isolation is provided by the server (a consistent point-in-time view of the
* RocksDB state at the moment {@code ScanKv} opens the scan), but the client cannot resume an
* expired or invalidated session, so progress is not checkpointed.
*
* <p>Unlike {@link HybridSnapshotLogSplit}, this split has no log handoff phase: when the bucket is
* drained the split is marked finished.
*/
public class KvBatchSplit extends SourceSplitBase {

private static final String KV_BATCH_SPLIT_PREFIX = "kv-batch-";

public KvBatchSplit(TableBucket tableBucket, @Nullable String partitionName) {
super(tableBucket, partitionName);
}

@Override
public String splitId() {
return toSplitId(KV_BATCH_SPLIT_PREFIX, tableBucket);
}

@Override
protected byte splitKind() {
return KV_BATCH_SPLIT_FLAG;
}

@Override
public String toString() {
return "KvBatchSplit{"
+ "tableBucket="
+ tableBucket
+ ", partitionName='"
+ partitionName
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.source.split;

/** State of {@link KvBatchSplit}. Has no resumable position. */
public class KvBatchSplitState extends SourceSplitState {

public KvBatchSplitState(KvBatchSplit split) {
super(split);
}

@Override
public KvBatchSplit toSourceSplit() {
return (KvBatchSplit) split;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@

import java.util.Objects;

/** A base source split for {@link SnapshotSplit} and {@link LogSplit}. */
/** A base source split for {@link SnapshotSplit}, {@link LogSplit} and {@link KvBatchSplit}. */
public abstract class SourceSplitBase implements SourceSplit {

public static final byte HYBRID_SNAPSHOT_SPLIT_FLAG = 1;
public static final byte LOG_SPLIT_FLAG = 2;
public static final byte KV_BATCH_SPLIT_FLAG = 3;

protected final TableBucket tableBucket;

Expand Down Expand Up @@ -81,6 +82,11 @@ public final boolean isHybridSnapshotLogSplit() {
return getClass() == HybridSnapshotLogSplit.class;
}

/** Checks whether this split is a {@link KvBatchSplit}. */
public final boolean isKvBatchSplit() {
return getClass() == KvBatchSplit.class;
}

/** Casts this split into a {@link HybridSnapshotLogSplit}. */
public final HybridSnapshotLogSplit asHybridSnapshotLogSplit() {
return (HybridSnapshotLogSplit) this;
Expand All @@ -91,11 +97,18 @@ public final LogSplit asLogSplit() {
return (LogSplit) this;
}

/** Casts this split into a {@link KvBatchSplit}. */
public final KvBatchSplit asKvBatchSplit() {
return (KvBatchSplit) this;
}

protected byte splitKind() {
if (isHybridSnapshotLogSplit()) {
return HYBRID_SNAPSHOT_SPLIT_FLAG;
} else if (isLogSplit()) {
return LOG_SPLIT_FLAG;
} else if (isKvBatchSplit()) {
return KV_BATCH_SPLIT_FLAG;
} else {
throw new IllegalArgumentException("Unsupported split kind for " + getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ public class SourceSplitSerializer implements SimpleVersionedSerializer<SourceSp
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));

private static final byte HYBRID_SNAPSHOT_SPLIT_FLAG = 1;
private static final byte LOG_SPLIT_FLAG = 2;

private static final int CURRENT_VERSION = VERSION_0;

@Nullable private final LakeSource<LakeSplit> lakeSource;
Expand Down Expand Up @@ -75,12 +72,14 @@ public byte[] serialize(SourceSplitBase split) throws IOException {
out.writeBoolean(hybridSnapshotLogSplit.isSnapshotFinished());
// write log starting offset
out.writeLong(hybridSnapshotLogSplit.getLogStartingOffset());
} else {
} else if (split.isLogSplit()) {
LogSplit logSplit = split.asLogSplit();
// write starting offset
out.writeLong(logSplit.getStartingOffset());
// write stopping offset
out.writeLong(logSplit.getStoppingOffset().orElse(LogSplit.NO_STOPPING_OFFSET));
} else {
// KvBatchSplit has no extra fields to serialize beyond the common header.
}
} else {
LakeSplitSerializer lakeSplitSerializer =
Expand Down Expand Up @@ -128,7 +127,7 @@ public SourceSplitBase deserialize(int version, byte[] serialized) throws IOExce
int bucketId = in.readInt();
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);

if (splitKind == HYBRID_SNAPSHOT_SPLIT_FLAG) {
if (splitKind == SourceSplitBase.HYBRID_SNAPSHOT_SPLIT_FLAG) {
long snapshotId = in.readLong();
long recordsToSkip = in.readLong();
boolean isSnapshotFinished = in.readBoolean();
Expand All @@ -140,10 +139,12 @@ public SourceSplitBase deserialize(int version, byte[] serialized) throws IOExce
recordsToSkip,
isSnapshotFinished,
logStartingOffset);
} else if (splitKind == LOG_SPLIT_FLAG) {
} else if (splitKind == SourceSplitBase.LOG_SPLIT_FLAG) {
long startingOffset = in.readLong();
long stoppingOffset = in.readLong();
return new LogSplit(tableBucket, partitionName, startingOffset, stoppingOffset);
} else if (splitKind == SourceSplitBase.KV_BATCH_SPLIT_FLAG) {
return new KvBatchSplit(tableBucket, partitionName);
} else {
LakeSplitSerializer lakeSplitSerializer =
new LakeSplitSerializer(checkNotNull(lakeSource).getSplitSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ public final LogSplitState asLogSplitState() {
return (LogSplitState) this;
}

public final boolean isKvBatchSplitState() {
return getClass() == KvBatchSplitState.class;
}

public final KvBatchSplitState asKvBatchSplitState() {
return (KvBatchSplitState) this;
}

public abstract SourceSplitBase toSourceSplit();

public boolean isLakeSplit() {
Expand Down
Loading