From 8893f6b9a1ea2aefc1e7e120948ec514d3822303 Mon Sep 17 00:00:00 2001 From: weimingdiit Date: Sat, 16 May 2026 12:00:05 +0800 Subject: [PATCH] [AURON #2273] Fallback Hudi incremental queries from native scan conversion Signed-off-by: weimingdiit --- .../sql/auron/hudi/HudiScanSupport.scala | 12 ++++++++- .../sql/auron/hudi/HudiScanSupportSuite.scala | 27 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala index 0ebcb1429..55392c52e 100644 --- a/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala +++ b/thirdparty/auron-hudi/src/main/scala/org/apache/spark/sql/auron/hudi/HudiScanSupport.scala @@ -46,6 +46,8 @@ object HudiScanSupport extends Logging { "hoodie.table.type") private val hudiQueryTypeKeys = Seq("hoodie.datasource.query.type", "hoodie.datasource.view.type") + private val hudiIncrementalInstantKeys = + Seq("hoodie.datasource.read.begin.instanttime", "hoodie.datasource.read.end.instanttime") private val hudiBaseFileFormatKeys = Seq( "hoodie.table.base.file.format", "hoodie.datasource.write.base.file.format", @@ -115,6 +117,9 @@ object HudiScanSupport extends Logging { if (hasTimeTravel(options)) { return false } + if (hasIncrementalQuery(options)) { + return false + } val tableType = tableTypeFromOptions(options) .orElse(tableTypeFromCatalog(catalogTable)) @@ -128,7 +133,7 @@ object HudiScanSupport extends Logging { queryType match { case Some(query) if readOptimizedQueryTypes.contains(query) => true case Some("snapshot") => !isMor - case Some("incremental" | "realtime") => false + case Some("realtime") => false case Some(_) => false case None => // MOR snapshot reads may need log-file merging. Native scan is safe only @@ -257,6 +262,11 @@ object HudiScanSupport extends Logging { "hoodie.datasource.read.as.of.timestamp")).isDefined } + private def hasIncrementalQuery(options: Map[String, String]): Boolean = { + queryTypeFromOptions(options).exists(_.equalsIgnoreCase("incremental")) || + caseInsensitiveValue(options, hudiIncrementalInstantKeys).isDefined + } + private def normalizePath(rawPath: String): String = { try { val uri = new URI(rawPath) diff --git a/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala index a2721e1db..1cce6d477 100644 --- a/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala +++ b/thirdparty/auron-hudi/src/test/scala/org/apache/spark/sql/auron/hudi/HudiScanSupportSuite.scala @@ -284,6 +284,33 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession { "hoodie.datasource.query.type" -> "snapshot"))) } + test("hudi isSupported rejects incremental query options") { + val fileFormatName = + "org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat" + val cowOptions = Map("hoodie.datasource.write.table.type" -> "COPY_ON_WRITE") + + assert( + !HudiScanSupport.isSupported( + fileFormatName, + cowOptions + ("hoodie.datasource.query.type" -> "incremental"))) + assert( + !HudiScanSupport.isSupported( + fileFormatName, + cowOptions + ("hoodie.datasource.view.type" -> "incremental"))) + assert( + !HudiScanSupport.isSupported( + fileFormatName, + cowOptions + ("hoodie.datasource.read.begin.instanttime" -> "20240101010101"))) + assert( + !HudiScanSupport.isSupported( + fileFormatName, + cowOptions + ("hoodie.datasource.read.end.instanttime" -> "20240102010101"))) + assert( + !HudiScanSupport.isSupported( + fileFormatName, + cowOptions + ("Hoodie.DataSource.Read.Begin.InstantTime" -> "20240101010101"))) + } + test("hudi isSupported rejects non-Hudi formats") { assert( !HudiScanSupport.isSupported(