Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,21 @@ trait DeltaConfigsBase extends DeltaLogging {
_ > 0,
"needs to be a positive integer.")

/**
* The commit interval at which the log-compaction post-commit hook produces a log compaction
* file (`<x>.<y>.compacted.json`), when log compaction is enabled
* (`deltaLog.minorCompaction.useForWrites`). A compaction is attempted after a commit whose
* version is a multiple of this interval. For the produced files to be non-overlapping and usable
* by readers, the checkpoint interval ([[CHECKPOINT_INTERVAL]]) should be a multiple of (and
* larger than) this value (e.g. the defaults: checkpoint every 10 commits, compaction every 5).
*/
val LOG_COMPACTION_INTERVAL = buildConfig[Int](
"logCompactionInterval",
"5",
_.toInt,
_ >= 2,
"needs to be an integer >= 2.")

/**
* This is the property that describes the table redirection detail. It is a JSON string format
* of the `TableRedirectConfiguration` class, which includes following attributes:
Expand Down
269 changes: 269 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/LogCompaction.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import java.nio.file.FileAlreadyExistsException

import org.apache.spark.sql.delta.actions.{Action, InMemoryLogReplay}
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames}
import org.apache.spark.sql.delta.util.FileNames.{CompactedDeltaFile, DeltaFile}

import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession

/**
* Utilities for creating log compaction files. A log compaction file aggregates the actions
* from a range of commit files `[startVersion, endVersion]` into a single JSON file by following
* the same action reconciliation rules as a checkpoint, but without full state reconstruction.
*
* Per the Delta protocol, log compaction files:
* - Reside in `_delta_log` and are named `<startVersion>.<endVersion>.compacted.json`.
* - Contain the reconciled actions for the commit range (one action per line), without
* `commitInfo` actions (these are stripped during reconciliation, same as checkpoints).
* - Are optional: writers may produce them and readers may consume them. They do not require any
* protocol or table-feature upgrade; readers that do not understand them simply ignore them.
* - Replace the corresponding individual commit files during snapshot construction on the read
* path, speeding it up while keeping the checkpoint interval high.
*/
object LogCompaction extends DeltaLogging {

/** opType under which all log compaction telemetry is recorded. */
private[delta] val OP_TYPE = "delta.logCompaction"

// Possible values of `LogCompactionMetrics.status`.
private[delta] val STATUS_COMPLETED = "completed"
private[delta] val STATUS_SKIPPED = "skipped"

// Possible values of `LogCompactionMetrics.skipReason` (only set when status is "skipped").
private[delta] val SKIP_REASON_TARGET_EXISTS = "targetAlreadyExists"
private[delta] val SKIP_REASON_CONCURRENT_WRITE = "concurrentlyCreated"
private[delta] val SKIP_REASON_WINDOW_TOO_LARGE = "windowTooLarge"

/**
* Creates a log compaction file for the table covering commits `[startVersion, endVersion]`
* (both inclusive). The resulting file contains the reconciled actions for the range (following
* the same rules as checkpoint creation) without `commitInfo`.
*
* The individual commit files in the range are read using the path resolution of the provided
* `snapshot`, so this works for both regular tables and coordinated-commits / catalog-managed
* tables (where recent commits may live under `_delta_log/_staged_commits`).
*
* This is a no-op if a compaction file for the exact `[startVersion, endVersion]` range already
* exists (idempotent): the common case is short-circuited by an existence check that avoids
* redundant reconciliation, and the write itself uses `overwrite = false` so a concurrent writer
* that produced the same (deterministic) file cannot be clobbered.
*
* It is also a no-op if the combined size of the window's commit files exceeds
* `deltaLog.minorCompaction.maxWindowSizeBytes`: reconciliation happens in a single in-memory log
* replay on the driver, so this bounds the driver memory a single compaction can consume.
*
* Each invocation emits a `delta.logCompaction.stats` telemetry event (see
* [[LogCompactionMetrics]]) capturing the outcome (completed / skipped with a reason), duration,
* number of commits and actions, and the resulting file size.
*
* @param deltaLog The [[DeltaLog]] instance for the table.
* @param snapshot A snapshot at or after `endVersion`, used to resolve commit file paths.
* @param startVersion The start version of the compaction range (inclusive).
* @param endVersion The end version of the compaction range (inclusive).
*/
def compact(
deltaLog: DeltaLog,
snapshot: Snapshot,
startVersion: Long,
endVersion: Long): Unit = {
require(
endVersion > startVersion,
s"endVersion ($endVersion) must be greater than startVersion ($startVersion)")

val hadoopConf = deltaLog.newDeltaHadoopConf()
val compactedFilePath =
FileNames.compactedDeltaFile(deltaLog.logPath, startVersion, endVersion)
val fs = deltaLog.logPath.getFileSystem(hadoopConf)
val startTimeMs = System.currentTimeMillis()
def elapsedMs: Long = System.currentTimeMillis() - startTimeMs

// Idempotency fast path: skip the (potentially expensive) reconciliation and write if the
// compaction file for this exact range already exists, e.g. because a concurrent writer
// already produced it. This avoids recomputing the file in the common case; the write below
// additionally closes the residual race window atomically via overwrite = false.
if (fs.exists(compactedFilePath)) {
logInfo(
log"Skipping log compaction; file already exists " +
log"${MDC(DeltaLogKeys.PATH, compactedFilePath)}")
recordCompactionStats(deltaLog, LogCompactionMetrics(
startVersion = startVersion,
endVersion = endVersion,
status = STATUS_SKIPPED,
durationMs = elapsedMs,
skipReason = Some(SKIP_REASON_TARGET_EXISTS)))
return
}

// Driver-memory guard: the reconciliation below replays the whole window in a single
// in-memory log replay on the driver (unlike checkpointing and snapshot construction, which
// reconcile distributedly). A window containing very large commits could therefore pressure
// driver memory, so skip compaction when the combined size of the window's commit files
// exceeds the configured threshold. The commits remain available as individual delta files and
// are subsumed by the next checkpoint; readers transparently fall back to them.
val windowSizeBytes = windowCommitSizeBytes(snapshot, startVersion, endVersion)
val maxWindowSizeBytes =
SparkSession.active.conf.get(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_MAX_WINDOW_SIZE)
if (maxWindowSizeBytes > 0 && windowSizeBytes > maxWindowSizeBytes) {
logInfo(
log"Skipping log compaction for versions " +
log"[${MDC(DeltaLogKeys.START_VERSION, startVersion)}, " +
log"${MDC(DeltaLogKeys.END_VERSION, endVersion)}]; window size " +
log"${MDC(DeltaLogKeys.NUM_BYTES, windowSizeBytes)} exceeds the configured maximum " +
log"${MDC(DeltaLogKeys.MAX_SIZE, maxWindowSizeBytes)}")
recordCompactionStats(deltaLog, LogCompactionMetrics(
startVersion = startVersion,
endVersion = endVersion,
status = STATUS_SKIPPED,
durationMs = elapsedMs,
skipReason = Some(SKIP_REASON_WINDOW_TOO_LARGE),
windowSizeBytes = windowSizeBytes))
return
}

val fileProvider = DeltaCommitFileProvider(snapshot)
val logReplay = new InMemoryLogReplay(
minFileRetentionTimestamp = None,
minSetTransactionRetentionTimestamp = None,
// A compaction file incrementally replaces its commit range, so (unlike a checkpoint) it
// must retain `removed = true` DomainMetadata tombstones to suppress earlier adds.
retainDomainMetadataTombstones = true)

(startVersion to endVersion).foreach { version =>
val file = fileProvider.deltaFile(version)
// `readAsIterator` returns a ClosableIterator backed by an open input stream. Keep a
// reference to it (rather than chaining `.map`, which discards the close handle) and close
// it explicitly so the stream is released even if reconciliation is interrupted mid-file.
val actions = deltaLog.store.readAsIterator(file, hadoopConf)
try {
logReplay.append(version, actions.map(Action.fromJson))
} finally {
actions.close()
}
}

// Count the reconciled actions as a side effect while the store consumes the iterator below.
var numActions = 0L
val actionsToWrite = logReplay.checkpoint.map { action =>
numActions += 1
action.json
}

// Write with overwrite = false so that, in the residual race where another writer produced the
// same compaction file between the existence check above and this write, we don't clobber it.
// The reconciled content for a given [startVersion, endVersion] range is deterministic, so an
// already-present file is equivalent; a FileAlreadyExistsException is therefore treated as a
// successful no-op rather than an error.
try {
deltaLog.store.write(
path = compactedFilePath,
actions = actionsToWrite,
overwrite = false,
hadoopConf = hadoopConf)
} catch {
case _: FileAlreadyExistsException =>
logInfo(
log"Skipping log compaction; file already exists " +
log"${MDC(DeltaLogKeys.PATH, compactedFilePath)}")
recordCompactionStats(deltaLog, LogCompactionMetrics(
startVersion = startVersion,
endVersion = endVersion,
status = STATUS_SKIPPED,
durationMs = elapsedMs,
skipReason = Some(SKIP_REASON_CONCURRENT_WRITE)))
return
}

recordCompactionStats(deltaLog, LogCompactionMetrics(
startVersion = startVersion,
endVersion = endVersion,
status = STATUS_COMPLETED,
durationMs = elapsedMs,
numCommitsCompacted = endVersion - startVersion + 1,
numActions = numActions,
compactedFileSizeBytes = fs.getFileStatus(compactedFilePath).getLen,
windowSizeBytes = windowSizeBytes))

logInfo(
log"Created log compaction file " +
log"${MDC(DeltaLogKeys.PATH, compactedFilePath)} " +
log"for versions [${MDC(DeltaLogKeys.START_VERSION, startVersion)}, " +
log"${MDC(DeltaLogKeys.END_VERSION, endVersion)}]")
}

/**
* Returns the combined size, in bytes, of the commit files for versions
* `[startVersion, endVersion]` as seen by `snapshot`'s log segment. Used to bound the driver
* memory consumed by reconciling the window. The sizes come from the already-listed
* [[org.apache.hadoop.fs.FileStatus]] entries, so this adds no extra file-system calls.
*
* Best-effort: this assumes `snapshot.logSegment` covers `[startVersion, endVersion]`, which
* holds on the post-commit-hook path (the window is always entirely after the checkpoint and is
* represented by individual commit files in the segment). If called directly with a snapshot
* whose segment does not cover the range, in-range entries may be missing and the result will
* under-count (possibly `0`) - so the size guard fails open (compaction proceeds) rather than
* ever wrongly tripping.
*/
private def windowCommitSizeBytes(
snapshot: Snapshot,
startVersion: Long,
endVersion: Long): Long = {
snapshot.logSegment.deltas.iterator.collect {
case DeltaFile(fileStatus, version) if version >= startVersion && version <= endVersion =>
fileStatus.getLen
case CompactedDeltaFile(fileStatus, lo, hi) if lo >= startVersion && hi <= endVersion =>
fileStatus.getLen
}.sum
}

/** Emits a single log compaction telemetry event. */
private def recordCompactionStats(deltaLog: DeltaLog, metrics: LogCompactionMetrics): Unit =
recordDeltaEvent(deltaLog, opType = s"$OP_TYPE.stats", data = metrics)
}

/**
* Telemetry for a single [[LogCompaction.compact]] invocation, emitted as the JSON blob of the
* `delta.logCompaction.stats` event.
*
* @param startVersion The (inclusive) start version of the compaction range.
* @param endVersion The (inclusive) end version of the compaction range.
* @param status `completed` if a file was written, `skipped` if it already existed.
* @param durationMs Wall-clock time spent in `compact`, in milliseconds.
* @param skipReason When `status == skipped`, why it was skipped (see `SKIP_REASON_*`).
* @param numCommitsCompacted Number of commit files reconciled into the compaction file.
* @param numActions Number of actions written to the compaction file.
* @param compactedFileSizeBytes Size of the written compaction file, in bytes.
* @param windowSizeBytes Combined size of the window's commit files, in bytes (the value the
* driver-memory guard checks).
*/
case class LogCompactionMetrics(
startVersion: Long,
endVersion: Long,
status: String,
durationMs: Long,
skipReason: Option[String] = None,
numCommitsCompacted: Long = 0L,
numActions: Long = 0L,
compactedFileSizeBytes: Long = 0L,
windowSizeBytes: Long = 0L)
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,14 @@ trait MetadataCleanup extends DeltaLogging {

val fs = logPath.getFileSystem(newDeltaHadoopConf())
var numDeleted = 0
val expiredDeltaLogs = listExpiredDeltaLogs(fileCutOffTime.getTime)
// Expired log compaction files are peeled off the same `_delta_log` listing that drives the
// delta/checkpoint/checksum cleanup below, so we don't pay for a second full directory
// listing (expensive on object stores). They are collected here as that listing is consumed
// and deleted separately further down. See `listExpiredDeltaLogs` for the (best-effort,
// self-healing) semantics in the rare case where the listing is not fully traversed.
val expiredCompactionDeltaFiles = ArrayBuffer.empty[FileStatus]
val expiredDeltaLogs = listExpiredDeltaLogs(
fileCutOffTime.getTime, onExpiredCompactionFile = expiredCompactionDeltaFiles += _)
if (expiredDeltaLogs.hasNext) {
// Trigger compatibility checkpoint creation logic only when this round of metadata cleanup
// is going to delete any deltas/checkpoint files.
Expand All @@ -122,6 +129,12 @@ trait MetadataCleanup extends DeltaLogging {
}
}
}
// Log compaction files are optional, derived optimization files. They were peeled off the
// shared listing above as it was consumed by the deletion loop, and are deleted here in a
// separate pass - never through the BufferingLogDeletionIterator - so that they can never
// influence the retention decisions for actual commit/checkpoint/checksum files, which is
// critical because incorrectly deleting those would corrupt the table.
numDeleted += expiredCompactionDeltaFiles.count(file => fs.delete(file.getPath, false))
val commitDirPath = FileNames.commitDirPath(logPath)
// Commit Directory might not exist on tables created in older versions and
// never updated since.
Expand Down Expand Up @@ -165,13 +178,56 @@ trait MetadataCleanup extends DeltaLogging {
* considered as expired, it must:
* - have a checkpoint file after it
* - be older than `fileCutOffTime`
*
* Log compaction files (`<x>.<y>.compacted.json`) are derived/optional optimization files that
* span a `[startVersion, endVersion]` range. They must NOT be fed into the
* [[BufferingLogDeletionIterator]] (which adjusts time-travel timestamps per single commit and
* whose retention decisions for real commit files could be perturbed by a range-spanning file).
* Instead, to avoid a second full `_delta_log` listing (expensive on object stores), expired
* compaction files are reported via `onExpiredCompactionFile` for deletion in a separate pass.
* A compaction file is expired when:
* - its start version is at or before the latest checkpoint version (read from
* `_last_checkpoint`), so the checkpoint already subsumes the commit range it covers and the
* file is no longer selected when building the latest snapshot (see
* `useCompactedDeltasForLogSegment`), and
* - it is older than `fileCutOffTime` - the age gate that actually preserves any file still
* needed within the retention / time-travel window. (So although the version gate uses the
* latest checkpoint rather than the protocol's retention `cutOffCheckpoint`, the age gate
* ensures no file inside the retention window is ever deleted.)
*
* Expired compaction files are peeled off this same listing as it is consumed by the returned
* [[BufferingLogDeletionIterator]] (reported via `onExpiredCompactionFile`), which avoids a
* second listing. This peeling is best-effort: the iterator may stop pulling from the listing
* early when it reaches a non-expired segment before a checkpoint, in which case any compaction
* files sorting after that point are not seen in this round. That is safe and self-healing -
* compaction files are derived/optional, a file with `startVersion <= checkpointVersion` is
* already subsumed by the checkpoint and is never selected by readers (see
* `useCompactedDeltasForLogSegment`), and it is collected by a later cleanup round once the
* retention cutoff advances past the blocking segment. Commit/checkpoint/checksum retention is
* identical either way (those files are not affected by the peeling). The
* `metadataCleanupAllowed` validation path passes the default no-op callback.
*/
private def listExpiredDeltaLogs(fileCutOffTime: Long): Iterator[FileStatus] = {
private def listExpiredDeltaLogs(
fileCutOffTime: Long,
onExpiredCompactionFile: FileStatus => Unit = _ => ()): Iterator[FileStatus] = {
val latestCheckpoint = readLastCheckpointFile()
if (latestCheckpoint.isEmpty) return Iterator.empty
val threshold = latestCheckpoint.get.version - 1L
val checkpointVersion = latestCheckpoint.get.version
val threshold = checkpointVersion - 1L
val files = store.listFrom(listingPrefix(logPath, 0), newDeltaHadoopConf())
.filter(f => isCheckpointFile(f) || isDeltaFile(f) || isChecksumFile(f))
.filter { file =>
if (isCompactedDeltaFile(file)) {
// Peel expired compaction files off this shared listing (deleted in a separate pass),
// and exclude every compaction file from the BufferingLogDeletionIterator below.
if (file.getModificationTime <= fileCutOffTime &&
compactedDeltaVersions(file)._1 <= checkpointVersion) {
onExpiredCompactionFile(file)
}
false
} else {
isCheckpointFile(file) || isDeltaFile(file) || isChecksumFile(file)
}
}

new BufferingLogDeletionIterator(
files, fileCutOffTime, threshold, getDeltaFileChecksumOrCheckpointVersion)
Expand Down
Loading