diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertProvider.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertProvider.scala
index 91a26508d..8ac7e9fb8 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertProvider.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertProvider.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.auron
import org.apache.spark.sql.execution.SparkPlan
trait AuronConvertProvider {
- def isEnabled: Boolean
+ def isEnabled(exec: SparkPlan): Boolean
def isSupported(exec: SparkPlan): Boolean
diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
index cc12a176a..1d721657b 100644
--- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
+++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala
@@ -161,7 +161,7 @@ object AuronConverters extends Logging {
case e: BroadcastExchangeExec if enableBroadcastExchange =>
tryConvert(e, convertBroadcastExchangeExec)
case e: FileSourceScanExec if enableScan => // scan
- extConvertProviders.find(p => p.isEnabled && p.isSupported(e)) match {
+ extConvertProviders.find(p => p.isEnabled(e) && p.isSupported(e)) match {
case Some(provider) => tryConvert(e, provider.convert)
case None => tryConvert(e, convertFileSourceScanExec)
}
@@ -239,23 +239,34 @@ object AuronConverters extends Logging {
case exec: ForceNativeExecutionWrapperBase => exec
case exec =>
- extConvertProviders.find(h => h.isEnabled && h.isSupported(exec)) match {
- case Some(provider) => tryConvert(exec, provider.convert)
- case None =>
- Shims.get.convertMoreSparkPlan(exec) match {
- case Some(exec) =>
- exec.setTagValue(convertibleTag, true)
- exec.setTagValue(convertStrategyTag, AlwaysConvert)
- exec
- case None =>
- if (Shims.get.isNative(exec)) { // for QueryStageInput and CustomShuffleReader
+ try {
+ extConvertProviders.find(h => h.isEnabled(exec) && h.isSupported(exec)) match {
+ case Some(provider) => tryConvert(exec, provider.convert)
+ case None =>
+ Shims.get.convertMoreSparkPlan(exec) match {
+ case Some(exec) =>
exec.setTagValue(convertibleTag, true)
exec.setTagValue(convertStrategyTag, AlwaysConvert)
exec
- } else {
- addNeverConvertReasonTag(exec)
- }
- }
+ case None =>
+ if (Shims.get.isNative(exec)) { // for QueryStageInput and CustomShuffleReader
+ exec.setTagValue(convertibleTag, true)
+ exec.setTagValue(convertStrategyTag, AlwaysConvert)
+ exec
+ } else {
+ addNeverConvertReasonTag(exec)
+ }
+ }
+ }
+ } catch {
+ case e @ (_: NotImplementedError | _: AssertionError | _: Exception) =>
+ exec.setTagValue(convertToNonNativeTag, true)
+ exec.setTagValue(convertibleTag, false)
+ exec.setTagValue(convertStrategyTag, NeverConvert)
+ exec.setTagValue(
+ neverConvertReasonTag,
+ s"${e.getMessage.replaceFirst("^assertion failed: ?", "")}")
+ exec
}
}
}
diff --git a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala
index da96d5644..19f0caf1f 100644
--- a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala
+++ b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiConvertProvider.scala
@@ -27,17 +27,22 @@ import org.apache.auron.spark.configuration.SparkAuronConfiguration
class HudiConvertProvider extends AuronConvertProvider with Logging {
- override def isEnabled: Boolean = {
- val sparkVersion = org.apache.spark.SPARK_VERSION
- val versionParts = sparkVersion.split("[\\.-]", 3)
- val maybeMajor = versionParts.headOption.flatMap(part => Try(part.toInt).toOption)
- val maybeMinor =
- if (versionParts.length >= 2) Try(versionParts(1).toInt).toOption else None
- val supported = (for {
- major <- maybeMajor
- minor <- maybeMinor
- } yield major == 3 && minor >= 0 && minor <= 5).getOrElse(false)
- SparkAuronConfiguration.ENABLE_HUDI_SCAN.get() && supported
+ override def isEnabled(exec: SparkPlan): Boolean = {
+ exec match {
+ case _: FileSourceScanExec =>
+ // Only handle Hudi-backed file scans; other scans fall through.
+ val sparkVersion = org.apache.spark.SPARK_VERSION
+ val versionParts = sparkVersion.split("[\\.-]", 3)
+ val maybeMajor = versionParts.headOption.flatMap(part => Try(part.toInt).toOption)
+ val maybeMinor =
+ if (versionParts.length >= 2) Try(versionParts(1).toInt).toOption else None
+ val supported = (for {
+ major <- maybeMajor
+ minor <- maybeMinor
+ } yield major == 3 && minor >= 0 && minor <= 5).getOrElse(false)
+ SparkAuronConfiguration.ENABLE_HUDI_SCAN.get() && supported
+ case _ => false
+ }
}
override def isSupported(exec: SparkPlan): Boolean = {
diff --git a/thirdparty/auron-iceberg/pom.xml b/thirdparty/auron-iceberg/pom.xml
index bb686f481..ec5b3db23 100644
--- a/thirdparty/auron-iceberg/pom.xml
+++ b/thirdparty/auron-iceberg/pom.xml
@@ -81,6 +81,11 @@
${project.version}
test
+
+ org.scala-lang
+ scala-library
+ provided
+
diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala
index aae3f576e..a2ef9e3aa 100644
--- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala
+++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala
@@ -28,18 +28,18 @@ import org.apache.auron.util.SemanticVersion
class IcebergConvertProvider extends AuronConvertProvider with Logging {
- override def isEnabled: Boolean = {
- val enabled = SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get()
- if (!enabled) {
- return false
- }
- if (!sparkCompatible) {
- logWarning(
- s"Disable Iceberg native scan: Spark $SPARK_VERSION is not supported. " +
+ override def isEnabled(exec: SparkPlan): Boolean = {
+ exec match {
+ case _: BatchScanExec =>
+ val enabled = SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get()
+ assert(enabled, "Conversion disabled: auron.enable.iceberg.scan=false.")
+ assert(
+ sparkCompatible,
s"Supported Spark versions: 3.4 to 4.0 (Iceberg ${icebergVersionOrUnknown}).")
- return false
+ enabled
+ case _ => false
}
- true
+
}
override def isSupported(exec: SparkPlan): Boolean = {
diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala
index f2820d154..0bede5c63 100644
--- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala
+++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala
@@ -48,31 +48,30 @@ object IcebergScanSupport extends Logging {
val scan = exec.scan
val scanClassName = scan.getClass.getName
// Only handle Iceberg scans; other sources must stay on Spark's path.
- if (!AuronIcebergSourceUtil.getClassOfSparkBatchQueryScan.isInstance(scan)) {
- return None
- }
+ assert(
+ AuronIcebergSourceUtil.getClassOfSparkBatchQueryScan.isInstance(scan),
+ "Not iceberg scans.")
val readSchema = scan.readSchema
val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema)
// Native scan can project file-level metadata columns such as _file via partition values.
// Metadata columns that require per-row materialization (for example _pos) still fallback.
- if (unsupportedMetadataColumns.nonEmpty) {
- return None
- }
+ assert(
+ !(unsupportedMetadataColumns.nonEmpty),
+ "Has per-row materialization (for example _pos).")
val fileSchema = StructType(readSchema.fields.filterNot(isSupportedMetadataColumn))
// Supported metadata columns are materialized via per-file constant values rather than
// read from the Iceberg data file payload.
val partitionSchema = StructType(readSchema.fields.filter(isSupportedMetadataColumn))
- if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) {
- return None
- }
+ assert(
+ fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)),
+ "Has unsupported Iceberg data-file schema type.")
- if (!partitionSchema.fields.forall(field =>
- NativeConverters.isTypeSupported(field.dataType))) {
- return None
- }
+ assert(
+ partitionSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)),
+ "Has unsupported schema type.")
val partitions = inputPartitions(exec)
// Empty scan (e.g. empty table) should still build a plan to return no rows.
@@ -90,27 +89,25 @@ object IcebergScanSupport extends Logging {
val icebergPartitions = partitions.flatMap(icebergPartition)
// All partitions must be Iceberg SparkInputPartition; otherwise fallback.
- if (icebergPartitions.size != partitions.size) {
- return None
- }
+ assert(
+ icebergPartitions.size == partitions.size,
+ "All partitions must be Iceberg SparkInputPartition.")
val fileTasks = icebergPartitions.flatMap(_.fileTasks)
// Native scan does not apply delete files; only allow pure data files (COW).
- if (!fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty)) {
- return None
- }
+ assert(
+ fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty),
+ "Not iceberg cow table.")
// Native scan handles a single file format; mixed formats must fallback.
val formats = fileTasks.map(_.file().format()).distinct
- if (formats.size > 1) {
- return None
- }
+ assert(!(formats.size > 1), "Not all data file format is a single file format.")
val format = formats.headOption.getOrElse(FileFormat.PARQUET)
- if (format != FileFormat.PARQUET && format != FileFormat.ORC) {
- return None
- }
+ assert(
+ !(format != FileFormat.PARQUET && format != FileFormat.ORC),
+ "Only support parquet or orc.")
val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema)
Some(
@@ -195,9 +192,9 @@ object IcebergScanSupport extends Logging {
private def icebergPartition(partition: InputPartition): Option[IcebergPartitionView] = {
val className = partition.getClass.getName
// Only accept Iceberg SparkInputPartition to access task groups.
- if (!AuronIcebergSourceUtil.getClassOfSparkInputPartition().isInstance(partition)) {
- return None
- }
+ assert(
+ AuronIcebergSourceUtil.getClassOfSparkInputPartition().isInstance(partition),
+ "Not iceberg scans.")
try {
// SparkInputPartition is package-private; use reflection to read its task group.
diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala
index 7250d0fef..d856846f2 100644
--- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala
+++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala
@@ -30,6 +30,8 @@ import org.apache.iceberg.spark.Spark3Util
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.auron.iceberg.IcebergScanSupport
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.execution.ExplainUtils.collectFirst
import org.apache.spark.sql.execution.auron.plan.NativeIcebergTableScanExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
@@ -38,6 +40,35 @@ class AuronIcebergIntegrationSuite
extends org.apache.spark.sql.QueryTest
with BaseAuronIcebergSuite {
+ test("iceberg native scan with auron.enable.iceberg.scan=false") {
+ withTable("local.db.t2") {
+ withSQLConf("spark.auron.enable" -> "true", "spark.auron.enable.iceberg.scan" -> "false") {
+ sql("create table local.db.t2 using iceberg as select 1 as id, 'a' as v")
+ val df = sql("select * from local.db.t2")
+ df.collect()
+ val neverConvertReasonTag: TreeNodeTag[String] = TreeNodeTag("auron.never.convert.reason")
+ assert(collectFirst(df.queryExecution.executedPlan) { case batchScanExec: BatchScanExec =>
+ batchScanExec.getTagValue(neverConvertReasonTag)
+ }.get.get.equals("Conversion disabled: auron.enable.iceberg.scan=false."))
+ }
+ }
+ }
+
+ test(
+ "iceberg scan falls back when reading unsupported metadata columns and check never convert reason") {
+ withTable("local.db.t4_pos") {
+ sql("create table local.db.t4_pos using iceberg as select 1 as id, 'a' as v")
+ withSQLConf("spark.auron.enable" -> "true", "spark.auron.enable.iceberg.scan" -> "true") {
+ val df = sql("select _pos from local.db.t4_pos")
+ df.collect()
+ val neverConvertReasonTag: TreeNodeTag[String] = TreeNodeTag("auron.never.convert.reason")
+ assert(collectFirst(df.queryExecution.executedPlan) { case batchScanExec: BatchScanExec =>
+ batchScanExec.getTagValue(neverConvertReasonTag)
+ }.get.get.equals("Has per-row materialization (for example _pos)."))
+ }
+ }
+ }
+
test("test iceberg integrate ") {
withTable("local.db.t1") {
sql(
diff --git a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala
index f9d75c439..21b2ed03a 100644
--- a/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala
+++ b/thirdparty/auron-paimon/src/main/scala/org/apache/spark/sql/hive/auron/paimon/PaimonConvertProvider.scala
@@ -28,7 +28,14 @@ import org.apache.auron.spark.configuration.SparkAuronConfiguration
class PaimonConvertProvider extends AuronConvertProvider with Logging {
- override def isEnabled: Boolean = SparkAuronConfiguration.ENABLE_PAIMON_SCAN.get()
+ override def isEnabled(exec: SparkPlan): Boolean = {
+ exec match {
+ case _: HiveTableScanExec =>
+ SparkAuronConfiguration.ENABLE_PAIMON_SCAN.get()
+ case _ => false
+ }
+
+ }
override def isSupported(exec: SparkPlan): Boolean = {
exec match {