Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ object HudiScanSupport extends Logging {
"hoodie.table.type")
private val hudiQueryTypeKeys =
Seq("hoodie.datasource.query.type", "hoodie.datasource.view.type")
private val hudiIncrementalInstantKeys =
Seq("hoodie.datasource.read.begin.instanttime", "hoodie.datasource.read.end.instanttime")
private val hudiBaseFileFormatKeys = Seq(
"hoodie.table.base.file.format",
"hoodie.datasource.write.base.file.format",
Expand Down Expand Up @@ -115,6 +117,9 @@ object HudiScanSupport extends Logging {
if (hasTimeTravel(options)) {
return false
}
if (hasIncrementalQuery(options)) {
return false
}

val tableType = tableTypeFromOptions(options)
.orElse(tableTypeFromCatalog(catalogTable))
Expand All @@ -128,7 +133,7 @@ object HudiScanSupport extends Logging {
queryType match {
case Some(query) if readOptimizedQueryTypes.contains(query) => true
case Some("snapshot") => !isMor
case Some("incremental" | "realtime") => false
case Some("realtime") => false
case Some(_) => false
case None =>
// MOR snapshot reads may need log-file merging. Native scan is safe only
Expand Down Expand Up @@ -257,6 +262,11 @@ object HudiScanSupport extends Logging {
"hoodie.datasource.read.as.of.timestamp")).isDefined
}

private def hasIncrementalQuery(options: Map[String, String]): Boolean = {
queryTypeFromOptions(options).exists(_.equalsIgnoreCase("incremental")) ||
caseInsensitiveValue(options, hudiIncrementalInstantKeys).isDefined
}

private def normalizePath(rawPath: String): String = {
try {
val uri = new URI(rawPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,33 @@ class HudiScanSupportSuite extends SparkFunSuite with SharedSparkSession {
"hoodie.datasource.query.type" -> "snapshot")))
}

test("hudi isSupported rejects incremental query options") {
val fileFormatName =
"org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat"
val cowOptions = Map("hoodie.datasource.write.table.type" -> "COPY_ON_WRITE")

assert(
!HudiScanSupport.isSupported(
fileFormatName,
cowOptions + ("hoodie.datasource.query.type" -> "incremental")))
assert(
!HudiScanSupport.isSupported(
fileFormatName,
cowOptions + ("hoodie.datasource.view.type" -> "incremental")))
assert(
!HudiScanSupport.isSupported(
fileFormatName,
cowOptions + ("hoodie.datasource.read.begin.instanttime" -> "20240101010101")))
assert(
!HudiScanSupport.isSupported(
fileFormatName,
cowOptions + ("hoodie.datasource.read.end.instanttime" -> "20240102010101")))
assert(
!HudiScanSupport.isSupported(
fileFormatName,
cowOptions + ("Hoodie.DataSource.Read.Begin.InstantTime" -> "20240101010101")))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a case for the legacy hoodie.datasource.view.type=incremental key — queryTypeFromOptions checks both keys, so it already works, but an explicit assertion would document the behavior:
assert(!HudiScanSupport.isSupported(fileFormatName, cowOptions + ("hoodie.datasource.view.type" -> "incremental")))

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I added an explicit assertion for the legacy hoodie.datasource.view.type=incremental option to document that it also falls back from native scan conversion.

}

test("hudi isSupported rejects non-Hudi formats") {
assert(
!HudiScanSupport.isSupported(
Expand Down
Loading