From 3b69ec84382f3050851c4644ff9191234af6e441 Mon Sep 17 00:00:00 2001 From: WanKun Date: Wed, 13 May 2026 20:24:21 +0800 Subject: [PATCH 1/4] Reduce Velox hash shuffle partition buffer memory by evicting large partitions after split --- .../VeloxCelebornColumnarShuffleWriter.scala | 1 + .../VeloxUniffleColumnarShuffleWriter.java | 1 + .../spark/shuffle/ColumnarShuffleWriter.scala | 1 + cpp/core/jni/JniWrapper.cc | 4 ++- cpp/core/shuffle/Options.h | 14 +++++--- cpp/core/shuffle/Payload.cc | 2 +- cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 32 +++++++++++++++++++ cpp/velox/shuffle/VeloxHashShuffleWriter.h | 6 +++- .../vectorized/ShuffleWriterJniWrapper.java | 1 + .../apache/gluten/config/GlutenConfig.scala | 10 ++++++ 10 files changed, 65 insertions(+), 7 deletions(-) diff --git a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala index 783d68a00cd9..a80fce39cbcf 100644 --- a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala +++ b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala @@ -150,6 +150,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, context.partitionId), nativeBufferSize, GlutenConfig.get.columnarShuffleReallocThreshold, + GlutenConfig.get.columnarShuffleEvictPartitionSize, partitionWriterHandle ) case SortShuffleWriterType => diff --git a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index ef35818c7b59..6a45cbd53cb8 100644 --- a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -185,6 +185,7 @@ protected void writeImpl(Iterator> records) { columnarDep.nativePartitioning(), partitionId), nativeBufferSize, reallocThreshold, + GlutenConfig.get().columnarShuffleEvictPartitionSize(), partitionWriterHandle); } diff --git a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 01f4bd06bab6..7c38653cb235 100644 --- a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -192,6 +192,7 @@ class ColumnarShuffleWriter[K, V]( taskContext.partitionId), nativeBufferSize, reallocThreshold, + GlutenConfig.get.columnarShuffleEvictPartitionSize, partitionWriterHandle ) } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 9e194be6ea84..86cfce616e54 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -990,6 +990,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe jint startPartitionId, jint splitBufferSize, jdouble splitBufferReallocThreshold, + jint evictPartitionSize, jlong partitionWriterHandle) { JNI_METHOD_START const auto ctx = getRuntime(env, wrapper); @@ -1004,7 +1005,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe toPartitioning(jStringToCString(env, partitioningNameJstr)), startPartitionId, splitBufferSize, - splitBufferReallocThreshold); + splitBufferReallocThreshold, + evictPartitionSize); return ctx->saveObject(ctx->createShuffleWriter(numPartitions, partitionWriter, shuffleWriterOptions)); JNI_METHOD_END(kInvalidObjectHandle) diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 1d7f9ad9f9c7..82d9f0aadd11 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -27,6 +27,7 @@ namespace gluten { static constexpr int16_t kDefaultBatchSize = 4096; +static constexpr int32_t kDefaultEvictPartitionSize = 256 * 1024; static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096; static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20; static constexpr int64_t kDefaultPushMemoryThreshold = 4096; @@ -81,6 +82,7 @@ struct ShuffleWriterOptions { struct HashShuffleWriterOptions : ShuffleWriterOptions { int32_t splitBufferSize = kDefaultShuffleWriterBufferSize; double splitBufferReallocThreshold = kDefaultSplitBufferReallocThreshold; + int32_t evictPartitionSize = kDefaultEvictPartitionSize; HashShuffleWriterOptions() : ShuffleWriterOptions(ShuffleWriterType::kHashShuffle) {} @@ -88,10 +90,12 @@ struct HashShuffleWriterOptions : ShuffleWriterOptions { Partitioning partitioning, int32_t startPartitionId, int32_t partitionBufferSize, - double partitionBufferReallocThreshold) + double partitionBufferReallocThreshold, + int32_t partitionEvictPartitionSize = kDefaultEvictPartitionSize) : ShuffleWriterOptions(ShuffleWriterType::kHashShuffle, partitioning, startPartitionId), splitBufferSize(partitionBufferSize), - splitBufferReallocThreshold(partitionBufferReallocThreshold) {} + splitBufferReallocThreshold(partitionBufferReallocThreshold), + evictPartitionSize(partitionEvictPartitionSize) {} protected: HashShuffleWriterOptions(ShuffleWriterType shuffleWriterType) : ShuffleWriterOptions(shuffleWriterType) {} @@ -101,10 +105,12 @@ struct HashShuffleWriterOptions : ShuffleWriterOptions { Partitioning partitioning, int32_t startPartitionId, int32_t partitionBufferSize, - double partitionBufferReallocThreshold) + double partitionBufferReallocThreshold, + int32_t partitionEvictPartitionSize = kDefaultEvictPartitionSize) : ShuffleWriterOptions(shuffleWriterType, partitioning, startPartitionId), splitBufferSize(partitionBufferSize), - splitBufferReallocThreshold(partitionBufferReallocThreshold) {} + splitBufferReallocThreshold(partitionBufferReallocThreshold), + evictPartitionSize(partitionEvictPartitionSize) {} }; struct SortShuffleWriterOptions : ShuffleWriterOptions { diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc index 230806cd2f26..5d02a844833c 100644 --- a/cpp/core/shuffle/Payload.cc +++ b/cpp/core/shuffle/Payload.cc @@ -60,7 +60,7 @@ arrow::Result readPayloadType(arrow::io::InputStream* is) { } arrow::Result compressBuffer( - const std::shared_ptr& buffer, + const std::shared_ptr buffer, uint8_t* output, int64_t outputLength, arrow::util::Codec* codec) { diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index f25d4ac88764..6b3b2c709997 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -440,9 +440,41 @@ arrow::Status VeloxHashShuffleWriter::doSplit(const facebook::velox::RowVector& printPartitionBuffer(); setSplitState(SplitState::kInit); + if (evictPartitionSize_ > 0) { + // After split, evict large partition buffers to free up memory for the next input RowVector. + const auto partitionBytes = estimatePartitionBufferBytes(); + for (uint32_t pid = 0; pid < partitionBytes.size(); ++pid) { + if (partitionBufferBase_[pid] > 0 && partitionBytes[pid] >= evictPartitionSize_) { + RETURN_NOT_OK(evictPartitionBuffers(pid, false)); + } + } + } return arrow::Status::OK(); } +std::vector VeloxHashShuffleWriter::estimatePartitionBufferBytes() const { + std::vector partitionBytes(numPartitions_, 0); + + for (const auto& columnBuffers : partitionBuffers_) { + for (uint32_t pid = 0; pid < columnBuffers.size(); ++pid) { + for (const auto& buffer : columnBuffers[pid]) { + if (buffer) { + partitionBytes[pid] += buffer->capacity(); + } + } + } + } + + for (uint32_t pid = 0; pid < complexTypeFlushBuffer_.size(); ++pid) { + const auto& buffer = complexTypeFlushBuffer_[pid]; + if (buffer) { + partitionBytes[pid] += buffer->capacity(); + } + } + + return partitionBytes; +} + arrow::Status VeloxHashShuffleWriter::splitRowVector(const facebook::velox::RowVector& rv) { SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitRV]); diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.h b/cpp/velox/shuffle/VeloxHashShuffleWriter.h index 9203c70594cd..114df80b45c9 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.h @@ -214,7 +214,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter { MemoryManager* memoryManager) : VeloxShuffleWriter(numPartitions, partitionWriter, options, memoryManager), splitBufferSize_(options->splitBufferSize), - splitBufferReallocThreshold_(options->splitBufferReallocThreshold) { + splitBufferReallocThreshold_(options->splitBufferReallocThreshold), + evictPartitionSize_(options->evictPartitionSize) { arenas_.resize(numPartitions); } @@ -223,6 +224,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter { arrow::Status initColumnTypes(const facebook::velox::RowVector& rv); + std::vector estimatePartitionBufferBytes() const; + arrow::Status splitRowVector(const facebook::velox::RowVector& rv); arrow::Status initFromRowVector(const facebook::velox::RowVector& rv); @@ -326,6 +329,7 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter { protected: int32_t splitBufferSize_; double splitBufferReallocThreshold_; + int32_t evictPartitionSize_; std::shared_ptr schema_; diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java index a389da68603e..b45d23f91d3b 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java @@ -43,6 +43,7 @@ public native long createHashShuffleWriter( int startPartitionId, int splitBufferSize, double splitBufferReallocThreshold, + int evictPartitionSize, long partitionWriterHandle); public native long createSortShuffleWriter( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 0e2877ef4e7d..dd2bb9547e6c 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -225,6 +225,8 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { def columnarShuffleReallocThreshold: Double = getConf(COLUMNAR_SHUFFLE_REALLOC_THRESHOLD) + def columnarShuffleEvictPartitionSize: Int = getConf(COLUMNAR_SHUFFLE_EVICT_PARTITION_SIZE) + def columnarShuffleMergeThreshold: Double = getConf(SHUFFLE_WRITER_MERGE_THRESHOLD) def columnarShuffleCodec: Option[String] = getConf(COLUMNAR_SHUFFLE_CODEC) @@ -1061,6 +1063,14 @@ object GlutenConfig extends ConfigRegistry { .checkValue(v => v >= 0 && v <= 1, "Buffer reallocation threshold must between [0, 1]") .createWithDefault(0.25) + val COLUMNAR_SHUFFLE_EVICT_PARTITION_SIZE = + buildConf("spark.gluten.sql.columnar.shuffle.evictPartitionSize") + .doc( + "For Velox hash shuffle writer, evict partition buffers larger than this threshold " + + "after splitting an input batch.") + .intConf + .createWithDefault(256 * 1024) + val COLUMNAR_SHUFFLE_CODEC = buildConf("spark.gluten.sql.columnar.shuffle.codec") .doc( From 5d40890b63697daf3e02bb60ea934307635cd3db Mon Sep 17 00:00:00 2001 From: wankun Date: Wed, 13 May 2026 13:59:59 +0000 Subject: [PATCH 2/4] update doc --- docs/Configuration.md | 253 +++++++++++++++++++++--------------------- 1 file changed, 127 insertions(+), 126 deletions(-) diff --git a/docs/Configuration.md b/docs/Configuration.md index 294e6c010ff0..b5a2083036ef 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -20,132 +20,133 @@ nav_order: 15 ## Gluten configurations -| Key | Default | Description | -|--------------------------------------------------------------------|-------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| spark.gluten.costModel | legacy | The class name of user-defined cost model that will be used by Gluten's transition planner. If not specified, a legacy built-in cost model will be used. The legacy cost model helps RAS planner exhaustively offload computations, and helps transition planner choose columnar-to-columnar transition over others. | -| spark.gluten.enabled | true | Whether to enable gluten. Default value is true. Just an experimental property. Recommend to enable/disable Gluten through the setting for spark.plugins. | -| spark.gluten.execution.resource.expired.time | 86400 | Expired time of execution with resource relation has cached. | -| spark.gluten.expression.blacklist | <undefined> | A black list of expression to skip transform, multiple values separated by commas. | -| spark.gluten.loadLibFromJar | false | Whether to load shared libraries from jars. | -| spark.gluten.loadLibOS | <undefined> | The shared library loader's OS name. | -| spark.gluten.loadLibOSVersion | <undefined> | The shared library loader's OS version. | -| spark.gluten.memory.isolation | false | Enable isolated memory mode. If true, Gluten controls the maximum off-heap memory can be used by each task to X, X = executor memory / max task slots. It's recommended to set true if Gluten serves concurrent queries within a single session, since not all memory Gluten allocated is guaranteed to be spillable. In the case, the feature should be enabled to avoid OOM. | -| spark.gluten.memory.overAcquiredMemoryRatio | 0.3 | If larger than 0, Velox backend will try over-acquire this ratio of the total allocated memory as backup to avoid OOM. | -| spark.gluten.memory.reservationBlockSize | 8MB | Block size of native reservation listener reserve memory from Spark. | -| spark.gluten.numTaskSlotsPerExecutor | -1 | Must provide default value since non-execution operations (e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate configurations using org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated | -| spark.gluten.shuffleWriter.bufferSize | <undefined> | -| spark.gluten.soft-affinity.duplicateReading.maxCacheItems | 10000 | Enable Soft Affinity duplicate reading detection | -| spark.gluten.soft-affinity.duplicateReadingDetect.enabled | false | If true, Enable Soft Affinity duplicate reading detection | -| spark.gluten.soft-affinity.enabled | false | Whether to enable Soft Affinity scheduling. | -| spark.gluten.soft-affinity.min.target-hosts | 1 | For on HDFS, if there are already target hosts, and then prefer to use the original target hosts to schedule | -| spark.gluten.soft-affinity.replications.num | 2 | Calculate the number of the replications for scheduling to the target executors per file | -| spark.gluten.sql.adaptive.costEvaluator.enabled | true | If true, use org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator as custom cost evaluator class, else follow the configuration spark.sql.adaptive.customCostEvaluatorClass. | -| spark.gluten.sql.ansiFallback.enabled | true | When true (default), Gluten will fall back to Spark when ANSI mode is enabled. When false, Gluten will attempt to execute in ANSI mode. | -| spark.gluten.sql.broadcastNestedLoopJoinTransformerEnabled | true | Config to enable BroadcastNestedLoopJoinExecTransformer. | -| spark.gluten.sql.cacheWholeStageTransformerContext | false | When true, `WholeStageTransformer` will cache the `WholeStageTransformerContext` when executing. It is used to get substrait plan node and native plan string. | -| spark.gluten.sql.cartesianProductTransformerEnabled | true | Config to enable CartesianProductExecTransformer. | -| spark.gluten.sql.collapseGetJsonObject.enabled | false | Collapse nested get_json_object functions as one for optimization. | -| spark.gluten.sql.columnar.appendData | true | Enable or disable columnar v2 command append data. | -| spark.gluten.sql.columnar.arrowUdf | true | Enable or disable columnar arrow udf. | -| spark.gluten.sql.columnar.batchscan | true | Enable or disable columnar batchscan. | -| spark.gluten.sql.columnar.broadcastExchange | true | Enable or disable columnar broadcastExchange. | -| spark.gluten.sql.columnar.broadcastJoin | true | Enable or disable columnar broadcastJoin. | -| spark.gluten.sql.columnar.cast.avg | true | -| spark.gluten.sql.columnar.coalesce | true | Enable or disable columnar coalesce. | -| spark.gluten.sql.columnar.collectLimit | true | Enable or disable columnar collectLimit. | -| spark.gluten.sql.columnar.collectTail | true | Enable or disable columnar collectTail. | -| spark.gluten.sql.columnar.enableNestedColumnPruningInHiveTableScan | true | Enable or disable nested column pruning in hivetablescan. | -| spark.gluten.sql.columnar.enableVanillaVectorizedReaders | true | Enable or disable vanilla vectorized scan. | -| spark.gluten.sql.columnar.executor.libpath || The gluten executor library path. | -| spark.gluten.sql.columnar.expand | true | Enable or disable columnar expand. | -| spark.gluten.sql.columnar.fallback.expressions.threshold | 50 | Fall back filter/project if number of nested expressions reaches this threshold, considering Spark codegen can bring better performance for such case. | -| spark.gluten.sql.columnar.fallback.ignoreRowToColumnar | true | When true, the fallback policy ignores the RowToColumnar when counting fallback number. | -| spark.gluten.sql.columnar.fallback.preferColumnar | true | When true, the fallback policy prefers to use Gluten plan rather than vanilla Spark plan if the both of them contains ColumnarToRow and the vanilla Spark plan ColumnarToRow number is not smaller than Gluten plan. | -| spark.gluten.sql.columnar.filescan | true | Enable or disable columnar filescan. | -| spark.gluten.sql.columnar.filter | true | Enable or disable columnar filter. | -| spark.gluten.sql.columnar.force.hashagg | true | Whether to force to use gluten's hash agg for replacing vanilla spark's sort agg. | -| spark.gluten.sql.columnar.forceShuffledHashJoin | true | -| spark.gluten.sql.columnar.generate | true | -| spark.gluten.sql.columnar.hashagg | true | Enable or disable columnar hashagg. | -| spark.gluten.sql.columnar.hivetablescan | true | Enable or disable columnar hivetablescan. | -| spark.gluten.sql.columnar.libname | gluten | The gluten library name. | -| spark.gluten.sql.columnar.libpath || The gluten library path. | -| spark.gluten.sql.columnar.limit | true | -| spark.gluten.sql.columnar.maxBatchSize | 4096 | -| spark.gluten.sql.columnar.overwriteByExpression | true | Enable or disable columnar v2 command overwrite by expression. | -| spark.gluten.sql.columnar.overwritePartitionsDynamic | true | Enable or disable columnar v2 command overwrite partitions dynamic. | -| spark.gluten.sql.columnar.parquet.write.blockSize | 128MB | -| spark.gluten.sql.columnar.partial.generate | true | Evaluates the non-offload-able HiveUDTF using vanilla Spark generator | -| spark.gluten.sql.columnar.partial.project | true | Break up one project node into 2 phases when some of the expressions are non offload-able. Phase one is a regular offloaded project transformer that evaluates the offload-able expressions in native, phase two preserves the output from phase one and evaluates the remaining non-offload-able expressions using vanilla Spark projections | -| spark.gluten.sql.columnar.physicalJoinOptimizationLevel | 12 | Fallback to row operators if there are several continuous joins. | -| spark.gluten.sql.columnar.physicalJoinOptimizationOutputSize | 52 | Fallback to row operators if there are several continuous joins and matched output size. | -| spark.gluten.sql.columnar.physicalJoinOptimizeEnable | false | Enable or disable columnar physicalJoinOptimize. | -| spark.gluten.sql.columnar.preferStreamingAggregate | true | Velox backend supports `StreamingAggregate`. `StreamingAggregate` uses the less memory as it does not need to hold all groups in memory, so it could avoid spill. When true and the child output ordering satisfies the grouping key then Gluten will choose `StreamingAggregate` as the native operator. | -| spark.gluten.sql.columnar.project | true | Enable or disable columnar project. | -| spark.gluten.sql.columnar.project.collapse | true | Combines two columnar project operators into one and perform alias substitution | -| spark.gluten.sql.columnar.query.fallback.threshold | -1 | The threshold for whether query will fall back by counting the number of ColumnarToRow & vanilla leaf node. | -| spark.gluten.sql.columnar.range | true | Enable or disable columnar range. | -| spark.gluten.sql.columnar.replaceData | true | Enable or disable columnar v2 command replace data. | -| spark.gluten.sql.columnar.scanOnly | false | When enabled, only scan and the filter after scan will be offloaded to native. | -| spark.gluten.sql.columnar.shuffle | true | Enable or disable columnar shuffle. | -| spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true | If enabled, fall back to ColumnarShuffleManager when celeborn service is unavailable.Otherwise, throw an exception. | -| spark.gluten.sql.columnar.shuffle.celeborn.useRssSort | true | If true, use RSS sort implementation for Celeborn sort-based shuffle.If false, use Gluten's row-based sort implementation. Only valid when `spark.celeborn.client.spark.shuffle.writer` is set to `sort`. | -| spark.gluten.sql.columnar.shuffle.codec | <undefined> | By default, the supported codecs are lz4 and zstd. When spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are gzip and zstd. | -| spark.gluten.sql.columnar.shuffle.codecBackend | <undefined> | -| spark.gluten.sql.columnar.shuffle.compression.threshold | 100 | If number of rows in a batch falls below this threshold, will copy all buffers into one buffer to compress. | -| spark.gluten.sql.columnar.shuffle.dictionary.enabled | false | Enable dictionary in hash-based shuffle. | -| spark.gluten.sql.columnar.shuffle.merge.threshold | 0.25 | -| spark.gluten.sql.columnar.shuffle.readerBufferSize | 1MB | Buffer size in bytes for shuffle reader reading input stream from local or remote. | -| spark.gluten.sql.columnar.shuffle.realloc.threshold | 0.25 | -| spark.gluten.sql.columnar.shuffle.sort.columns.threshold | 100000 | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of columns is greater than this threshold. | -| spark.gluten.sql.columnar.shuffle.sort.deserializerBufferSize | 1MB | Buffer size in bytes for sort-based shuffle reader deserializing raw input to columnar batch. | -| spark.gluten.sql.columnar.shuffle.sort.partitions.threshold | 4000 | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of partitions is greater than this threshold. | -| spark.gluten.sql.columnar.shuffle.typeAwareCompress.enabled | false | Enable type-aware compression (e.g. FFor for 64-bit integers) in shuffle. Not compatible with dictionary encoding; if both are enabled, type-aware compression is automatically disabled. | -| spark.gluten.sql.columnar.shuffledHashJoin | true | Enable or disable columnar shuffledHashJoin. | -| spark.gluten.sql.columnar.shuffledHashJoin.optimizeBuildSide | true | Whether to allow Gluten to choose an optimal build side for shuffled hash join. | -| spark.gluten.sql.columnar.smallFileThreshold | 0.5 | The total size threshold of small files in table scan.To avoid small files being placed into the same partition, Gluten will try to distribute small files into different partitions when the total size of small files is below this threshold. | -| spark.gluten.sql.columnar.sort | true | Enable or disable columnar sort. | -| spark.gluten.sql.columnar.sortMergeJoin | true | Enable or disable columnar sortMergeJoin. This should be set with preferSortMergeJoin=false. | -| spark.gluten.sql.columnar.tableCache | false | Enable or disable columnar table cache. | -| spark.gluten.sql.columnar.takeOrderedAndProject | true | -| spark.gluten.sql.columnar.union | true | Enable or disable columnar union. | -| spark.gluten.sql.columnar.wholeStage.fallback.threshold | -1 | The threshold for whether whole stage will fall back in AQE supported case by counting the number of ColumnarToRow & vanilla leaf node. | -| spark.gluten.sql.columnar.window | true | Enable or disable columnar window. | -| spark.gluten.sql.columnar.window.group.limit | true | Enable or disable columnar window group limit. | -| spark.gluten.sql.columnar.writeToDataSourceV2 | true | Enable or disable columnar v2 command write to data source v2. | -| spark.gluten.sql.columnarSampleEnabled | false | Disable or enable columnar sample. | -| spark.gluten.sql.columnarToRowMemoryThreshold | 64MB | -| spark.gluten.sql.countDistinctWithoutExpand | false | Convert Count Distinct to a UDAF called count_distinct to prevent SparkPlanner converting it to Expand+Count. WARNING: When enabled, count distinct queries will fail to fallback!!! | -| spark.gluten.sql.extendedColumnPruning.enabled | true | Do extended nested column pruning for cases ignored by vanilla Spark. | -| spark.gluten.sql.fallbackRegexpExpressions | false | If true, fall back all regexp expressions. There are a few incompatible cases between RE2 (used by native engine) and java.util.regex (used by Spark). User should enable this property if their incompatibility is intolerable. | -| spark.gluten.sql.fallbackUnexpectedMetadataParquet | false | If enabled, Gluten will not offload scan when unexpected metadata is detected. | -| spark.gluten.sql.fallbackUnexpectedMetadataParquet.limit | 10 | If supplied, metadata of `limit` number of Parquet files will be checked to determine whether to fall back to java scan. | -| spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage | 0.1 | The percentage of root paths to sample for metadata validation when the number of root paths is large. Value range is (0, 1.0]. 1.0 means check all paths (no sampling). A smaller value reduces validation cost for tables with many partitions. | -| spark.gluten.sql.injectNativePlanStringToExplain | false | When true, Gluten will inject native plan tree to Spark's explain output. | -| spark.gluten.sql.mergeTwoPhasesAggregate.enabled | true | Whether to merge two phases aggregate if there are no other operators between them. | -| spark.gluten.sql.native.arrow.reader.enabled | false | This is config to specify whether to enable the native columnar csv reader | -| spark.gluten.sql.native.bloomFilter | true | -| spark.gluten.sql.native.hive.writer.enabled | true | This is config to specify whether to enable the native columnar writer for HiveFileFormat. Currently only supports HiveFileFormat with Parquet as the output file type. | -| spark.gluten.sql.native.hyperLogLog.Aggregate | true | -| spark.gluten.sql.native.parquet.write.blockRows | 100000000 | -| spark.gluten.sql.native.union | false | Enable or disable native union where computation is completely offloaded to backend. | -| spark.gluten.sql.native.writeColumnMetadataExclusionList | comment | Native write files does not support column metadata. Metadata in list would be removed to support native write files. Multiple values separated by commas. | -| spark.gluten.sql.native.writer.enabled | <undefined> | This is config to specify whether to enable the native columnar parquet/orc writer | -| spark.gluten.sql.orc.charType.scan.fallback.enabled | true | Force fallback for orc char type scan. | -| spark.gluten.sql.pushAggregateThroughJoin.enabled | false | Enables the push-aggregate-through-join optimization in Gluten. When enabled, aggregate operators may be pushed below joins during logical optimization and corresponding physical plans may be rewritten to execute the aggregation earlier. | -| spark.gluten.sql.pushAggregateThroughJoin.maxDepth | 2147483647 | Maximum join traversal depth when applying the push-aggregate-through-join optimization. A value of 1 allows pushing an aggregate through a single join; larger values allow the rule to traverse and push through multiple consecutive joins. | -| spark.gluten.sql.removeNativeWriteFilesSortAndProject | true | When true, Gluten will remove the vanilla Spark V1Writes added sort and project for velox backend. | -| spark.gluten.sql.rewrite.dateTimestampComparison | true | Rewrite the comparision between date and timestamp to timestamp comparison.For example `from_unixtime(ts) > date` will be rewritten to `ts > to_unixtime(date)` | -| spark.gluten.sql.scan.fileSchemeValidation.enabled | true | When true, enable file path scheme validation for scan. Validation will fail if file scheme is not supported by registered file systems, which will cause scan operator fall back. | -| spark.gluten.sql.supported.flattenNestedFunctions | and,or | Flatten nested functions as one for optimization. | -| spark.gluten.sql.text.input.empty.as.default | false | treat empty fields in CSV input as default values. | -| spark.gluten.sql.text.input.max.block.size | 8KB | the max block size for text input rows | -| spark.gluten.sql.validation.printStackOnFailure | false | -| spark.gluten.storage.hdfsViewfs.enabled | false | If enabled, gluten will convert the viewfs path to hdfs path in scala side | -| spark.gluten.supported.hive.udfs || Supported hive udf names. | -| spark.gluten.supported.python.udfs || Supported python udf names. | -| spark.gluten.supported.scala.udfs || Supported scala udf names. | -| spark.gluten.ui.enabled | true | Whether to enable the gluten web UI, If true, attach the gluten UI page to the Spark web UI. | +| Key | Default | Description | +|---------------------------------------------------------------------|-------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| spark.gluten.costModel | legacy | The class name of user-defined cost model that will be used by Gluten's transition planner. If not specified, a legacy built-in cost model will be used. The legacy cost model helps RAS planner exhaustively offload computations, and helps transition planner choose columnar-to-columnar transition over others. | +| spark.gluten.enabled | true | Whether to enable gluten. Default value is true. Just an experimental property. Recommend to enable/disable Gluten through the setting for spark.plugins. | +| spark.gluten.execution.resource.expired.time | 86400 | Expired time of execution with resource relation has cached. | +| spark.gluten.expression.blacklist | <undefined> | A black list of expression to skip transform, multiple values separated by commas. | +| spark.gluten.loadLibFromJar | false | Whether to load shared libraries from jars. | +| spark.gluten.loadLibOS | <undefined> | The shared library loader's OS name. | +| spark.gluten.loadLibOSVersion | <undefined> | The shared library loader's OS version. | +| spark.gluten.memory.isolation | false | Enable isolated memory mode. If true, Gluten controls the maximum off-heap memory can be used by each task to X, X = executor memory / max task slots. It's recommended to set true if Gluten serves concurrent queries within a single session, since not all memory Gluten allocated is guaranteed to be spillable. In the case, the feature should be enabled to avoid OOM. | +| spark.gluten.memory.overAcquiredMemoryRatio | 0.3 | If larger than 0, Velox backend will try over-acquire this ratio of the total allocated memory as backup to avoid OOM. | +| spark.gluten.memory.reservationBlockSize | 8MB | Block size of native reservation listener reserve memory from Spark. | +| spark.gluten.numTaskSlotsPerExecutor | -1 | Must provide default value since non-execution operations (e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate configurations using org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated | +| spark.gluten.shuffleWriter.bufferSize | <undefined> | +| spark.gluten.soft-affinity.duplicateReading.maxCacheItems | 10000 | Enable Soft Affinity duplicate reading detection | +| spark.gluten.soft-affinity.duplicateReadingDetect.enabled | false | If true, Enable Soft Affinity duplicate reading detection | +| spark.gluten.soft-affinity.enabled | false | Whether to enable Soft Affinity scheduling. | +| spark.gluten.soft-affinity.min.target-hosts | 1 | For on HDFS, if there are already target hosts, and then prefer to use the original target hosts to schedule | +| spark.gluten.soft-affinity.replications.num | 2 | Calculate the number of the replications for scheduling to the target executors per file | +| spark.gluten.sql.adaptive.costEvaluator.enabled | true | If true, use org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator as custom cost evaluator class, else follow the configuration spark.sql.adaptive.customCostEvaluatorClass. | +| spark.gluten.sql.ansiFallback.enabled | true | When true (default), Gluten will fall back to Spark when ANSI mode is enabled. When false, Gluten will attempt to execute in ANSI mode. | +| spark.gluten.sql.broadcastNestedLoopJoinTransformerEnabled | true | Config to enable BroadcastNestedLoopJoinExecTransformer. | +| spark.gluten.sql.cacheWholeStageTransformerContext | false | When true, `WholeStageTransformer` will cache the `WholeStageTransformerContext` when executing. It is used to get substrait plan node and native plan string. | +| spark.gluten.sql.cartesianProductTransformerEnabled | true | Config to enable CartesianProductExecTransformer. | +| spark.gluten.sql.collapseGetJsonObject.enabled | false | Collapse nested get_json_object functions as one for optimization. | +| spark.gluten.sql.columnar.appendData | true | Enable or disable columnar v2 command append data. | +| spark.gluten.sql.columnar.arrowUdf | true | Enable or disable columnar arrow udf. | +| spark.gluten.sql.columnar.batchscan | true | Enable or disable columnar batchscan. | +| spark.gluten.sql.columnar.broadcastExchange | true | Enable or disable columnar broadcastExchange. | +| spark.gluten.sql.columnar.broadcastJoin | true | Enable or disable columnar broadcastJoin. | +| spark.gluten.sql.columnar.cast.avg | true | +| spark.gluten.sql.columnar.coalesce | true | Enable or disable columnar coalesce. | +| spark.gluten.sql.columnar.collectLimit | true | Enable or disable columnar collectLimit. | +| spark.gluten.sql.columnar.collectTail | true | Enable or disable columnar collectTail. | +| spark.gluten.sql.columnar.enableNestedColumnPruningInHiveTableScan | true | Enable or disable nested column pruning in hivetablescan. | +| spark.gluten.sql.columnar.enableVanillaVectorizedReaders | true | Enable or disable vanilla vectorized scan. | +| spark.gluten.sql.columnar.executor.libpath || The gluten executor library path. | +| spark.gluten.sql.columnar.expand | true | Enable or disable columnar expand. | +| spark.gluten.sql.columnar.fallback.expressions.threshold | 50 | Fall back filter/project if number of nested expressions reaches this threshold, considering Spark codegen can bring better performance for such case. | +| spark.gluten.sql.columnar.fallback.ignoreRowToColumnar | true | When true, the fallback policy ignores the RowToColumnar when counting fallback number. | +| spark.gluten.sql.columnar.fallback.preferColumnar | true | When true, the fallback policy prefers to use Gluten plan rather than vanilla Spark plan if the both of them contains ColumnarToRow and the vanilla Spark plan ColumnarToRow number is not smaller than Gluten plan. | +| spark.gluten.sql.columnar.filescan | true | Enable or disable columnar filescan. | +| spark.gluten.sql.columnar.filter | true | Enable or disable columnar filter. | +| spark.gluten.sql.columnar.force.hashagg | true | Whether to force to use gluten's hash agg for replacing vanilla spark's sort agg. | +| spark.gluten.sql.columnar.forceShuffledHashJoin | true | +| spark.gluten.sql.columnar.generate | true | +| spark.gluten.sql.columnar.hashagg | true | Enable or disable columnar hashagg. | +| spark.gluten.sql.columnar.hivetablescan | true | Enable or disable columnar hivetablescan. | +| spark.gluten.sql.columnar.libname | gluten | The gluten library name. | +| spark.gluten.sql.columnar.libpath || The gluten library path. | +| spark.gluten.sql.columnar.limit | true | +| spark.gluten.sql.columnar.maxBatchSize | 4096 | +| spark.gluten.sql.columnar.overwriteByExpression | true | Enable or disable columnar v2 command overwrite by expression. | +| spark.gluten.sql.columnar.overwritePartitionsDynamic | true | Enable or disable columnar v2 command overwrite partitions dynamic. | +| spark.gluten.sql.columnar.parquet.write.blockSize | 128MB | +| spark.gluten.sql.columnar.partial.generate | true | Evaluates the non-offload-able HiveUDTF using vanilla Spark generator | +| spark.gluten.sql.columnar.partial.project | true | Break up one project node into 2 phases when some of the expressions are non offload-able. Phase one is a regular offloaded project transformer that evaluates the offload-able expressions in native, phase two preserves the output from phase one and evaluates the remaining non-offload-able expressions using vanilla Spark projections | +| spark.gluten.sql.columnar.physicalJoinOptimizationLevel | 12 | Fallback to row operators if there are several continuous joins. | +| spark.gluten.sql.columnar.physicalJoinOptimizationOutputSize | 52 | Fallback to row operators if there are several continuous joins and matched output size. | +| spark.gluten.sql.columnar.physicalJoinOptimizeEnable | false | Enable or disable columnar physicalJoinOptimize. | +| spark.gluten.sql.columnar.preferStreamingAggregate | true | Velox backend supports `StreamingAggregate`. `StreamingAggregate` uses the less memory as it does not need to hold all groups in memory, so it could avoid spill. When true and the child output ordering satisfies the grouping key then Gluten will choose `StreamingAggregate` as the native operator. | +| spark.gluten.sql.columnar.project | true | Enable or disable columnar project. | +| spark.gluten.sql.columnar.project.collapse | true | Combines two columnar project operators into one and perform alias substitution | +| spark.gluten.sql.columnar.query.fallback.threshold | -1 | The threshold for whether query will fall back by counting the number of ColumnarToRow & vanilla leaf node. | +| spark.gluten.sql.columnar.range | true | Enable or disable columnar range. | +| spark.gluten.sql.columnar.replaceData | true | Enable or disable columnar v2 command replace data. | +| spark.gluten.sql.columnar.scanOnly | false | When enabled, only scan and the filter after scan will be offloaded to native. | +| spark.gluten.sql.columnar.shuffle | true | Enable or disable columnar shuffle. | +| spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true | If enabled, fall back to ColumnarShuffleManager when celeborn service is unavailable.Otherwise, throw an exception. | +| spark.gluten.sql.columnar.shuffle.celeborn.useRssSort | true | If true, use RSS sort implementation for Celeborn sort-based shuffle.If false, use Gluten's row-based sort implementation. Only valid when `spark.celeborn.client.spark.shuffle.writer` is set to `sort`. | +| spark.gluten.sql.columnar.shuffle.codec | <undefined> | By default, the supported codecs are lz4 and zstd. When spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are gzip and zstd. | +| spark.gluten.sql.columnar.shuffle.codecBackend | <undefined> | +| spark.gluten.sql.columnar.shuffle.compression.threshold | 100 | If number of rows in a batch falls below this threshold, will copy all buffers into one buffer to compress. | +| spark.gluten.sql.columnar.shuffle.dictionary.enabled | false | Enable dictionary in hash-based shuffle. | +| spark.gluten.sql.columnar.shuffle.evictPartitionSize | 262144 | For Velox hash shuffle writer, evict partition buffers larger than this threshold after splitting an input batch. | +| spark.gluten.sql.columnar.shuffle.merge.threshold | 0.25 | +| spark.gluten.sql.columnar.shuffle.readerBufferSize | 1MB | Buffer size in bytes for shuffle reader reading input stream from local or remote. | +| spark.gluten.sql.columnar.shuffle.realloc.threshold | 0.25 | +| spark.gluten.sql.columnar.shuffle.sort.columns.threshold | 100000 | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of columns is greater than this threshold. | +| spark.gluten.sql.columnar.shuffle.sort.deserializerBufferSize | 1MB | Buffer size in bytes for sort-based shuffle reader deserializing raw input to columnar batch. | +| spark.gluten.sql.columnar.shuffle.sort.partitions.threshold | 4000 | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of partitions is greater than this threshold. | +| spark.gluten.sql.columnar.shuffle.typeAwareCompress.enabled | false | Enable type-aware compression (e.g. FFor for 64-bit integers) in shuffle. Not compatible with dictionary encoding; if both are enabled, type-aware compression is automatically disabled. | +| spark.gluten.sql.columnar.shuffledHashJoin | true | Enable or disable columnar shuffledHashJoin. | +| spark.gluten.sql.columnar.shuffledHashJoin.optimizeBuildSide | true | Whether to allow Gluten to choose an optimal build side for shuffled hash join. | +| spark.gluten.sql.columnar.smallFileThreshold | 0.5 | The total size threshold of small files in table scan.To avoid small files being placed into the same partition, Gluten will try to distribute small files into different partitions when the total size of small files is below this threshold. | +| spark.gluten.sql.columnar.sort | true | Enable or disable columnar sort. | +| spark.gluten.sql.columnar.sortMergeJoin | true | Enable or disable columnar sortMergeJoin. This should be set with preferSortMergeJoin=false. | +| spark.gluten.sql.columnar.tableCache | false | Enable or disable columnar table cache. | +| spark.gluten.sql.columnar.takeOrderedAndProject | true | +| spark.gluten.sql.columnar.union | true | Enable or disable columnar union. | +| spark.gluten.sql.columnar.wholeStage.fallback.threshold | -1 | The threshold for whether whole stage will fall back in AQE supported case by counting the number of ColumnarToRow & vanilla leaf node. | +| spark.gluten.sql.columnar.window | true | Enable or disable columnar window. | +| spark.gluten.sql.columnar.window.group.limit | true | Enable or disable columnar window group limit. | +| spark.gluten.sql.columnar.writeToDataSourceV2 | true | Enable or disable columnar v2 command write to data source v2. | +| spark.gluten.sql.columnarSampleEnabled | false | Disable or enable columnar sample. | +| spark.gluten.sql.columnarToRowMemoryThreshold | 64MB | +| spark.gluten.sql.countDistinctWithoutExpand | false | Convert Count Distinct to a UDAF called count_distinct to prevent SparkPlanner converting it to Expand+Count. WARNING: When enabled, count distinct queries will fail to fallback!!! | +| spark.gluten.sql.extendedColumnPruning.enabled | true | Do extended nested column pruning for cases ignored by vanilla Spark. | +| spark.gluten.sql.fallbackRegexpExpressions | false | If true, fall back all regexp expressions. There are a few incompatible cases between RE2 (used by native engine) and java.util.regex (used by Spark). User should enable this property if their incompatibility is intolerable. | +| spark.gluten.sql.fallbackUnexpectedMetadataParquet | false | If enabled, Gluten will not offload scan when unexpected metadata is detected. | +| spark.gluten.sql.fallbackUnexpectedMetadataParquet.limit | 10 | If supplied, metadata of `limit` number of Parquet files will be checked to determine whether to fall back to java scan. | +| spark.gluten.sql.fallbackUnexpectedMetadataParquet.samplePercentage | 0.1 | The percentage of root paths to sample for metadata validation when the number of root paths is large. Value range is (0, 1.0]. 1.0 means check all paths (no sampling). A smaller value reduces validation cost for tables with many partitions. | +| spark.gluten.sql.injectNativePlanStringToExplain | false | When true, Gluten will inject native plan tree to Spark's explain output. | +| spark.gluten.sql.mergeTwoPhasesAggregate.enabled | true | Whether to merge two phases aggregate if there are no other operators between them. | +| spark.gluten.sql.native.arrow.reader.enabled | false | This is config to specify whether to enable the native columnar csv reader | +| spark.gluten.sql.native.bloomFilter | true | +| spark.gluten.sql.native.hive.writer.enabled | true | This is config to specify whether to enable the native columnar writer for HiveFileFormat. Currently only supports HiveFileFormat with Parquet as the output file type. | +| spark.gluten.sql.native.hyperLogLog.Aggregate | true | +| spark.gluten.sql.native.parquet.write.blockRows | 100000000 | +| spark.gluten.sql.native.union | false | Enable or disable native union where computation is completely offloaded to backend. | +| spark.gluten.sql.native.writeColumnMetadataExclusionList | comment | Native write files does not support column metadata. Metadata in list would be removed to support native write files. Multiple values separated by commas. | +| spark.gluten.sql.native.writer.enabled | <undefined> | This is config to specify whether to enable the native columnar parquet/orc writer | +| spark.gluten.sql.orc.charType.scan.fallback.enabled | true | Force fallback for orc char type scan. | +| spark.gluten.sql.pushAggregateThroughJoin.enabled | false | Enables the push-aggregate-through-join optimization in Gluten. When enabled, aggregate operators may be pushed below joins during logical optimization and corresponding physical plans may be rewritten to execute the aggregation earlier. | +| spark.gluten.sql.pushAggregateThroughJoin.maxDepth | 2147483647 | Maximum join traversal depth when applying the push-aggregate-through-join optimization. A value of 1 allows pushing an aggregate through a single join; larger values allow the rule to traverse and push through multiple consecutive joins. | +| spark.gluten.sql.removeNativeWriteFilesSortAndProject | true | When true, Gluten will remove the vanilla Spark V1Writes added sort and project for velox backend. | +| spark.gluten.sql.rewrite.dateTimestampComparison | true | Rewrite the comparision between date and timestamp to timestamp comparison.For example `from_unixtime(ts) > date` will be rewritten to `ts > to_unixtime(date)` | +| spark.gluten.sql.scan.fileSchemeValidation.enabled | true | When true, enable file path scheme validation for scan. Validation will fail if file scheme is not supported by registered file systems, which will cause scan operator fall back. | +| spark.gluten.sql.supported.flattenNestedFunctions | and,or | Flatten nested functions as one for optimization. | +| spark.gluten.sql.text.input.empty.as.default | false | treat empty fields in CSV input as default values. | +| spark.gluten.sql.text.input.max.block.size | 8KB | the max block size for text input rows | +| spark.gluten.sql.validation.printStackOnFailure | false | +| spark.gluten.storage.hdfsViewfs.enabled | false | If enabled, gluten will convert the viewfs path to hdfs path in scala side | +| spark.gluten.supported.hive.udfs || Supported hive udf names. | +| spark.gluten.supported.python.udfs || Supported python udf names. | +| spark.gluten.supported.scala.udfs || Supported scala udf names. | +| spark.gluten.ui.enabled | true | Whether to enable the gluten web UI, If true, attach the gluten UI page to the Spark web UI. | ## Gluten *experimental* configurations From 715585d1a1bf36c2503631861077ed8aeb3dbcb3 Mon Sep 17 00:00:00 2001 From: WanKun Date: Fri, 15 May 2026 13:36:54 +0800 Subject: [PATCH 3/4] Rename conf name --- .../shuffle/VeloxCelebornColumnarShuffleWriter.scala | 2 +- .../writer/VeloxUniffleColumnarShuffleWriter.java | 2 +- .../apache/spark/shuffle/ColumnarShuffleWriter.scala | 2 +- cpp/core/jni/JniWrapper.cc | 4 ++-- cpp/core/shuffle/Options.h | 12 ++++++------ cpp/velox/shuffle/VeloxHashShuffleWriter.cc | 4 ++-- cpp/velox/shuffle/VeloxHashShuffleWriter.h | 4 ++-- .../gluten/vectorized/ShuffleWriterJniWrapper.java | 2 +- .../org/apache/gluten/config/GlutenConfig.scala | 11 ++++++----- 9 files changed, 22 insertions(+), 21 deletions(-) diff --git a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala index a80fce39cbcf..03208adbcf34 100644 --- a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala +++ b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala @@ -150,7 +150,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, context.partitionId), nativeBufferSize, GlutenConfig.get.columnarShuffleReallocThreshold, - GlutenConfig.get.columnarShuffleEvictPartitionSize, + GlutenConfig.get.columnarShufflePartitionBufferEvictThreshold, partitionWriterHandle ) case SortShuffleWriterType => diff --git a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index 6a45cbd53cb8..e01f97ba3ea9 100644 --- a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -185,7 +185,7 @@ protected void writeImpl(Iterator> records) { columnarDep.nativePartitioning(), partitionId), nativeBufferSize, reallocThreshold, - GlutenConfig.get().columnarShuffleEvictPartitionSize(), + GlutenConfig.get().columnarShufflePartitionBufferEvictThreshold(), partitionWriterHandle); } diff --git a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 7c38653cb235..ff1c66e03731 100644 --- a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -192,7 +192,7 @@ class ColumnarShuffleWriter[K, V]( taskContext.partitionId), nativeBufferSize, reallocThreshold, - GlutenConfig.get.columnarShuffleEvictPartitionSize, + GlutenConfig.get.columnarShufflePartitionBufferEvictThreshold, partitionWriterHandle ) } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 86cfce616e54..6cb3fd6ffcfc 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -990,7 +990,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe jint startPartitionId, jint splitBufferSize, jdouble splitBufferReallocThreshold, - jint evictPartitionSize, + jint partitionBufferEvictThreshold, jlong partitionWriterHandle) { JNI_METHOD_START const auto ctx = getRuntime(env, wrapper); @@ -1006,7 +1006,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe startPartitionId, splitBufferSize, splitBufferReallocThreshold, - evictPartitionSize); + partitionBufferEvictThreshold); return ctx->saveObject(ctx->createShuffleWriter(numPartitions, partitionWriter, shuffleWriterOptions)); JNI_METHOD_END(kInvalidObjectHandle) diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 82d9f0aadd11..a0d91c32bb16 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -27,7 +27,7 @@ namespace gluten { static constexpr int16_t kDefaultBatchSize = 4096; -static constexpr int32_t kDefaultEvictPartitionSize = 256 * 1024; +static constexpr int32_t kDefaultPartitionBufferEvictThreshold = -1; static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096; static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20; static constexpr int64_t kDefaultPushMemoryThreshold = 4096; @@ -82,7 +82,7 @@ struct ShuffleWriterOptions { struct HashShuffleWriterOptions : ShuffleWriterOptions { int32_t splitBufferSize = kDefaultShuffleWriterBufferSize; double splitBufferReallocThreshold = kDefaultSplitBufferReallocThreshold; - int32_t evictPartitionSize = kDefaultEvictPartitionSize; + int32_t partitionBufferEvictThreshold = kDefaultPartitionBufferEvictThreshold; HashShuffleWriterOptions() : ShuffleWriterOptions(ShuffleWriterType::kHashShuffle) {} @@ -91,11 +91,11 @@ struct HashShuffleWriterOptions : ShuffleWriterOptions { int32_t startPartitionId, int32_t partitionBufferSize, double partitionBufferReallocThreshold, - int32_t partitionEvictPartitionSize = kDefaultEvictPartitionSize) + int32_t partitionBufferEvictThreshold = kDefaultPartitionBufferEvictThreshold) : ShuffleWriterOptions(ShuffleWriterType::kHashShuffle, partitioning, startPartitionId), splitBufferSize(partitionBufferSize), splitBufferReallocThreshold(partitionBufferReallocThreshold), - evictPartitionSize(partitionEvictPartitionSize) {} + partitionBufferEvictThreshold(partitionBufferEvictThreshold) {} protected: HashShuffleWriterOptions(ShuffleWriterType shuffleWriterType) : ShuffleWriterOptions(shuffleWriterType) {} @@ -106,11 +106,11 @@ struct HashShuffleWriterOptions : ShuffleWriterOptions { int32_t startPartitionId, int32_t partitionBufferSize, double partitionBufferReallocThreshold, - int32_t partitionEvictPartitionSize = kDefaultEvictPartitionSize) + int32_t partitionBufferEvictThreshold = kDefaultPartitionBufferEvictThreshold) : ShuffleWriterOptions(shuffleWriterType, partitioning, startPartitionId), splitBufferSize(partitionBufferSize), splitBufferReallocThreshold(partitionBufferReallocThreshold), - evictPartitionSize(partitionEvictPartitionSize) {} + partitionBufferEvictThreshold(partitionBufferEvictThreshold) {} }; struct SortShuffleWriterOptions : ShuffleWriterOptions { diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index 6b3b2c709997..526e080f82d8 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -440,11 +440,11 @@ arrow::Status VeloxHashShuffleWriter::doSplit(const facebook::velox::RowVector& printPartitionBuffer(); setSplitState(SplitState::kInit); - if (evictPartitionSize_ > 0) { + if (partitionBufferEvictThreshold_ > 0) { // After split, evict large partition buffers to free up memory for the next input RowVector. const auto partitionBytes = estimatePartitionBufferBytes(); for (uint32_t pid = 0; pid < partitionBytes.size(); ++pid) { - if (partitionBufferBase_[pid] > 0 && partitionBytes[pid] >= evictPartitionSize_) { + if (partitionBufferBase_[pid] > 0 && partitionBytes[pid] >= partitionBufferEvictThreshold_) { RETURN_NOT_OK(evictPartitionBuffers(pid, false)); } } diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.h b/cpp/velox/shuffle/VeloxHashShuffleWriter.h index 114df80b45c9..e746dddab8cf 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.h @@ -215,7 +215,7 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter { : VeloxShuffleWriter(numPartitions, partitionWriter, options, memoryManager), splitBufferSize_(options->splitBufferSize), splitBufferReallocThreshold_(options->splitBufferReallocThreshold), - evictPartitionSize_(options->evictPartitionSize) { + partitionBufferEvictThreshold_(options->partitionBufferEvictThreshold) { arenas_.resize(numPartitions); } @@ -329,7 +329,7 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter { protected: int32_t splitBufferSize_; double splitBufferReallocThreshold_; - int32_t evictPartitionSize_; + int32_t partitionBufferEvictThreshold_; std::shared_ptr schema_; diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java index b45d23f91d3b..87685f8505c2 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java @@ -43,7 +43,7 @@ public native long createHashShuffleWriter( int startPartitionId, int splitBufferSize, double splitBufferReallocThreshold, - int evictPartitionSize, + int partitionBufferEvictThreshold, long partitionWriterHandle); public native long createSortShuffleWriter( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index dd2bb9547e6c..97214e1348a2 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -225,7 +225,8 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { def columnarShuffleReallocThreshold: Double = getConf(COLUMNAR_SHUFFLE_REALLOC_THRESHOLD) - def columnarShuffleEvictPartitionSize: Int = getConf(COLUMNAR_SHUFFLE_EVICT_PARTITION_SIZE) + def columnarShufflePartitionBufferEvictThreshold: Int = + getConf(COLUMNAR_SHUFFLE_PARTITION_BUFFER_EVICT_THRESHOLD) def columnarShuffleMergeThreshold: Double = getConf(SHUFFLE_WRITER_MERGE_THRESHOLD) @@ -1063,13 +1064,13 @@ object GlutenConfig extends ConfigRegistry { .checkValue(v => v >= 0 && v <= 1, "Buffer reallocation threshold must between [0, 1]") .createWithDefault(0.25) - val COLUMNAR_SHUFFLE_EVICT_PARTITION_SIZE = - buildConf("spark.gluten.sql.columnar.shuffle.evictPartitionSize") + val COLUMNAR_SHUFFLE_PARTITION_BUFFER_EVICT_THRESHOLD = + buildConf("spark.gluten.sql.columnar.shuffle.partitionBufferEvictThreshold") .doc( "For Velox hash shuffle writer, evict partition buffers larger than this threshold " + - "after splitting an input batch.") + "after splitting an input batch. Use non-positive value to disable this feature.") .intConf - .createWithDefault(256 * 1024) + .createWithDefault(-1) val COLUMNAR_SHUFFLE_CODEC = buildConf("spark.gluten.sql.columnar.shuffle.codec") From 26c6145d9f9a35351b8f996ee8f20c4ead75b7ee Mon Sep 17 00:00:00 2001 From: wankunde Date: Fri, 15 May 2026 10:10:08 +0000 Subject: [PATCH 4/4] update docs with dev/gen-all-config-docs.sh --- docs/Configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/Configuration.md b/docs/Configuration.md index b5a2083036ef..28c59953592e 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -94,8 +94,8 @@ nav_order: 15 | spark.gluten.sql.columnar.shuffle.codecBackend | <undefined> | | spark.gluten.sql.columnar.shuffle.compression.threshold | 100 | If number of rows in a batch falls below this threshold, will copy all buffers into one buffer to compress. | | spark.gluten.sql.columnar.shuffle.dictionary.enabled | false | Enable dictionary in hash-based shuffle. | -| spark.gluten.sql.columnar.shuffle.evictPartitionSize | 262144 | For Velox hash shuffle writer, evict partition buffers larger than this threshold after splitting an input batch. | | spark.gluten.sql.columnar.shuffle.merge.threshold | 0.25 | +| spark.gluten.sql.columnar.shuffle.partitionBufferEvictThreshold | -1 | For Velox hash shuffle writer, evict partition buffers larger than this threshold after splitting an input batch. Use non-positive value to disable this feature. | | spark.gluten.sql.columnar.shuffle.readerBufferSize | 1MB | Buffer size in bytes for shuffle reader reading input stream from local or remote. | | spark.gluten.sql.columnar.shuffle.realloc.threshold | 0.25 | | spark.gluten.sql.columnar.shuffle.sort.columns.threshold | 100000 | The threshold to determine whether to use sort-based columnar shuffle. Sort-based shuffle will be used if the number of columns is greater than this threshold. |