From 38bab7170fb412fb0b1c8711b67037df1da88e4b Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 13 May 2026 12:01:30 +0100 Subject: [PATCH 1/3] simplify the logic in AppendBatchResizeForShuffleInputAndOutput --- .../backendsapi/velox/VeloxRuleApi.scala | 2 +- ...dBatchResizeForShuffleInputAndOutput.scala | 119 +++++++++--------- 2 files changed, 63 insertions(+), 58 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 9ce400de56e4..d63928527df5 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -108,7 +108,7 @@ object VeloxRuleApi { offloads)) // Legacy: Post-transform rules. - injector.injectPostTransform(_ => AppendBatchResizeForShuffleInputAndOutput()) + injector.injectPostTransform(c => AppendBatchResizeForShuffleInputAndOutput(c.caller.isAqe())) injector.injectPostTransform(_ => GpuBufferBatchResizeForShuffleInputOutput()) injector.injectPostTransform(_ => UnionTransformerRule()) injector.injectPostTransform(_ => PartialFallbackRules()) diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala index fcce64d65222..f8b1a25c6898 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala @@ -28,11 +28,13 @@ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec * Try to append [[VeloxResizeBatchesExec]] for shuffle input and output to make the batch sizes in * good shape. */ -case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] { +case class AppendBatchResizeForShuffleInputAndOutput(isAdaptiveContext: Boolean) + extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { if (VeloxConfig.get.enableColumnarCudf) { return plan } + val resizeBatchesShuffleInputEnabled = VeloxConfig.get.veloxResizeBatchesShuffleInput val resizeBatchesShuffleOutputEnabled = VeloxConfig.get.veloxResizeBatchesShuffleOutput if (!resizeBatchesShuffleInputEnabled && !resizeBatchesShuffleOutputEnabled) { @@ -41,65 +43,68 @@ case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] { val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange val preferredBatchBytes = VeloxConfig.get.veloxPreferredBatchBytes - plan.transformUp { + + val newPlan = if (resizeBatchesShuffleInputEnabled) { + addResizeBatchesForShuffleInput(plan, range.min, range.max, preferredBatchBytes) + } else { + plan + } + + val resultPlan = if (resizeBatchesShuffleOutputEnabled) { + addResizeBatchesForShuffleOutput(newPlan, range.min, range.max, preferredBatchBytes) + } else { + newPlan + } + + resultPlan + } + + private def addResizeBatchesForShuffleInput( + plan: SparkPlan, + min: Int, + max: Int, + preferredBatchBytes: Long): SparkPlan = { + plan match { case shuffle: ColumnarShuffleExchangeExec - if resizeBatchesShuffleInputEnabled && - shuffle.shuffleWriterType.requiresResizingShuffleInput => + if shuffle.shuffleWriterType.requiresResizingShuffleInput => val appendBatches = - VeloxResizeBatchesExec(shuffle.child, range.min, range.max, preferredBatchBytes) + VeloxResizeBatchesExec(shuffle.child, min, max, preferredBatchBytes) shuffle.withNewChildren(Seq(appendBatches)) - case a @ AQEShuffleReadExec( - ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _), - _) - if resizeBatchesShuffleOutputEnabled && - shuffle.shuffleWriterType.requiresResizingShuffleOutput => - VeloxResizeBatchesExec(a, range.min, range.max, preferredBatchBytes) - case a @ AQEShuffleReadExec( - ShuffleQueryStageExec( - _, - ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec), - _), - _) - if resizeBatchesShuffleOutputEnabled && - shuffle.shuffleWriterType.requiresResizingShuffleOutput => - VeloxResizeBatchesExec(a, range.min, range.max, preferredBatchBytes) - // Since it's transformed in a bottom to up order, so we may first encounter - // ShuffeQueryStageExec, which is transformed to VeloxResizeBatchesExec(ShuffeQueryStageExec), - // then we see AQEShuffleReadExec - case a @ AQEShuffleReadExec( - VeloxResizeBatchesExec( - s @ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _), - _, - _, - _), - _) - if resizeBatchesShuffleOutputEnabled && - shuffle.shuffleWriterType.requiresResizingShuffleOutput => - VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max, preferredBatchBytes) - case a @ AQEShuffleReadExec( - VeloxResizeBatchesExec( - s @ ShuffleQueryStageExec( - _, - ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec), - _), - _, - _, - _), - _) - if resizeBatchesShuffleOutputEnabled && - shuffle.shuffleWriterType.requiresResizingShuffleOutput => - VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max, preferredBatchBytes) - case s @ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _) - if resizeBatchesShuffleOutputEnabled && - shuffle.shuffleWriterType.requiresResizingShuffleOutput => - VeloxResizeBatchesExec(s, range.min, range.max, preferredBatchBytes) - case s @ ShuffleQueryStageExec( - _, - ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec), - _) - if resizeBatchesShuffleOutputEnabled && - shuffle.shuffleWriterType.requiresResizingShuffleOutput => - VeloxResizeBatchesExec(s, range.min, range.max, preferredBatchBytes) + case other => + other.withNewChildren(other.children.map( + p => addResizeBatchesForShuffleInput(p, min, max, preferredBatchBytes))) + } + } + + private def addResizeBatchesForShuffleOutput( + plan: SparkPlan, + min: Int, + max: Int, + preferredBatchBytes: Long): SparkPlan = { + plan match { + case shuffle: ColumnarShuffleExchangeExec + if !isAdaptiveContext && shuffle.shuffleWriterType.requiresResizingShuffleOutput => + VeloxResizeBatchesExec(shuffle, min, max, preferredBatchBytes) + case reused @ ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec) + if !isAdaptiveContext && shuffle.shuffleWriterType.requiresResizingShuffleOutput => + VeloxResizeBatchesExec(reused, min, max, preferredBatchBytes) + case s: ShuffleQueryStageExec if requiresResizingShuffleOutput(s) => + VeloxResizeBatchesExec(s, min, max, preferredBatchBytes) + case a @ AQEShuffleReadExec(s @ ShuffleQueryStageExec(_, _, _), _) + if requiresResizingShuffleOutput(s) => + VeloxResizeBatchesExec(a, min, max, preferredBatchBytes) + case other => + other.withNewChildren(other.children.map( + p => addResizeBatchesForShuffleOutput(p, min, max, preferredBatchBytes))) + } + } + + private def requiresResizingShuffleOutput(s: ShuffleQueryStageExec): Boolean = { + s.shuffle match { + case c: ColumnarShuffleExchangeExec + if c.shuffleWriterType.requiresResizingShuffleOutput => + true + case _ => false } } } From da5d84526ce2e4b48a8afe6cd7c9d76cb3f11ed4 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 13 May 2026 15:59:34 +0100 Subject: [PATCH 2/3] fix test --- ...pendBatchResizeForShuffleInputAndOutput.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala index f8b1a25c6898..f511257aaf9a 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala @@ -64,15 +64,13 @@ case class AppendBatchResizeForShuffleInputAndOutput(isAdaptiveContext: Boolean) min: Int, max: Int, preferredBatchBytes: Long): SparkPlan = { - plan match { - case shuffle: ColumnarShuffleExchangeExec - if shuffle.shuffleWriterType.requiresResizingShuffleInput => - val appendBatches = - VeloxResizeBatchesExec(shuffle.child, min, max, preferredBatchBytes) - shuffle.withNewChildren(Seq(appendBatches)) - case other => - other.withNewChildren(other.children.map( - p => addResizeBatchesForShuffleInput(p, min, max, preferredBatchBytes))) + plan.transformUp { + case shuffle: ColumnarShuffleExchangeExec + if shuffle.shuffleWriterType.requiresResizingShuffleInput => + val appendBatches = + VeloxResizeBatchesExec(shuffle.child, min, max, preferredBatchBytes) + shuffle.withNewChildren(Seq(appendBatches)) + case other => other } } From cc9794ab7d810ef30f96f772b0f77ebe9da67f68 Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 13 May 2026 16:33:35 +0100 Subject: [PATCH 3/3] fix --- ...ndBatchResizeForShuffleInputAndOutput.scala | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala index f511257aaf9a..a7309c341a9b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala @@ -22,7 +22,6 @@ import org.apache.gluten.execution.VeloxResizeBatchesExec import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQueryStageExec} -import org.apache.spark.sql.execution.exchange.ReusedExchangeExec /** * Try to append [[VeloxResizeBatchesExec]] for shuffle input and output to make the batch sizes in @@ -65,12 +64,11 @@ case class AppendBatchResizeForShuffleInputAndOutput(isAdaptiveContext: Boolean) max: Int, preferredBatchBytes: Long): SparkPlan = { plan.transformUp { - case shuffle: ColumnarShuffleExchangeExec - if shuffle.shuffleWriterType.requiresResizingShuffleInput => - val appendBatches = - VeloxResizeBatchesExec(shuffle.child, min, max, preferredBatchBytes) - shuffle.withNewChildren(Seq(appendBatches)) - case other => other + case shuffle: ColumnarShuffleExchangeExec + if shuffle.shuffleWriterType.requiresResizingShuffleInput => + val appendBatches = + VeloxResizeBatchesExec(shuffle.child, min, max, preferredBatchBytes) + shuffle.withNewChildren(Seq(appendBatches)) } } @@ -80,12 +78,6 @@ case class AppendBatchResizeForShuffleInputAndOutput(isAdaptiveContext: Boolean) max: Int, preferredBatchBytes: Long): SparkPlan = { plan match { - case shuffle: ColumnarShuffleExchangeExec - if !isAdaptiveContext && shuffle.shuffleWriterType.requiresResizingShuffleOutput => - VeloxResizeBatchesExec(shuffle, min, max, preferredBatchBytes) - case reused @ ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec) - if !isAdaptiveContext && shuffle.shuffleWriterType.requiresResizingShuffleOutput => - VeloxResizeBatchesExec(reused, min, max, preferredBatchBytes) case s: ShuffleQueryStageExec if requiresResizingShuffleOutput(s) => VeloxResizeBatchesExec(s, min, max, preferredBatchBytes) case a @ AQEShuffleReadExec(s @ ShuffleQueryStageExec(_, _, _), _)