Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -872,8 +872,12 @@ class IcebergConverter
targetSnapshot,
"delta.iceberg.conversion.unsupportedActions",
data = Map(
"version" -> targetSnapshot.version,
"commitInfo" -> commitInfo.map(_.operation).getOrElse(""),
// The latest snapshot version targeted by this conversion.
"latestVersion" -> targetSnapshot.version,
// The Delta version of the specific commit that failed to convert. This is the
// commit currently being translated, not the latest snapshot being synced.
"offendingVersion" -> deltaVersion,
"operation" -> commitInfo.map(_.operation).getOrElse(""),
"hasAdd" -> addFiles.nonEmpty.toString,
"hasRemove" -> removeFiles.nonEmpty.toString,
"dataChange" -> dataChange.toString,
Expand All @@ -882,8 +886,9 @@ class IcebergConverter
)
logError(
s"""Unsupported combination of actions for incremental conversion. Context:
|version -> ${targetSnapshot.version},
|commitInfo -> ${commitInfo.map(_.operation).getOrElse("")},
|latestVersion -> ${targetSnapshot.version},
|offendingVersion -> $deltaVersion,
|operation -> ${commitInfo.map(_.operation).getOrElse("")},
|hasAdd -> ${addFiles.nonEmpty.toString},
|hasRemove -> ${removeFiles.nonEmpty.toString},
|dataChange -> ${dataChange.toString},
Expand All @@ -896,8 +901,11 @@ class IcebergConverter
targetSnapshot,
"delta.iceberg.conversion.convertActions",
data = Map(
"version" -> targetSnapshot.version,
"commitInfo" -> commitInfo.map(_.operation).getOrElse(""),
// The latest snapshot version targeted by this conversion.
"latestVersion" -> targetSnapshot.version,
// The Delta version of the commit that was converted in this call.
"version" -> deltaVersion,
"operation" -> commitInfo.map(_.operation).getOrElse(""),
"txnHelper" -> txnHelper.getClass.getSimpleName
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ class IcebergConverterForTest extends IcebergConverter {

class UniFormConverterSuite extends
QueryTest with SharedSparkSession with DeltaSQLCommandTest with NonSparkReadIceberg {
def constructDummyAddFile(path: String = "s3://path1/1.parquet"): AddFile = {
def constructDummyAddFile(
path: String = "s3://path1/1.parquet",
dataChange: Boolean = true): AddFile = {
AddFile(
path = path,
dataChange = true,
dataChange = dataChange,
partitionValues = Map.empty[String, String],
size = 100,
modificationTime = System.currentTimeMillis(),
Expand All @@ -85,7 +87,8 @@ class UniFormConverterSuite extends
readSnapshot: Snapshot,
newActions: Seq[Action],
catalogTable: CatalogTable,
newMetadata: Metadata): CurrentTransactionInfo = {
newMetadata: Metadata,
operation: String = "WRITE"): CurrentTransactionInfo = {
new CurrentTransactionInfo(
txnId = s"test-txn-001",
readPredicates = Vector.empty,
Expand All @@ -96,7 +99,7 @@ class UniFormConverterSuite extends
protocol = readSnapshot.protocol,
actions = newActions,
readSnapshot = readSnapshot,
commitInfo = Some(CommitInfo.empty(Some(version))),
commitInfo = Some(CommitInfo.empty(Some(version)).copy(operation = operation)),
readRowIdHighWatermark = 0L,
catalogTable = Some(catalogTable),
domainMetadata = Seq.empty,
Expand Down Expand Up @@ -543,4 +546,135 @@ class UniFormConverterSuite extends
assertDeltaCommitRangeEvent(events, staleSnapshot.version + 1, attemptDeltaVersion)
}
}

/** Reads the data map from the first usage record matching the given opType. */
private def firstEventData(
events: Seq[com.databricks.spark.util.UsageRecord],
opType: String): Map[String, Any] = {
val matched = filterUsageRecords(events, opType)
assert(matched.nonEmpty, s"Expected at least one $opType event")
JsonUtils.fromJson[Map[String, Any]](matched.head.blob)
}

test("convertActions success event reports latestVersion and per-commit version separately") {
val tableName = "test_convert_actions_event"
withTable(tableName) {
spark.sql(
s"""CREATE TABLE $tableName (id INT) USING DELTA
|TBLPROPERTIES (
| 'delta.columnMapping.mode' = 'name',
| 'delta.enableIcebergCompatV2' = 'true',
| 'delta.universalFormat.enabledFormats' = 'iceberg'
|)""".stripMargin)
spark.sql(s"INSERT INTO $tableName VALUES (1)")
spark.sql(s"INSERT INTO $tableName VALUES (2)")
val tableId = TableIdentifier(tableName)
val deltaLog = DeltaLog.forTable(spark, tableId)
val snapshot = deltaLog.update()
val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)

// Full conversion first to seed the Iceberg metadata.
val converter = new IcebergConverterForTest()
val metadataPath = converter.convertSnapshotAndReturnMetadataPath(snapshot, catalogTable)

// Incremental conversion of a single new AddFile (append-only -> success).
val catalogTableWithIcebergInfo =
catalogTableWithDeltaUniformIceberg(catalogTable, metadataPath, snapshot.version)
val attemptDeltaVersion = snapshot.version + 1
val txnInfo = constructDummyTxnInfo(
version = attemptDeltaVersion,
readSnapshot = snapshot,
newActions = Seq(constructDummyAddFile()),
catalogTable = catalogTableWithIcebergInfo,
newMetadata = snapshot.metadata
)

val events = Log4jUsageLogger.track {
converter.convertUncommitedTxn(
txnInfo, attemptDeltaVersion, deltaLog, catalogTableWithIcebergInfo)
}

val data = firstEventData(events, "delta.iceberg.conversion.convertActions")
// New key: the latest snapshot being targeted.
assert(data.contains("latestVersion"),
"convertActions event should contain latestVersion")
assert(data("latestVersion").toString === attemptDeltaVersion.toString)
// Existing key now carries the per-commit version that was converted, not the head.
assert(data("version").toString === attemptDeltaVersion.toString)
// Renamed from commitInfo -> operation.
assert(data("operation").toString === "WRITE",
"convertActions event should contain operation (renamed from commitInfo)")
assert(!data.contains("commitInfo"),
"convertActions event should no longer contain the old commitInfo key")
assert(data.contains("txnHelper"))
}
}

test("unsupportedActions failure event reports offendingVersion and renamed keys") {
val tableName = "test_unsupported_actions_event"
withTable(tableName) {
spark.sql(
s"""CREATE TABLE $tableName (id INT) USING DELTA
|TBLPROPERTIES (
| 'delta.columnMapping.mode' = 'name',
| 'delta.enableIcebergCompatV2' = 'true',
| 'delta.universalFormat.enabledFormats' = 'iceberg'
|)""".stripMargin)
spark.sql(s"INSERT INTO $tableName VALUES (1)")
spark.sql(s"INSERT INTO $tableName VALUES (2)")
val tableId = TableIdentifier(tableName)
val deltaLog = DeltaLog.forTable(spark, tableId)
val snapshot = deltaLog.update()
val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)

// Full conversion first to seed the Iceberg metadata.
val converter = new IcebergConverterForTest()
val metadataPath = converter.convertSnapshotAndReturnMetadataPath(snapshot, catalogTable)

// Incremental conversion of an add-only commit with a *mix* of dataChange values
// (one true, one false) -> dataChange=Some -> unsupported combination of actions.
val catalogTableWithIcebergInfo =
catalogTableWithDeltaUniformIceberg(catalogTable, metadataPath, snapshot.version)
val attemptDeltaVersion = snapshot.version + 1
val txnInfo = constructDummyTxnInfo(
version = attemptDeltaVersion,
readSnapshot = snapshot,
newActions = Seq(
constructDummyAddFile(path = "s3://path1/dc-true.parquet", dataChange = true),
constructDummyAddFile(path = "s3://path1/dc-false.parquet", dataChange = false)
),
catalogTable = catalogTableWithIcebergInfo,
newMetadata = snapshot.metadata
)

val events = Log4jUsageLogger.track {
val ex = intercept[UnsupportedOperationException] {
converter.convertUncommitedTxn(
txnInfo, attemptDeltaVersion, deltaLog, catalogTableWithIcebergInfo)
}
assert(ex.getMessage.contains(
"Unsupported combination of actions for incremental conversion"))
}

val data = firstEventData(events, "delta.iceberg.conversion.unsupportedActions")
// New key: the actual failing commit version (was not reported before this change).
assert(data.contains("offendingVersion"),
"unsupportedActions event should contain offendingVersion")
assert(data("offendingVersion").toString === attemptDeltaVersion.toString)
// Renamed from version -> latestVersion.
assert(data.contains("latestVersion"),
"unsupportedActions event should contain latestVersion (renamed from version)")
assert(!data.contains("version"),
"unsupportedActions event should no longer contain the old version key")
// Renamed from commitInfo -> operation.
assert(data("operation").toString === "WRITE",
"unsupportedActions event should contain operation (renamed from commitInfo)")
assert(!data.contains("commitInfo"),
"unsupportedActions event should no longer contain the old commitInfo key")
// The classification that triggered the failure.
assert(data("hasAdd").toString === "true")
assert(data("hasRemove").toString === "false")
assert(data("dataChange").toString === "Some")
}
}
}
Loading