diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala index 2ccb2adb24b..b5025c1ae34 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala @@ -484,6 +484,21 @@ trait DeltaConfigsBase extends DeltaLogging { _ > 0, "needs to be a positive integer.") + /** + * The commit interval at which the log-compaction post-commit hook produces a log compaction + * file (`..compacted.json`), when log compaction is enabled + * (`deltaLog.minorCompaction.useForWrites`). A compaction is attempted after a commit whose + * version is a multiple of this interval. For the produced files to be non-overlapping and usable + * by readers, the checkpoint interval ([[CHECKPOINT_INTERVAL]]) should be a multiple of (and + * larger than) this value (e.g. the defaults: checkpoint every 10 commits, compaction every 5). + */ + val LOG_COMPACTION_INTERVAL = buildConfig[Int]( + "logCompactionInterval", + "5", + _.toInt, + _ >= 2, + "needs to be an integer >= 2.") + /** * This is the property that describes the table redirection detail. It is a JSON string format * of the `TableRedirectConfiguration` class, which includes following attributes: diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/LogCompaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/LogCompaction.scala new file mode 100644 index 00000000000..ebf32fe0603 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/LogCompaction.scala @@ -0,0 +1,269 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import java.nio.file.FileAlreadyExistsException + +import org.apache.spark.sql.delta.actions.{Action, InMemoryLogReplay} +import org.apache.spark.sql.delta.logging.DeltaLogKeys +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames} +import org.apache.spark.sql.delta.util.FileNames.{CompactedDeltaFile, DeltaFile} + +import org.apache.spark.internal.MDC +import org.apache.spark.sql.SparkSession + +/** + * Utilities for creating log compaction files. A log compaction file aggregates the actions + * from a range of commit files `[startVersion, endVersion]` into a single JSON file by following + * the same action reconciliation rules as a checkpoint, but without full state reconstruction. + * + * Per the Delta protocol, log compaction files: + * - Reside in `_delta_log` and are named `..compacted.json`. + * - Contain the reconciled actions for the commit range (one action per line), without + * `commitInfo` actions (these are stripped during reconciliation, same as checkpoints). + * - Are optional: writers may produce them and readers may consume them. They do not require any + * protocol or table-feature upgrade; readers that do not understand them simply ignore them. + * - Replace the corresponding individual commit files during snapshot construction on the read + * path, speeding it up while keeping the checkpoint interval high. + */ +object LogCompaction extends DeltaLogging { + + /** opType under which all log compaction telemetry is recorded. */ + private[delta] val OP_TYPE = "delta.logCompaction" + + // Possible values of `LogCompactionMetrics.status`. + private[delta] val STATUS_COMPLETED = "completed" + private[delta] val STATUS_SKIPPED = "skipped" + + // Possible values of `LogCompactionMetrics.skipReason` (only set when status is "skipped"). + private[delta] val SKIP_REASON_TARGET_EXISTS = "targetAlreadyExists" + private[delta] val SKIP_REASON_CONCURRENT_WRITE = "concurrentlyCreated" + private[delta] val SKIP_REASON_WINDOW_TOO_LARGE = "windowTooLarge" + + /** + * Creates a log compaction file for the table covering commits `[startVersion, endVersion]` + * (both inclusive). The resulting file contains the reconciled actions for the range (following + * the same rules as checkpoint creation) without `commitInfo`. + * + * The individual commit files in the range are read using the path resolution of the provided + * `snapshot`, so this works for both regular tables and coordinated-commits / catalog-managed + * tables (where recent commits may live under `_delta_log/_staged_commits`). + * + * This is a no-op if a compaction file for the exact `[startVersion, endVersion]` range already + * exists (idempotent): the common case is short-circuited by an existence check that avoids + * redundant reconciliation, and the write itself uses `overwrite = false` so a concurrent writer + * that produced the same (deterministic) file cannot be clobbered. + * + * It is also a no-op if the combined size of the window's commit files exceeds + * `deltaLog.minorCompaction.maxWindowSizeBytes`: reconciliation happens in a single in-memory log + * replay on the driver, so this bounds the driver memory a single compaction can consume. + * + * Each invocation emits a `delta.logCompaction.stats` telemetry event (see + * [[LogCompactionMetrics]]) capturing the outcome (completed / skipped with a reason), duration, + * number of commits and actions, and the resulting file size. + * + * @param deltaLog The [[DeltaLog]] instance for the table. + * @param snapshot A snapshot at or after `endVersion`, used to resolve commit file paths. + * @param startVersion The start version of the compaction range (inclusive). + * @param endVersion The end version of the compaction range (inclusive). + */ + def compact( + deltaLog: DeltaLog, + snapshot: Snapshot, + startVersion: Long, + endVersion: Long): Unit = { + require( + endVersion > startVersion, + s"endVersion ($endVersion) must be greater than startVersion ($startVersion)") + + val hadoopConf = deltaLog.newDeltaHadoopConf() + val compactedFilePath = + FileNames.compactedDeltaFile(deltaLog.logPath, startVersion, endVersion) + val fs = deltaLog.logPath.getFileSystem(hadoopConf) + val startTimeMs = System.currentTimeMillis() + def elapsedMs: Long = System.currentTimeMillis() - startTimeMs + + // Idempotency fast path: skip the (potentially expensive) reconciliation and write if the + // compaction file for this exact range already exists, e.g. because a concurrent writer + // already produced it. This avoids recomputing the file in the common case; the write below + // additionally closes the residual race window atomically via overwrite = false. + if (fs.exists(compactedFilePath)) { + logInfo( + log"Skipping log compaction; file already exists " + + log"${MDC(DeltaLogKeys.PATH, compactedFilePath)}") + recordCompactionStats(deltaLog, LogCompactionMetrics( + startVersion = startVersion, + endVersion = endVersion, + status = STATUS_SKIPPED, + durationMs = elapsedMs, + skipReason = Some(SKIP_REASON_TARGET_EXISTS))) + return + } + + // Driver-memory guard: the reconciliation below replays the whole window in a single + // in-memory log replay on the driver (unlike checkpointing and snapshot construction, which + // reconcile distributedly). A window containing very large commits could therefore pressure + // driver memory, so skip compaction when the combined size of the window's commit files + // exceeds the configured threshold. The commits remain available as individual delta files and + // are subsumed by the next checkpoint; readers transparently fall back to them. + val windowSizeBytes = windowCommitSizeBytes(snapshot, startVersion, endVersion) + val maxWindowSizeBytes = + SparkSession.active.conf.get(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_MAX_WINDOW_SIZE) + if (maxWindowSizeBytes > 0 && windowSizeBytes > maxWindowSizeBytes) { + logInfo( + log"Skipping log compaction for versions " + + log"[${MDC(DeltaLogKeys.START_VERSION, startVersion)}, " + + log"${MDC(DeltaLogKeys.END_VERSION, endVersion)}]; window size " + + log"${MDC(DeltaLogKeys.NUM_BYTES, windowSizeBytes)} exceeds the configured maximum " + + log"${MDC(DeltaLogKeys.MAX_SIZE, maxWindowSizeBytes)}") + recordCompactionStats(deltaLog, LogCompactionMetrics( + startVersion = startVersion, + endVersion = endVersion, + status = STATUS_SKIPPED, + durationMs = elapsedMs, + skipReason = Some(SKIP_REASON_WINDOW_TOO_LARGE), + windowSizeBytes = windowSizeBytes)) + return + } + + val fileProvider = DeltaCommitFileProvider(snapshot) + val logReplay = new InMemoryLogReplay( + minFileRetentionTimestamp = None, + minSetTransactionRetentionTimestamp = None, + // A compaction file incrementally replaces its commit range, so (unlike a checkpoint) it + // must retain `removed = true` DomainMetadata tombstones to suppress earlier adds. + retainDomainMetadataTombstones = true) + + (startVersion to endVersion).foreach { version => + val file = fileProvider.deltaFile(version) + // `readAsIterator` returns a ClosableIterator backed by an open input stream. Keep a + // reference to it (rather than chaining `.map`, which discards the close handle) and close + // it explicitly so the stream is released even if reconciliation is interrupted mid-file. + val actions = deltaLog.store.readAsIterator(file, hadoopConf) + try { + logReplay.append(version, actions.map(Action.fromJson)) + } finally { + actions.close() + } + } + + // Count the reconciled actions as a side effect while the store consumes the iterator below. + var numActions = 0L + val actionsToWrite = logReplay.checkpoint.map { action => + numActions += 1 + action.json + } + + // Write with overwrite = false so that, in the residual race where another writer produced the + // same compaction file between the existence check above and this write, we don't clobber it. + // The reconciled content for a given [startVersion, endVersion] range is deterministic, so an + // already-present file is equivalent; a FileAlreadyExistsException is therefore treated as a + // successful no-op rather than an error. + try { + deltaLog.store.write( + path = compactedFilePath, + actions = actionsToWrite, + overwrite = false, + hadoopConf = hadoopConf) + } catch { + case _: FileAlreadyExistsException => + logInfo( + log"Skipping log compaction; file already exists " + + log"${MDC(DeltaLogKeys.PATH, compactedFilePath)}") + recordCompactionStats(deltaLog, LogCompactionMetrics( + startVersion = startVersion, + endVersion = endVersion, + status = STATUS_SKIPPED, + durationMs = elapsedMs, + skipReason = Some(SKIP_REASON_CONCURRENT_WRITE))) + return + } + + recordCompactionStats(deltaLog, LogCompactionMetrics( + startVersion = startVersion, + endVersion = endVersion, + status = STATUS_COMPLETED, + durationMs = elapsedMs, + numCommitsCompacted = endVersion - startVersion + 1, + numActions = numActions, + compactedFileSizeBytes = fs.getFileStatus(compactedFilePath).getLen, + windowSizeBytes = windowSizeBytes)) + + logInfo( + log"Created log compaction file " + + log"${MDC(DeltaLogKeys.PATH, compactedFilePath)} " + + log"for versions [${MDC(DeltaLogKeys.START_VERSION, startVersion)}, " + + log"${MDC(DeltaLogKeys.END_VERSION, endVersion)}]") + } + + /** + * Returns the combined size, in bytes, of the commit files for versions + * `[startVersion, endVersion]` as seen by `snapshot`'s log segment. Used to bound the driver + * memory consumed by reconciling the window. The sizes come from the already-listed + * [[org.apache.hadoop.fs.FileStatus]] entries, so this adds no extra file-system calls. + * + * Best-effort: this assumes `snapshot.logSegment` covers `[startVersion, endVersion]`, which + * holds on the post-commit-hook path (the window is always entirely after the checkpoint and is + * represented by individual commit files in the segment). If called directly with a snapshot + * whose segment does not cover the range, in-range entries may be missing and the result will + * under-count (possibly `0`) - so the size guard fails open (compaction proceeds) rather than + * ever wrongly tripping. + */ + private def windowCommitSizeBytes( + snapshot: Snapshot, + startVersion: Long, + endVersion: Long): Long = { + snapshot.logSegment.deltas.iterator.collect { + case DeltaFile(fileStatus, version) if version >= startVersion && version <= endVersion => + fileStatus.getLen + case CompactedDeltaFile(fileStatus, lo, hi) if lo >= startVersion && hi <= endVersion => + fileStatus.getLen + }.sum + } + + /** Emits a single log compaction telemetry event. */ + private def recordCompactionStats(deltaLog: DeltaLog, metrics: LogCompactionMetrics): Unit = + recordDeltaEvent(deltaLog, opType = s"$OP_TYPE.stats", data = metrics) +} + +/** + * Telemetry for a single [[LogCompaction.compact]] invocation, emitted as the JSON blob of the + * `delta.logCompaction.stats` event. + * + * @param startVersion The (inclusive) start version of the compaction range. + * @param endVersion The (inclusive) end version of the compaction range. + * @param status `completed` if a file was written, `skipped` if it already existed. + * @param durationMs Wall-clock time spent in `compact`, in milliseconds. + * @param skipReason When `status == skipped`, why it was skipped (see `SKIP_REASON_*`). + * @param numCommitsCompacted Number of commit files reconciled into the compaction file. + * @param numActions Number of actions written to the compaction file. + * @param compactedFileSizeBytes Size of the written compaction file, in bytes. + * @param windowSizeBytes Combined size of the window's commit files, in bytes (the value the + * driver-memory guard checks). + */ +case class LogCompactionMetrics( + startVersion: Long, + endVersion: Long, + status: String, + durationMs: Long, + skipReason: Option[String] = None, + numCommitsCompacted: Long = 0L, + numActions: Long = 0L, + compactedFileSizeBytes: Long = 0L, + windowSizeBytes: Long = 0L) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala b/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala index 5fdf570f974..35b4b53ca97 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala @@ -95,7 +95,14 @@ trait MetadataCleanup extends DeltaLogging { val fs = logPath.getFileSystem(newDeltaHadoopConf()) var numDeleted = 0 - val expiredDeltaLogs = listExpiredDeltaLogs(fileCutOffTime.getTime) + // Expired log compaction files are peeled off the same `_delta_log` listing that drives the + // delta/checkpoint/checksum cleanup below, so we don't pay for a second full directory + // listing (expensive on object stores). They are collected here as that listing is consumed + // and deleted separately further down. See `listExpiredDeltaLogs` for the (best-effort, + // self-healing) semantics in the rare case where the listing is not fully traversed. + val expiredCompactionDeltaFiles = ArrayBuffer.empty[FileStatus] + val expiredDeltaLogs = listExpiredDeltaLogs( + fileCutOffTime.getTime, onExpiredCompactionFile = expiredCompactionDeltaFiles += _) if (expiredDeltaLogs.hasNext) { // Trigger compatibility checkpoint creation logic only when this round of metadata cleanup // is going to delete any deltas/checkpoint files. @@ -122,6 +129,12 @@ trait MetadataCleanup extends DeltaLogging { } } } + // Log compaction files are optional, derived optimization files. They were peeled off the + // shared listing above as it was consumed by the deletion loop, and are deleted here in a + // separate pass - never through the BufferingLogDeletionIterator - so that they can never + // influence the retention decisions for actual commit/checkpoint/checksum files, which is + // critical because incorrectly deleting those would corrupt the table. + numDeleted += expiredCompactionDeltaFiles.count(file => fs.delete(file.getPath, false)) val commitDirPath = FileNames.commitDirPath(logPath) // Commit Directory might not exist on tables created in older versions and // never updated since. @@ -165,13 +178,56 @@ trait MetadataCleanup extends DeltaLogging { * considered as expired, it must: * - have a checkpoint file after it * - be older than `fileCutOffTime` + * + * Log compaction files (`..compacted.json`) are derived/optional optimization files that + * span a `[startVersion, endVersion]` range. They must NOT be fed into the + * [[BufferingLogDeletionIterator]] (which adjusts time-travel timestamps per single commit and + * whose retention decisions for real commit files could be perturbed by a range-spanning file). + * Instead, to avoid a second full `_delta_log` listing (expensive on object stores), expired + * compaction files are reported via `onExpiredCompactionFile` for deletion in a separate pass. + * A compaction file is expired when: + * - its start version is at or before the latest checkpoint version (read from + * `_last_checkpoint`), so the checkpoint already subsumes the commit range it covers and the + * file is no longer selected when building the latest snapshot (see + * `useCompactedDeltasForLogSegment`), and + * - it is older than `fileCutOffTime` - the age gate that actually preserves any file still + * needed within the retention / time-travel window. (So although the version gate uses the + * latest checkpoint rather than the protocol's retention `cutOffCheckpoint`, the age gate + * ensures no file inside the retention window is ever deleted.) + * + * Expired compaction files are peeled off this same listing as it is consumed by the returned + * [[BufferingLogDeletionIterator]] (reported via `onExpiredCompactionFile`), which avoids a + * second listing. This peeling is best-effort: the iterator may stop pulling from the listing + * early when it reaches a non-expired segment before a checkpoint, in which case any compaction + * files sorting after that point are not seen in this round. That is safe and self-healing - + * compaction files are derived/optional, a file with `startVersion <= checkpointVersion` is + * already subsumed by the checkpoint and is never selected by readers (see + * `useCompactedDeltasForLogSegment`), and it is collected by a later cleanup round once the + * retention cutoff advances past the blocking segment. Commit/checkpoint/checksum retention is + * identical either way (those files are not affected by the peeling). The + * `metadataCleanupAllowed` validation path passes the default no-op callback. */ - private def listExpiredDeltaLogs(fileCutOffTime: Long): Iterator[FileStatus] = { + private def listExpiredDeltaLogs( + fileCutOffTime: Long, + onExpiredCompactionFile: FileStatus => Unit = _ => ()): Iterator[FileStatus] = { val latestCheckpoint = readLastCheckpointFile() if (latestCheckpoint.isEmpty) return Iterator.empty - val threshold = latestCheckpoint.get.version - 1L + val checkpointVersion = latestCheckpoint.get.version + val threshold = checkpointVersion - 1L val files = store.listFrom(listingPrefix(logPath, 0), newDeltaHadoopConf()) - .filter(f => isCheckpointFile(f) || isDeltaFile(f) || isChecksumFile(f)) + .filter { file => + if (isCompactedDeltaFile(file)) { + // Peel expired compaction files off this shared listing (deleted in a separate pass), + // and exclude every compaction file from the BufferingLogDeletionIterator below. + if (file.getModificationTime <= fileCutOffTime && + compactedDeltaVersions(file)._1 <= checkpointVersion) { + onExpiredCompactionFile(file) + } + false + } else { + isCheckpointFile(file) || isDeltaFile(file) || isChecksumFile(file) + } + } new BufferingLogDeletionIterator( files, fileCutOffTime, threshold, getDeltaFileChecksumOrCheckpointVersion) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 68634fdbebe..aa9928d8e05 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.delta.constraints.{Constraints, Invariants} import org.apache.spark.sql.delta.coordinatedcommits.{CatalogOwnedTableUtils, CoordinatedCommitsUtils, TableCommitCoordinatorClient} import org.apache.spark.sql.delta.coordinatedcommits.CatalogTrackedInfo import org.apache.spark.sql.delta.files._ -import org.apache.spark.sql.delta.hooks.{CheckpointHook, ChecksumHook, GenerateSymlinkManifest, HudiConverterHook, IcebergConverterHook, PostCommitHook, UpdateCatalogFactory} +import org.apache.spark.sql.delta.hooks.{CheckpointHook, ChecksumHook, GenerateSymlinkManifest, HudiConverterHook, IcebergConverterHook, LogCompactionHook, PostCommitHook, UpdateCatalogFactory} import org.apache.spark.sql.delta.hooks.metrics.UpdateMetricsHook import org.apache.spark.sql.delta.util.CatalogTableUtils import org.apache.spark.sql.delta.implicits.addFileEncoder @@ -475,6 +475,7 @@ trait OptimisticTransactionImpl extends TransactionHelper } // The CheckpointHook will only checkpoint if necessary, so always register it to run. registerPostCommitHook(CheckpointHook) + registerPostCommitHook(LogCompactionHook) registerPostCommitHook(HudiConverterHook) /** The protocol of the snapshot that this transaction is reading at. */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala index b4a66590a20..70a4f6b1c8e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/InMemoryLogReplay.scala @@ -29,6 +29,10 @@ import java.net.URI * A [[RemoveFile]] "corresponds" to the [[AddFile]] that matches both the parquet file URI * *and* the deletion vector's URI (if any). * - The most recent version for any `appId` in a [[SetTransaction]] wins. + * - The most recent [[DomainMetadata]] for any `domain` wins. A `removed = true` action is a + * tombstone: it is excluded from the output by default (snapshot/checkpoint semantics), but + * retained when `retainDomainMetadataTombstones` is set, so that an incremental log compaction + * can suppress an earlier `add` that precedes the compacted window. * - The most recent [[Metadata]] wins. * - The most recent [[Protocol]] version wins. * - For each `(path, dv id)` tuple, this class should always output only one [[FileAction]] @@ -38,7 +42,8 @@ import java.net.URI */ class InMemoryLogReplay( minFileRetentionTimestamp: Option[Long], - minSetTransactionRetentionTimestamp: Option[Long]) extends LogReplay { + minSetTransactionRetentionTimestamp: Option[Long], + retainDomainMetadataTombstones: Boolean = false) extends LogReplay { import InMemoryLogReplay._ @@ -62,9 +67,11 @@ class InMemoryLogReplay( actions.foreach { case a: SetTransaction => transactions(a.appId) = a - case a: DomainMetadata if a.removed => - domainMetadatas.remove(a.domain) - case a: DomainMetadata if !a.removed => + case a: DomainMetadata => + // Keep the latest action per domain, *including* `removed = true` tombstones. A full + // checkpoint/snapshot excludes tombstones (see `getDomainMetadatas`), but an incremental + // log compaction must retain them so that a domain whose `add` precedes the compacted + // window is still suppressed when the compaction replaces those commits. domainMetadatas(a.domain) = a case _: CheckpointOnlyAction => // Ignore this while doing LogReplay case a: Metadata => @@ -107,7 +114,17 @@ class InMemoryLogReplay( } } - private[delta] def getDomainMetadatas: Iterable[DomainMetadata] = domainMetadatas.values + private[delta] def getDomainMetadatas: Iterable[DomainMetadata] = + if (retainDomainMetadataTombstones) { + // Log compaction is an incremental replacement for its commit range, so it must retain + // `removed = true` tombstones to suppress earlier (pre-window) adds, just like RemoveFile + // tombstones are retained for files. + domainMetadatas.values + } else { + // Snapshots and checkpoints exclude domain tombstones: per the protocol, removed domains + // are not returned by snapshot reads and are not preserved in checkpoints. + domainMetadatas.values.filterNot(_.removed) + } /** Returns the current state of the Table as an iterator of actions. */ override def checkpoint: Iterator[Action] = { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/LogCompactionHook.scala b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/LogCompactionHook.scala new file mode 100644 index 00000000000..228c21e99c0 --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/LogCompactionHook.scala @@ -0,0 +1,88 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.hooks + +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.sources.DeltaSQLConf + +import org.apache.spark.sql.SparkSession + +/** + * Post-commit hook that creates log compaction files (`..compacted.json`) to speed up + * snapshot construction without the cost of a full checkpoint. + * + * When enabled (`deltaLog.minorCompaction.useForWrites`), after a commit whose version is a + * multiple of the configured interval (table property `delta.logCompactionInterval`), this hook + * reconciles the most recent window of commits into a single compaction file. The window is + * `[startVersion, committedVersion]` where + * `startVersion = max(committedVersion - interval + 1, lastCheckpointVersion + 1)`. Bounding + * the window below by the last checkpoint avoids producing a compaction that spans (and is thus + * subsumed by) a checkpoint. + * + * Reconciling a fixed window keeps the produced compaction files non-overlapping so that, when the + * checkpoint interval is a multiple of the compaction interval, they tile the commit range and can + * be chained by the reader (see `useCompactedDeltasForLogSegment`). Producing an ever-growing range + * would instead yield overlapping files where the reader only ever uses the smallest one. + * + * If a checkpoint was just written for this commit, the hook does nothing, because a checkpoint + * already subsumes the commits a compaction would cover. + * + * Per the protocol, compaction files (like checkpoints) may only be produced for commit versions + * already published in `_delta_log`. On catalog-managed / coordinated-commits tables the hook + * therefore synchronously backfills the window's commits before compacting, mirroring how + * checkpoint writing publishes commits via `Snapshot.ensureCommitFilesBackfilled`. + * + * Log compaction files are optional and do not require any protocol or table-feature upgrade. + * Readers that support compacted deltas (`deltaLog.minorCompaction.useForReads`) use them to speed + * up snapshot construction; readers that don't simply ignore them. + */ +object LogCompactionHook extends PostCommitHook { + + override val name: String = "Post commit log compaction" + + override def run(spark: SparkSession, txn: CommittedTransaction): Unit = { + if (!spark.conf.get(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES)) return + // A checkpoint already subsumes the commits a compaction would cover, so skip when one was + // just written for this commit. + if (txn.needsCheckpoint) return + + val interval = + DeltaConfigs.LOG_COMPACTION_INTERVAL.fromMetaData(txn.postCommitSnapshot.metadata) + val endVersion = txn.committedVersion + // Only compact on interval boundaries to keep the produced windows non-overlapping. + if (endVersion <= 0 || endVersion % interval != 0) return + + // Don't start a compaction at or before the latest checkpoint: those commits are already + // subsumed by the checkpoint, so such a compaction would never be used by the reader. + val checkpointVersion = txn.postCommitSnapshot.logSegment.checkpointProvider.version + val startVersion = math.max(endVersion - interval + 1, checkpointVersion + 1) + + // Need at least two commits in the range to produce a useful compaction. + if (endVersion <= startVersion) return + + // Per the protocol, a log compaction file may only be produced for versions already published + // (backfilled) in `_delta_log` (i.e. `_delta_log/.json` must exist for every `v` in the + // range). On coordinated-commits / catalog-managed tables recent commits may still be staged + // under `_delta_log/_staged_commits`, so we synchronously backfill them first - the same way + // checkpoint writing ensures published commits (see `Snapshot.ensureCommitFilesBackfilled`, + // invoked from `Checkpoints.writeCheckpointFiles`). This is a no-op on filesystem-based tables + // (no commit coordinator). + txn.postCommitSnapshot.ensureCommitFilesBackfilled(txn.catalogTable) + + LogCompaction.compact(txn.deltaLog, txn.postCommitSnapshot, startVersion, endVersion) + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 8f2eab60947..c8c4777455e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -2967,6 +2967,34 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils { .booleanConf .createWithDefault(true) + val DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES = + buildConf("deltaLog.minorCompaction.useForWrites") + .doc( + "If true, a post-commit hook will periodically create log compaction files " + + "(`..compacted.json`) that aggregate the actions of a range of commits. " + + "Readers that support compacted deltas (see `deltaLog.minorCompaction.useForReads`) " + + "can use them to speed up snapshot construction without the cost of a full " + + "checkpoint. This is most effective when the checkpoint interval is a multiple of " + + "and larger than `delta.logCompactionInterval`. Compaction files are optional " + + "and do not require any protocol or table feature upgrade.") + .internal() + .booleanConf + .createWithDefault(true) + + val DELTALOG_MINOR_COMPACTION_MAX_WINDOW_SIZE = + buildConf("deltaLog.minorCompaction.maxWindowSizeBytes") + .doc( + "The maximum combined size (in bytes) of the commit files in a log compaction window. " + + "The log compaction post-commit hook reconciles a window of commits in a single " + + "in-memory log replay on the driver, so a window containing very large commits could " + + "pressure driver memory. If the combined size of the window's commit files exceeds this " + + "threshold, log compaction is skipped for that window; the commits remain available as " + + "individual delta files and are subsumed by the next checkpoint. Set to a non-positive " + + "value to disable the guard (no size limit).") + .internal() + .bytesConf(ByteUnit.BYTE) + .createWithDefault(1024L * 1024L * 1024L) // 1 GiB + val ICEBERG_MAX_COMMITS_TO_CONVERT = buildConf("iceberg.maxPendingCommits") .doc(""" |The maximum number of pending Delta commits to convert to Iceberg incrementally. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala index 7e5f3663e3b..9c368eac8c4 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogMinorCompactionSuite.scala @@ -38,6 +38,13 @@ class DeltaLogMinorCompactionSuite extends QueryTest with DeltaSQLTestUtils with CatalogOwnedTestBaseSuite { + // This suite exercises *reading* of compaction files that it creates manually via + // `minorCompactDeltaLog`. Disable the log-compaction post-commit hook (on by default) so its + // automatically produced compaction files don't interfere with the precise sets asserted here. + // The write path is covered by LogCompactionSuite. + override protected def sparkConf: SparkConf = + super.sparkConf.set(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES.key, "false") + /** Helper method to do minor compaction of [[DeltaLog]] from [startVersion, endVersion] */ private def minorCompactDeltaLog( tablePath: String, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala index 506e882fbf0..ceb771fd414 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala @@ -323,6 +323,53 @@ class DeltaRetentionSuite extends QueryTest } } + test("log compaction files are cleaned up based on their start version") { + withTempDir { tempDir => + val clock = new ManualClock(getStartTimeForRetentionTest) + val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock) + val logPath = new File(log.logPath.toUri) + + def compactedRanges(): Set[(Long, Long)] = { + logPath.listFiles() + .map(f => new Path(f.getCanonicalPath)) + .filter(FileNames.isCompactedDeltaFile) + .map(FileNames.compactedDeltaVersions) + .toSet + } + + // Commit versions 0..4 and create a checkpoint at version 4 (the cutoff checkpoint). + startTxnWithManualLogCleanup(log).commit(createTestAddFile(encodedPath = "0") :: Nil, testOp) + (1 to 4).foreach { i => + log.startTransaction().commit(createTestAddFile(encodedPath = i.toString) :: Nil, testOp) + } + // A compaction starting before the cutoff checkpoint must be deleted. + LogCompaction.compact(log, log.update(), startVersion = 1, endVersion = 3) + log.checkpoint() + + // Commit versions 5..6 (after the checkpoint) and compact them. This compaction starts after + // the cutoff checkpoint, so it must be retained. + (5 to 6).foreach { i => + log.startTransaction().commit(createTestAddFile(encodedPath = i.toString) :: Nil, testOp) + } + LogCompaction.compact(log, log.update(), startVersion = 5, endVersion = 6) + // A compaction whose start version equals the checkpoint version (4) is also subsumed by the + // checkpoint and must be deleted (boundary of the `startVersion <= checkpointVersion` rule). + LogCompaction.compact(log, log.update(), startVersion = 4, endVersion = 6) + + assert(compactedRanges() === Set((1L, 3L), (4L, 6L), (5L, 6L))) + + clock.advance(intervalStringToMillis(DeltaConfigs.LOG_RETENTION.defaultValue) + + intervalStringToMillis("interval 1 day")) + log.cleanUpExpiredLogs(log.update()) + + // The [1, 3] and [4, 6] compactions (start version <= the cutoff checkpoint version 4) are + // deleted, while the [5, 6] compaction (start version after the checkpoint) is retained. + assert(compactedRanges() === Set((5L, 6L))) + // Sanity check: the pre-checkpoint commit files were also cleaned up. + assert(!getFileVersions(getDeltaFiles(logPath)).contains(1L)) + } + } + test("allow users to expire transaction identifiers from checkpoints") { withTempDir { dir => val clock = new ManualClock(getStartTimeForRetentionTest) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuiteBase.scala index 304d8365394..1e478f4a8fe 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuiteBase.scala @@ -47,6 +47,11 @@ trait DeltaRetentionSuiteBase extends QueryTest protected override def sparkConf: SparkConf = super.sparkConf // Disable the log cleanup because it runs asynchronously and causes test flakiness .set("spark.databricks.delta.properties.defaults.enableExpiredLogCleanup", "false") + // The log-compaction post-commit hook is enabled by default and, with the default compaction + // interval smaller than the checkpoint interval, would create compaction files that interfere + // with these tests' precise log-file assertions. Disable it here; the compaction-cleanup test + // in this suite creates the compaction files it needs explicitly via `LogCompaction.compact`. + .set(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES.key, "false") protected def intervalStringToMillis(str: String): Long = { DeltaConfigs.getMilliSeconds( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/LogCompactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/LogCompactionSuite.scala new file mode 100644 index 00000000000..5f079d715e0 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/LogCompactionSuite.scala @@ -0,0 +1,448 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.actions.{Action, AddFile, CommitInfo, DomainMetadata, SetTransaction} +import org.apache.spark.sql.delta.hooks.LogCompactionHook +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Tests for the writing side of log compaction files: the [[LogCompaction]] writer and the + * [[LogCompactionHook]] post-commit hook that produces `..compacted.json` files. + * + * The reading side (consuming compaction files for snapshot construction) is covered by + * [[DeltaLogMinorCompactionSuite]]. + */ +class LogCompactionSuite extends QueryTest + with SharedSparkSession + with DeltaSQLCommandTest { + + /** Returns the sorted (startVersion, endVersion) ranges of compaction files on disk. */ + private def compactedRanges(deltaLog: DeltaLog): Seq[(Long, Long)] = { + val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf()) + fs.listStatus(deltaLog.logPath) + .filter(FileNames.isCompactedDeltaFile) + .map(f => FileNames.compactedDeltaVersions(f.getPath)) + .sorted + .toSeq + } + + /** Returns the sorted versions of checkpoint files on disk. */ + private def checkpointVersions(deltaLog: DeltaLog): Seq[Long] = { + val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf()) + fs.listStatus(deltaLog.logPath) + .filter(FileNames.isCheckpointFile) + .map(f => FileNames.checkpointVersion(f.getPath)) + .distinct + .sorted + .toSeq + } + + /** Appends a single row with the given value, producing one new commit. */ + private def appendRow(path: String, value: Long): Unit = { + spark.range(value, value + 1).write.format("delta").mode("append").save(path) + } + + /** + * Appends rows until the table reaches `targetVersion` (inclusive). The committed values are + * `0..targetVersion`, one per commit, so the table contents are `spark.range(targetVersion + 1)`. + */ + private def commitUpToVersion(path: String, targetVersion: Long): DeltaLog = { + (0L to targetVersion).foreach(v => appendRow(path, v)) + val deltaLog = DeltaLog.forTable(spark, path) + assert(deltaLog.update().version === targetVersion) + deltaLog + } + + test("log compaction can be disabled") { + withSQLConf( + DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES.key -> "false", + DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "100") { + withTempDir { dir => + val path = dir.getCanonicalPath + val deltaLog = commitUpToVersion(path, 12) + assert(compactedRanges(deltaLog).isEmpty, + "no compaction files should be produced when the feature is disabled") + } + } + } + + test("log compaction is enabled by default") { + // Relies on the default `deltaLog.minorCompaction.useForWrites = true` and the default + // compaction interval of 5; a checkpoint interval larger than the compaction interval is still + // required for the hook to produce a compaction. + withSQLConf( + DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "100") { + withTempDir { dir => + val path = dir.getCanonicalPath + val deltaLog = commitUpToVersion(path, 5) + assert(compactedRanges(deltaLog) === Seq((1L, 5L)), + "a compaction over the default interval should be produced with the default config") + } + } + } + + test("hook produces non-overlapping fixed windows at the configured interval") { + withSQLConf( + DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES.key -> "true", + DeltaConfigs.LOG_COMPACTION_INTERVAL.defaultTablePropertyKey -> "5", + // High checkpoint interval so no checkpoint interferes with the windows. + DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "100") { + withTempDir { dir => + val path = dir.getCanonicalPath + val deltaLog = commitUpToVersion(path, 10) + // Triggered at v5 -> [1, 5] and v10 -> [6, 10]. The version 0 commit fills the [0, 0] gap. + assert(compactedRanges(deltaLog) === Seq((1L, 5L), (6L, 10L))) + assert(checkpointVersions(deltaLog).isEmpty) + } + } + } + + test("produced compaction files are used for snapshot construction and preserve state") { + withSQLConf( + DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES.key -> "true", + DeltaConfigs.LOG_COMPACTION_INTERVAL.defaultTablePropertyKey -> "5", + DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "100") { + withTempDir { dir => + val path = dir.getCanonicalPath + commitUpToVersion(path, 10) + + DeltaLog.clearCache() + val compactedSnapshot = DeltaLog.forTable(spark, path).unsafeVolatileSnapshot + val usedCompactions = compactedSnapshot.logSegment.deltas + .map(_.getPath) + .filter(FileNames.isCompactedDeltaFile) + .map(p => FileNames.compactedDeltaVersions(p)) + .sorted + assert(usedCompactions === Seq((1L, 5L), (6L, 10L)), + "snapshot should be backed by the compaction files instead of individual commits") + + // The snapshot built from compaction files must match the one built from raw commits. + withSQLConf(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_READS.key -> "false") { + DeltaLog.clearCache() + val rawSnapshot = DeltaLog.forTable(spark, path).unsafeVolatileSnapshot + assert(rawSnapshot.logSegment.deltas + .forall(f => !FileNames.isCompactedDeltaFile(f.getPath))) + assert(rawSnapshot.computeChecksum === compactedSnapshot.computeChecksum) + checkAnswer(rawSnapshot.stateDF, compactedSnapshot.stateDF) + checkAnswer(rawSnapshot.allFiles.toDF(), compactedSnapshot.allFiles.toDF()) + } + + checkAnswer(spark.read.format("delta").load(path), spark.range(11).toDF()) + } + } + } + + test("a checkpoint subsumes compaction and bounds the next window") { + withSQLConf( + DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES.key -> "true", + DeltaConfigs.LOG_COMPACTION_INTERVAL.defaultTablePropertyKey -> "5", + DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "10") { + withTempDir { dir => + val path = dir.getCanonicalPath + val deltaLog = commitUpToVersion(path, 15) + // v5 -> [1, 5] + // v10 -> skipped, a checkpoint is written instead (checkpoint subsumes compaction) + // v15 -> [11, 15], bounded below by checkpoint(10) + 1, not v15 - interval + 1 = 11 + assert(checkpointVersions(deltaLog).contains(10L)) + assert(compactedRanges(deltaLog) === Seq((1L, 5L), (11L, 15L))) + + // The snapshot should use the checkpoint at 10 plus the [11, 15] compaction. + DeltaLog.clearCache() + val snapshot = DeltaLog.forTable(spark, path).unsafeVolatileSnapshot + assert(snapshot.logSegment.checkpointProvider.version === 10L) + val usedCompactions = snapshot.logSegment.deltas + .map(_.getPath) + .filter(FileNames.isCompactedDeltaFile) + .map(p => FileNames.compactedDeltaVersions(p)) + .sorted + assert(usedCompactions === Seq((11L, 15L))) + } + } + } + + test("no compaction is produced before a full window of commits exists") { + withSQLConf( + DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES.key -> "true", + DeltaConfigs.LOG_COMPACTION_INTERVAL.defaultTablePropertyKey -> "5", + DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "100") { + withTempDir { dir => + val path = dir.getCanonicalPath + // Only versions 0..4 exist; the first interval boundary (v5) hasn't been reached. + val deltaLog = commitUpToVersion(path, 4) + assert(compactedRanges(deltaLog).isEmpty) + } + } + } + + test("hook is registered on every transaction") { + withTempDir { dir => + val path = dir.getCanonicalPath + appendRow(path, 0) + val txn = DeltaLog.forTable(spark, path).startTransaction() + assert(txn.containsPostCommitHook(LogCompactionHook)) + } + } + + test("LogCompaction.compact writes a reconciled file without commitInfo") { + withSQLConf( + DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "100") { + withTempDir { dir => + val path = dir.getCanonicalPath + val deltaLog = commitUpToVersion(path, 5) + + LogCompaction.compact(deltaLog, deltaLog.update(), startVersion = 1, endVersion = 4) + + val compactedPath = FileNames.compactedDeltaFile(deltaLog.logPath, 1, 4) + val actions = deltaLog.store + .read(compactedPath, deltaLog.newDeltaHadoopConf()) + .map(Action.fromJson) + // commitInfo actions are stripped during reconciliation, just like checkpoints. + assert(actions.forall(!_.isInstanceOf[CommitInfo])) + assert(actions.exists(_.isInstanceOf[AddFile])) + } + } + } + + test("LogCompaction.compact is a no-op when the target file already exists") { + withSQLConf( + DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "100") { + withTempDir { dir => + val path = dir.getCanonicalPath + val deltaLog = commitUpToVersion(path, 5) + val hadoopConf = deltaLog.newDeltaHadoopConf() + val compactedPath = FileNames.compactedDeltaFile(deltaLog.logPath, 1, 4) + + // Pre-create the compaction file with sentinel content that real reconciliation would + // never produce, so we can detect whether `compact` overwrote it. + val sentinel = """{"sentinel":"do-not-overwrite"}""" + deltaLog.store.write(compactedPath, Iterator(sentinel), overwrite = true, hadoopConf) + + LogCompaction.compact(deltaLog, deltaLog.update(), startVersion = 1, endVersion = 4) + + // The existing file must be left untouched: `compact` skips when the target exists. + assert(deltaLog.store.read(compactedPath, hadoopConf).toSeq === Seq(sentinel), + "an existing compaction file should not be recomputed or overwritten") + } + } + } + + test("LogCompaction.compact emits telemetry for completed and skipped compactions") { + withSQLConf( + DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "100") { + withTempDir { dir => + val path = dir.getCanonicalPath + val deltaLog = commitUpToVersion(path, 5) + val snapshot = deltaLog.update() + + // A completed compaction reports the range, commit/action counts, and file size. + val completedLogs = DeltaTestUtils.collectUsageLogs("delta.logCompaction.stats") { + LogCompaction.compact(deltaLog, snapshot, startVersion = 1, endVersion = 4) + } + assert(completedLogs.size === 1, "exactly one stats event should be emitted") + val completed = + JsonUtils.mapper.readValue[LogCompactionMetrics](completedLogs.head.blob) + assert(completed.status === LogCompaction.STATUS_COMPLETED) + assert(completed.skipReason.isEmpty) + assert(completed.startVersion === 1 && completed.endVersion === 4) + assert(completed.numCommitsCompacted === 4) + assert(completed.numActions > 0, "the compaction should contain reconciled actions") + assert(completed.compactedFileSizeBytes > 0, "the written file should have a size") + assert(completed.durationMs >= 0) + + // Compacting the same range again is skipped with the target-already-exists reason. + val skippedLogs = DeltaTestUtils.collectUsageLogs("delta.logCompaction.stats") { + LogCompaction.compact(deltaLog, snapshot, startVersion = 1, endVersion = 4) + } + assert(skippedLogs.size === 1) + val skipped = JsonUtils.mapper.readValue[LogCompactionMetrics](skippedLogs.head.blob) + assert(skipped.status === LogCompaction.STATUS_SKIPPED) + assert(skipped.skipReason === Some(LogCompaction.SKIP_REASON_TARGET_EXISTS)) + } + } + } + + test("compaction preserves non-trivial action types (DVs, row tracking, domain metadata, txn)") { + withSQLConf( + // Keep versions deterministic: no auto-compaction, and no checkpoint within the range. + DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES.key -> "false", + DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "100") { + withTempDir { dir => + val path = dir.getCanonicalPath + // Deletion vectors + row tracking produce non-trivial actions (DV-bearing AddFiles and + // per-file baseRowId); domainMetadata is enabled explicitly so the raw DomainMetadata + // commit below is allowed. + sql( + s"""CREATE TABLE delta.`$path` (id LONG) USING delta TBLPROPERTIES ( + | 'delta.enableDeletionVectors' = 'true', + | 'delta.enableRowTracking' = 'true', + | 'delta.feature.domainMetadata' = 'supported')""".stripMargin) + spark.range(0, 50).write.format("delta").mode("append").save(path) // v1 + spark.range(50, 100).write.format("delta").mode("append").save(path) // v2 + // v3: a partial-file delete produces deletion vectors. + sql(s"DELETE FROM delta.`$path` WHERE id IN (3, 60)") + + val deltaLog = DeltaLog.forTable(spark, path) + // v4: an explicit domain metadata and a transaction identifier, committed directly. + deltaLog.startTransaction().commit( + Seq( + DomainMetadata("test.compaction.domain", """{"k":"v"}""", removed = false), + SetTransaction("app-id", 7, Some(123L))), + DeltaOperations.ManualUpdate) + + val endVersion = deltaLog.update().version + LogCompaction.compact( + deltaLog, deltaLog.update(), startVersion = 1, endVersion = endVersion) + + // The compaction file must preserve every non-trivial action type. + val compactedPath = FileNames.compactedDeltaFile(deltaLog.logPath, 1, endVersion) + val actions = deltaLog.store + .read(compactedPath, deltaLog.newDeltaHadoopConf()) + .map(Action.fromJson) + val addFiles = actions.collect { case a: AddFile => a } + assert(addFiles.exists(_.deletionVector != null), + "a deletion-vector-bearing AddFile must be preserved") + assert(addFiles.exists(_.baseRowId.isDefined), + "row-tracking baseRowId must be preserved on AddFiles") + assert( + actions.collect { case d: DomainMetadata => d.domain }.contains("test.compaction.domain"), + "domain metadata must be preserved") + assert( + actions.collect { case s: SetTransaction => s } + .exists(s => s.appId == "app-id" && s.version == 7), + "the transaction identifier must be preserved") + assert(actions.forall(!_.isInstanceOf[CommitInfo]), "commitInfo must be stripped") + + // The snapshot built from the compaction must be identical to one built from raw commits. + DeltaLog.clearCache() + val compactedSnapshot = DeltaLog.forTable(spark, path).unsafeVolatileSnapshot + assert( + compactedSnapshot.logSegment.deltas.exists(f => + FileNames.isCompactedDeltaFile(f.getPath)), + "the snapshot should be backed by the compaction file") + withSQLConf(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_READS.key -> "false") { + DeltaLog.clearCache() + val rawSnapshot = DeltaLog.forTable(spark, path).unsafeVolatileSnapshot + assert(rawSnapshot.computeChecksum === compactedSnapshot.computeChecksum) + checkAnswer(compactedSnapshot.allFiles.toDF(), rawSnapshot.allFiles.toDF()) + assert(compactedSnapshot.domainMetadata.toSet === rawSnapshot.domainMetadata.toSet) + assert(compactedSnapshot.setTransactions.toSet === rawSnapshot.setTransactions.toSet) + } + + // The data must read back correctly (deleted rows excluded). + checkAnswer(spark.read.format("delta").load(path), + spark.range(0, 100).where("id NOT IN (3, 60)").toDF()) + } + } + } + + test("compaction retains a domain metadata removal whose add precedes the window") { + withSQLConf( + DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES.key -> "false", + DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "100") { + withTempDir { dir => + val path = dir.getCanonicalPath + sql(s"CREATE TABLE delta.`$path` (id LONG) USING delta " + + "TBLPROPERTIES ('delta.feature.domainMetadata' = 'supported')") // v0 + val deltaLog = DeltaLog.forTable(spark, path) + // v1: add the domain - this is OUTSIDE the compacted window below. + deltaLog.startTransaction().commit( + Seq(DomainMetadata("test.domain", """{"k":"v"}""", removed = false)), + DeltaOperations.ManualUpdate) + // v2: remove the domain - this is INSIDE the compacted window. + deltaLog.startTransaction().commit( + Seq(DomainMetadata("test.domain", "", removed = true)), + DeltaOperations.ManualUpdate) + spark.range(0, 1).write.format("delta").mode("append").save(path) // v3 + + val endVersion = deltaLog.update().version + // Compact [2, endVersion]: the window contains the removal but NOT the original add. + LogCompaction.compact( + deltaLog, deltaLog.update(), startVersion = 2, endVersion = endVersion) + + // The compaction file must carry the removal tombstone (like a RemoveFile tombstone) so it + // suppresses the earlier add when it replaces the commit range. + val compactedPath = FileNames.compactedDeltaFile(deltaLog.logPath, 2, endVersion) + val actions = deltaLog.store + .read(compactedPath, deltaLog.newDeltaHadoopConf()) + .map(Action.fromJson) + assert( + actions.collect { case d: DomainMetadata if d.removed => d.domain } + .contains("test.domain"), + "the removal tombstone for an out-of-window domain add must be preserved") + + // The compaction-backed snapshot must agree with the raw-commit snapshot: domain removed. + DeltaLog.clearCache() + val compactedSnapshot = DeltaLog.forTable(spark, path).unsafeVolatileSnapshot + assert( + compactedSnapshot.logSegment.deltas.exists(f => + FileNames.isCompactedDeltaFile(f.getPath)), + "the snapshot should be backed by the compaction file") + assert(!compactedSnapshot.domainMetadata.exists(_.domain == "test.domain"), + "the domain must be absent (removed) in the compaction-backed snapshot") + withSQLConf(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_READS.key -> "false") { + DeltaLog.clearCache() + val rawSnapshot = DeltaLog.forTable(spark, path).unsafeVolatileSnapshot + assert(rawSnapshot.domainMetadata.map(_.domain).toSet === + compactedSnapshot.domainMetadata.map(_.domain).toSet) + } + } + } + } + + test("LogCompaction.compact skips a window whose commit files exceed the size guard") { + withSQLConf( + DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "100") { + withTempDir { dir => + val path = dir.getCanonicalPath + val deltaLog = commitUpToVersion(path, 5) + val snapshot = deltaLog.update() + val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf()) + val compactedPath = FileNames.compactedDeltaFile(deltaLog.logPath, 1, 4) + + // A 1-byte cap is exceeded by the (non-empty) window, so compaction is skipped: no file is + // written, and a `windowTooLarge` skip event carrying the measured window size is emitted. + val skippedLogs = withSQLConf( + DeltaSQLConf.DELTALOG_MINOR_COMPACTION_MAX_WINDOW_SIZE.key -> "1") { + DeltaTestUtils.collectUsageLogs("delta.logCompaction.stats") { + LogCompaction.compact(deltaLog, snapshot, startVersion = 1, endVersion = 4) + } + } + assert(!fs.exists(compactedPath), + "no compaction file should be written when the size guard trips") + assert(skippedLogs.size === 1) + val skipped = JsonUtils.mapper.readValue[LogCompactionMetrics](skippedLogs.head.blob) + assert(skipped.status === LogCompaction.STATUS_SKIPPED) + assert(skipped.skipReason === Some(LogCompaction.SKIP_REASON_WINDOW_TOO_LARGE)) + assert(skipped.windowSizeBytes > 1, + "the measured window size should exceed the configured cap") + + // With the guard disabled (non-positive threshold), the same window is compacted. + withSQLConf(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_MAX_WINDOW_SIZE.key -> "0") { + LogCompaction.compact(deltaLog, snapshot, startVersion = 1, endVersion = 4) + } + assert(fs.exists(compactedPath), + "the window should be compacted when the size guard is disabled") + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/LogCompactionWithCatalogOwnedSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/LogCompactionWithCatalogOwnedSuite.scala new file mode 100644 index 00000000000..d7013a39c8b --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/LogCompactionWithCatalogOwnedSuite.scala @@ -0,0 +1,85 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.coordinatedcommits.CatalogOwnedTestBaseSuite +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.util.FileNames + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Verifies that [[org.apache.spark.sql.delta.hooks.LogCompactionHook]] honours the protocol + * requirement that a log compaction file may only be produced for versions already published + * (backfilled) in `_delta_log` (see PROTOCOL.md, "Maintenance Operations on Catalog-managed + * Tables"). + * + * On catalog-managed (catalog-owned / coordinated-commits) tables, committed versions can still be + * staged under `_delta_log/_staged_commits`. The hook synchronously backfills the window's commits + * before compacting (the same way checkpoint writing publishes commits via + * `Snapshot.ensureCommitFilesBackfilled`), so every produced compaction covers only published + * versions. + */ +class LogCompactionWithCatalogOwnedSuite extends QueryTest + with SharedSparkSession + with DeltaSQLCommandTest + with CatalogOwnedTestBaseSuite { + + // Batched backfill so committed versions would otherwise be staged when the hook runs. + override def catalogOwnedCoordinatorBackfillBatchSize: Option[Int] = Some(2) + + test("hook backfills then compacts, producing compactions over published versions") { + withSQLConf( + DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES.key -> "true", + DeltaConfigs.LOG_COMPACTION_INTERVAL.defaultTablePropertyKey -> "5", + // High checkpoint interval so no checkpoint interferes with the produced windows. + DeltaConfigs.CHECKPOINT_INTERVAL.defaultTablePropertyKey -> "100") { + withTempDir { dir => + val path = dir.getCanonicalPath + (0L to 10L).foreach { v => + spark.range(v, v + 1).write.format("delta").mode("append").save(path) + } + val deltaLog = DeltaLog.forTable(spark, path) + val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf()) + + val compactions = fs.listStatus(deltaLog.logPath) + .filter(FileNames.isCompactedDeltaFile) + .map(f => FileNames.compactedDeltaVersions(f.getPath)) + .sorted + + // The hook backfills before compacting, so it produces the same windows it would on a + // filesystem table: v5 -> [1, 5] and v10 -> [6, 10]. + assert(compactions.toSeq === Seq((1L, 5L), (6L, 10L))) + + // Every produced compaction must cover only published (backfilled) versions: the backfilled + // `_delta_log/.json` must exist. + compactions.foreach { case (startV, endV) => + assert(fs.exists(FileNames.unsafeDeltaFile(deltaLog.logPath, endV)), + s"compaction [$startV, $endV] covers a non-published (unbackfilled) end version") + } + + // The compaction files are used for snapshot construction and the table reads correctly. + DeltaLog.clearCache() + val snapshot = DeltaLog.forTable(spark, path).unsafeVolatileSnapshot + assert(snapshot.logSegment.deltas.exists(f => FileNames.isCompactedDeltaFile(f.getPath))) + checkAnswer(spark.read.format("delta").load(path), spark.range(11).toDF()) + } + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala index 70d6669067a..d9f35356a95 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala @@ -786,6 +786,12 @@ class SnapshotManagementParallelListingSuite extends QueryTest override protected def sparkConf: SparkConf = super.sparkConf.set(logStoreClassConfKey, classOf[CountDownLatchLogStore].getName) + // This suite simulates coordinated-commits backfills with a custom commit coordinator and + // asserts a precise staged/backfilled listing state. The log-compaction post-commit hook is + // on by default and, at the default interval (5), would call `ensureCommitFilesBackfilled` + // mid-sequence and perturb that state. Disable the write hook here; compaction writing is + // covered by LogCompactionSuite. + .set(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES.key, "false") override protected def beforeEach(): Unit = { super.beforeEach() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala index d0b753b5d8e..2ed3f2088c2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala @@ -41,6 +41,7 @@ import org.apache.commons.text.StringEscapeUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.{QueryTest, SaveMode, SparkSession} +import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.test.SharedSparkSession @@ -52,6 +53,13 @@ class TableRedirectSuite extends QueryTest with DeltaCheckpointTestUtils with DeltaSQLTestUtils { + // Several tests assert on the exact set/count of `_delta_log` files. The log-compaction + // post-commit hook is on by default and, at the default interval (5), writes + // `..compacted.json` files (which also end in `.json`) that would inflate those counts. + // Disable the write hook here; compaction writing is covered by LogCompactionSuite. + override protected def sparkConf: SparkConf = + super.sparkConf.set(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES.key, "false") + private def validateState( deltaLog: DeltaLog, redirectState: RedirectState, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala index 81a42271e1f..e2c3f6c4388 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala @@ -65,6 +65,12 @@ class CoordinatedCommitsSuite super.sparkConf .set(COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey, "tracking-in-memory") .set(COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey, JsonUtils.toJson(Map())) + // Several tests assert on the exact set of backfilled/staged `_delta_log` files. The + // log-compaction post-commit hook is on by default and, at the default interval (5), would + // both write `..compacted.json` files and force backfills (ensureCommitFilesBackfilled) + // mid-sequence, perturbing those assertions. Disable the write hook here; compaction writing + // is covered by LogCompactionSuite. + .set(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_WRITES.key, "false") } test("helper method that recovers config from abstract metadata works properly") {