Skip to content
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,24 @@
<td>Integer</td>
<td>To avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge.</td>
</tr>
<tr>
<td><h5>manifest-sort.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to invoke manifest sort rewrite during commit.</td>
</tr>
<tr>
<td><h5>manifest-sort.partition-field</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Partition field name to sort manifest entries by. Validated by schema validation; If not configured, defaults to the first partition field.</td>
</tr>
<tr>
<td><h5>manifest-sort.max-rewrite-size</h5></td>
<td style="word-wrap: break-word;">256 mb</td>
<td>MemorySize</td>
<td>Maximum total size of manifest files to rewrite in a single sort rewrite pass. Sections exceeding this limit are skipped. Set to a larger value to allow more aggressive sort rewriting.</td>
</tr>
<tr>
<td><h5>manifest.target-file-size</h5></td>
<td style="word-wrap: break-word;">8 mb</td>
Expand Down
37 changes: 37 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,30 @@ public InlineElement getDescription() {
"To avoid frequent manifest merges, this parameter specifies the minimum number "
+ "of ManifestFileMeta to merge.");

public static final ConfigOption<Boolean> MANIFEST_SORT_ENABLED =
key("manifest-sort.enabled")
.booleanType()
.defaultValue(false)
.withDescription("Whether to invoke manifest sort rewrite during commit.");

public static final ConfigOption<String> MANIFEST_SORT_PARTITION_FIELD =
key("manifest-sort.partition-field")
.stringType()
.noDefaultValue()
.withDescription(
"Partition field name to sort manifest entries by. Validated by"
+ " schema validation; If not configured, defaults to the first partition field.");

public static final ConfigOption<MemorySize> MANIFEST_SORT_MAX_REWRITE_SIZE =
key("manifest-sort.max-rewrite-size")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(256))
.withDescription(
"Maximum total size of manifest files to rewrite in a single"
+ " sort rewrite pass. Sections exceeding this limit are"
+ " skipped. Set to a larger value to allow more aggressive"
+ " sort rewriting.");

public static final ConfigOption<String> UPSERT_KEY =
key("upsert-key")
.stringType()
Expand Down Expand Up @@ -2564,6 +2588,19 @@ public MemorySize manifestFullCompactionThresholdSize() {
return options.get(MANIFEST_FULL_COMPACTION_FILE_SIZE);
}

public boolean manifestSortEnabled() {
return options.get(MANIFEST_SORT_ENABLED);
}

@Nullable
public String manifestSortPartitionField() {
return options.get(MANIFEST_SORT_PARTITION_FIELD);
}

public long manifestSortMaxRewriteSize() {
return options.get(MANIFEST_SORT_MAX_REWRITE_SIZE).getBytes();
}

public String partitionDefaultName() {
return options.get(PARTITION_DEFAULT_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.apache.paimon.operation.commit.SuccessCommitResult;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.operation.metrics.CommitStats;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.predicate.Predicate;
Expand Down Expand Up @@ -958,13 +960,7 @@ CommitResult tryCommitOnce(
// try to merge old manifest files to create base manifest list
mergeAfterManifests =
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
options.manifestTargetSize().getBytes(),
options.manifestMergeMinCount(),
options.manifestFullCompactionThresholdSize().getBytes(),
partitionType,
options.scanManifestParallelism());
mergeBeforeManifests, manifestFile, partitionType, options);
baseManifestList = manifestList.write(mergeAfterManifests);

if (options.rowTrackingEnabled()) {
Expand Down Expand Up @@ -1184,16 +1180,16 @@ private boolean compactManifestOnce() {
manifestList.readDataManifests(latestSnapshot);
List<ManifestFileMeta> mergeAfterManifests;

// the fist trial
// the fist trial: use a copied options with forced full compaction settings
Options compactOptions = Options.fromMap(options.toMap());
compactOptions.set(CoreOptions.MANIFEST_MERGE_MIN_COUNT, 1);
compactOptions.set(CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE, MemorySize.ofBytes(1));
mergeAfterManifests =
ManifestFileMerger.merge(
mergeBeforeManifests,
manifestFile,
options.manifestTargetSize().getBytes(),
1,
1,
partitionType,
options.scanManifestParallelism());
new CoreOptions(compactOptions));

if (new HashSet<>(mergeBeforeManifests).equals(new HashSet<>(mergeAfterManifests))) {
// no need to commit this snapshot, because no compact were happened
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.operation;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.manifest.FileEntry;
Expand Down Expand Up @@ -48,7 +49,7 @@
import static org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Util for merging manifest files. */
/** Manifest file merger with standard merge logic and optional sort rewrite. */
public class ManifestFileMerger {

private static final Logger LOG = LoggerFactory.getLogger(ManifestFileMerger.class);
Expand All @@ -62,15 +63,28 @@ public class ManifestFileMerger {
public static List<ManifestFileMeta> merge(
List<ManifestFileMeta> input,
ManifestFile manifestFile,
long suggestedMetaSize,
int suggestedMinMetaCount,
long manifestFullCompactionSize,
RowType partitionType,
@Nullable Integer manifestReadParallelism) {
CoreOptions options) {
// Extract configuration from options
long suggestedMetaSize = options.manifestTargetSize().getBytes();
int suggestedMinMetaCount = options.manifestMergeMinCount();
long manifestFullCompactionSize = options.manifestFullCompactionThresholdSize().getBytes();
Integer manifestReadParallelism = options.scanManifestParallelism();

// these are the newly created manifest files, clean them up if exception occurs
List<ManifestFileMeta> newFilesForAbort = new ArrayList<>();

try {
// If manifest-sort.enabled is enabled and there are partition fields, use
// trySortRewrite
if (options.manifestSortEnabled() && partitionType.getFieldCount() > 0) {
Optional<List<ManifestFileMeta>> sorted =
ManifestFileSorter.trySortRewrite(
input, newFilesForAbort, manifestFile, partitionType, options);
return sorted.orElse(input);
}

// Otherwise try full compaction first, then minor compaction if needed
Optional<List<ManifestFileMeta>> fullCompacted =
tryFullCompaction(
input,
Expand Down Expand Up @@ -234,7 +248,6 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
}

// 2.2. merge

if (toBeMerged.size() <= 1) {
return Optional.empty();
}
Expand Down Expand Up @@ -295,21 +308,21 @@ private static FullCompactionReadResult readForFullCompaction(
return new FullCompactionReadResult(file, requireChange, entries);
}

private static Set<BinaryRow> computeDeletePartitions(Set<FileEntry.Identifier> deleteEntries) {
static Set<BinaryRow> computeDeletePartitions(Set<FileEntry.Identifier> deleteEntries) {
Set<BinaryRow> partitions = new HashSet<>();
for (FileEntry.Identifier identifier : deleteEntries) {
partitions.add(identifier.partition);
}
return partitions;
}

private static class FullCompactionReadResult {
static class FullCompactionReadResult {

private final ManifestFileMeta file;
private final boolean requireChange;
private final List<ManifestEntry> entries;
final ManifestFileMeta file;
final boolean requireChange;
final List<ManifestEntry> entries;

private FullCompactionReadResult(
FullCompactionReadResult(
ManifestFileMeta file, boolean requireChange, List<ManifestEntry> entries) {
this.file = file;
this.requireChange = requireChange;
Expand Down
Loading
Loading