From e893155d62cc126de0105d5ef74497a6bddb3d8a Mon Sep 17 00:00:00 2001 From: Mariam-Almesfer Date: Sun, 10 May 2026 13:37:12 +0300 Subject: [PATCH] Enable hour(timestamp_ntz) native execution in Velox backend --- .../DateFunctionsValidateSuite.scala | 7 ++--- .../columnar/validator/Validators.scala | 28 +++++++++++++++++-- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala index e5e25c88b6f3..47b3e5b692e7 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala @@ -599,12 +599,9 @@ class DateFunctionsValidateSuite extends FunctionsValidateSuite { checkGlutenPlan[BatchScanExecTransformer] } - // Ensures the fallback of unsupported function works. + // hour(timestamp_ntz) runs natively; output is int (no NTZ propagation). runQueryAndCompare("select hour(ts) from view") { - df => - assert(collect(df.queryExecution.executedPlan) { - case p if p.isInstanceOf[ProjectExec] => p - }.nonEmpty) + checkGlutenPlan[ProjectExecTransformer] } } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 3218e045f543..8b6c8d219395 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -262,9 +262,33 @@ object Validators { case p if HiveTableScanExecTransformer.isHiveTableScan(p) => true case _ => false } - val hasNTZ = plan.output.exists(a => containsNTZ(a.dataType)) || + // True only if all NTZ-producing nodes in p's subtree are file scans. + // File scans run natively in Velox; their NTZ data never crosses + // RowToVeloxColumnar and therefore keeps correct semantics. + // In-memory sources (LocalTableScan) go through RowToVeloxColumnar + // which treats NTZ as UTC-adjusted, causing wrong results downstream. + def ntzOnlyFromFileScans(p: SparkPlan): Boolean = { + if (!p.output.exists(a => containsNTZ(a.dataType))) true + else + p match { + case _: BatchScanExec => true + case _: FileSourceScanExec => true + case q if HiveTableScanExecTransformer.isHiveTableScan(q) => true + case _ if p.children.isEmpty => false + case _ => p.children.forall(ntzOnlyFromFileScans) + } + } + // Allow plan nodes that consume NTZ from file scans but produce non-NTZ + // output (e.g. hour(timestamp_ntz) -> int). Nodes that propagate NTZ in + // their output still fall back until full NTZ support is complete. + // Exception: WriteFilesExec - its own output is metadata (no NTZ), but + // the native parquet writer incorrectly writes NTZ columns from its child + // as UTC-adjusted timestamps, so we must fall back when the child has NTZ. + val outputHasNTZ = plan.output.exists(a => containsNTZ(a.dataType)) + val writeChildHasNTZ = plan.isInstanceOf[WriteFilesExec] && plan.children.exists(_.output.exists(a => containsNTZ(a.dataType))) - if (isScan || !hasNTZ) { + val childNTZFromFileScans = plan.children.forall(ntzOnlyFromFileScans) + if (isScan || (!outputHasNTZ && !writeChildHasNTZ && childNTZFromFileScans)) { return pass() } }