Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import org.apache.gluten.extension.columnar.transition.Transitions

import org.apache.spark.sql.{AnalysisException, Dataset}
import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints, DeltaInvariantCheckerExec}
import org.apache.spark.sql.delta.constraints.{Constraint, Constraints, DeltaInvariantCheckerExec, GlutenDeltaInvariantChecker}
import org.apache.spark.sql.delta.files.{GlutenDeltaFileFormatWriter, TransactionalWrite}
import org.apache.spark.sql.delta.hooks.AutoCompact
import org.apache.spark.sql.delta.perf.{DeltaOptimizedWriterExec, GlutenDeltaOptimizedWriterExec}
import org.apache.spark.sql.delta.schema.InnerInvariantViolationException
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, WriteJobStatsTracker}
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormatWriter, V1WritesUtils, WriteJobStatsTracker}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.ScalaExtensions.OptionExt
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -91,53 +91,64 @@ class GlutenOptimisticTransaction(delegate: OptimisticTransaction)

val empty2NullPlan =
convertEmptyToNullIfNeeded(queryExecution.executedPlan, partitioningColumns, constraints)
val maybeCheckInvariants = if (constraints.isEmpty) {
val rowInvariantPlan = if (constraints.isEmpty) {
// Compared to vanilla Delta, we simply avoid adding the invariant checker
// when the constraint list is empty, to prevent the unnecessary transitions
// from being added around the invariant checker.
empty2NullPlan
} else {
DeltaInvariantCheckerExec(empty2NullPlan, constraints)
}
val nativeInvariantChecker = if (V1WritesUtils.getWriteFilesOpt(empty2NullPlan).isEmpty) {
GlutenDeltaInvariantChecker.create(empty2NullPlan.output, constraints)
} else {
None
}
val nativeInvariantPlan = nativeInvariantChecker.map(_ => empty2NullPlan).getOrElse(
rowInvariantPlan)
def toVeloxPlan(plan: SparkPlan): SparkPlan = plan match {
case aqe: AdaptiveSparkPlanExec =>
assert(!aqe.isFinalPlan)
aqe.copy(supportsColumnar = true)
case _ => Transitions.toBatchPlan(maybeCheckInvariants, VeloxBatchType)
case _ => Transitions.toBatchPlan(plan, VeloxBatchType)
}
// No need to plan optimized write if the write command is OPTIMIZE, which aims to produce
// evenly-balanced data files already.
val physicalPlan =
val (physicalPlan, nativeInvariantCheckerForWrite) =
if (
!isOptimize &&
shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
) {
// We uniformly convert the query plan to a columnar plan. If
// the further write operation turns out to be non-offload-able, the
// columnar plan will be converted back to a row-based plan.
val veloxPlan = toVeloxPlan(maybeCheckInvariants)
val veloxPlan = toVeloxPlan(nativeInvariantPlan)
try {
val glutenWriterExec =
GlutenDeltaOptimizedWriterExec(veloxPlan, metadata.partitionColumns, deltaLog)
val validationResult = glutenWriterExec.doValidate()
if (validationResult.ok()) {
glutenWriterExec
(glutenWriterExec, nativeInvariantChecker)
} else {
logInfo(
s"GlutenDeltaOptimizedWriterExec: Internal shuffle validated negative," +
s" reason: ${validationResult.reason()}. Falling back to row-based shuffle.")
DeltaOptimizedWriterExec(maybeCheckInvariants, metadata.partitionColumns, deltaLog)
(
DeltaOptimizedWriterExec(rowInvariantPlan, metadata.partitionColumns, deltaLog),
None)
}
} catch {
case e: AnalysisException =>
logWarning(
s"GlutenDeltaOptimizedWriterExec: Failed to create internal shuffle," +
s" reason: ${e.getMessage()}. Falling back to row-based shuffle.")
DeltaOptimizedWriterExec(maybeCheckInvariants, metadata.partitionColumns, deltaLog)
(
DeltaOptimizedWriterExec(rowInvariantPlan, metadata.partitionColumns, deltaLog),
None)
}
} else {
val veloxPlan = toVeloxPlan(maybeCheckInvariants)
veloxPlan
val veloxPlan = toVeloxPlan(nativeInvariantPlan)
(veloxPlan, nativeInvariantChecker)
}

val statsTrackers: ListBuffer[WriteJobStatsTracker] = ListBuffer()
Expand Down Expand Up @@ -185,7 +196,8 @@ class GlutenOptimisticTransaction(delegate: OptimisticTransaction)
optionalStatsTracker.toSeq
++ statsTrackers
++ identityTrackerOpt.toSeq,
options = options
options = options,
nativeInvariantChecker = nativeInvariantCheckerForWrite
)
} catch {
case InnerInvariantViolationException(violationException) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.delta.constraints

import org.apache.gluten.columnarbatch.VeloxColumnarBatches
import org.apache.gluten.execution.{PlaceholderRow, TerminalRow}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.delta.constraints.Constraints.NotNull
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, SchemaUtils}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* Native write-time invariant checker for constraints that can be validated without converting
* Velox batches back to Spark rows.
*/
private[delta] case class GlutenDeltaInvariantChecker private (
notNullConstraints: Seq[(Int, NotNull)])
extends Serializable {
Comment on lines +32 to +34
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@transient private lazy val columnOrdinals: Array[Int] =
notNullConstraints.map(_._1).toArray

def wrap(rows: Iterator[InternalRow]): Iterator[InternalRow] = {
rows.map {
row =>
check(row)
row
}
}

private def check(row: InternalRow): Unit = row match {
case _: PlaceholderRow =>
case terminal: TerminalRow => check(terminal.batch())
case other => checkRow(other)
}

private def check(batch: ColumnarBatch): Unit = {
val failedConstraintIndex = VeloxColumnarBatches.firstNullColumnIndex(batch, columnOrdinals)
if (failedConstraintIndex >= 0) {
throw DeltaInvariantViolationException(notNullConstraints(failedConstraintIndex)._2)
}
}

private def checkRow(row: InternalRow): Unit = {
var i = 0
while (i < notNullConstraints.size) {
val (ordinal, constraint) = notNullConstraints(i)
if (row.isNullAt(ordinal)) {
throw DeltaInvariantViolationException(constraint)
}
i += 1
}
}
}

private[delta] object GlutenDeltaInvariantChecker {
def create(
output: Seq[Attribute],
constraints: Seq[Constraint]): Option[GlutenDeltaInvariantChecker] = {
if (constraints.isEmpty) {
return None
}

val topLevelNotNullConstraints = constraints.collect {
case constraint: NotNull if constraint.column.length == 1 => constraint
}
if (topLevelNotNullConstraints.size != constraints.size) {
return None
}

val checks = topLevelNotNullConstraints.map {
constraint =>
val columnName = constraint.column.head
val ordinal = output.indexWhere {
attribute => SchemaUtils.DELTA_COL_RESOLVER(attribute.name, columnName)
}
if (ordinal < 0) {
return None
}
ordinal -> constraint
}
Some(GlutenDeltaInvariantChecker(checks))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.connector.write.WriterCommitMessage
import org.apache.spark.sql.delta.DeltaOptions
import org.apache.spark.sql.delta.constraints.GlutenDeltaInvariantChecker
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.stats.GlutenDeltaJobStatsTracker
import org.apache.spark.sql.errors.QueryExecutionErrors
Expand Down Expand Up @@ -71,6 +72,12 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
/** A variable used in tests to check the final executed plan. */
private var executedPlan: Option[SparkPlan] = None

private[delta] def getExecutedPlanForTesting: Option[SparkPlan] = executedPlan

private[delta] def clearExecutedPlanForTesting(): Unit = {
executedPlan = None
}

// scalastyle:off argcount
/**
* Basic work flow of this command is:
Expand All @@ -96,7 +103,8 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
bucketSpec: Option[BucketSpec],
statsTrackers: Seq[WriteJobStatsTracker],
options: Map[String, String],
numStaticPartitionCols: Int = 0): Set[String] = {
numStaticPartitionCols: Int = 0,
nativeInvariantChecker: Option[GlutenDeltaInvariantChecker] = None): Set[String] = {
require(partitionColumns.size >= numStaticPartitionCols)

val job = Job.getInstance(hadoopConf)
Expand Down Expand Up @@ -225,7 +233,8 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
partitionColumns,
sortColumns,
orderingMatched,
isNativeWritable
isNativeWritable,
nativeInvariantChecker
)
}
}
Expand All @@ -242,7 +251,8 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
partitionColumns: Seq[Attribute],
sortColumns: Seq[Attribute],
orderingMatched: Boolean,
writeOffloadable: Boolean): Set[String] = {
writeOffloadable: Boolean,
nativeInvariantChecker: Option[GlutenDeltaInvariantChecker]): Set[String] = {
val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns)
val empty2NullPlan =
if (projectList.nonEmpty) ProjectExecTransformer(projectList, plan) else plan
Expand Down Expand Up @@ -318,7 +328,8 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
committer,
iterator = iter,
concurrentOutputWriterSpec = concurrentOutputWriterSpec,
partitionColumnToDataType
partitionColumnToDataType,
nativeInvariantChecker
)
},
rddWithNonEmptyPartitions.partitions.indices,
Expand Down Expand Up @@ -433,7 +444,8 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
committer: FileCommitProtocol,
iterator: Iterator[InternalRow],
concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec],
partitionColumnToDataType: Map[String, DataType]): WriteTaskResult = {
partitionColumnToDataType: Map[String, DataType],
nativeInvariantChecker: Option[GlutenDeltaInvariantChecker]): WriteTaskResult = {

val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
Expand Down Expand Up @@ -487,7 +499,8 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
// Execute the task to write rows out and commit the task.
dataWriter.writeWithIterator(iterator)
val rowsToWrite = nativeInvariantChecker.map(_.wrap(iterator)).getOrElse(iterator)
dataWriter.writeWithIterator(rowsToWrite)
dataWriter.commit()
})(
catchBlock = {
Expand Down
Loading
Loading