[Spark] Support writing Log Compaction Files#7080
Open
felipepessoto wants to merge 2 commits into
Open
Conversation
c6c544f to
8191bbd
Compare
Delta already supports reading log compaction files
(`<x>.<y>.compacted.json`); this adds the write side for Spark. A
compaction file aggregates the reconciled actions of a commit range
into one file (same action-reconciliation rules as a checkpoint, with
`commitInfo` stripped), letting a compaction-aware reader build a
snapshot from `checkpoint + a few compactions + a few commits` instead
of `checkpoint + many commits`. They are optional and require no
protocol or table-feature upgrade; readers that don't understand them
ignore them.
What this adds:
- LogCompaction: reconciles commits [start, end] via InMemoryLogReplay
and writes <start>.<end>.compacted.json. Commit paths are resolved
through DeltaCommitFileProvider, so it works for filesystem and
coordinated-commits / catalog-managed layouts. Idempotent: skips when
the target exists (fs.exists fast-path) and writes with
overwrite = false, treating a concurrent FileAlreadyExistsException as
a successful no-op (content is deterministic). Bounds driver memory by
skipping windows whose combined commit-file size exceeds
deltaLog.minorCompaction.maxWindowSizeBytes. Emits a
delta.logCompaction.stats event (LogCompactionMetrics) for each call.
- LogCompactionHook: post-commit hook that, when enabled, produces a
compaction over the fixed window
[max(committedVersion - interval + 1, lastCheckpointVersion + 1),
committedVersion] at each interval boundary, and is skipped when a
checkpoint was just written. Fixed, non-overlapping windows are
required so the reader's greedy selection can chain them. Per the
protocol, only published (backfilled) versions are compacted, so on
coordinated-commits / catalog-managed tables it first calls
Snapshot.ensureCommitFilesBackfilled (the same mechanism checkpoint
writing uses). Runs synchronously like CheckpointHook.
- InMemoryLogReplay: add retainDomainMetadataTombstones (default false,
so snapshot/checkpoint/checksum behavior is unchanged). A compaction
is an incremental replacement for its range, so unlike a checkpoint it
must retain removed=true DomainMetadata tombstones to suppress an add
that precedes the window; LogCompaction opts in.
- Metadata cleanup: expired compaction files are deleted inside the
existing cleanUpExpiredLogs flow, under the same gates as
delta/checkpoint/checksum cleanup. A file is removed when its start
version is at or before the latest checkpoint and it is older than the
retention cutoff. Deletion is a separate pass, never fed through
BufferingLogDeletionIterator, so it cannot perturb commit-file
retention; expired files are peeled off the same _delta_log listing to
avoid a second listing (best-effort and self-healing).
- Config: delta.logCompactionInterval (table property, default 5) and
session SQLConfs deltaLog.minorCompaction.{useForWrites (default true),
maxWindowSizeBytes (default 1 GiB)}; the existing read-side
deltaLog.minorCompaction.useForReads is unchanged. The global
delta.checkpointInterval default is left unchanged.
Tests: LogCompactionSuite (windows, snapshot equivalence vs raw commits,
idempotency, telemetry, non-trivial actions incl. DVs/row-tracking/
domain metadata/SetTransaction, domain-tombstone retention, size guard),
LogCompactionWithCatalogOwnedSuite (backfill-then-compact end to end),
and a DeltaRetentionSuite cleanup test. The read path remains covered by
DeltaLogMinorCompactionSuite.
Signed-off-by: Felipe Fujiy Pessoto <felipepessoto@hotmail.com>
8191bbd to
e5aac7c
Compare
Contributor
Author
|
@timothyw553 could you please trigger CI. Do you know who would be the best person to review it? @prakharjain09 / @ryan-johnson-databricks? TIA |
…delta_log assertions Signed-off-by: Felipe Pessoto <felipepessoto@hotmail.com>
Contributor
Author
|
@timothyw553 I fixed some tests relying on precise number of delta log file count. Could you trigger it again, please? |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which Delta project/connector is this regarding?
Description
This PR adds write support for Log Compaction Files to the Spark. Delta already supports reading compaction files during snapshot construction (
spark.databricks.delta.deltaLog.minorCompaction.useForReads,useCompactedDeltasForLogSegment), but there was no way to produce them. This is the writer half, and addresses the Spark portion of #2072.A log compaction file
<x>.<y>.compacted.jsonaggregates the reconciled actions of commits[x, y](same action-reconciliation rules as a checkpoint, withcommitInfostripped) into a single file. Readers that understand them can construct a snapshot fromcheckpoint + a few compactions + a few commitsinstead ofcheckpoint + many commits, reducing the number of individual commit files read during snapshot construction (and making a higher checkpoint interval viable in the future — though this PR does not change the default checkpoint interval). Per the protocol they are optional and require no protocol or table-feature upgrade; readers that don't understand them simply ignore them.Once this is merged, I plan to send a separate PR to address the:
Increase the Checkpoint Interval to 20 without affecting the performance of Read/Write operationsof #2072.What this PR adds
LogCompaction— a small writer that reads commits[startVersion, endVersion], reconciles them viaInMemoryLogReplay, and writes<start>.<end>.compacted.json. Commit-file paths are resolved throughDeltaCommitFileProviderso it works for both filesystem and coordinated-commits / catalog-managed path layouts. Because a compaction file incrementally replaces its commit range (rather than being a full snapshot like a checkpoint), the reconciliation retains tombstones that suppress earlier (pre-window) state:RemoveFiletombstones (already retained byInMemoryLogReplay) andremoved = trueDomainMetadatatombstones — for the latter,compactopts into a newInMemoryLogReplay(retainDomainMetadataTombstones = true)mode, since the default (checkpoint/snapshot) reconciliation drops domain tombstones per the protocol. Without this, a domain removed inside the window whoseaddwas in an earlier checkpoint/commit would wrongly reappear as active in a compaction-backed snapshot. It is idempotent: if the target<start>.<end>.compacted.jsonalready exists it returns early without re-reading or rewriting, so concurrent writers reaching the same interval boundary don't redundantly recompute the same file. The write itself usesoverwrite = false, so the residual race where two writers both pass the existence check is resolved atomically — the loser receives aFileAlreadyExistsException, which (because the reconciled content for a range is deterministic and therefore equivalent) is caught and treated as a successful no-op rather than clobbering the existing file. To bound driver memory (reconciliation runs in a single in-memory log replay on the driver, unlike distributed checkpoint/snapshot reconstruction), a window whose combined commit-file size exceedsdeltaLog.minorCompaction.maxWindowSizeBytes(default 1 GiB) is skipped — measured for free from the snapshot's already-listedFileStatusentries. Eachcompactcall emits adelta.logCompaction.statstelemetry event (LogCompactionMetrics: start/end version, status, duration, commits/actions reconciled, resulting file size, window size, and skip reason) for observability, mirroring how checkpointing records metrics.LogCompactionHook— a post-commit hook that, when enabled, produces a compaction file after a commit whose version is a multiple of the configured interval. It compacts a fixed window[max(committedVersion - interval + 1, lastCheckpointVersion + 1), committedVersion], and is skipped when a checkpoint was just written (a checkpoint already subsumes those commits). Fixed, non-overlapping windows are required so the reader's greedy selection can chain them; an ever-growing range would produce overlapping files where the reader only ever uses the smallest one. To get cleanly tiling, reader-usable files, set the checkpoint interval to a multiple of (and larger than) the compaction interval. Per the protocol, a compaction may only be produced for published (backfilled) versions; on catalog-managed / coordinated-commits tables the hook therefore synchronously backfills the window's commits before compacting viaSnapshot.ensureCommitFilesBackfilled— the same mechanism checkpoint writing uses. This is a no-op on filesystem-based tables.delta.logCompactionInterval(table property, int, default5, must be>= 2) — the compaction interval, persisted per-table likedelta.checkpointInterval.spark.databricks.delta.deltaLog.minorCompaction.useForWrites(session SQLConf, internal, defaulttrue) — whether the post-commit hook produces compaction files. Named to mirror the existing read config.spark.databricks.delta.deltaLog.minorCompaction.maxWindowSizeBytes(session SQLConf, internal, default1 GiB) — driver-memory guard: if the combined size of a window's commit files exceeds this, that window is skipped (reconciliation happens in a single in-memory log replay on the driver). A non-positive value disables the guard.spark.databricks.delta.deltaLog.minorCompaction.useForReads(internal, defaulttrue) — whether snapshots are built using compaction files.delta.checkpointIntervalis unchanged (10). With the default compaction interval of5(a divisor of, and smaller than, the checkpoint interval), the hook produces one mid-cadence compaction (e.g.[1,5]) between checkpoints — so a compaction-aware reader readscheckpoint + 1 compaction + a few commitsinstead ofcheckpoint + up to 9 commits, without changing checkpoint frequency for anyone._last_checkpoint) and older than the retention cutoff is deleted (the age gate preserves anything still inside the retention / time-travel window), in the spirit of the protocol's metadata cleanup section. This runs inside the existingcleanUpExpiredLogsflow, after the sameenableExpiredLogCleanup/metadataCleanupAllowedgates as the existing delta/checkpoint/checksum cleanup, so it inherits the same policy (including for catalog-managed tables) and does not introduce a new cleanup entry point. The deletion itself is a separate pass, intentionally not fed throughBufferingLogDeletionIterator, so it can never influence the retention decisions for actual commit/checkpoint/checksum files. Expired compaction files are peeled off the same_delta_loglisting that drives commit/checkpoint cleanup (no second listing). This peeling is best-effort and self-healing: if the listing iterator stops early at a non-expired segment, a trailing expired compaction file is simply collected in a later cleanup round — it's a derived/optional file already subsumed by the checkpoint and never selected by readers, so this never affects correctness or commit-file retention.Known limitations / follow-ups
5) is smaller than the checkpoint interval (10), suites that make precise_delta_logfile-set assertions across ≥5 commits can now see compaction files. The affected suites disable the write hook in their base (DeltaRetentionSuiteBase,DeltaLogMinorCompactionSuite); a full CI run may surface additional suites needing the same one-line opt-out. (The globaldelta.checkpointIntervaldefault is intentionally not changed by this PR.)CheckpointHook. Moving it to a background thread to avoid the boundary-commit latency spike would be a deliberate new design, revisited only if that latency becomes a concern.How was this patch tested?
New and existing unit tests (Spark):
LogCompactionSuite(new): the feature is enabled by default and can be disabled viadeltaLog.minorCompaction.useForWrites; the hook produces the expected non-overlapping fixed windows at the configured interval; produced files are used for snapshot construction and yield identical state/checksum vs. reading raw commits; a checkpoint subsumes the compaction at its boundary and bounds the next window; no compaction is produced before a full window exists; the hook is registered on every transaction;LogCompaction.compactwrites a reconciled file withoutcommitInfo;LogCompaction.compactis a no-op when the target compaction file already exists (idempotency / skip-if-exists); and completed vs. skipped compactions emit the expecteddelta.logCompaction.statstelemetry (LogCompactionMetrics); and a compaction over commits containing non-trivial action types (deletion vectors, row-trackingbaseRowId, domain metadata, and aSetTransactionidentifier) preserves them and yields a snapshot identical to one built from the raw commits; and a compaction whose window contains aDomainMetadataremoval whoseaddprecedes the window retains the removal tombstone so the compaction-backed snapshot agrees with the raw-commit snapshot (the domain stays removed); and a window whose combined commit-file size exceedsdeltaLog.minorCompaction.maxWindowSizeBytesis skipped (no file written,windowTooLargetelemetry) while a window under the cap is compacted.LogCompactionWithCatalogOwnedSuite(new): on a catalog-owned table with batched backfill, the hook backfills the window's commits and then produces the same compaction windows it would on a filesystem table ([1,5],[6,10]), each covering only published (backfilled) versions; the snapshot uses them and the table reads correctly.DeltaRetentionSuite(new test): old compaction files (start version ≤ the latest checkpoint version, including the boundary where start version equals the checkpoint version) are cleaned up while newer ones are retained; also verified underDeltaRetentionWithCatalogOwnedBatch1Suite(catalog-owned).A full CI run may surface additional suites making precise
_delta_logassertions that need the one-line write-hook opt-out.Does this PR introduce any user-facing changes?
Yes. By default, Delta now writes optional log compaction files (
<x>.<y>.compacted.json) to_delta_logbetween checkpoints (session config…deltaLog.minorCompaction.useForWrites, defaulttrue; table propertydelta.logCompactionInterval, default5). The defaultdelta.checkpointIntervalis unchanged (10). These files require no protocol or table-feature change and are ignored by readers that don't support them; compaction can be turned off viadeltaLog.minorCompaction.useForWrites = false.