Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,34 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging {
argument: ExpressionTransformer,
function: ExpressionTransformer,
expr: ArraySort): ExpressionTransformer = {
GenericExpressionTransformer(substraitExprName, Seq(argument, function), expr)
// Spark's default array_sort comparator includes null-handling logic that
// Velox's rewriteArraySortCall cannot parse. Detect it and strip the lambda
// so Velox uses its 1-arg array_sort (ascending, nulls-last) which has
// identical semantics.
if (isDefaultArraySortComparator(expr)) {
logInfo("Stripping default comparator from array_sort for Velox offloading")
GenericExpressionTransformer(substraitExprName, Seq(argument), expr)
} else {
GenericExpressionTransformer(substraitExprName, Seq(argument, function), expr)
}
}

/**
* Checks whether the ArraySort expression uses Spark's default comparator. The default comparator
* wraps the simple ascending comparison in null-handling if-else logic that Velox's
* SimpleComparisonMatcher cannot parse. Since Velox's 1-arg array_sort already provides ascending
* sort with nulls-last (matching Spark's default semantics), we can safely strip the comparator
* lambda.
*/
private def isDefaultArraySortComparator(expr: ArraySort): Boolean = {
expr.function match {
case LambdaFunction(body, args, _) if args.size == 2 =>
val left = args(0)
val right = args(1)
val defaultWithNulls = ArraySort.comparator(left, right)
body.semanticEquals(defaultWithNulls)
case _ => false
}
}

/** Transform array exists to Substrait */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,38 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}

test("array_sort - default comparator offloading") {
withTable("t_arr_sort_default") {
sql("create table t_arr_sort_default (a array<int>) using parquet")
sql(
"insert into t_arr_sort_default values " +
"(array(3, 1, 2, 5, 4)), (array(10, 20, 30)), (array(null, 2, 1)), (array())")

// Default ascending sort (no lambda) - Spark generates a null-handling
// comparator that Velox cannot parse without the stripping logic.
runQueryAndCompare("select a, array_sort(a) from t_arr_sort_default") {
checkGlutenPlan[ProjectExecTransformer]
}

// sort_array also uses default comparator under the hood
runQueryAndCompare("select a, sort_array(a) from t_arr_sort_default") {
checkGlutenPlan[ProjectExecTransformer]
}

// Descending sort_array
runQueryAndCompare("select a, sort_array(a, false) from t_arr_sort_default") {
checkGlutenPlan[ProjectExecTransformer]
}

// Custom comparator (descending) - must still be offloaded with 2-arg form
runQueryAndCompare(
"select a, array_sort(a, (l, r) -> " +
"IF(l > r, -1, IF(l < r, 1, 0))) from t_arr_sort_default") {
checkGlutenPlan[ProjectExecTransformer]
}
}
}

test("Support bool type filter in scan") {
withTable("t") {
sql("create table t (id int, b boolean) using parquet")
Expand Down
Loading