diff --git a/spark/src/main/java/org/apache/spark/sql/delta/ColumnTypeChangeSupport.java b/spark/src/main/java/org/apache/spark/sql/delta/ColumnTypeChangeSupport.java new file mode 100644 index 00000000000..fcd99c732cb --- /dev/null +++ b/spark/src/main/java/org/apache/spark/sql/delta/ColumnTypeChangeSupport.java @@ -0,0 +1,102 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.delta; + +import org.apache.spark.sql.delta.constraints.Constraints$; +import org.apache.spark.sql.delta.schema.SchemaUtils$; +import org.apache.spark.sql.delta.sources.DeltaSQLConf$; +import org.apache.spark.sql.delta.v2.interop.AbstractMetadata; +import org.apache.spark.sql.delta.v2.interop.AbstractProtocol; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.AtomicType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.StructField; +import scala.Option; +import scala.collection.immutable.List; + +/** + * Connector-agnostic checks for whether the data type of a column or nested field can be changed + * on a Delta table. + */ +public final class ColumnTypeChangeSupport { + + private ColumnTypeChangeSupport() {} + + /** Returns whether the type of the field at {@code fieldNames} can be changed to {@code newType}. */ + public static boolean supportsTypeChange( + SparkSession spark, + AbstractProtocol protocol, + AbstractMetadata metadata, + List fieldNames, + DataType newType) { + + + SQLConf conf = spark.sessionState().conf(); + if (!TypeWidening$.MODULE$.isEnabled(protocol, metadata)) { + return false; + } + Option fromField = + SchemaUtils$.MODULE$.findNestedFieldIgnoreCase(metadata.schema(), fieldNames, true); + if (fromField.isEmpty() + || !(fromField.get().dataType() instanceof AtomicType) + || !(newType instanceof AtomicType)) { + return false; + } + AtomicType fromType = (AtomicType) fromField.get().dataType(); + AtomicType toType = (AtomicType) newType; + return isSupportedWidening(conf, metadata, fromType, toType) + && !hasBlockingDependency(spark, protocol, metadata, fieldNames); + } + + private static boolean isSupportedWidening( + SQLConf conf, AbstractMetadata metadata, AtomicType fromType, AtomicType toType) { + boolean uniformIcebergCompatibleOnly = + UniversalFormat$.MODULE$.icebergEnabled(metadata.configuration()); + String mode = conf.getConf(DeltaSQLConf$.MODULE$.DELTA_ALLOW_AUTOMATIC_WIDENING()); + switch (mode) { + case "ALWAYS": + return TypeWidening$.MODULE$.isTypeChangeSupported( + fromType, toType, uniformIcebergCompatibleOnly); + case "SAME_FAMILY_TYPE": + return TypeWidening$.MODULE$.isTypeChangeSupportedForSchemaEvolution( + fromType, toType, uniformIcebergCompatibleOnly); + default: // NEVER + return false; + } + } + + /** + * Whether a column fits any condition that prevents changing its type, e.g it's referenced by a + * generated column expression or a CHECK constraint that may become malformed. + */ + private static boolean hasBlockingDependency( + SparkSession spark, + AbstractProtocol protocol, + AbstractMetadata metadata, + List fieldNames) { + + boolean dependentGeneratedColumn = + !SchemaUtils$.MODULE$ + .findDependentGeneratedColumns(spark, fieldNames, protocol, metadata.schema()) + .isEmpty(); + boolean dependentConstraint = + !Constraints$.MODULE$.findDependentConstraints(spark, fieldNames, metadata).isEmpty(); + return dependentGeneratedColumn || dependentConstraint; + } +} diff --git a/spark/src/main/scala-shims/spark-4.1/SparkTableShims.scala b/spark/src/main/scala-shims/spark-4.1/SparkTableShims.scala deleted file mode 100644 index 89a03a655da..00000000000 --- a/spark/src/main/scala-shims/spark-4.1/SparkTableShims.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (2021) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.delta - -import org.apache.spark.sql.connector.catalog.TableCapability - -/** Shim to build [[DeltaV2Table]] against different Spark versions. */ -object SparkTableShims { - // Capability [[TableCapability.AUTOMATIC_SCHEMA_EVOLUTION]] is available in Spark 4.1, but - // schema evolution isn't properly supported yet in MERGE/INSERT there so ignore it. - val schemaEvolutionCapability: Option[TableCapability] = None -} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/GeneratedColumn.scala b/spark/src/main/scala/org/apache/spark/sql/delta/GeneratedColumn.scala index da0b312a26c..c294766e8c7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/GeneratedColumn.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/GeneratedColumn.scala @@ -22,6 +22,7 @@ import java.util.Locale import org.apache.spark.sql.delta.DataFrameUtils import org.apache.spark.sql.delta.ClassicColumnConversions._ import org.apache.spark.sql.delta.actions.{Metadata, Protocol} +import org.apache.spark.sql.delta.v2.interop.AbstractProtocol import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeFileIndex} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils.quoteIdentifier @@ -77,7 +78,7 @@ import org.apache.spark.sql.types.{Metadata => FieldMetadata} */ object GeneratedColumn extends DeltaLogging with AnalysisHelper { - def satisfyGeneratedColumnProtocol(protocol: Protocol): Boolean = + def satisfyGeneratedColumnProtocol(protocol: AbstractProtocol): Boolean = protocol.isFeatureSupported(GeneratedColumnsTableFeature) /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index fdff612ae38..cbd479df02b 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol, TableFeatureProtocolUtils} import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.v2.interop.{AbstractMetadata, AbstractProtocol} import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.functions.{col, lit} @@ -29,7 +30,7 @@ object TypeWidening { /** * Returns whether the protocol version supports the Type Widening table feature. */ - def isSupported(protocol: Protocol): Boolean = + def isSupported(protocol: AbstractProtocol): Boolean = Seq(TypeWideningPreviewTableFeature, TypeWideningTableFeature) .exists(protocol.isFeatureSupported) @@ -39,8 +40,8 @@ object TypeWidening { * not. When Type Widening is enabled, the type of existing columns or fields can be widened * using ALTER TABLE CHANGE COLUMN. */ - def isEnabled(protocol: Protocol, metadata: Metadata): Boolean = { - val isEnabled = DeltaConfigs.ENABLE_TYPE_WIDENING.fromMetaData(metadata) + def isEnabled(protocol: AbstractProtocol, metadata: AbstractMetadata): Boolean = { + val isEnabled = DeltaConfigs.ENABLE_TYPE_WIDENING.fromMap(metadata.configuration) if (isEnabled && !isSupported(protocol)) { throw new IllegalStateException( s"Table property '${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' is " + diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala index 54d264d5cac..9d06a7f2bae 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/constraints/Constraints.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf.ValidateCheckConstraintsMode +import org.apache.spark.sql.delta.v2.interop.AbstractMetadata import org.apache.spark.SparkThrowable import org.apache.spark.internal.MDC @@ -117,7 +118,7 @@ object Constraints extends DeltaLogging { def findDependentConstraints( sparkSession: SparkSession, columnName: Seq[String], - metadata: Metadata): Map[String, String] = { + metadata: AbstractMetadata): Map[String, String] = { metadata.configuration.filter { case (key, constraint) if key.toLowerCase(Locale.ROOT).startsWith("delta.constraints.") => SchemaUtils.containsDependentExpression( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala index 137e096b88a..17b11675279 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMod import org.apache.spark.sql.delta.{RowCommitVersion, RowId} import org.apache.spark.sql.delta.ClassicColumnConversions._ import org.apache.spark.sql.delta.actions.Protocol +import org.apache.spark.sql.delta.v2.interop.AbstractProtocol import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging @@ -1656,7 +1657,7 @@ def normalizeColumnNamesInDataType( def findDependentGeneratedColumns( sparkSession: SparkSession, targetColumn: Seq[String], - protocol: Protocol, + protocol: AbstractProtocol, schema: StructType): Map[String, String] = { if (GeneratedColumn.satisfyGeneratedColumnProtocol(protocol) && GeneratedColumn.hasGeneratedColumns(schema)) { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractProtocol.scala b/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractProtocol.scala index e054d2f30c4..86aaea9b32d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractProtocol.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/v2/interop/AbstractProtocol.scala @@ -16,6 +16,8 @@ package org.apache.spark.sql.delta.v2.interop +import org.apache.spark.sql.delta.TableFeature + /** * Abstract trait for protocol actions in Delta. This trait provides a common * abstraction that can be implemented by both Spark's V1 Protocol and Kernel's Protocol @@ -51,5 +53,12 @@ trait AbstractProtocol { minWriterVersion == other.minWriterVersion && readerFeatures == other.readerFeatures && writerFeatures == other.writerFeatures + + /** + * Check if a `feature` is supported by this protocol. This means either (a) the protocol does + * not support table features but implicitly supports the feature, or (b) the protocol supports + * table features and explicitly references the feature. + */ + def isFeatureSupported(feature: TableFeature): Boolean } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/InMemoryTestTableMixin.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDSv2TestMixin.scala similarity index 51% rename from spark/src/test/scala/org/apache/spark/sql/delta/InMemoryTestTableMixin.scala rename to spark/src/test/scala/org/apache/spark/sql/delta/DeltaDSv2TestMixin.scala index edaec4b0558..1dffa08a32c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/InMemoryTestTableMixin.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDSv2TestMixin.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.test.SharedSparkSession /** * Tag for tests that access Delta internals (e.g., DeltaLog, physical scans, usage logs) * and are therefore incompatible with the DSv2 InMemoryTable test path. - * Tests tagged with this are automatically skipped when [[InMemoryTestTableMixin]] is active. + * Tests tagged with this are automatically skipped in DSv2 mode. */ case class DSv2Incompatible(reason: String) extends org.scalatest.Tag("DSv2Incompatible") @@ -32,28 +32,28 @@ case class DSv2Incompatible(reason: String) extends org.scalatest.Tag("DSv2Incom * should be implemented sometime in the future. * Not [[DSv2Incompatible]] -- that one is for tests that are completely unsupported and would * never pass with DSv2. - * Tests tagged with this are automatically skipped when [[InMemoryTestTableMixin]] is active. + * Tests tagged with this are automatically skipped in DSv2 mode. */ case class DSv2TemporarilyIncompatible(reason: String) extends org.scalatest.Tag("DSv2TemporarilyIncompatible") /** * Tag for tests that assert the physical Delta execution plan shape. - * Tests tagged with this are automatically skipped when [[InMemoryTestTableMixin]] is active. + * Tests tagged with this are automatically skipped in DSv2 mode. */ case class ChecksPhysicalDeltaPlan(reason: String = "checks physical Delta plan") extends org.scalatest.Tag("ChecksPhysicalDeltaPlan") /** * Tag for tests that inspect or mutate Delta internals such as DeltaLog or file stats. - * Tests tagged with this are automatically skipped when [[InMemoryTestTableMixin]] is active. + * Tests tagged with this are automatically skipped in DSv2 mode. */ case class ChecksDeltaInternals(reason: String = "checks Delta internals") extends org.scalatest.Tag("ChecksDeltaInternals") /** * Tag for tests that inspect Delta metrics. - * Test tagged with this are automatically skipped when [[InMemoryTestTableMixin]] is active. + * Test tagged with this are automatically skipped in DSv2 mode. */ case class ChecksDeltaMetrics(reason: String = "checks Delta metrics") extends org.scalatest.Tag("ChecksDeltaMetrics") @@ -66,34 +66,67 @@ case class ChecksDeltaMetrics(reason: String = "checks Delta metrics") case object DSv2DMLSchemaEvolution extends org.scalatest.Tag("DSv2DMLSchemaEvolution") /** - * Mixin trait that configures the session catalog to use [[InMemoryDeltaCatalog]], - * routing DML operations through Spark's V2 execution path via [[InMemorySparkTable]]. + * Shared skip logic for tests that cannot run against the DSv2 connector, used by both + * [[InMemoryTestTableMixin]] (STRICT + in-memory test table) and [[DSv2TestMixin]] (AUTO + real + * [[io.delta.spark.internal.v2.catalog.DeltaV2Table]]). Centralizes every skip reason + * so both mixins skip the same set of DSv2-incompatible tests. */ -trait InMemoryTestTableMixin extends SharedSparkSession with InMemoryTestTableMixinShims { +trait DSv2IncompatibilityTagging extends SharedSparkSession with InMemoryTestTableMixinShims { - override def sparkConf: SparkConf = super.sparkConf - .set("spark.sql.catalog.spark_catalog", classOf[InMemoryDeltaCatalog].getName) + /** + * Extra tags appended to every test before evaluating skip reasons. Defaults to none; may be + * overridden by mixins that need to add suite-wide tags. + */ + protected def additionalDSv2TestTags: Seq[org.scalatest.Tag] = Seq.empty - override protected def test - (testName: String, testTags: org.scalatest.Tag*) - (testFun: => Any) - (implicit pos: org.scalactic.source.Position): Unit = { + /** + * Returns a `(tagName, reason)` pair when the test is tagged as incompatible with the DSv2 + * connector, or `None` when the test can run. Covers: [[DSv2Incompatible]] (fundamentally + * unsupported), [[DSv2TemporarilyIncompatible]] (not implemented yet), + * [[ChecksPhysicalDeltaPlan]] and [[ChecksDeltaInternals]] (assert V1 plan/internals), and + * [[DSv2DMLSchemaEvolution]] (only on Spark 4.2+). + */ + protected def dsv2SkipReason( + testTags: Seq[org.scalatest.Tag]): Option[(String, String)] = testTags.collectFirst { - case t: DSv2Incompatible => - "DSv2Incompatible" -> t.reason - case t: DSv2TemporarilyIncompatible => - "DSv2TemporarilyIncompatible" -> t.reason - case t: ChecksPhysicalDeltaPlan => - "ChecksPhysicalDeltaPlan" -> t.reason - case t: ChecksDeltaInternals => - "ChecksDeltaInternals" -> t.reason + case t: DSv2Incompatible => "DSv2Incompatible" -> t.reason + case t: DSv2TemporarilyIncompatible => "DSv2TemporarilyIncompatible" -> t.reason + case t: ChecksPhysicalDeltaPlan => "ChecksPhysicalDeltaPlan" -> t.reason + case t: ChecksDeltaInternals => "ChecksDeltaInternals" -> t.reason case DSv2DMLSchemaEvolution if !v2DmlSchemaEvolutionSupported => "DSV2DMLSchemaEvolution" -> "DSv2 DML schema evolution not supported on this spark version" - } match { + } + + /** + * Shared `test` override for all DSv2 test mixins: appends [[additionalDSv2TestTags]], then skips + * tests that [[dsv2SkipReason]] marks incompatible with the DSv2 connector and runs the rest. + */ + override protected def test(testName: String, testTags: org.scalatest.Tag*)(testFun: => Any)( + implicit pos: org.scalactic.source.Position): Unit = { + val allTags = testTags ++ additionalDSv2TestTags + dsv2SkipReason(allTags) match { case Some((tagName, reason)) => - ignore(testName + s" ($tagName: $reason)", testTags: _*)(testFun) + ignore(testName + s" ($tagName: $reason)", allTags: _*)(testFun) case None => - super.test(testName, testTags: _*)(testFun) + super.test(testName, allTags: _*)(testFun) } } } + +/** + * Mixin trait that configures the session catalog to use [[InMemoryDeltaCatalog]], + * routing DML operations through Spark's V2 execution path via [[InMemorySparkTable]]. + */ +trait InMemoryTestTableMixin extends SharedSparkSession with DSv2IncompatibilityTagging { + + override def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.catalog.spark_catalog", classOf[InMemoryDeltaCatalog].getName) + +} + +/** Mixin for test suites that exercise the real DSv2 Delta connector. */ +trait DeltaDSv2TestMixin extends SharedSparkSession with DSv2IncompatibilityTagging { + + protected override def sparkConf: SparkConf = + super.sparkConf.set(DeltaSQLConf.V2_ENABLE_MODE.key, "STRICT") +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala index dfac9a427dd..e8338f08461 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala @@ -416,7 +416,7 @@ trait DeltaInsertIntoTest } /** Collects all the types of insert previously defined. */ - protected lazy val allInsertTypes: Set[Insert] = Set( + protected def allInsertTypes: Set[Insert] = Set( SQLInsertOverwriteReplaceWhere, SQLInsertOverwritePartitionByPosition, SQLInsertOverwritePartitionColList, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/InMemorySparkTable.scala b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/InMemorySparkTable.scala index 3ff5260ddee..7c24a8b2a3c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/catalog/InMemorySparkTable.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/catalog/InMemorySparkTable.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta.catalog import java.util -import org.apache.spark.sql.delta.SparkTableShims +import io.delta.spark.internal.v2.catalog.DeltaV2TableShims import org.apache.spark.sql.connector.catalog.{InMemoryRowLevelOperationTable, TableCapability} import org.apache.spark.sql.connector.expressions.Transform @@ -41,7 +41,8 @@ class InMemorySparkTable( override def capabilities(): util.Set[TableCapability] = { val caps = new util.HashSet[TableCapability](super.capabilities()) - SparkTableShims.schemaEvolutionCapability.foreach(caps.add) + val schemaEvolution = DeltaV2TableShims.schemaEvolutionCapability() + if (schemaEvolution.isPresent) caps.add(schemaEvolution.get) caps } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala index 2f8676c6008..7ee97e8a829 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/sources/DeltaSourceMetadataEvolutionSupportSuite.scala @@ -16,7 +16,7 @@ package org.apache.spark.sql.delta.sources -import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaColumnMappingMode, DeltaOptions} +import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaColumnMappingMode, DeltaOptions, TableFeature} import org.apache.spark.sql.delta.{DeltaTestUtilsBase, DeltaThrowable, NoMapping} import org.apache.spark.sql.delta.v2.interop.{AbstractMetadata, AbstractProtocol} @@ -694,6 +694,7 @@ class DeltaSourceMetadataEvolutionSupportSuite override def minWriterVersion: Int = writerV override def readerFeatures: Option[Set[String]] = readerFs override def writerFeatures: Option[Set[String]] = writerFs + override def isFeatureSupported(feature: TableFeature): Boolean = false } val readMetadata = mkMetadata() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningConstraintsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningConstraintsSuite.scala index 8393d16435d..4e1aaf5c532 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningConstraintsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningConstraintsSuite.scala @@ -16,10 +16,10 @@ package org.apache.spark.sql.delta.typewidening -import org.apache.spark.sql.delta.DeltaAnalysisException -import org.apache.spark.sql.delta.DeltaDMLTestUtilsNameBased +import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.SparkArithmeticException import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ @@ -32,10 +32,47 @@ class TypeWideningConstraintsSuite extends QueryTest with TypeWideningTestMixin with DeltaDMLTestUtilsNameBased - with TypeWideningConstraintsTests + with TypeWideningConstraintsTests { + + // Valid DSv1 vs. DSv2 behavior difference: DSv1 rejects widening a map key/value referenced by + // a CHECK constraint up front. See DSv2 below. + override protected def checkMapConstraintTypeEvolutionError( + field: String)(insert: => Unit): Unit = + checkError( + intercept[DeltaAnalysisException](insert), + "DELTA_CONSTRAINT_DATA_TYPE_MISMATCH", + parameters = Map( + "columnName" -> field, + "columnType" -> "TINYINT", + "dataType" -> "INT", + "constraints" -> "delta.constraints.ck -> s . arr [ 0 ] [ 3 ] = 3" + )) +} + +/** Runs the type widening constraint tests against DSv2. */ +class TypeWideningConstraintsDSv2Suite + extends QueryTest + with TypeWideningDSv2TestMixin + with TypeWideningConstraintsTests { + + // Valid DSv1 vs. DSv2 behavior difference: DSv2 skips evolving an unsupported map key/value + // instead of a hard-fail. The write then fails due to a cast overflow since the field isn't + // widened. See DSv1 above. + override protected def checkMapConstraintTypeEvolutionError( + field: String)(insert: => Unit): Unit = + checkError( + intercept[SparkArithmeticException](insert), + "CAST_OVERFLOW_IN_TABLE_INSERT", + parameters = Map( + "sourceType" -> "\"INT\"", + "targetType" -> "\"TINYINT\"", + "columnName" -> field.split('.').map(part => s"`$part`").mkString("."))) +} trait TypeWideningConstraintsTests { self: QueryTest with TypeWideningTestMixin => + protected def checkMapConstraintTypeEvolutionError(field: String)(insert: => Unit): Unit + test("not null constraint with type change") { withTable("t") { sql("CREATE TABLE t (a byte NOT NULL) USING DELTA") @@ -234,36 +271,18 @@ trait TypeWideningConstraintsTests { self: QueryTest with TypeWideningTestMixin withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") { // Insert by name is not supported by type evolution. - checkError( - intercept[DeltaAnalysisException] { - // Migrate map's key to int type. - spark.createDataFrame(Seq(Tuple1(Tuple1(Array(Map(999999 -> 1, 3 -> 3)))))) - .toDF("s").withColumn("s", col("s").cast("struct>>")) - .write.format("delta").mode("append").saveAsTable("t") - }, - "DELTA_CONSTRAINT_DATA_TYPE_MISMATCH", - parameters = Map( - "columnName" -> "s.arr.element.key", - "columnType" -> "TINYINT", - "dataType" -> "INT", - "constraints" -> "delta.constraints.ck -> s . arr [ 0 ] [ 3 ] = 3" - ) - ) - checkError( - intercept[DeltaAnalysisException] { - // Migrate map's value to int type. - spark.createDataFrame(Seq(Tuple1(Tuple1(Array(Map(1 -> 999999, 3 -> 3)))))) - .toDF("s").withColumn("s", col("s").cast("struct>>")) - .write.format("delta").mode("append").saveAsTable("t") - }, - "DELTA_CONSTRAINT_DATA_TYPE_MISMATCH", - parameters = Map( - "columnName" -> "s.arr.element.value", - "columnType" -> "TINYINT", - "dataType" -> "INT", - "constraints" -> "delta.constraints.ck -> s . arr [ 0 ] [ 3 ] = 3" - ) - ) + checkMapConstraintTypeEvolutionError("s.arr.element.key") { + // Migrate map's key to int type. + spark.createDataFrame(Seq(Tuple1(Tuple1(Array(Map(999999 -> 1, 3 -> 3)))))) + .toDF("s").withColumn("s", col("s").cast("struct>>")) + .write.format("delta").mode("append").saveAsTable("t") + } + checkMapConstraintTypeEvolutionError("s.arr.element.value") { + // Migrate map's value to int type. + spark.createDataFrame(Seq(Tuple1(Tuple1(Array(Map(1 -> 999999, 3 -> 3)))))) + .toDF("s").withColumn("s", col("s").cast("struct>>")) + .write.format("delta").mode("append").saveAsTable("t") + } } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningGeneratedColumnsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningGeneratedColumnsSuite.scala index 3593e5b0bf8..796a4679312 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningGeneratedColumnsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningGeneratedColumnsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.typewidening import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.SparkArithmeticException import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ @@ -31,11 +32,51 @@ class TypeWideningGeneratedColumnsSuite with TypeWideningTestMixin with DeltaDMLTestUtilsNameBased with GeneratedColumnTest - with TypeWideningGeneratedColumnTests + with TypeWideningGeneratedColumnTests { + + // Valid DSv1 vs. DSv2 behavior difference: DSv1 rejects widening a map key/value referenced by + // a generated column up front. See DSv2 below. + override protected def checkArrayElementGeneratedColumnError(insert: => Unit): Unit = + checkError( + intercept[DeltaAnalysisException](insert), + "DELTA_GENERATED_COLUMNS_DATA_TYPE_MISMATCH", + parameters = Map( + "columnName" -> "a.element", + "columnType" -> "TINYINT", + "dataType" -> "INT", + "generatedColumns" -> "gen -> hash(a[0])")) +} + +/** Runs the type widening generated column tests against DSv2. */ +class TypeWideningGeneratedColumnsDSv2Suite + extends QueryTest + with TypeWideningDSv2TestMixin + with GeneratedColumnTest + with TypeWideningGeneratedColumnTests { + + // Valid DSv1 vs. DSv2 behavior difference: DSv2 skips evolving an unsupported map key/value + // instead of a hard-fail. The write then fails due to a cast overflow since the field isn't + // widened. See DSv1 above. + override protected def checkArrayElementGeneratedColumnError(insert: => Unit): Unit = + checkError( + intercept[SparkArithmeticException](insert), + "CAST_OVERFLOW_IN_TABLE_INSERT", + parameters = Map( + "sourceType" -> "\"INT\"", + "targetType" -> "\"TINYINT\"", + "columnName" -> "`a`.`element`")) +} trait TypeWideningGeneratedColumnTests extends GeneratedColumnTest { self: QueryTest with TypeWideningTestMixin => + /** + * Asserts the error raised when `insert` attempts to widen the array element referenced by the + * generated column to an out-of-range int. The failure differs between the DSv1 and DSv2 write + * paths, so each suite overrides this with the matching assertion. + */ + protected def checkArrayElementGeneratedColumnError(insert: => Unit): Unit + test("generated column with type change") { withTable("t") { createTable( @@ -197,20 +238,11 @@ trait TypeWideningGeneratedColumnTests extends GeneratedColumnTest { withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") { // Insert by name is not supported by type evolution. - checkError( - intercept[DeltaAnalysisException] { - spark.createDataFrame(Seq(Tuple1(Array(200000, 12345)))) - .toDF("a").withColumn("a", col("a").cast("array")) - .write.format("delta").mode("append").saveAsTable("t") - }, - "DELTA_GENERATED_COLUMNS_DATA_TYPE_MISMATCH", - parameters = Map( - "columnName" -> "a.element", - "columnType" -> "TINYINT", - "dataType" -> "INT", - "generatedColumns" -> "gen -> hash(a[0])" - ) - ) + checkArrayElementGeneratedColumnError { + spark.createDataFrame(Seq(Tuple1(Array(200000, 12345)))) + .toDF("a").withColumn("a", col("a").cast("array")) + .write.format("delta").mode("append").saveAsTable("t") + } checkAnswer(sql("SELECT gen FROM t"), Row(1765031574)) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionBasicSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionBasicSuite.scala index 2a16a3b2334..0ff5e67e991 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionBasicSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionBasicSuite.scala @@ -39,7 +39,18 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap class TypeWideningInsertSchemaEvolutionBasicSuite extends QueryTest with TypeWideningTestMixin - with DeltaDMLTestUtilsNameBased + with TypeWideningInsertSchemaEvolutionBasicTests { + + protected override def sparkConf: SparkConf = { + super.sparkConf + .set(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "true") + } +} + +/** Runs the type widening INSERT schema evolution tests against DSv2. */ +class TypeWideningInsertSchemaEvolutionBasicDSv2Suite + extends QueryTest + with TypeWideningDSv2TestMixin with TypeWideningInsertSchemaEvolutionBasicTests { protected override def sparkConf: SparkConf = { @@ -91,35 +102,38 @@ trait TypeWideningInsertSchemaEvolutionBasicTests s"${testCase.fromType.sql} -> ${testCase.toType.sql}") { append(testCase.initialValuesDF) - withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> StoreAssignmentPolicy.LEGACY.toString, - DeltaSQLConf.DELTA_ALLOW_AUTOMATIC_WIDENING.key -> "never") { - testCase.additionalValuesDF - .write - .format("delta") - .mode("append") - .option("mergeSchema", "true") - .insertInto(tableSQLIdentifier) + withSQLConf(DeltaSQLConf.DELTA_ALLOW_AUTOMATIC_WIDENING.key -> "never") { + mayOverflow { + testCase.additionalValuesDF + .write + .format("delta") + .mode("append") + .option("mergeSchema", "true") + .insertInto(tableSQLIdentifier) + } } assert(readDeltaTableByIdentifier().schema("value").dataType === testCase.fromType) } } - test(s"INSERT - logs for missed opportunity for conversion") { + test(s"INSERT - logs for missed opportunity for conversion", + DSv2TemporarilyIncompatible("no usage logs on the DSv2 write path")) { val testCase = restrictedAutomaticWideningTestCases.head append(testCase.initialValuesDF) + val events = Log4jUsageLogger.track { - withSQLConf( - SQLConf.STORE_ASSIGNMENT_POLICY.key -> StoreAssignmentPolicy.LEGACY.toString, - DeltaSQLConf.DELTA_ALLOW_AUTOMATIC_WIDENING.key -> "same_family_type") { - testCase.additionalValuesDF - .write - .format("delta") - .mode("append") - .option("mergeSchema", "true") - .insertInto(tableSQLIdentifier) + withSQLConf(DeltaSQLConf.DELTA_ALLOW_AUTOMATIC_WIDENING.key -> "same_family_type") { + mayOverflow { + testCase.additionalValuesDF + .write + .format("delta") + .mode("append") + .option("mergeSchema", "true") + .insertInto(tableSQLIdentifier) + } } } @@ -128,18 +142,19 @@ trait TypeWideningInsertSchemaEvolutionBasicTests event.tags.get("opType") == Option("delta.typeWidening.missedAutomaticWidening"))) } - test(s"INSERT - no logs for lack of missed opportunity for conversion") { + test(s"INSERT - no logs for lack of missed opportunity for conversion", + DSv2TemporarilyIncompatible("usage logging differs on the DSv2 write path")) { val testCase = supportedTestCases.head append(testCase.initialValuesDF) val events = Log4jUsageLogger.track { - withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> StoreAssignmentPolicy.LEGACY.toString) { + mayOverflow { testCase.additionalValuesDF - .write - .format("delta") - .mode("append") - .option("mergeSchema", "true") - .insertInto(tableSQLIdentifier) + .write + .format("delta") + .mode("append") + .option("mergeSchema", "true") + .insertInto(tableSQLIdentifier) } } @@ -174,14 +189,13 @@ trait TypeWideningInsertSchemaEvolutionBasicTests s"${testCase.fromType.sql} -> ${testCase.toType.sql}") { append(testCase.initialValuesDF) // Test cases for some of the unsupported type changes may overflow while others only have - // values that can be implicitly cast to the narrower type - e.g. double ->float. - // We set storeAssignmentPolicy to LEGACY to ignore overflows, this test only ensures - // that the table schema didn't evolve. - withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> StoreAssignmentPolicy.LEGACY.toString) { + // values that can be implicitly cast to the narrower type - e.g. double -> float. The write + // either succeeds or overflows; this test only ensures that the table schema didn't evolve. + mayOverflow { testCase.additionalValuesDF.write.mode("append") .insertInto(tableSQLIdentifier) - assert(readDeltaTableByIdentifier().schema("value").dataType === testCase.fromType) } + assert(readDeltaTableByIdentifier().schema("value").dataType === testCase.fromType) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionExtendedSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionExtendedSuite.scala index ac6c767f247..e7ffaf528fa 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionExtendedSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionExtendedSuite.scala @@ -29,6 +29,15 @@ class TypeWideningInsertSchemaEvolutionExtendedSuite with DeltaDMLTestUtilsNameBased with TypeWideningInsertSchemaEvolutionExtendedTests +/** Runs the extended type widening INSERT schema evolution tests against DSv2. */ +class TypeWideningInsertSchemaEvolutionExtendedDSv2Suite + extends QueryTest + with TypeWideningDSv2TestMixin + with TypeWideningInsertSchemaEvolutionExtendedTests { + // Schema evolution isn't supported yet for streaming writes in DSv2. + protected override def allInsertTypes: Set[Insert] = super.allInsertTypes - StreamingInsert +} + trait TypeWideningInsertSchemaEvolutionExtendedTests extends DeltaInsertIntoTest with TypeWideningTestCases { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMergeIntoSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMergeIntoSchemaEvolutionSuite.scala index 7b4455c8504..dc7ad7c8002 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMergeIntoSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMergeIntoSchemaEvolutionSuite.scala @@ -21,8 +21,6 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.SparkConf import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy import org.apache.spark.sql.types._ /** @@ -31,8 +29,7 @@ import org.apache.spark.sql.types._ */ class TypeWideningMergeIntoSchemaEvolutionSuite extends TypeWideningMergeIntoSchemaEvolutionTests - with TypeWideningTestMixin - with DeltaDMLTestUtilsNameBased { + with TypeWideningTestMixin { protected override def sparkConf: SparkConf = { super.sparkConf @@ -78,15 +75,16 @@ trait TypeWideningMergeIntoSchemaEvolutionTests extends QueryTest sql("CREATE TABLE source (a int) USING DELTA") sql("INSERT INTO source VALUES (1), (2)") - withSQLConf(DeltaSQLConf.DELTA_ALLOW_AUTOMATIC_WIDENING.key -> "never", - SQLConf.STORE_ASSIGNMENT_POLICY.key -> StoreAssignmentPolicy.LEGACY.toString) { + withSQLConf(DeltaSQLConf.DELTA_ALLOW_AUTOMATIC_WIDENING.key -> "never") { // Merge int values into short column. This should not widen the target schema. - executeMerge( - tgt = s"$tableSQLIdentifier t", - src = "source", - cond = "0 = 1", - clauses = insert("*") - ) + mayOverflow { + executeMerge( + tgt = s"$tableSQLIdentifier t", + src = "source", + cond = "0 = 1", + clauses = insert("*") + ) + } assert(readDeltaTableByIdentifier().schema("a").dataType === ShortType) } } @@ -128,17 +126,16 @@ trait TypeWideningMergeIntoSchemaEvolutionTests extends QueryTest append(testCase.initialValuesDF) // Test cases for some of the unsupported type changes may overflow while others only have - // values that can be implicitly cast to the narrower type - e.g. double ->float. - // We set storeAssignmentPolicy to LEGACY to ignore overflows, this test only ensures - // that the table schema didn't evolve. - withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> StoreAssignmentPolicy.LEGACY.toString) { + // values that can be implicitly cast to the narrower type - e.g. double -> float. The merge + // either succeeds or overflows; this test only ensures that the table schema didn't evolve. + mayOverflow { executeMerge( tgt = s"$tableSQLIdentifier t", src = "source", cond = "0 = 1", clauses = insert("*")) - assert(readDeltaTableByIdentifier().schema("value").dataType === testCase.fromType) } + assert(readDeltaTableByIdentifier().schema("value").dataType === testCase.fromType) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTestMixin.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTestMixin.scala index 1823f021a83..fed617d7d4e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTestMixin.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTestMixin.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.util.DeltaFileOperations import com.google.common.math.DoubleMath -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkThrowable} import org.apache.spark.sql.{DataFrame, Encoder, QueryTest} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.functions.col @@ -80,6 +80,21 @@ trait TypeWideningTestMixin TypeWidening.isEnabled(snapshot.protocol, snapshot.metadata) } + /** + * Runs `write`, tolerating a cast overflow failure. Used in tests that want to check that column + * doesn't get automatically widened. Depending on the data, the write may succeed or overflow. + */ + protected def mayOverflow(write: => Unit): Unit = { + try { + write + } catch { + case e: ArithmeticException with SparkThrowable if Set( + "CAST_OVERFLOW_IN_TABLE_INSERT", + "CAST_OVERFLOW", + "DELTA_CAST_OVERFLOW_IN_TABLE_WRITE").contains(e.getCondition) => + } + } + /** Short-hand to create type widening metadata for struct fields. */ protected def typeWideningMetadata( from: AtomicType, @@ -129,6 +144,20 @@ trait TypeWideningTestMixin } } +/** + * Mixin for type widening test suites that run against the DSv2 Delta connector. + * Use name-based access as path-based isn't well-supported in DSv2. + */ +trait TypeWideningDSv2TestMixin + extends TypeWideningTestMixin + with DeltaDSv2TestMixin + with DeltaDMLTestUtilsNameBased { self: QueryTest => + + protected override def sparkConf: SparkConf = { + super.sparkConf.set(DeltaSQLConf.V2_ENABLE_MODE.key, "AUTO") + } +} + /** * Mixin trait containing helpers to test dropping the type widening table feature. */ diff --git a/spark/src/main/scala-shims/spark-4.2/SparkTableShims.scala b/spark/v2/src/main/java-shims/spark-4.0/io/delta/spark/internal/v2/catalog/DeltaV2TableShims.java similarity index 57% rename from spark/src/main/scala-shims/spark-4.2/SparkTableShims.scala rename to spark/v2/src/main/java-shims/spark-4.0/io/delta/spark/internal/v2/catalog/DeltaV2TableShims.java index d797e752b5c..7a12e3ac6b2 100644 --- a/spark/src/main/scala-shims/spark-4.2/SparkTableShims.scala +++ b/spark/v2/src/main/java-shims/spark-4.0/io/delta/spark/internal/v2/catalog/DeltaV2TableShims.java @@ -14,15 +14,19 @@ * limitations under the License. */ -package org.apache.spark.sql.delta +package io.delta.spark.internal.v2.catalog; -import org.apache.spark.sql.connector.catalog.TableCapability +import java.util.Optional; +import org.apache.spark.sql.connector.catalog.TableCapability; /** - * Shim to build [[DeltaV2Table]] against different Spark versions. - * This is the shim for the latest version - Spark 4.2. + * Shim to build the DSv2 Delta table connector against different Spark versions. + * This is the shim for Spark 4.0. */ -object SparkTableShims { - val schemaEvolutionCapability: Option[TableCapability] = - Some(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION) +public abstract class DeltaV2TableShims { + + /** No schema evolution capability in DSv2 before Spark 4.2. */ + public static Optional schemaEvolutionCapability() { + return Optional.empty(); + } } diff --git a/spark/src/main/scala-shims/spark-4.0/SparkTableShims.scala b/spark/v2/src/main/java-shims/spark-4.1/io/delta/spark/internal/v2/catalog/DeltaV2TableShims.java similarity index 56% rename from spark/src/main/scala-shims/spark-4.0/SparkTableShims.scala rename to spark/v2/src/main/java-shims/spark-4.1/io/delta/spark/internal/v2/catalog/DeltaV2TableShims.java index d44f141607f..2a211ae4b10 100644 --- a/spark/src/main/scala-shims/spark-4.0/SparkTableShims.scala +++ b/spark/v2/src/main/java-shims/spark-4.1/io/delta/spark/internal/v2/catalog/DeltaV2TableShims.java @@ -14,12 +14,19 @@ * limitations under the License. */ -package org.apache.spark.sql.delta +package io.delta.spark.internal.v2.catalog; -import org.apache.spark.sql.connector.catalog.TableCapability +import java.util.Optional; +import org.apache.spark.sql.connector.catalog.TableCapability; -/** Shim to build [[DeltaV2Table]] against different Spark versions. */ -object SparkTableShims { - // Capability [[TableCapability.AUTOMATIC_SCHEMA_EVOLUTION]] is not available in Spark 4.0. - val schemaEvolutionCapability: Option[TableCapability] = None +/** + * Shim to build the DSv2 Delta table connector against different Spark versions. + * This is the shim for Spark 4.1. + */ +public abstract class DeltaV2TableShims { + + /** No schema evolution capability in DSv2 before Spark 4.2. */ + public static Optional schemaEvolutionCapability() { + return Optional.empty(); + } } diff --git a/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/catalog/DeltaV2TableShims.java b/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/catalog/DeltaV2TableShims.java new file mode 100644 index 00000000000..e23c7990512 --- /dev/null +++ b/spark/v2/src/main/java-shims/spark-4.2/io/delta/spark/internal/v2/catalog/DeltaV2TableShims.java @@ -0,0 +1,66 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.spark.internal.v2.catalog; + +import org.apache.spark.sql.delta.ColumnTypeChangeSupport; +import org.apache.spark.sql.delta.v2.interop.AbstractMetadata; +import org.apache.spark.sql.delta.v2.interop.AbstractProtocol; +import io.delta.spark.internal.v2.utils.ScalaUtils; +import java.util.Optional; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.SupportsSchemaEvolution; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.catalog.TableChange; + +/** + * Shim to build the DSv2 Delta table connector against different Spark versions. + * This is the shim for Spark 4.2: + * - Schema evolution support in DSV2 (add columns, change type) was added in Spark 4.2. + */ +public abstract class DeltaV2TableShims implements SupportsSchemaEvolution { + + /** Implemented in DeltaV2Table to provide access to the table protocol/metadata. */ + protected abstract AbstractProtocol protocol(); + protected abstract AbstractMetadata metadata(); + + /** + * Whether an analyzer-proposed schema-evolution column change (adding a column or widening a + * column type) can be applied to this Delta table. Adding a column is always allowed; changing a + * column type delegates to the connector-agnostic {@link ColumnTypeChangeSupport}. + */ + @Override + public boolean supportsColumnChange(TableChange.ColumnChange change) { + if (change instanceof TableChange.AddColumn) { + return true; + } + if (change instanceof TableChange.UpdateColumnType) { + TableChange.UpdateColumnType typeChange = (TableChange.UpdateColumnType) change; + return ColumnTypeChangeSupport.supportsTypeChange( + SparkSession.active(), + protocol(), + metadata(), + ScalaUtils.toScalaList(typeChange.fieldNames()), + typeChange.newDataType()); + } + return false; + } + + /** The schema evolution capability to advertise. */ + public static Optional schemaEvolutionCapability() { + return Optional.of(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION); + } +} diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/adapters/KernelProtocolAdapter.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/adapters/KernelProtocolAdapter.java index 86713cb9ef3..f06ebeb7f5e 100644 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/adapters/KernelProtocolAdapter.java +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/adapters/KernelProtocolAdapter.java @@ -17,6 +17,8 @@ import io.delta.kernel.internal.actions.Protocol; import java.util.Objects; +import org.apache.spark.sql.delta.TableFeature; +import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils; import org.apache.spark.sql.delta.v2.interop.AbstractProtocol; import scala.Option; import scala.collection.immutable.Set; @@ -69,4 +71,27 @@ public Option> writerFeatures() { } return cachedWriterFeatures; } + + /** + * Check if a {@code feature} is supported by this protocol. This means either (a) the protocol + * does not support table features but implicitly supports the feature, or (b) the protocol + * supports table features and explicitly references the feature. + */ + @Override + public boolean isFeatureSupported(TableFeature feature) { + boolean supportsReaderFeatures = + minReaderVersion() >= TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION(); + boolean supportsWriterFeatures = + minWriterVersion() >= TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION(); + boolean explicitlySupported = + (!readerFeatures().isEmpty() && readerFeatures().get().contains(feature.name())) + || (!writerFeatures().isEmpty() && writerFeatures().get().contains(feature.name())); + boolean implicitlySupported = + feature.isLegacyFeature() + && !supportsReaderFeatures + && minReaderVersion() >= feature.minReaderVersion() + && !supportsWriterFeatures + && minWriterVersion() >= feature.minWriterVersion(); + return explicitlySupported || implicitlySupported; + } } diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java index 3ca3b495cf5..92d4b539843 100644 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/catalog/DeltaV2Table.java @@ -25,6 +25,8 @@ import io.delta.kernel.engine.Engine; import io.delta.kernel.internal.SnapshotImpl; import io.delta.kernel.internal.rowtracking.RowTracking; +import io.delta.spark.internal.v2.adapters.KernelMetadataAdapter; +import io.delta.spark.internal.v2.adapters.KernelProtocolAdapter; import io.delta.spark.internal.v2.read.MetadataEvolutionHandler; import io.delta.spark.internal.v2.read.SparkScanBuilder; import io.delta.spark.internal.v2.read.cdc.CDCSchemaContext; @@ -66,10 +68,11 @@ import org.apache.spark.sql.delta.DeltaTableUtils; import org.apache.spark.sql.delta.RowCommitVersion$; import org.apache.spark.sql.delta.RowId$; -import org.apache.spark.sql.delta.SparkTableShims$; import org.apache.spark.sql.delta.catalog.DeltaV2TableMarker; import org.apache.spark.sql.delta.commands.cdc.CDCReader; import org.apache.spark.sql.delta.sources.PersistedMetadata; +import org.apache.spark.sql.delta.v2.interop.AbstractMetadata; +import org.apache.spark.sql.delta.v2.interop.AbstractProtocol; import org.apache.spark.sql.execution.datasources.FileFormat$; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; @@ -79,7 +82,7 @@ import scala.jdk.javaapi.CollectionConverters; /** DataSource V2 Table implementation for Delta Lake using the Delta Kernel API. */ -public class DeltaV2Table +public class DeltaV2Table extends DeltaV2TableShims implements Table, SupportsRead, SupportsWrite, SupportsMetadataColumns, DeltaV2TableMarker { private static final String METADATA_COLUMN_NAME = FileFormat$.MODULE$.METADATA_NAME(); private static final String ROW_ID_METADATA_FIELD_NAME = RowId$.MODULE$.ROW_ID(); @@ -94,11 +97,7 @@ private static Set buildCapabilities() { TableCapability.BATCH_READ, TableCapability.MICRO_BATCH_READ, TableCapability.BATCH_WRITE); - scala.Option schemaEvolution = - SparkTableShims$.MODULE$.schemaEvolutionCapability(); - if (schemaEvolution.isDefined()) { - caps.add(schemaEvolution.get()); - } + DeltaV2TableShims.schemaEvolutionCapability().ifPresent(caps::add); return Collections.unmodifiableSet(caps); } @@ -376,6 +375,16 @@ public Set capabilities() { return CAPABILITIES; } + /** The table protocol from the initial snapshot. */ + protected AbstractProtocol protocol() { + return new KernelProtocolAdapter(((SnapshotImpl) initialSnapshot).getProtocol()); + } + + /** The table metadata from the initial snapshot. */ + protected AbstractMetadata metadata() { + return new KernelMetadataAdapter(((SnapshotImpl) initialSnapshot).getMetadata()); + } + /** * Exposes file-level and row-tracking metadata via a single DSv2 metadata struct column. * diff --git a/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/ScalaUtils.java b/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/ScalaUtils.java index dc097ec8f4d..5bb3bf3898c 100644 --- a/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/ScalaUtils.java +++ b/spark/v2/src/main/java/io/delta/spark/internal/v2/utils/ScalaUtils.java @@ -15,11 +15,13 @@ */ package io.delta.spark.internal.v2.utils; +import java.util.Arrays; import java.util.Collections; import java.util.Map; import java.util.Optional; import scala.Option; import scala.Tuple2; +import scala.collection.JavaConverters; import scala.collection.immutable.Map$; import scala.collection.mutable.Builder; import scala.jdk.javaapi.CollectionConverters; @@ -51,6 +53,10 @@ public static Map toJavaMap( return CollectionConverters.asJava(scalaMap); } + public static scala.collection.immutable.List toScalaList(T[] values) { + return JavaConverters.asScalaBuffer(Arrays.asList(values)).toList(); + } + /** * Converts a Java {@link Optional} to a Scala {@link Option}. *