diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index adbbce0af4..c5d9d52668 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -43,6 +43,8 @@ public class FlussConfigUtils { static { TABLE_OPTIONS = extractConfigOptions("table."); + TABLE_OPTIONS.put( + ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), ConfigOptions.LOG_SEGMENT_FILE_SIZE); CLIENT_OPTIONS = extractConfigOptions("client."); ALTERABLE_TABLE_OPTIONS = Arrays.asList( @@ -54,7 +56,7 @@ public class FlussConfigUtils { } public static boolean isTableStorageConfig(String key) { - return key.startsWith(TABLE_PREFIX); + return TABLE_OPTIONS.containsKey(key); } public static boolean isAlterableTableOption(String key) { diff --git a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java index 13cb49e9ed..d5942c93ed 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java @@ -53,6 +53,11 @@ public TableConfig(Configuration config) { this.config = config; } + /** Gets the log segment file size of the table. */ + public MemorySize getLogSegmentSize() { + return config.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE); + } + /** Gets the replication factor of the table. */ public int getReplicationFactor() { return config.get(ConfigOptions.TABLE_REPLICATION_FACTOR); diff --git a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java index e2690c54ba..fb38ac1e58 100644 --- a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java @@ -45,7 +45,12 @@ void testExtractOptions() { assertThat(k).startsWith("table."); assertThat(v.key()).startsWith("table."); }); - assertThat(tableOptions.size()).isEqualTo(TABLE_OPTIONS.size()); + assertThat(TABLE_OPTIONS).containsAllEntriesOf(tableOptions); + assertThat(TABLE_OPTIONS) + .containsEntry( + ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), + ConfigOptions.LOG_SEGMENT_FILE_SIZE); + assertThat(TABLE_OPTIONS.size()).isEqualTo(tableOptions.size() + 1); Map> clientOptions = extractConfigOptions("client."); assertThat(clientOptions).isNotEmpty(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/config/ResolvedTableConfig.java b/fluss-server/src/main/java/org/apache/fluss/server/config/ResolvedTableConfig.java new file mode 100644 index 0000000000..afbdbeb4eb --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/config/ResolvedTableConfig.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.config; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.MemorySize; +import org.apache.fluss.config.TableConfig; + +import javax.annotation.Nullable; + +/** Server-side table config that falls back to server configuration for runtime options. */ +public final class ResolvedTableConfig extends TableConfig { + + private final Configuration tableConfig; + private final Configuration serverConfig; + + /** + * Creates a resolved table config. + * + * @param tableConfig the table properties configuration + * @param serverConfig the server configuration to fall back to for runtime options + */ + public ResolvedTableConfig(Configuration tableConfig, @Nullable Configuration serverConfig) { + super(tableConfig); + this.tableConfig = tableConfig; + this.serverConfig = serverConfig == null ? new Configuration() : serverConfig; + } + + /** Gets the log segment file size resolved from table properties or server configuration. */ + @Override + public MemorySize getLogSegmentSize() { + if (tableConfig.contains(ConfigOptions.LOG_SEGMENT_FILE_SIZE)) { + return tableConfig.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE); + } + return serverConfig.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java index ae04328202..eea1d43ab6 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java @@ -20,16 +20,17 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.TableConfig; import org.apache.fluss.exception.FlussException; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.LogStorageException; import org.apache.fluss.exception.SchemaNotExistException; -import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.TabletManagerBase; +import org.apache.fluss.server.config.ResolvedTableConfig; import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile; import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; import org.apache.fluss.server.storage.LocalDiskManager; @@ -260,16 +261,14 @@ private void waitForLoadLogsInDir(LogRecoveryTask recoveryTask) throws Throwable * @param dataDir the local data directory chosen for the bucket * @param tablePath the table path of the bucket belongs to * @param tableBucket the table bucket - * @param logFormat the log format - * @param tieredLogLocalSegments the number of segments to retain in local for tiered log + * @param tableConfig the resolved table config * @param isChangelog whether the log is a changelog of primary key table */ public LogTablet getOrCreateLog( File dataDir, PhysicalTablePath tablePath, TableBucket tableBucket, - LogFormat logFormat, - int tieredLogLocalSegments, + TableConfig tableConfig, boolean isChangelog) throws Exception { return inLock( @@ -287,11 +286,10 @@ public LogTablet getOrCreateLog( tablePath, tabletDir, conf, + tableConfig, serverMetricGroup, 0L, scheduler, - logFormat, - tieredLogLocalSegments, isChangelog, clock, true); @@ -392,11 +390,10 @@ private LogTablet loadLog( physicalTablePath, tabletDir, conf, + new ResolvedTableConfig(tableInfo.getProperties(), conf), serverMetricGroup, logRecoveryPoint, scheduler, - tableInfo.getTableConfig().getLogFormat(), - tableInfo.getTableConfig().getTieredLogLocalSegments(), tableInfo.hasPrimaryKey(), clock, isCleanShutdown); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java index 23c60c82e1..ade66d215b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java @@ -20,6 +20,7 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.TableConfig; import org.apache.fluss.exception.CorruptRecordException; import org.apache.fluss.exception.DuplicateSequenceException; import org.apache.fluss.exception.FlussRuntimeException; @@ -140,14 +141,13 @@ private LogTablet( Configuration conf, Scheduler scheduler, WriterStateManager writerStateManager, - LogFormat logFormat, - int tieredLogLocalSegments, + TableConfig tableConfig, boolean isChangelog, Clock clock) { this.dataDir = dataDir; this.physicalPath = physicalPath; this.localLog = localLog; - this.maxSegmentFileSize = (int) conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes(); + this.maxSegmentFileSize = (int) tableConfig.getLogSegmentSize().getBytes(); this.logFlushIntervalMessages = conf.get(ConfigOptions.LOG_FLUSH_INTERVAL_MESSAGES); int writerExpirationCheckIntervalMs = (int) conf.get(ConfigOptions.WRITER_ID_EXPIRATION_CHECK_INTERVAL).toMillis(); @@ -162,11 +162,11 @@ private LogTablet( () -> removeExpiredWriter(System.currentTimeMillis()), writerExpirationCheckIntervalMs, writerExpirationCheckIntervalMs); - this.logFormat = logFormat; + this.logFormat = tableConfig.getLogFormat(); checkArgument( - tieredLogLocalSegments > 0, + tableConfig.getTieredLogLocalSegments() > 0, "log segments to retain in local must be greater than 0"); - this.tieredLogLocalSegments = tieredLogLocalSegments; + this.tieredLogLocalSegments = tableConfig.getTieredLogLocalSegments(); this.clock = clock; this.isChangeLog = isChangelog; @@ -307,15 +307,16 @@ public static LogTablet create( PhysicalTablePath tablePath, File tabletDir, Configuration conf, + TableConfig tableConfig, TabletServerMetricGroup serverMetricGroup, long recoveryPoint, Scheduler scheduler, - LogFormat logFormat, - int tieredLogLocalSegments, boolean isChangelog, Clock clock, boolean isCleanShutdown) throws Exception { + LogFormat logFormat = tableConfig.getLogFormat(); + // create the log directory if it doesn't exist Files.createDirectories(tabletDir.toPath()); @@ -358,8 +359,7 @@ public static LogTablet create( conf, scheduler, writerStateManager, - logFormat, - tieredLogLocalSegments, + tableConfig, isChangelog, clock); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 1f8b893ab4..b3d5dbc062 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -20,6 +20,7 @@ import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.compression.ArrowCompressionInfo; import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; import org.apache.fluss.config.TableConfig; import org.apache.fluss.exception.FencedLeaderEpochException; import org.apache.fluss.exception.InvalidColumnProjectionException; @@ -54,6 +55,7 @@ import org.apache.fluss.rpc.protocol.MergeMode; import org.apache.fluss.rpc.util.PredicateMessageUtils; import org.apache.fluss.server.SequenceIDCounter; +import org.apache.fluss.server.config.ResolvedTableConfig; import org.apache.fluss.server.coordinator.CoordinatorContext; import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.kv.KvManager; @@ -249,7 +251,8 @@ public Replica( TableInfo tableInfo, Clock clock, RemoteLogManager remoteLogManager, - ScannerManager scannerManager) + ScannerManager scannerManager, + @Nullable Configuration configuration) throws Exception { this.physicalPath = physicalPath; this.tableBucket = tableBucket; @@ -271,7 +274,7 @@ public Replica( tableInfo.getSchemaId(), tableInfo.getSchema()); this.tableInfo = tableInfo; - this.tableConfig = tableInfo.getTableConfig(); + this.tableConfig = new ResolvedTableConfig(tableInfo.getProperties(), configuration); this.logFormat = tableConfig.getLogFormat(); this.arrowCompressionInfo = tableConfig.getArrowCompressionInfo(); this.snapshotContext = snapshotContext; @@ -2154,12 +2157,7 @@ private LogTablet createLog( throws Exception { LogTablet log = logManager.getOrCreateLog( - dataDir, - physicalPath, - tableBucket, - tableConfig.getLogFormat(), - tableConfig.getTieredLogLocalSegments(), - isKvTable()); + dataDir, physicalPath, tableBucket, tableConfig, isKvTable()); // update high watermark. Optional watermarkOpt = lazyHighWatermarkCheckpoint.fetch(tableBucket); long watermark = diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 71fd233b68..76328de1f2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -1994,7 +1994,8 @@ protected Optional maybeCreateReplica(NotifyLeaderAndIsrData data) { tableInfo, clock, remoteLogManager, - scannerManager); + scannerManager, + conf); if (!existingLogTabletOpt.isPresent()) { localDiskManager.recordReplicaLoad(dataDir, isKvTable); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index fcd5f1688a..740130020f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -112,6 +112,7 @@ public static void validateTableDescriptor( ConfigOption option = TABLE_OPTIONS.get(key); validateOptionValue(tableConf, option); } + checkLogSegmentFileSize(tableConf); // check distribution checkDistribution(tableDescriptor, maxBucketNum); @@ -417,6 +418,17 @@ private static void checkTieredLog(Configuration tableConf) { } } + private static void checkLogSegmentFileSize(Configuration tableConf) { + if (tableConf.contains(ConfigOptions.LOG_SEGMENT_FILE_SIZE) + && tableConf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() + > Integer.MAX_VALUE) { + throw new InvalidConfigException( + String.format( + "Invalid configuration for %s, it must be less than or equal %d bytes.", + ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), Integer.MAX_VALUE)); + } + } + private static void checkPartition( Configuration tableConf, List partitionKeys, RowType rowType) { boolean isPartitioned = !partitionKeys.isEmpty(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/config/ResolvedTableConfigTest.java b/fluss-server/src/test/java/org/apache/fluss/server/config/ResolvedTableConfigTest.java new file mode 100644 index 0000000000..2be850af2b --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/config/ResolvedTableConfigTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.config; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.MemorySize; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ResolvedTableConfig}. */ +class ResolvedTableConfigTest { + + @Test + void testLogSegmentSizeFallsBackToServerConfig() { + Configuration tableConf = new Configuration(); + Configuration serverConf = new Configuration(); + serverConf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("8kb")); + + ResolvedTableConfig tableConfig = new ResolvedTableConfig(tableConf, serverConf); + + assertThat(tableConfig.getLogSegmentSize()).isEqualTo(MemorySize.parse("8kb")); + + tableConf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("4kb")); + + assertThat(tableConfig.getLogSegmentSize()).isEqualTo(MemorySize.parse("4kb")); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java index 2c54c1665e..0f61576fce 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java @@ -21,7 +21,6 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.config.TableConfig; import org.apache.fluss.metadata.KvFormat; -import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.SchemaGetter; import org.apache.fluss.metadata.SchemaInfo; @@ -371,7 +370,11 @@ private KvTablet getOrCreateKv( tablePath.getDatabaseName(), tablePath.getTableName(), partitionName); LogTablet logTablet = logManager.getOrCreateLog( - tempDir, physicalTablePath, tableBucket, LogFormat.ARROW, 1, true); + tempDir, + physicalTablePath, + tableBucket, + new TableConfig(new Configuration()), + true); return kvManager.getOrCreateKv( physicalTablePath, tableBucket, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletMergeModeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletMergeModeTest.java index 97ae1d9cc9..a9141ba527 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletMergeModeTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletMergeModeTest.java @@ -125,11 +125,10 @@ void setUp() throws Exception { physicalTablePath, logTabletDir, conf, + new TableConfig(conf), TestingMetricGroups.TABLET_SERVER_METRICS, 0, new FlussScheduler(1), - LogFormat.ARROW, - 1, true, SystemClock.getInstance(), true); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java index 1303521708..0ee197c07b 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletSchemaEvolutionTest.java @@ -113,11 +113,10 @@ void setUp() throws Exception { physicalTablePath, logTabletDir, conf, + new TableConfig(conf), TestingMetricGroups.TABLET_SERVER_METRICS, 0, new FlussScheduler(1), - LogFormat.ARROW, - 1, true, SystemClock.getInstance(), true); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index d7ef347015..39ce70ed4d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -174,11 +174,10 @@ private LogTablet createLogTablet(File tempLogDir, long tableId, PhysicalTablePa tablePath, logTabletDir, conf, + new TableConfig(conf), TestingMetricGroups.TABLET_SERVER_METRICS, 0, new FlussScheduler(1), - LogFormat.ARROW, - 1, true, SystemClock.getInstance(), true); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/scan/ScannerManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/scan/ScannerManagerTest.java index 3843290f5d..339e30df2d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/scan/ScannerManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/scan/ScannerManagerTest.java @@ -23,7 +23,6 @@ import org.apache.fluss.exception.TooManyScannersException; import org.apache.fluss.memory.TestingMemorySegmentPool; import org.apache.fluss.metadata.KvFormat; -import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.SchemaInfo; import org.apache.fluss.metadata.TableBucket; @@ -103,11 +102,10 @@ void setUp() throws Exception { physicalTablePath, logTabletDir, conf, + new TableConfig(conf), TestingMetricGroups.TABLET_SERVER_METRICS, 0, new FlussScheduler(1), - LogFormat.ARROW, - 1, true, org.apache.fluss.utils.clock.SystemClock.getInstance(), true); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/DroppedTableRecoveryTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/DroppedTableRecoveryTest.java index e3d5b9ab39..d69806be3b 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/DroppedTableRecoveryTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/DroppedTableRecoveryTest.java @@ -21,7 +21,6 @@ import org.apache.fluss.config.Configuration; import org.apache.fluss.config.TableConfig; import org.apache.fluss.metadata.KvFormat; -import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.SchemaInfo; import org.apache.fluss.metadata.TableBucket; @@ -148,16 +147,14 @@ void testMultipleLogTabletResidualDataDirectoriesCleanup() throws Exception { tempDir, PhysicalTablePath.of(tablePath), tableBucket1, - LogFormat.ARROW, - 1, + new TableConfig(new Configuration()), false); LogTablet log2 = logManager.getOrCreateLog( tempDir, PhysicalTablePath.of(tablePath), tableBucket2, - LogFormat.ARROW, - 1, + new TableConfig(new Configuration()), false); // Write some data to both logs @@ -209,8 +206,7 @@ void testLogTabletResidualDataCleanupWithPartitionedTable() throws Exception { tempDir, partitionedTablePath, partitionedTableBucket, - LogFormat.ARROW, - 1, + new TableConfig(new Configuration()), false); // Write some data to the log @@ -256,16 +252,14 @@ void testMultipleKvTabletResidualDataDirectoriesCleanup() throws Exception { tempDir, PhysicalTablePath.of(tablePath), tableBucket1, - LogFormat.ARROW, - 1, + new TableConfig(new Configuration()), false); LogTablet log2 = logManager.getOrCreateLog( tempDir, PhysicalTablePath.of(tablePath), tableBucket2, - LogFormat.ARROW, - 1, + new TableConfig(new Configuration()), false); // Write some data to both logs @@ -356,8 +350,7 @@ void testKvTabletResidualDataCleanupWithPartitionedTable() throws Exception { tempDir, partitionedTablePath, partitionedTableBucket, - LogFormat.ARROW, - 1, + new TableConfig(new Configuration()), false); // Write some data to the log diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java index 075af47c48..26e08cfad4 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java @@ -20,6 +20,7 @@ import org.apache.fluss.compression.ArrowCompressionInfo; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.MemorySize; +import org.apache.fluss.config.TableConfig; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.record.ChangeType; @@ -650,11 +651,10 @@ private LogTablet createLogTablet(boolean isCleanShutdown, long recoveryPoint) PhysicalTablePath.of(DATA1_TABLE_PATH), logDir, conf, + new TableConfig(conf), TestingMetricGroups.TABLET_SERVER_METRICS, recoveryPoint, scheduler, - LogFormat.ARROW, - 1, false, SystemClock.getInstance(), isCleanShutdown); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java index a6087e39b3..0a3911d64a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java @@ -18,12 +18,15 @@ package org.apache.fluss.server.log; import org.apache.fluss.config.ConfigOptions; -import org.apache.fluss.metadata.LogFormat; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.MemorySize; +import org.apache.fluss.config.TableConfig; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.LogTestBase; import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.server.config.ResolvedTableConfig; import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile; import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.apache.fluss.server.storage.LocalDiskManager; @@ -175,6 +178,29 @@ void testGetNonExistentLog() { assertThat(log.isPresent()).isFalse(); } + @Test + void testTableLogSegmentFileSizeOverridesServerConfig() throws Exception { + initTableBuckets(null); + MemoryLogRecords records = genMemoryLogRecordsByObject(DATA1); + Configuration tableProperties = new Configuration(); + tableProperties.set( + ConfigOptions.LOG_SEGMENT_FILE_SIZE, + new MemorySize(records.sizeInBytes() * 2L - 1)); + + LogTablet log = + logManager.getOrCreateLog( + tempDir, + PhysicalTablePath.of(tablePath1), + tableBucket1, + new ResolvedTableConfig(tableProperties, conf), + false); + + log.appendAsLeader(records); + log.appendAsLeader(records); + + assertThat(log.logSegments().size()).isEqualTo(2); + } + @ParameterizedTest @MethodSource("partitionProvider") void testCheckpointRecoveryPoints(String partitionName) throws Exception { @@ -400,15 +426,18 @@ private LogTablet getOrCreateLog( PhysicalTablePath.of( tablePath.getDatabaseName(), tablePath.getTableName(), partitionName), tableBucket, - LogFormat.ARROW, - 1, + createTableConfig(), false); } private LogTablet createLog(TablePath tablePath, TableBucket tableBucket, File dataDir) throws Exception { return logManager.getOrCreateLog( - dataDir, PhysicalTablePath.of(tablePath), tableBucket, LogFormat.ARROW, 1, false); + dataDir, PhysicalTablePath.of(tablePath), tableBucket, createTableConfig(), false); + } + + private TableConfig createTableConfig() { + return new ResolvedTableConfig(new Configuration(), conf); } private void initTableBuckets(@Nullable String partitionName) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java index 46685cfe05..fa6766e6ae 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java @@ -20,8 +20,8 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.config.MemorySize; +import org.apache.fluss.config.TableConfig; import org.apache.fluss.exception.OutOfOrderSequenceException; -import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.record.LogRecord; import org.apache.fluss.record.LogRecordBatch; @@ -92,11 +92,10 @@ public void setup() throws Exception { PhysicalTablePath.of(DATA1_TABLE_PATH), logDir, conf, + new TableConfig(conf), TestingMetricGroups.TABLET_SERVER_METRICS, 0, scheduler, - LogFormat.ARROW, - 1, false, SystemClock.getInstance(), true); @@ -496,11 +495,10 @@ private LogTablet createLogTablet(Configuration config) throws Exception { PhysicalTablePath.of(DATA1_TABLE_PATH), logDir, config, + new TableConfig(config), TestingMetricGroups.TABLET_SERVER_METRICS, 0, scheduler, - LogFormat.ARROW, - 1, false, SystemClock.getInstance(), true); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java index bad818a9e9..f459f98a9d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java @@ -543,7 +543,8 @@ private Replica makeReplica( DATA1_TABLE_INFO, manualClock, remoteLogManager, - scannerManager); + scannerManager, + conf); } private void initRemoteLogEnv() throws Exception { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/utils/TableDescriptorValidationTest.java b/fluss-server/src/test/java/org/apache/fluss/server/utils/TableDescriptorValidationTest.java new file mode 100644 index 0000000000..a52ba1d3f7 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/utils/TableDescriptorValidationTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.utils; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.exception.InvalidConfigException; +import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link TableDescriptorValidation}. */ +class TableDescriptorValidationTest { + + @Test + void testValidateLogSegmentFileSizeTableProperty() { + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(Schema.newBuilder().column("id", DataTypes.INT()).build()) + .distributedBy(1) + .property(ConfigOptions.TABLE_REPLICATION_FACTOR.key(), "1") + .property(ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), "1kb") + .build(); + + TableDescriptorValidation.validateTableDescriptor(tableDescriptor, 1, null); + } + + @Test + void testValidateLogSegmentFileSizeTablePropertyTooLarge() { + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(Schema.newBuilder().column("id", DataTypes.INT()).build()) + .distributedBy(1) + .property(ConfigOptions.TABLE_REPLICATION_FACTOR.key(), "1") + .property(ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), "3g") + .build(); + + assertThatThrownBy( + () -> + TableDescriptorValidation.validateTableDescriptor( + tableDescriptor, 1, null)) + .isInstanceOf(InvalidConfigException.class) + .hasMessageContaining( + "Invalid configuration for log.segment.file-size, it must be less than or equal"); + } +}