diff --git a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala index 21bd620f8fa..ed8c3c9706e 100644 --- a/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala +++ b/iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala @@ -872,8 +872,12 @@ class IcebergConverter targetSnapshot, "delta.iceberg.conversion.unsupportedActions", data = Map( - "version" -> targetSnapshot.version, - "commitInfo" -> commitInfo.map(_.operation).getOrElse(""), + // The latest snapshot version targeted by this conversion. + "latestVersion" -> targetSnapshot.version, + // The Delta version of the specific commit that failed to convert. This is the + // commit currently being translated, not the latest snapshot being synced. + "offendingVersion" -> deltaVersion, + "operation" -> commitInfo.map(_.operation).getOrElse(""), "hasAdd" -> addFiles.nonEmpty.toString, "hasRemove" -> removeFiles.nonEmpty.toString, "dataChange" -> dataChange.toString, @@ -882,8 +886,9 @@ class IcebergConverter ) logError( s"""Unsupported combination of actions for incremental conversion. Context: - |version -> ${targetSnapshot.version}, - |commitInfo -> ${commitInfo.map(_.operation).getOrElse("")}, + |latestVersion -> ${targetSnapshot.version}, + |offendingVersion -> $deltaVersion, + |operation -> ${commitInfo.map(_.operation).getOrElse("")}, |hasAdd -> ${addFiles.nonEmpty.toString}, |hasRemove -> ${removeFiles.nonEmpty.toString}, |dataChange -> ${dataChange.toString}, @@ -896,8 +901,11 @@ class IcebergConverter targetSnapshot, "delta.iceberg.conversion.convertActions", data = Map( - "version" -> targetSnapshot.version, - "commitInfo" -> commitInfo.map(_.operation).getOrElse(""), + // The latest snapshot version targeted by this conversion. + "latestVersion" -> targetSnapshot.version, + // The Delta version of the commit that was converted in this call. + "version" -> deltaVersion, + "operation" -> commitInfo.map(_.operation).getOrElse(""), "txnHelper" -> txnHelper.getClass.getSimpleName ) ) diff --git a/iceberg/src/test/scala/org/apache/spark/sql/delta/uniform/UniFormConverterSuite.scala b/iceberg/src/test/scala/org/apache/spark/sql/delta/uniform/UniFormConverterSuite.scala index 0bfb77885ae..bae5d725799 100644 --- a/iceberg/src/test/scala/org/apache/spark/sql/delta/uniform/UniFormConverterSuite.scala +++ b/iceberg/src/test/scala/org/apache/spark/sql/delta/uniform/UniFormConverterSuite.scala @@ -69,10 +69,12 @@ class IcebergConverterForTest extends IcebergConverter { class UniFormConverterSuite extends QueryTest with SharedSparkSession with DeltaSQLCommandTest with NonSparkReadIceberg { - def constructDummyAddFile(path: String = "s3://path1/1.parquet"): AddFile = { + def constructDummyAddFile( + path: String = "s3://path1/1.parquet", + dataChange: Boolean = true): AddFile = { AddFile( path = path, - dataChange = true, + dataChange = dataChange, partitionValues = Map.empty[String, String], size = 100, modificationTime = System.currentTimeMillis(), @@ -85,7 +87,8 @@ class UniFormConverterSuite extends readSnapshot: Snapshot, newActions: Seq[Action], catalogTable: CatalogTable, - newMetadata: Metadata): CurrentTransactionInfo = { + newMetadata: Metadata, + operation: String = "WRITE"): CurrentTransactionInfo = { new CurrentTransactionInfo( txnId = s"test-txn-001", readPredicates = Vector.empty, @@ -96,7 +99,7 @@ class UniFormConverterSuite extends protocol = readSnapshot.protocol, actions = newActions, readSnapshot = readSnapshot, - commitInfo = Some(CommitInfo.empty(Some(version))), + commitInfo = Some(CommitInfo.empty(Some(version)).copy(operation = operation)), readRowIdHighWatermark = 0L, catalogTable = Some(catalogTable), domainMetadata = Seq.empty, @@ -543,4 +546,135 @@ class UniFormConverterSuite extends assertDeltaCommitRangeEvent(events, staleSnapshot.version + 1, attemptDeltaVersion) } } + + /** Reads the data map from the first usage record matching the given opType. */ + private def firstEventData( + events: Seq[com.databricks.spark.util.UsageRecord], + opType: String): Map[String, Any] = { + val matched = filterUsageRecords(events, opType) + assert(matched.nonEmpty, s"Expected at least one $opType event") + JsonUtils.fromJson[Map[String, Any]](matched.head.blob) + } + + test("convertActions success event reports latestVersion and per-commit version separately") { + val tableName = "test_convert_actions_event" + withTable(tableName) { + spark.sql( + s"""CREATE TABLE $tableName (id INT) USING DELTA + |TBLPROPERTIES ( + | 'delta.columnMapping.mode' = 'name', + | 'delta.enableIcebergCompatV2' = 'true', + | 'delta.universalFormat.enabledFormats' = 'iceberg' + |)""".stripMargin) + spark.sql(s"INSERT INTO $tableName VALUES (1)") + spark.sql(s"INSERT INTO $tableName VALUES (2)") + val tableId = TableIdentifier(tableName) + val deltaLog = DeltaLog.forTable(spark, tableId) + val snapshot = deltaLog.update() + val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId) + + // Full conversion first to seed the Iceberg metadata. + val converter = new IcebergConverterForTest() + val metadataPath = converter.convertSnapshotAndReturnMetadataPath(snapshot, catalogTable) + + // Incremental conversion of a single new AddFile (append-only -> success). + val catalogTableWithIcebergInfo = + catalogTableWithDeltaUniformIceberg(catalogTable, metadataPath, snapshot.version) + val attemptDeltaVersion = snapshot.version + 1 + val txnInfo = constructDummyTxnInfo( + version = attemptDeltaVersion, + readSnapshot = snapshot, + newActions = Seq(constructDummyAddFile()), + catalogTable = catalogTableWithIcebergInfo, + newMetadata = snapshot.metadata + ) + + val events = Log4jUsageLogger.track { + converter.convertUncommitedTxn( + txnInfo, attemptDeltaVersion, deltaLog, catalogTableWithIcebergInfo) + } + + val data = firstEventData(events, "delta.iceberg.conversion.convertActions") + // New key: the latest snapshot being targeted. + assert(data.contains("latestVersion"), + "convertActions event should contain latestVersion") + assert(data("latestVersion").toString === attemptDeltaVersion.toString) + // Existing key now carries the per-commit version that was converted, not the head. + assert(data("version").toString === attemptDeltaVersion.toString) + // Renamed from commitInfo -> operation. + assert(data("operation").toString === "WRITE", + "convertActions event should contain operation (renamed from commitInfo)") + assert(!data.contains("commitInfo"), + "convertActions event should no longer contain the old commitInfo key") + assert(data.contains("txnHelper")) + } + } + + test("unsupportedActions failure event reports offendingVersion and renamed keys") { + val tableName = "test_unsupported_actions_event" + withTable(tableName) { + spark.sql( + s"""CREATE TABLE $tableName (id INT) USING DELTA + |TBLPROPERTIES ( + | 'delta.columnMapping.mode' = 'name', + | 'delta.enableIcebergCompatV2' = 'true', + | 'delta.universalFormat.enabledFormats' = 'iceberg' + |)""".stripMargin) + spark.sql(s"INSERT INTO $tableName VALUES (1)") + spark.sql(s"INSERT INTO $tableName VALUES (2)") + val tableId = TableIdentifier(tableName) + val deltaLog = DeltaLog.forTable(spark, tableId) + val snapshot = deltaLog.update() + val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId) + + // Full conversion first to seed the Iceberg metadata. + val converter = new IcebergConverterForTest() + val metadataPath = converter.convertSnapshotAndReturnMetadataPath(snapshot, catalogTable) + + // Incremental conversion of an add-only commit with a *mix* of dataChange values + // (one true, one false) -> dataChange=Some -> unsupported combination of actions. + val catalogTableWithIcebergInfo = + catalogTableWithDeltaUniformIceberg(catalogTable, metadataPath, snapshot.version) + val attemptDeltaVersion = snapshot.version + 1 + val txnInfo = constructDummyTxnInfo( + version = attemptDeltaVersion, + readSnapshot = snapshot, + newActions = Seq( + constructDummyAddFile(path = "s3://path1/dc-true.parquet", dataChange = true), + constructDummyAddFile(path = "s3://path1/dc-false.parquet", dataChange = false) + ), + catalogTable = catalogTableWithIcebergInfo, + newMetadata = snapshot.metadata + ) + + val events = Log4jUsageLogger.track { + val ex = intercept[UnsupportedOperationException] { + converter.convertUncommitedTxn( + txnInfo, attemptDeltaVersion, deltaLog, catalogTableWithIcebergInfo) + } + assert(ex.getMessage.contains( + "Unsupported combination of actions for incremental conversion")) + } + + val data = firstEventData(events, "delta.iceberg.conversion.unsupportedActions") + // New key: the actual failing commit version (was not reported before this change). + assert(data.contains("offendingVersion"), + "unsupportedActions event should contain offendingVersion") + assert(data("offendingVersion").toString === attemptDeltaVersion.toString) + // Renamed from version -> latestVersion. + assert(data.contains("latestVersion"), + "unsupportedActions event should contain latestVersion (renamed from version)") + assert(!data.contains("version"), + "unsupportedActions event should no longer contain the old version key") + // Renamed from commitInfo -> operation. + assert(data("operation").toString === "WRITE", + "unsupportedActions event should contain operation (renamed from commitInfo)") + assert(!data.contains("commitInfo"), + "unsupportedActions event should no longer contain the old commitInfo key") + // The classification that triggered the failure. + assert(data("hasAdd").toString === "true") + assert(data("hasRemove").toString === "false") + assert(data("dataChange").toString === "Some") + } + } }