Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class FlussConfigUtils {

static {
TABLE_OPTIONS = extractConfigOptions("table.");
TABLE_OPTIONS.put(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could log.index.file-size also be supported?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. this is the first phase for this PR

ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), ConfigOptions.LOG_SEGMENT_FILE_SIZE);
CLIENT_OPTIONS = extractConfigOptions("client.");
ALTERABLE_TABLE_OPTIONS =
Arrays.asList(
Expand All @@ -54,7 +56,7 @@ public class FlussConfigUtils {
}

public static boolean isTableStorageConfig(String key) {
return key.startsWith(TABLE_PREFIX);
return TABLE_OPTIONS.containsKey(key);
}

public static boolean isAlterableTableOption(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public TableConfig(Configuration config) {
this.config = config;
}

/** Gets the log segment file size of the table. */
public MemorySize getLogSegmentSize() {
return config.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE);
}

/** Gets the replication factor of the table. */
public int getReplicationFactor() {
return config.get(ConfigOptions.TABLE_REPLICATION_FACTOR);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.config;

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.MemorySize;
import org.apache.fluss.config.TableConfig;

import javax.annotation.Nullable;

/** Server-side table config that falls back to server configuration for runtime options. */
public final class ResolvedTableConfig extends TableConfig {

private final Configuration tableConfig;
private final Configuration serverConfig;

/**
* Creates a resolved table config.
*
* @param tableConfig the table properties configuration
* @param serverConfig the server configuration to fall back to for runtime options
*/
public ResolvedTableConfig(Configuration tableConfig, @Nullable Configuration serverConfig) {
super(tableConfig);
this.tableConfig = tableConfig;
this.serverConfig = serverConfig == null ? new Configuration() : serverConfig;
}

/** Gets the log segment file size resolved from table properties or server configuration. */
@Override
public MemorySize getLogSegmentSize() {
if (tableConfig.contains(ConfigOptions.LOG_SEGMENT_FILE_SIZE)) {
return tableConfig.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE);
}
return serverConfig.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.TableConfig;
import org.apache.fluss.exception.FlussException;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.LogStorageException;
import org.apache.fluss.exception.SchemaNotExistException;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.TabletManagerBase;
import org.apache.fluss.server.config.ResolvedTableConfig;
import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.storage.LocalDiskManager;
Expand Down Expand Up @@ -260,16 +261,14 @@ private void waitForLoadLogsInDir(LogRecoveryTask recoveryTask) throws Throwable
* @param dataDir the local data directory chosen for the bucket
* @param tablePath the table path of the bucket belongs to
* @param tableBucket the table bucket
* @param logFormat the log format
* @param tieredLogLocalSegments the number of segments to retain in local for tiered log
* @param tableConfig the resolved table config
* @param isChangelog whether the log is a changelog of primary key table
*/
public LogTablet getOrCreateLog(
File dataDir,
PhysicalTablePath tablePath,
TableBucket tableBucket,
LogFormat logFormat,
int tieredLogLocalSegments,
TableConfig tableConfig,
boolean isChangelog)
throws Exception {
return inLock(
Expand All @@ -287,11 +286,10 @@ public LogTablet getOrCreateLog(
tablePath,
tabletDir,
conf,
tableConfig,
serverMetricGroup,
0L,
scheduler,
logFormat,
tieredLogLocalSegments,
isChangelog,
clock,
true);
Expand Down Expand Up @@ -392,11 +390,10 @@ private LogTablet loadLog(
physicalTablePath,
tabletDir,
conf,
new ResolvedTableConfig(tableInfo.getProperties(), conf),
serverMetricGroup,
logRecoveryPoint,
scheduler,
tableInfo.getTableConfig().getLogFormat(),
tableInfo.getTableConfig().getTieredLogLocalSegments(),
tableInfo.hasPrimaryKey(),
clock,
isCleanShutdown);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.TableConfig;
import org.apache.fluss.exception.CorruptRecordException;
import org.apache.fluss.exception.DuplicateSequenceException;
import org.apache.fluss.exception.FlussRuntimeException;
Expand Down Expand Up @@ -140,14 +141,13 @@ private LogTablet(
Configuration conf,
Scheduler scheduler,
WriterStateManager writerStateManager,
LogFormat logFormat,
int tieredLogLocalSegments,
TableConfig tableConfig,
boolean isChangelog,
Clock clock) {
this.dataDir = dataDir;
this.physicalPath = physicalPath;
this.localLog = localLog;
this.maxSegmentFileSize = (int) conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes();
this.maxSegmentFileSize = (int) tableConfig.getLogSegmentSize().getBytes();
this.logFlushIntervalMessages = conf.get(ConfigOptions.LOG_FLUSH_INTERVAL_MESSAGES);
int writerExpirationCheckIntervalMs =
(int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_CHECK_INTERVAL).toMillis();
Expand All @@ -162,11 +162,11 @@ private LogTablet(
() -> removeExpiredWriter(System.currentTimeMillis()),
writerExpirationCheckIntervalMs,
writerExpirationCheckIntervalMs);
this.logFormat = logFormat;
this.logFormat = tableConfig.getLogFormat();
checkArgument(
tieredLogLocalSegments > 0,
tableConfig.getTieredLogLocalSegments() > 0,
"log segments to retain in local must be greater than 0");
this.tieredLogLocalSegments = tieredLogLocalSegments;
this.tieredLogLocalSegments = tableConfig.getTieredLogLocalSegments();

this.clock = clock;
this.isChangeLog = isChangelog;
Expand Down Expand Up @@ -307,15 +307,16 @@ public static LogTablet create(
PhysicalTablePath tablePath,
File tabletDir,
Configuration conf,
TableConfig tableConfig,
TabletServerMetricGroup serverMetricGroup,
long recoveryPoint,
Scheduler scheduler,
LogFormat logFormat,
int tieredLogLocalSegments,
boolean isChangelog,
Clock clock,
boolean isCleanShutdown)
throws Exception {
LogFormat logFormat = tableConfig.getLogFormat();

// create the log directory if it doesn't exist
Files.createDirectories(tabletDir.toPath());

Expand Down Expand Up @@ -358,8 +359,7 @@ public static LogTablet create(
conf,
scheduler,
writerStateManager,
logFormat,
tieredLogLocalSegments,
tableConfig,
isChangelog,
clock);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.compression.ArrowCompressionInfo;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.TableConfig;
import org.apache.fluss.exception.FencedLeaderEpochException;
import org.apache.fluss.exception.InvalidColumnProjectionException;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.fluss.rpc.protocol.MergeMode;
import org.apache.fluss.rpc.util.PredicateMessageUtils;
import org.apache.fluss.server.SequenceIDCounter;
import org.apache.fluss.server.config.ResolvedTableConfig;
import org.apache.fluss.server.coordinator.CoordinatorContext;
import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
import org.apache.fluss.server.kv.KvManager;
Expand Down Expand Up @@ -249,7 +251,8 @@ public Replica(
TableInfo tableInfo,
Clock clock,
RemoteLogManager remoteLogManager,
ScannerManager scannerManager)
ScannerManager scannerManager,
@Nullable Configuration configuration)
throws Exception {
this.physicalPath = physicalPath;
this.tableBucket = tableBucket;
Expand All @@ -271,7 +274,7 @@ public Replica(
tableInfo.getSchemaId(),
tableInfo.getSchema());
this.tableInfo = tableInfo;
this.tableConfig = tableInfo.getTableConfig();
this.tableConfig = new ResolvedTableConfig(tableInfo.getProperties(), configuration);
this.logFormat = tableConfig.getLogFormat();
this.arrowCompressionInfo = tableConfig.getArrowCompressionInfo();
this.snapshotContext = snapshotContext;
Expand Down Expand Up @@ -2154,12 +2157,7 @@ private LogTablet createLog(
throws Exception {
LogTablet log =
logManager.getOrCreateLog(
dataDir,
physicalPath,
tableBucket,
tableConfig.getLogFormat(),
tableConfig.getTieredLogLocalSegments(),
isKvTable());
dataDir, physicalPath, tableBucket, tableConfig, isKvTable());
// update high watermark.
Optional<Long> watermarkOpt = lazyHighWatermarkCheckpoint.fetch(tableBucket);
long watermark =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1994,7 +1994,8 @@ protected Optional<Replica> maybeCreateReplica(NotifyLeaderAndIsrData data) {
tableInfo,
clock,
remoteLogManager,
scannerManager);
scannerManager,
conf);
if (!existingLogTabletOpt.isPresent()) {
localDiskManager.recordReplicaLoad(dataDir, isKvTable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public static void validateTableDescriptor(
ConfigOption<?> option = TABLE_OPTIONS.get(key);
validateOptionValue(tableConf, option);
}
checkLogSegmentFileSize(tableConf);

// check distribution
checkDistribution(tableDescriptor, maxBucketNum);
Expand Down Expand Up @@ -417,6 +418,17 @@ private static void checkTieredLog(Configuration tableConf) {
}
}

private static void checkLogSegmentFileSize(Configuration tableConf) {
if (tableConf.contains(ConfigOptions.LOG_SEGMENT_FILE_SIZE)
&& tableConf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes()
> Integer.MAX_VALUE) {
throw new InvalidConfigException(
String.format(
"Invalid configuration for %s, it must be less than or equal %d bytes.",
ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE));
}
}

private static void checkPartition(
Configuration tableConf, List<String> partitionKeys, RowType rowType) {
boolean isPartitioned = !partitionKeys.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.server.config;

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.MemorySize;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link ResolvedTableConfig}. */
class ResolvedTableConfigTest {

@Test
void testLogSegmentSizeFallsBackToServerConfig() {
Configuration tableConf = new Configuration();
Configuration serverConf = new Configuration();
serverConf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("8kb"));

ResolvedTableConfig tableConfig = new ResolvedTableConfig(tableConf, serverConf);

assertThat(tableConfig.getLogSegmentSize()).isEqualTo(MemorySize.parse("8kb"));

tableConf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("4kb"));

assertThat(tableConfig.getLogSegmentSize()).isEqualTo(MemorySize.parse("4kb"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.TableConfig;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.SchemaGetter;
import org.apache.fluss.metadata.SchemaInfo;
Expand Down Expand Up @@ -371,7 +370,11 @@ private KvTablet getOrCreateKv(
tablePath.getDatabaseName(), tablePath.getTableName(), partitionName);
LogTablet logTablet =
logManager.getOrCreateLog(
tempDir, physicalTablePath, tableBucket, LogFormat.ARROW, 1, true);
tempDir,
physicalTablePath,
tableBucket,
new TableConfig(new Configuration()),
true);
return kvManager.getOrCreateKv(
physicalTablePath,
tableBucket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,10 @@ void setUp() throws Exception {
physicalTablePath,
logTabletDir,
conf,
new TableConfig(conf),
TestingMetricGroups.TABLET_SERVER_METRICS,
0,
new FlussScheduler(1),
LogFormat.ARROW,
1,
true,
SystemClock.getInstance(),
true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,10 @@ void setUp() throws Exception {
physicalTablePath,
logTabletDir,
conf,
new TableConfig(conf),
TestingMetricGroups.TABLET_SERVER_METRICS,
0,
new FlussScheduler(1),
LogFormat.ARROW,
1,
true,
SystemClock.getInstance(),
true);
Expand Down
Loading
Loading