Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.shuffle

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType}
import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType, VeloxConfig}
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.runtime.Runtimes
import org.apache.gluten.utils.ArrowAbiUtil
Expand Down Expand Up @@ -95,6 +95,7 @@ private class CelebornColumnarBatchSerializerInstance(
val batchSize = GlutenConfig.get.maxBatchSize
val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
val deserializerBufferSize = GlutenConfig.get.columnarSortShuffleDeserializerBufferSize
val enableHashShuffleReaderStreamMerge = VeloxConfig.get.enableHashShuffleReaderStreamMerge
val handle = jniWrapper
.make(
cSchema.memoryAddress(),
Expand All @@ -103,7 +104,8 @@ private class CelebornColumnarBatchSerializerInstance(
batchSize,
readerBufferSize,
deserializerBufferSize,
shuffleWriterType.name
shuffleWriterType.name,
enableHashShuffleReaderStreamMerge
)
// Close shuffle reader instance as lately as the end of task processing,
// since the native reader could hold a reference to memory pool that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def veloxResizeBatchesShuffleOutput: Boolean =
getConf(COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_OUTPUT)

def enableHashShuffleReaderStreamMerge: Boolean =
getConf(COLUMNAR_VELOX_HASH_SHUFFLE_READER_STREAM_MERGE_ENABLED)

case class ResizeRange(min: Int, max: Int) {
assert(max >= min)
assert(min > 0, "Min batch size should be larger than 0")
Expand Down Expand Up @@ -322,6 +325,20 @@ object VeloxConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(false)

val COLUMNAR_VELOX_HASH_SHUFFLE_READER_STREAM_MERGE_ENABLED =
buildConf("spark.gluten.sql.columnar.backend.velox.hashShuffle.reader.streamMerge.enabled")
.doc(
"Enables a reader-side raw payload merge fast path for plain hash shuffle payloads " +
"within each shuffle input stream. This path merges payload buffers before Velox " +
"vectors are materialized, so it has lower per-batch overhead than generic " +
"VeloxResizeBatchesExec resizing, but it only covers plain payloads. Complex types " +
"and dictionary-encoded payloads are not merged by this path. " +
"VeloxResizeBatchesExec can still be enabled separately as a generic complement " +
"for types and encodings not covered by this fast path. If false, each hash " +
"shuffle payload is returned as its own columnar batch.")
.booleanConf
.createWithDefault(false)

val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE =
buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.vectorized

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType}
import org.apache.gluten.config.{GlutenConfig, ShuffleWriterType, VeloxConfig}
import org.apache.gluten.iterator.ClosableIterator
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.runtime.Runtimes
Expand Down Expand Up @@ -104,14 +104,17 @@ private class ColumnarBatchSerializerInstanceImpl(
val batchSize = GlutenConfig.get.maxBatchSize
val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize
val deserializerBufferSize = GlutenConfig.get.columnarSortShuffleDeserializerBufferSize
val enableHashShuffleReaderStreamMerge = VeloxConfig.get.enableHashShuffleReaderStreamMerge
val shuffleReaderHandle = jniWrapper.make(
cSchema.memoryAddress(),
compressionCodec,
compressionCodecBackend,
batchSize,
readerBufferSize,
deserializerBufferSize,
shuffleWriterType.name)
shuffleWriterType.name,
enableHashShuffleReaderStreamMerge
)
// Close shuffle reader instance as lately as the end of task processing,
// since the native reader could hold a reference to memory pool that
// was used to create all buffers read from shuffle reader. The pool
Expand Down
4 changes: 3 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1211,7 +1211,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
jint batchSize,
jlong readerBufferSize,
jlong deserializerBufferSize,
jstring shuffleWriterType) {
jstring shuffleWriterType,
jboolean enableHashShuffleReaderStreamMerge) {
JNI_METHOD_START
auto ctx = getRuntime(env, wrapper);

Expand All @@ -1223,6 +1224,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe
options.batchSize = batchSize;
options.readerBufferSize = readerBufferSize;
options.deserializerBufferSize = deserializerBufferSize;
options.enableHashShuffleReaderStreamMerge = enableHashShuffleReaderStreamMerge;

options.shuffleWriterType = ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterType));
std::shared_ptr<arrow::Schema> schema =
Expand Down
4 changes: 4 additions & 0 deletions cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ struct ShuffleReaderOptions {

// Buffer size when deserializing rows into columnar batches. Only used for sort-based shuffle.
int64_t deserializerBufferSize = kDefaultDeserializerBufferSize;

// Whether to enable the reader-side raw payload merge fast path for plain hash shuffle payloads within one input
// stream.
bool enableHashShuffleReaderStreamMerge = false;
};

struct ShuffleWriterOptions {
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,8 @@ std::shared_ptr<ShuffleReader> VeloxRuntime::createShuffleReader(
options.readerBufferSize,
options.deserializerBufferSize,
memoryManager(),
options.shuffleWriterType);
options.shuffleWriterType,
options.enableHashShuffleReaderStreamMerge);

return std::make_shared<VeloxShuffleReader>(std::move(deserializerFactory));
}
Expand Down
173 changes: 150 additions & 23 deletions cpp/velox/shuffle/VeloxShuffleReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ arrow::Result<BlockType> readBlockType(arrow::io::InputStream* inputStream) {
return type;
}

uint32_t validateHashShuffleReaderBatchSize(int32_t batchSize) {
GLUTEN_CHECK(batchSize > 0, fmt::format("Hash shuffle reader batch size must be positive, but got {}", batchSize));
return static_cast<uint32_t>(batchSize);
}

struct BufferViewReleaser {
BufferViewReleaser() : BufferViewReleaser(nullptr) {}

Expand Down Expand Up @@ -300,6 +305,23 @@ std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}

std::shared_ptr<VeloxColumnarBatch> makeColumnarBatch(
RowTypePtr type,
std::unique_ptr<InMemoryPayload> payload,
memory::MemoryPool* pool,
int64_t& deserializeTime) {
ScopedTimer timer(&deserializeTime);
std::vector<BufferPtr> veloxBuffers;
auto numBuffers = payload->numBuffers();
veloxBuffers.reserve(numBuffers);
for (size_t i = 0; i < numBuffers; ++i) {
GLUTEN_ASSIGN_OR_THROW(auto buffer, payload->readBufferAt(i));
veloxBuffers.push_back(convertToVeloxBuffer(std::move(buffer)));
}
auto rowVector = deserialize(type, payload->numRows(), veloxBuffers, {}, {}, pool);
return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
}

arrow::Result<BufferPtr>
readDictionaryBuffer(arrow::io::InputStream* in, facebook::velox::memory::MemoryPool* pool, arrow::util::Codec* codec) {
size_t bufferSize;
Expand Down Expand Up @@ -444,23 +466,45 @@ VeloxHashShuffleReaderDeserializer::VeloxHashShuffleReaderDeserializer(
const std::shared_ptr<arrow::Schema>& schema,
const std::shared_ptr<arrow::util::Codec>& codec,
const facebook::velox::RowTypePtr& rowType,
int32_t batchSize,
int64_t readerBufferSize,
VeloxMemoryManager* memoryManager,
std::vector<bool> isValidityBuffer,
bool hasComplexType,
bool enableStreamMerge,
int64_t& deserializeTime,
int64_t& decompressTime)
: streamReader_(streamReader),
schema_(schema),
codec_(codec),
rowType_(rowType),
batchSize_(validateHashShuffleReaderBatchSize(batchSize)),
readerBufferSize_(readerBufferSize),
memoryManager_(memoryManager),
isValidityBuffer_(std::move(isValidityBuffer)),
hasComplexType_(hasComplexType),
enableStreamMerge_(enableStreamMerge),
deserializeTime_(deserializeTime),
decompressTime_(decompressTime) {}

bool VeloxHashShuffleReaderDeserializer::shouldSkipMerge() const {
// Stream merge is a reader-side raw payload fast path: for plain payloads it
// concatenates buffers before Velox vectors are materialized, avoiding the generic
// RowVector append cost paid by VeloxResizeBatchesExec. Keep complex and dictionary
// payloads on the existing per-payload path; VeloxResizeBatchesExec can be enabled
// separately as the generic complement for those cases.
return !enableStreamMerge_ || hasComplexType_ || !dictionaryFields_.empty();
}

bool VeloxHashShuffleReaderDeserializer::resolveNextBlockType() {
if (blockTypeResolved_) {
return true;
}

GLUTEN_ASSIGN_OR_THROW(auto blockType, readBlockType(in_.get()));
switch (blockType) {
case BlockType::kEndOfStream:
in_ = nullptr;
return false;
case BlockType::kDictionary: {
VeloxDictionaryReader reader(rowType_, memoryManager_->getLeafMemoryPool().get(), codec_.get());
Expand All @@ -485,6 +529,7 @@ bool VeloxHashShuffleReaderDeserializer::resolveNextBlockType() {
default:
throw GlutenException(fmt::format("Unsupported block type: {}", static_cast<int32_t>(blockType)));
}
blockTypeResolved_ = true;
return true;
}

Expand All @@ -499,6 +544,12 @@ void VeloxHashShuffleReaderDeserializer::loadNextStream() {
return;
}

if (!dictionaryFields_.empty() || !dictionaries_.empty()) {
dictionaryFields_.clear();
dictionaries_.clear();
}
blockTypeResolved_ = false;

if (readerBufferSize_ > 0) {
GLUTEN_ASSIGN_OR_THROW(
in_,
Expand All @@ -510,36 +561,106 @@ void VeloxHashShuffleReaderDeserializer::loadNextStream() {
}

std::shared_ptr<ColumnarBatch> VeloxHashShuffleReaderDeserializer::next() {
if (in_ == nullptr) {
loadNextStream();
while (true) {
if (in_ == nullptr) {
if (merged_) {
return makeColumnarBatch(
rowType_, std::move(merged_), memoryManager_->getLeafMemoryPool().get(), deserializeTime_);
}

if (reachedEos_) {
return nullptr;
loadNextStream();

if (reachedEos_) {
return nullptr;
}
}
if (resolveNextBlockType()) {
break;
}
}

while (!resolveNextBlockType()) {
loadNextStream();

if (reachedEos_) {
return nullptr;
if (shouldSkipMerge()) {
if (merged_) {
return makeColumnarBatch(
rowType_, std::move(merged_), memoryManager_->getLeafMemoryPool().get(), deserializeTime_);
}

uint32_t numRows = 0;
GLUTEN_ASSIGN_OR_THROW(
auto arrowBuffers,
BlockPayload::deserialize(
in_.get(), codec_, memoryManager_->defaultArrowMemoryPool(), numRows, deserializeTime_, decompressTime_));

blockTypeResolved_ = false;

return makeColumnarBatch(
rowType_,
numRows,
std::move(arrowBuffers),
dictionaryFields_,
dictionaries_,
memoryManager_->getLeafMemoryPool().get(),
deserializeTime_);
}

std::vector<std::shared_ptr<arrow::Buffer>> arrowBuffers{};
uint32_t numRows = 0;
GLUTEN_ASSIGN_OR_THROW(
auto arrowBuffers,
BlockPayload::deserialize(
in_.get(), codec_, memoryManager_->defaultArrowMemoryPool(), numRows, deserializeTime_, decompressTime_));
while (!merged_ || merged_->numRows() < batchSize_) {
if (in_ == nullptr) {
if (merged_) {
break;
}

loadNextStream();
if (reachedEos_) {
break;
}
}
if (!resolveNextBlockType()) {
continue;
}

if (shouldSkipMerge()) {
break;
}

GLUTEN_ASSIGN_OR_THROW(
arrowBuffers,
BlockPayload::deserialize(
in_.get(), codec_, memoryManager_->defaultArrowMemoryPool(), numRows, deserializeTime_, decompressTime_));

return makeColumnarBatch(
rowType_,
numRows,
std::move(arrowBuffers),
dictionaryFields_,
dictionaries_,
memoryManager_->getLeafMemoryPool().get(),
deserializeTime_);
blockTypeResolved_ = false;

if (!merged_) {
merged_ = std::make_unique<InMemoryPayload>(numRows, &isValidityBuffer_, schema_, std::move(arrowBuffers));
arrowBuffers.clear();
continue;
}

auto mergedRows = merged_->numRows() + numRows;
if (mergedRows > batchSize_) {
break;
}

auto append = std::make_unique<InMemoryPayload>(numRows, &isValidityBuffer_, schema_, std::move(arrowBuffers));
GLUTEN_ASSIGN_OR_THROW(
merged_,
InMemoryPayload::merge(std::move(merged_), std::move(append), memoryManager_->defaultArrowMemoryPool()));
arrowBuffers.clear();
}

if (!merged_) {
return nullptr;
}

auto columnarBatch =
makeColumnarBatch(rowType_, std::move(merged_), memoryManager_->getLeafMemoryPool().get(), deserializeTime_);

if (!arrowBuffers.empty()) {
merged_ = std::make_unique<InMemoryPayload>(numRows, &isValidityBuffer_, schema_, std::move(arrowBuffers));
}

return columnarBatch;
}

VeloxSortShuffleReaderDeserializer::VeloxSortShuffleReaderDeserializer(
Expand Down Expand Up @@ -797,7 +918,8 @@ VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
int64_t readerBufferSize,
int64_t deserializerBufferSize,
VeloxMemoryManager* memoryManager,
ShuffleWriterType shuffleWriterType)
ShuffleWriterType shuffleWriterType,
bool enableHashShuffleReaderStreamMerge)
: schema_(schema),
codec_(codec),
veloxCompressionType_(veloxCompressionType),
Expand All @@ -806,7 +928,8 @@ VeloxShuffleReaderDeserializerFactory::VeloxShuffleReaderDeserializerFactory(
readerBufferSize_(readerBufferSize),
deserializerBufferSize_(deserializerBufferSize),
memoryManager_(memoryManager),
shuffleWriterType_(shuffleWriterType) {
shuffleWriterType_(shuffleWriterType),
enableHashShuffleReaderStreamMerge_(enableHashShuffleReaderStreamMerge) {
initFromSchema();
}

Expand All @@ -832,8 +955,12 @@ std::unique_ptr<ColumnarBatchIterator> VeloxShuffleReaderDeserializerFactory::cr
schema_,
codec_,
rowType_,
batchSize_,
readerBufferSize_,
memoryManager_,
isValidityBuffer_,
hasComplexType_,
enableHashShuffleReaderStreamMerge_,
deserializeTime_,
decompressTime_);
case ShuffleWriterType::kSortShuffle:
Expand Down
Loading
Loading