Skip to content

feat(core): support sort manifest entries by partition#7866

Open
baiyangtx wants to merge 20 commits into
apache:masterfrom
baiyangtx:manifest-sort
Open

feat(core): support sort manifest entries by partition#7866
baiyangtx wants to merge 20 commits into
apache:masterfrom
baiyangtx:manifest-sort

Conversation

@baiyangtx
Copy link
Copy Markdown
Contributor

Purpose

Manifest entries are currently written in arrival order, which scatters entries belonging
to the same partition across multiple manifest files. This leads to:

  • Inefficient partition-pruning during reads — the reader must scan many manifest files
    to find all entries for a single partition.
  • Suboptimal data locality when compaction reorganizes files.

This PR introduces manifest entry sorting by partition, so that entries for the same
partition are clustered together within manifest files.

Changes

New configuration options:

Option Default Description
manifest.merge.sorted true Sort entries by partition during manifest full compaction
manifest.merge.sort-on-commit false Sort entries by partition during manifest full merge in commit
manifest.delta.sorted true Sort entries by partition when writing manifest delta

Core implementation:

  • ManifestFileMerger: Introduces mergeSortedByPartition() and mergeUnsorted().
    When manifest.merge.sorted is enabled, entries are collected into a
    BinaryExternalSortBuffer (with spill-to-disk support), then written in
    partition-major order: (partition, bucket, level).
  • ManifestFileMerger.createManifestEntryComparator(): Comparator used for
    sorting delta manifests, falling back to pure-Java comparison when codegen
    is unavailable.
  • FileStoreCommitImpl: Wires sort parameters into all three paths —
    commit manifest merge, delta file writing, and manifest compaction.

Tests

# Manifest merge with sorting
mvn -pl paimon-core -am -DfailIfNoTests=false \
    -Dtest=ManifestFileMetaTest test

# No-partition edge case
mvn -pl paimon-core -am -DfailIfNoTests=false \
    -Dtest=NoPartitionManifestFileMetaTest test

# Spark compact procedure
mvn -pl paimon-spark/paimon-spark-ut -am -DfailIfNoTests=false \
    -Dtest=CompactProcedureTestBase test

Copy link
Copy Markdown
Contributor

@Aitozi Aitozi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1, Please resolve the conflict

@Aitozi
Copy link
Copy Markdown
Contributor

Aitozi commented May 21, 2026

CC @JingsongLi for another look

@JingsongLi
Copy link
Copy Markdown
Contributor

Hi @baiyangtx @Aitozi , please take a look to #7842 , I didn't look closely, maybe the abilities of these two PR implementations are the same.

@baiyangtx
Copy link
Copy Markdown
Contributor Author

baiyangtx commented May 22, 2026

Hi @baiyangtx @Aitozi , please take a look to #7842 , I didn't look closely, maybe the abilities of these two PR implementations are the same.

Thanks @discivigour for working on this too. My PR #7866 takes a different approach — sorting inline during the existing full compaction path, rather than a separate post-commit sort pipeline. Key differences:

  • No new compact type — sorting happens inside ManifestFileMerger.tryFullCompaction, reusing the existing full compaction trigger and lifecycle. No additional manifest scans or rewrite passes.
  • Memory-safe with external sort — uses BinaryExternalSortBuffer (already battle-tested across Paimon). When entries exceed manifest.merge.sort.buffer, it automatically spills to disk. No risk of OOM regardless of manifest count.
  • Finer-grained config — separate toggles for merge compaction, commit-time merge, and delta writing, instead of a single switch.

Happy to align with the community's direction, but I believe the inline approach is simpler and less invasive.

张永翔 added 14 commits May 22, 2026 14:44
- Sort delta entries during commit
- Sort base entries via full merge

See merge request: !854
The manifest entry sorting feature introduced in the parent commit
used FileKind.DELETE in mergeUnsorted() and mergeSortedByPartition()
without importing org.apache.paimon.manifest.FileKind, causing all
modules that depend on paimon-core to fail compilation.
The sorting refactor eliminated the single call to sequentialBatchedExecute,
but left the static import, causing Checkstyle to fail the build.
- FileStoreCommitImpl: use options.xxx() instead of local variables
  (manifestTargetSize, manifestMergeMinCount, etc. don't exist)
- FileStoreCommitImpl: replace coreOptions with options
- ManifestFileMerger: replace entry.toBytes() with
  entrySerializer.serializeToBytes() (toBytes() doesn't exist on apache/master)
- ManifestFileMerger: extract createPartitionRecordComparator() to handle
  partition row comparison (InternalRowUtils.compare takes DataTypeRoot,
  not RowType; newRecordComparator takes boolean[], not boolean)
- Remove stale reader Function referencing deleted readForFullCompaction
- Add missing DataType import, remove unused Function/singletonList imports
createPartitionRecordComparator now returns RecordComparator instead of
Comparator<BinaryRow>, since RecordComparator extends
Comparator<InternalRow> and newRecordComparator already returns
RecordComparator. This avoids a type incompatibility at the assignment
site where the result was used as RecordComparator.
Manifest sorting changes the read order of results across partitions.
The test compared results using containsExactly which requires exact
order; changed to containsExactlyInAnyOrder to match the same approach
used in CompactProcedureTestBase.scala.
Manifest sorting changes read order; the test expected sorted output.
Added explicit sort by id to make the test order-independent.
… assertion

- testFullCompactionReadManifestsInParallel: removed parallel read
  assertions since reads are now sequential in mergeSortedByPartition
- BlobTableTest/MultipleBlobTableTest: disable MANIFEST_MERGE_SORTED
  and MANIFEST_DELTA_SORTED since blob/vector-store code depends on
  original file ordering
…rtions

The manifest sorting feature changes the order of results. All
containsExactlyElementsOf in IncrementalClusterActionITCase need to
be changed to containsExactlyInAnyOrderElementsOf, not just the one
in the assertResult helper.
…in Spark tests

CompactProcedureTestBase and RescaleProcedureTest had additional
containsExactlyElementsOf calls that were not updated in the
original PR. All changed to containsExactlyInAnyOrderElementsOf.
- LanceFormatTest: add ORDER BY a to LIMIT queries
- SparkWriteITCase.testWriteWithDefaultValue: add ORDER BY a to
  all SELECT queries whose results are compared via toString()
Comment on lines +422 to +423
private static int mergeSortedByPartition(
List<ManifestFileMeta> toBeMerged,
Copy link
Copy Markdown
Contributor

@discivigour discivigour May 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @baiyangtx for working on this too. Can this code sort the out-of-order large file manifest that already exists in the table? If the user wants to enable this parameter halfway, it seems that the feature won't work.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@discivigour Good point — my approach indeed doesn't handle existing out-of-order manifests. I think our two PRs are actually complementary: yours handles the bootstrap/rewrite path for historical manifests, mine maintains sorted order going forward via inline compaction. I'd like to collaborate and align the configuration surface first so users see one coherent "manifest sort" feature rather than two separate mechanisms. WDYT?

Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: feat(core): support sort manifest entries by partition

Good motivation -- clustering manifest entries by partition will improve partition-pruning scan performance. Below are several issues I found while reviewing.


1. Loss of ADD filter during manifest reads (performance regression)

In the original readForFullCompaction, the code called:

manifestFile.read(file.fileName(), file.fileSize(), FileEntry.addFilter(), Filter.alwaysTrue())

Both mergeUnsorted() and mergeSortedByPartition() now call:

manifestFile.read(file.fileName(), file.fileSize())

...and then filter FileKind.DELETE in Java. This means DELETE entries are now deserialized from disk only to be discarded. For manifest files with many DELETE entries (e.g., after bulk deletes or partition drops), this is a regression. Please push the add-filter down into the reader call.


2. Parallel manifest reading was silently removed

The original code used sequentialBatchedExecute(reader, toBeMerged, manifestReadParallelism) to read manifest files in parallel. The refactored mergeUnsorted() and mergeSortedByPartition() now iterate sequentially over toBeMerged. For large tables with hundreds of manifest files needing compaction, this could significantly increase compaction latency. The manifestReadParallelism parameter is now effectively unused in the full compaction path.

The test testFullCompactionReadManifestsInParallel was modified to remove the parallelism assertions (maxConcurrentManifestReads), which confirms the feature was dropped rather than preserved.


3. IOManager created from System.getProperty("java.io.tmpdir")

In mergeSortedByPartition():

ioManager = IOManager.create(System.getProperty("java.io.tmpdir"));

This bypasses any user-configured temp directory or spill path. The commit code already has access to an IOManager (or its configuration) through the FileStore. Please pass the IOManager (or temp directories) from the caller rather than hard-coding the system temp directory. This also avoids creating a new IOManager per compaction.


4. Test assertions weakened to containsExactlyInAnyOrderElementsOf

Many tests (especially clustering/sort-related ones like testLocalSortClusterUnpartitionedTable) were changed from containsExactlyElementsOf to containsExactlyInAnyOrderElementsOf. This fundamentally weakens the test -- e.g., the comment in IncrementalClusterActionITCase says "verify internal order: within the single output file, rows must be sorted ascending by (a, b)" but then uses an assertion that does not verify ordering.

If the manifest sorting changes the read order for these tests, the correct fix is to add an ORDER BY clause to the query (as done in SparkWriteITCase) rather than dropping the ordering guarantee from the assertion. Otherwise these tests no longer validate that clustering produces correctly sorted data.


5. Inconsistent sort key between delta sort and full compaction sort

createManifestEntryComparator (used for delta sorting) orders by (partition, bucket, level, fileName), while mergeSortedByPartition (used for full compaction) orders by (partition, bucket, level). Without a stable tiebreaker in the full compaction sort, entries with the same (partition, bucket, level) have non-deterministic order. Consider adding fileName to the full compaction comparator as well for consistency and deterministic output.


6. Default values: manifest.delta.sorted=true changes write-path behavior silently

manifest.delta.sorted defaults to true, meaning every commit will now incur sorting overhead on the delta file list. Given that manifest.merge.sort-on-commit defaults to false explicitly to avoid commit-path latency impact, it seems inconsistent that delta sorting is on by default. Consider defaulting to false or at least calling this out in migration/upgrade notes since it changes existing behavior.


Minor

  • The variable name partitionRmpr in createManifestEntrySortBuffer seems like a typo. Consider partitionCmpr or partitionComparator.
  • The overload chain for merge() and tryFullCompaction() (3 overloads each) is getting long. A parameter object or builder might improve readability in the future.

Overall the feature is valuable and the use of BinaryExternalSortBuffer for spill-to-disk is a sound approach. Addressing the performance regressions (filter pushdown, parallel reads) and the weakened test assertions would strengthen this PR significantly.

zhangyongxiang.alpha added 6 commits May 25, 2026 10:47
1. Restore ADD filter in manifest reads (mergeUnsorted and
   mergeSortedByPartition) to avoid deserializing DELETE entries
2. Add fileName tiebreaker to full compaction sort key for
   deterministic output
3. Change manifest.delta.sorted default to false (consistent
   with sort-on-commit default)
4. Rename partitionRmpr to partitionCmp
Thread @nullable IOManager from caller through tryFullCompaction
and merge methods to mergeSortedByPartition. Falls back to
java.io.tmpdir when null. Overloads and existing callers pass null
to preserve existing behavior.
…UnpartitionedTable

Restore containsExactlyElementsOf for the 'verify internal order'
assertion. With parallelism=1 and single output file, rows within
the file are guaranteed to be sorted by (a, b) per local-sort.
Missed in previous commit - the merge method also needs ioManager
parameter threaded through to tryFullCompaction.
@baiyangtx
Copy link
Copy Markdown
Contributor Author

baiyangtx commented May 25, 2026

Thanks @JingsongLi for the thorough review. I've addressed most of the feedback:

  1. ADD filter regression — Restored FileEntry.addFilter() in both mergeUnsorted() and mergeSortedByPartition(). DELETE entries are now filtered at the IO level as before.

  2. IOManager hard-coded tmpdir — Added @Nullable IOManager parameter to tryFullCompaction and merge. Falls back to System.getProperty("java.io.tmpdir") when null is passed, so existing callers are unaffected.

  3. Inconsistent sort key — Added fileName as the 5th field in the sort row, serving as a deterministic tiebreaker after (partition, bucket, level).

  4. manifest.delta.sorted default — Changed from true to false, consistent with manifest.merge.sort-on-commit.

  5. Test assertion for internal order — Reverted containsExactlyInAnyOrderElementsOf back to containsExactlyElementsOf for the testLocalSortClusterUnpartitionedTable internal-order assertion. With parallelism=1 and a single output file, the order is deterministic.

  6. Minor — Renamed partitionRmprpartitionCmp.

For discussion — parallel manifest reads (2): The original sequentialBatchedExecute was removed because the sorting path needs all entries collected before spilling to external sort. I'd appreciate guidance on whether to restore parallel reads for the non-sorted path only, or if the simplicity of unified sequential reads is acceptable here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants