Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.delta.metric.IncrementMetric
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.hooks.InflightDMLRegistry
import org.apache.spark.sql.delta.ClassicColumnConversions._
import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, FileAction}
import org.apache.spark.sql.delta.commands.DeleteCommand.{rewritingFilesMsg, FINDING_TOUCHED_FILES_MSG}
Expand Down Expand Up @@ -126,6 +127,10 @@ case class DeleteCommand(
override lazy val metrics = createMetrics

final override def run(sparkSession: SparkSession): Seq[Row] = {
// Async Auto Compaction DML yield: register this DELETE so concurrent async AC yields.
val deleteTableId = deltaLog.unsafeVolatileTableId
InflightDMLRegistry.acquire(deleteTableId)
try {
recordDeltaOperation(deltaLog, "delta.dml.delete") {
deltaLog.withNewTransaction(catalogTable) { txn =>
DeltaLog.assertRemovable(txn.snapshot)
Expand Down Expand Up @@ -161,6 +166,9 @@ case class DeleteCommand(
} else {
Seq(Row(metrics("numDeletedRows").value))
}
} finally {
InflightDMLRegistry.release(deleteTableId)
}
}

def performDelete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.delta.metric.IncrementMetric
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.hooks.InflightDMLRegistry
import org.apache.spark.sql.delta.actions.{Action, AddFile, FileAction}
import org.apache.spark.sql.delta.commands.merge.{MergeIntoMaterializeSource, MergeIntoMaterializeSourceReason, MergeStats}
import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeFileIndex, TransactionalWrite}
Expand Down Expand Up @@ -172,13 +173,21 @@ trait MergeIntoCommandBase extends LeafRunnableCommand
}

val (materializeSource, _) = shouldMaterializeSource(spark, source, isInsertOnly)
if (!materializeSource) {
runMerge(spark)
} else {
// If it is determined that source should be materialized, wrap the execution with retries,
// in case the data of the materialized source is lost.
runWithMaterializedSourceLostRetries(
spark, targetFileIndex.deltaLog, metrics, runMerge)
// Async Auto Compaction DML yield: register this MERGE as in-flight on the target table so
// that any concurrent async AC worker yields. See InflightDMLRegistry.
val mergeTableId = targetFileIndex.deltaLog.unsafeVolatileTableId
InflightDMLRegistry.acquire(mergeTableId)
try {
if (!materializeSource) {
runMerge(spark)
} else {
// If it is determined that source should be materialized, wrap the execution with retries,
// in case the data of the materialized source is lost.
runWithMaterializedSourceLostRetries(
spark, targetFileIndex.deltaLog, metrics, runMerge)
}
} finally {
InflightDMLRegistry.release(mergeTableId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.delta.files.SQLMetricsReporting
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.schema.{SchemaUtils, UnsupportedDataTypeInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.hooks.{AsyncAutoCompactCancelledException, AsyncAutoCompactService, InflightDMLRegistry}
import org.apache.spark.sql.delta.util.BinPackingUtils

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -574,6 +575,19 @@ class OptimizeExecutor(
optimizeOperation: Operation,
actions: Seq[Action],
metrics: Map[String, SQLMetric])(f: OptimisticTransaction => Boolean): Unit = {
// Async Auto Compaction DML yield: if a long-running DML (MERGE/DELETE/UPDATE/OVERWRITE)
// acquired InflightDMLRegistry on this table between our Spark-job completion and now,
// abort cleanly rather than racing the DML to commit (which would force the DML to throw
// ConcurrentModificationException and re-execute minutes of Spark work). Only applies when
// running on the async worker thread; inline AC and user-initiated OPTIMIZE proceed
// unconditionally.
if (isAutoCompact && AsyncAutoCompactService.isAsyncWorker &&
InflightDMLRegistry.isActive(snapshot.metadata.id)) {
throw new AsyncAutoCompactCancelledException(
snapshot.metadata.id,
s"Async Auto Compaction yielded at pre-commit to in-flight DML on table " +
s"${snapshot.metadata.id}")
}
try {
txn.registerSQLMetrics(sparkSession, metrics)
txn.commit(actions, optimizeOperation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.util.control.NonFatal
// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.metric.IncrementMetric
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.hooks.InflightDMLRegistry
import org.apache.spark.sql.delta.ClassicColumnConversions._
import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, FileAction}
import org.apache.spark.sql.delta.commands.cdc.CDCReader.{CDC_TYPE_COLUMN_NAME, CDC_TYPE_NOT_CDC, CDC_TYPE_UPDATE_POSTIMAGE, CDC_TYPE_UPDATE_PREIMAGE}
Expand Down Expand Up @@ -94,6 +95,10 @@ case class UpdateCommand(
)

final override def run(sparkSession: SparkSession): Seq[Row] = {
// Async Auto Compaction DML yield: register this UPDATE so concurrent async AC yields.
val updateTableId = tahoeFileIndex.deltaLog.unsafeVolatileTableId
InflightDMLRegistry.acquire(updateTableId)
try {
recordDeltaOperation(tahoeFileIndex.deltaLog, "delta.dml.update") {
val deltaLog = tahoeFileIndex.deltaLog
deltaLog.withNewTransaction(catalogTable) { txn =>
Expand All @@ -109,6 +114,9 @@ case class UpdateCommand(
sparkSession.sharedState.cacheManager.recacheByPlan(sparkSession, target)
}
Seq(Row(metrics("numUpdatedRows").value))
} finally {
InflightDMLRegistry.release(updateTableId)
}
}

private def performUpdate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.util.Try

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.hooks.InflightDMLRegistry
import org.apache.spark.sql.delta.ClassicColumnConversions._
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.DMLUtils.TaggedCommitData
Expand Down Expand Up @@ -103,6 +104,13 @@ case class WriteIntoDelta(


override def run(sparkSession: SparkSession): Seq[Row] = {
// Async Auto Compaction DML yield: for OVERWRITE (which rewrites files in bulk), register
// this write so concurrent async AC yields. APPEND skips this because appends never
// conflict with AC.
val writeTableId =
if (isOverwriteOperation) Some(deltaLog.unsafeVolatileTableId) else None
writeTableId.foreach(InflightDMLRegistry.acquire)
try {
deltaLog.withNewTransaction(catalogTableOpt) { txn =>
if (hasBeenExecuted(txn, sparkSession, Some(options))) {
return Seq.empty
Expand All @@ -128,6 +136,9 @@ case class WriteIntoDelta(
}
}
Seq.empty
} finally {
writeTableId.foreach(InflightDMLRegistry.release)
}
}

override def writeAndReturnCommitData(
Expand Down
Loading