Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.delta.commands

// scalastyle:off import.ordering.noEmptyLine
import java.util.Locale
import java.util.concurrent.TimeUnit

import scala.util.Try
Expand Down Expand Up @@ -48,7 +49,7 @@ import org.apache.spark.sql.execution.command.{LeafRunnableCommand, RunnableComm
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -813,10 +814,73 @@ case class CreateDeltaTableCommand(
// Unity Catalog table id stored in `io.unitycatalog.tableId`.
newMetadata = newMetadata.copy(id = txn.snapshot.metadata.id)
}

// Carry over table and column comments from the old table when the new DDL
// does not explicitly specify them.
if (sparkSession.conf.get(DeltaSQLConf.RETAIN_COMMENTS_DURING_REPLACE_TABLE)) {
if (newMetadata.description == null) {
newMetadata = newMetadata.copy(description = txn.metadata.description)
}
val updatedSchema = carryOverStructTypeComments(txn.metadata.schema, newMetadata.schema)
newMetadata = newMetadata.copy(schemaString = updatedSchema.json)
}

txn.updateMetadataForNewTableInReplace(newMetadata)
}
}

/**
* Carries over struct type comments from the old schema to the new schema
* for fields matched by name (case-insensitive), when the new field does not
* already have an explicit comment. Recurses into nested struct types,
* including structs nested inside ArrayType and MapType at arbitrary depth.
*/
private def carryOverStructTypeComments(
oldStruct: StructType,
newStruct: StructType): StructType = {
val oldFieldsByName = oldStruct.fields
.map(field => field.name.toLowerCase(Locale.ROOT) -> field)
.toMap
StructType(newStruct.fields.map { newField =>
val matched = oldFieldsByName.get(newField.name.toLowerCase(Locale.ROOT))
// Carry over comment if new field doesn't have one and old field does
val fieldWithComment = if (!newField.metadata.contains("comment")) {
matched.flatMap(_.getComment()) match {
case Some(c) => newField.withComment(c)
case None => newField
}
} else {
newField
}
// Recurse into nested types to carry over comments on inner struct fields
val newDataType = matched
.map(field =>
carryOverDataTypeComments(field.dataType, fieldWithComment.dataType)
)
.getOrElse(fieldWithComment.dataType)
fieldWithComment.copy(dataType = newDataType)
})
}

/**
* Recursively carries over column comments through nested DataTypes.
* Handles StructType, ArrayType, and MapType at arbitrary nesting depth.
*/
private def carryOverDataTypeComments(oldType: DataType, newType: DataType): DataType = {
(oldType, newType) match {
case (oldStruct: StructType, newStruct: StructType) =>
carryOverStructTypeComments(oldStruct, newStruct)
case (ArrayType(oldElem, _), ArrayType(newElem, nullable)) =>
ArrayType(carryOverDataTypeComments(oldElem, newElem), nullable)
case (MapType(oldK, oldV, _), MapType(newK, newV, nullable)) =>
MapType(carryOverDataTypeComments(oldK, newK),
carryOverDataTypeComments(oldV, newV),
nullable
)
case _ => newType
}
}

/** Returns true if the current operation could be replacing a table. */
private def isReplace: Boolean = {
operation == TableCreationModes.CreateOrReplace ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2919,6 +2919,17 @@ trait DeltaSQLConfBase extends DeltaSQLConfUtils {
.booleanConf
.createWithDefault(true)

val RETAIN_COMMENTS_DURING_REPLACE_TABLE =
buildConf("retainCommentsDuringReplace")
.internal()
.doc(
"""
|If enabled, CREATE OR REPLACE TABLE retains table and column comments
|from the old table when the new DDL does not explicitly specify them.
|""".stripMargin)
.booleanConf
.createWithDefault(true)

val ALLOW_COLUMN_MAPPING_REMOVAL =
buildConf("columnMapping.allowRemoval")
.internal()
Expand Down
Loading
Loading