Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.spark.sql.delta;

import org.apache.spark.sql.delta.constraints.Constraints$;
import org.apache.spark.sql.delta.schema.SchemaUtils$;
import org.apache.spark.sql.delta.sources.DeltaSQLConf$;
import org.apache.spark.sql.delta.v2.interop.AbstractMetadata;
import org.apache.spark.sql.delta.v2.interop.AbstractProtocol;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.AtomicType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import scala.Option;
import scala.collection.immutable.List;

/**
* Connector-agnostic checks for whether the data type of a column or nested field can be changed
* on a Delta table.
*/
public final class ColumnTypeChangeSupport {

private ColumnTypeChangeSupport() {}

/** Returns whether the type of the field at {@code fieldNames} can be changed to {@code newType}. */
public static boolean supportsTypeChange(
SparkSession spark,
AbstractProtocol protocol,
AbstractMetadata metadata,
List<String> fieldNames,
DataType newType) {


SQLConf conf = spark.sessionState().conf();
if (!TypeWidening$.MODULE$.isEnabled(protocol, metadata)) {
return false;
}
Option<StructField> fromField =
SchemaUtils$.MODULE$.findNestedFieldIgnoreCase(metadata.schema(), fieldNames, true);
if (fromField.isEmpty()
|| !(fromField.get().dataType() instanceof AtomicType)
|| !(newType instanceof AtomicType)) {
return false;
}
AtomicType fromType = (AtomicType) fromField.get().dataType();
AtomicType toType = (AtomicType) newType;
return isSupportedWidening(conf, metadata, fromType, toType)
&& !hasBlockingDependency(spark, protocol, metadata, fieldNames);
}

private static boolean isSupportedWidening(
SQLConf conf, AbstractMetadata metadata, AtomicType fromType, AtomicType toType) {
boolean uniformIcebergCompatibleOnly =
UniversalFormat$.MODULE$.icebergEnabled(metadata.configuration());
String mode = conf.getConf(DeltaSQLConf$.MODULE$.DELTA_ALLOW_AUTOMATIC_WIDENING());
switch (mode) {
case "ALWAYS":
return TypeWidening$.MODULE$.isTypeChangeSupported(
fromType, toType, uniformIcebergCompatibleOnly);
case "SAME_FAMILY_TYPE":
return TypeWidening$.MODULE$.isTypeChangeSupportedForSchemaEvolution(
fromType, toType, uniformIcebergCompatibleOnly);
default: // NEVER
return false;
}
}

/**
* Whether a column fits any condition that prevents changing its type, e.g it's referenced by a
* generated column expression or a CHECK constraint that may become malformed.
*/
private static boolean hasBlockingDependency(
SparkSession spark,
AbstractProtocol protocol,
AbstractMetadata metadata,
List<String> fieldNames) {

boolean dependentGeneratedColumn =
!SchemaUtils$.MODULE$
.findDependentGeneratedColumns(spark, fieldNames, protocol, metadata.schema())
.isEmpty();
boolean dependentConstraint =
!Constraints$.MODULE$.findDependentConstraints(spark, fieldNames, metadata).isEmpty();
return dependentGeneratedColumn || dependentConstraint;
}
}
26 changes: 0 additions & 26 deletions spark/src/main/scala-shims/spark-4.1/SparkTableShims.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Locale
import org.apache.spark.sql.delta.DataFrameUtils
import org.apache.spark.sql.delta.ClassicColumnConversions._
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.v2.interop.AbstractProtocol
import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeFileIndex}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils.quoteIdentifier
Expand Down Expand Up @@ -77,7 +78,7 @@ import org.apache.spark.sql.types.{Metadata => FieldMetadata}
*/
object GeneratedColumn extends DeltaLogging with AnalysisHelper {

def satisfyGeneratedColumnProtocol(protocol: Protocol): Boolean =
def satisfyGeneratedColumnProtocol(protocol: AbstractProtocol): Boolean =
protocol.isFeatureSupported(GeneratedColumnsTableFeature)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.v2.interop.{AbstractMetadata, AbstractProtocol}

import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.functions.{col, lit}
Expand All @@ -29,7 +30,7 @@ object TypeWidening {
/**
* Returns whether the protocol version supports the Type Widening table feature.
*/
def isSupported(protocol: Protocol): Boolean =
def isSupported(protocol: AbstractProtocol): Boolean =
Seq(TypeWideningPreviewTableFeature, TypeWideningTableFeature)
.exists(protocol.isFeatureSupported)

Expand All @@ -39,8 +40,8 @@ object TypeWidening {
* not. When Type Widening is enabled, the type of existing columns or fields can be widened
* using ALTER TABLE CHANGE COLUMN.
*/
def isEnabled(protocol: Protocol, metadata: Metadata): Boolean = {
val isEnabled = DeltaConfigs.ENABLE_TYPE_WIDENING.fromMetaData(metadata)
def isEnabled(protocol: AbstractProtocol, metadata: AbstractMetadata): Boolean = {
val isEnabled = DeltaConfigs.ENABLE_TYPE_WIDENING.fromMap(metadata.configuration)
if (isEnabled && !isSupported(protocol)) {
throw new IllegalStateException(
s"Table property '${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' is " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf.ValidateCheckConstraintsMode
import org.apache.spark.sql.delta.v2.interop.AbstractMetadata

import org.apache.spark.SparkThrowable
import org.apache.spark.internal.MDC
Expand Down Expand Up @@ -117,7 +118,7 @@ object Constraints extends DeltaLogging {
def findDependentConstraints(
sparkSession: SparkSession,
columnName: Seq[String],
metadata: Metadata): Map[String, String] = {
metadata: AbstractMetadata): Map[String, String] = {
metadata.configuration.filter {
case (key, constraint) if key.toLowerCase(Locale.ROOT).startsWith("delta.constraints.") =>
SchemaUtils.containsDependentExpression(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMod
import org.apache.spark.sql.delta.{RowCommitVersion, RowId}
import org.apache.spark.sql.delta.ClassicColumnConversions._
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.v2.interop.AbstractProtocol
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -1656,7 +1657,7 @@ def normalizeColumnNamesInDataType(
def findDependentGeneratedColumns(
sparkSession: SparkSession,
targetColumn: Seq[String],
protocol: Protocol,
protocol: AbstractProtocol,
schema: StructType): Map[String, String] = {
if (GeneratedColumn.satisfyGeneratedColumnProtocol(protocol) &&
GeneratedColumn.hasGeneratedColumns(schema)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.apache.spark.sql.delta.v2.interop

import org.apache.spark.sql.delta.TableFeature

/**
* Abstract trait for protocol actions in Delta. This trait provides a common
* abstraction that can be implemented by both Spark's V1 Protocol and Kernel's Protocol
Expand Down Expand Up @@ -51,5 +53,12 @@ trait AbstractProtocol {
minWriterVersion == other.minWriterVersion &&
readerFeatures == other.readerFeatures &&
writerFeatures == other.writerFeatures

/**
* Check if a `feature` is supported by this protocol. This means either (a) the protocol does
* not support table features but implicitly supports the feature, or (b) the protocol supports
* table features and explicitly references the feature.
*/
def isFeatureSupported(feature: TableFeature): Boolean
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.test.SharedSparkSession
/**
* Tag for tests that access Delta internals (e.g., DeltaLog, physical scans, usage logs)
* and are therefore incompatible with the DSv2 InMemoryTable test path.
* Tests tagged with this are automatically skipped when [[InMemoryTestTableMixin]] is active.
* Tests tagged with this are automatically skipped in DSv2 mode.
*/
case class DSv2Incompatible(reason: String) extends org.scalatest.Tag("DSv2Incompatible")

Expand All @@ -32,28 +32,28 @@ case class DSv2Incompatible(reason: String) extends org.scalatest.Tag("DSv2Incom
* should be implemented sometime in the future.
* Not [[DSv2Incompatible]] -- that one is for tests that are completely unsupported and would
* never pass with DSv2.
* Tests tagged with this are automatically skipped when [[InMemoryTestTableMixin]] is active.
* Tests tagged with this are automatically skipped in DSv2 mode.
*/
case class DSv2TemporarilyIncompatible(reason: String)
extends org.scalatest.Tag("DSv2TemporarilyIncompatible")

/**
* Tag for tests that assert the physical Delta execution plan shape.
* Tests tagged with this are automatically skipped when [[InMemoryTestTableMixin]] is active.
* Tests tagged with this are automatically skipped in DSv2 mode.
*/
case class ChecksPhysicalDeltaPlan(reason: String = "checks physical Delta plan")
extends org.scalatest.Tag("ChecksPhysicalDeltaPlan")

/**
* Tag for tests that inspect or mutate Delta internals such as DeltaLog or file stats.
* Tests tagged with this are automatically skipped when [[InMemoryTestTableMixin]] is active.
* Tests tagged with this are automatically skipped in DSv2 mode.
*/
case class ChecksDeltaInternals(reason: String = "checks Delta internals")
extends org.scalatest.Tag("ChecksDeltaInternals")

/**
* Tag for tests that inspect Delta metrics.
* Test tagged with this are automatically skipped when [[InMemoryTestTableMixin]] is active.
* Test tagged with this are automatically skipped in DSv2 mode.
*/
case class ChecksDeltaMetrics(reason: String = "checks Delta metrics")
extends org.scalatest.Tag("ChecksDeltaMetrics")
Expand All @@ -66,34 +66,67 @@ case class ChecksDeltaMetrics(reason: String = "checks Delta metrics")
case object DSv2DMLSchemaEvolution extends org.scalatest.Tag("DSv2DMLSchemaEvolution")

/**
* Mixin trait that configures the session catalog to use [[InMemoryDeltaCatalog]],
* routing DML operations through Spark's V2 execution path via [[InMemorySparkTable]].
* Shared skip logic for tests that cannot run against the DSv2 connector, used by both
* [[InMemoryTestTableMixin]] (STRICT + in-memory test table) and [[DSv2TestMixin]] (AUTO + real
* [[io.delta.spark.internal.v2.catalog.DeltaV2Table]]). Centralizes every skip reason
* so both mixins skip the same set of DSv2-incompatible tests.
*/
trait InMemoryTestTableMixin extends SharedSparkSession with InMemoryTestTableMixinShims {
trait DSv2IncompatibilityTagging extends SharedSparkSession with InMemoryTestTableMixinShims {

override def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.catalog.spark_catalog", classOf[InMemoryDeltaCatalog].getName)
/**
* Extra tags appended to every test before evaluating skip reasons. Defaults to none; may be
* overridden by mixins that need to add suite-wide tags.
*/
protected def additionalDSv2TestTags: Seq[org.scalatest.Tag] = Seq.empty

override protected def test
(testName: String, testTags: org.scalatest.Tag*)
(testFun: => Any)
(implicit pos: org.scalactic.source.Position): Unit = {
/**
* Returns a `(tagName, reason)` pair when the test is tagged as incompatible with the DSv2
* connector, or `None` when the test can run. Covers: [[DSv2Incompatible]] (fundamentally
* unsupported), [[DSv2TemporarilyIncompatible]] (not implemented yet),
* [[ChecksPhysicalDeltaPlan]] and [[ChecksDeltaInternals]] (assert V1 plan/internals), and
* [[DSv2DMLSchemaEvolution]] (only on Spark 4.2+).
*/
protected def dsv2SkipReason(
testTags: Seq[org.scalatest.Tag]): Option[(String, String)] =
testTags.collectFirst {
case t: DSv2Incompatible =>
"DSv2Incompatible" -> t.reason
case t: DSv2TemporarilyIncompatible =>
"DSv2TemporarilyIncompatible" -> t.reason
case t: ChecksPhysicalDeltaPlan =>
"ChecksPhysicalDeltaPlan" -> t.reason
case t: ChecksDeltaInternals =>
"ChecksDeltaInternals" -> t.reason
case t: DSv2Incompatible => "DSv2Incompatible" -> t.reason
case t: DSv2TemporarilyIncompatible => "DSv2TemporarilyIncompatible" -> t.reason
case t: ChecksPhysicalDeltaPlan => "ChecksPhysicalDeltaPlan" -> t.reason
case t: ChecksDeltaInternals => "ChecksDeltaInternals" -> t.reason
case DSv2DMLSchemaEvolution if !v2DmlSchemaEvolutionSupported =>
"DSV2DMLSchemaEvolution" -> "DSv2 DML schema evolution not supported on this spark version"
} match {
}

/**
* Shared `test` override for all DSv2 test mixins: appends [[additionalDSv2TestTags]], then skips
* tests that [[dsv2SkipReason]] marks incompatible with the DSv2 connector and runs the rest.
*/
override protected def test(testName: String, testTags: org.scalatest.Tag*)(testFun: => Any)(
implicit pos: org.scalactic.source.Position): Unit = {
val allTags = testTags ++ additionalDSv2TestTags
dsv2SkipReason(allTags) match {
case Some((tagName, reason)) =>
ignore(testName + s" ($tagName: $reason)", testTags: _*)(testFun)
ignore(testName + s" ($tagName: $reason)", allTags: _*)(testFun)
case None =>
super.test(testName, testTags: _*)(testFun)
super.test(testName, allTags: _*)(testFun)
}
}
}

/**
* Mixin trait that configures the session catalog to use [[InMemoryDeltaCatalog]],
* routing DML operations through Spark's V2 execution path via [[InMemorySparkTable]].
*/
trait InMemoryTestTableMixin extends SharedSparkSession with DSv2IncompatibilityTagging {

override def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.catalog.spark_catalog", classOf[InMemoryDeltaCatalog].getName)

}

/** Mixin for test suites that exercise the real DSv2 Delta connector. */
trait DeltaDSv2TestMixin extends SharedSparkSession with DSv2IncompatibilityTagging {

protected override def sparkConf: SparkConf =
super.sparkConf.set(DeltaSQLConf.V2_ENABLE_MODE.key, "STRICT")
}
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ trait DeltaInsertIntoTest
}

/** Collects all the types of insert previously defined. */
protected lazy val allInsertTypes: Set[Insert] = Set(
protected def allInsertTypes: Set[Insert] = Set(
SQLInsertOverwriteReplaceWhere,
SQLInsertOverwritePartitionByPosition,
SQLInsertOverwritePartitionColList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta.catalog

import java.util

import org.apache.spark.sql.delta.SparkTableShims
import io.delta.spark.internal.v2.catalog.DeltaV2TableShims

import org.apache.spark.sql.connector.catalog.{InMemoryRowLevelOperationTable, TableCapability}
import org.apache.spark.sql.connector.expressions.Transform
Expand All @@ -41,7 +41,8 @@ class InMemorySparkTable(

override def capabilities(): util.Set[TableCapability] = {
val caps = new util.HashSet[TableCapability](super.capabilities())
SparkTableShims.schemaEvolutionCapability.foreach(caps.add)
val schemaEvolution = DeltaV2TableShims.schemaEvolutionCapability()
if (schemaEvolution.isPresent) caps.add(schemaEvolution.get)
caps
}

Expand Down
Loading
Loading