Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, context.partitionId),
nativeBufferSize,
GlutenConfig.get.columnarShuffleReallocThreshold,
GlutenConfig.get.columnarShuffleEvictPartitionSize,
partitionWriterHandle
)
case SortShuffleWriterType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ protected void writeImpl(Iterator<Product2<K, V>> records) {
columnarDep.nativePartitioning(), partitionId),
nativeBufferSize,
reallocThreshold,
GlutenConfig.get().columnarShuffleEvictPartitionSize(),
partitionWriterHandle);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ class ColumnarShuffleWriter[K, V](
taskContext.partitionId),
nativeBufferSize,
reallocThreshold,
GlutenConfig.get.columnarShuffleEvictPartitionSize,
partitionWriterHandle
)
}
Expand Down
4 changes: 3 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
jint startPartitionId,
jint splitBufferSize,
jdouble splitBufferReallocThreshold,
jint evictPartitionSize,
jlong partitionWriterHandle) {
JNI_METHOD_START
const auto ctx = getRuntime(env, wrapper);
Expand All @@ -1004,7 +1005,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
toPartitioning(jStringToCString(env, partitioningNameJstr)),
startPartitionId,
splitBufferSize,
splitBufferReallocThreshold);
splitBufferReallocThreshold,
evictPartitionSize);

return ctx->saveObject(ctx->createShuffleWriter(numPartitions, partitionWriter, shuffleWriterOptions));
JNI_METHOD_END(kInvalidObjectHandle)
Expand Down
14 changes: 10 additions & 4 deletions cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
namespace gluten {

static constexpr int16_t kDefaultBatchSize = 4096;
static constexpr int32_t kDefaultEvictPartitionSize = 256 * 1024;
static constexpr int32_t kDefaultShuffleWriterBufferSize = 4096;
static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20;
static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
Expand Down Expand Up @@ -81,17 +82,20 @@ struct ShuffleWriterOptions {
struct HashShuffleWriterOptions : ShuffleWriterOptions {
int32_t splitBufferSize = kDefaultShuffleWriterBufferSize;
double splitBufferReallocThreshold = kDefaultSplitBufferReallocThreshold;
int32_t evictPartitionSize = kDefaultEvictPartitionSize;

HashShuffleWriterOptions() : ShuffleWriterOptions(ShuffleWriterType::kHashShuffle) {}

HashShuffleWriterOptions(
Partitioning partitioning,
int32_t startPartitionId,
int32_t partitionBufferSize,
double partitionBufferReallocThreshold)
double partitionBufferReallocThreshold,
int32_t partitionEvictPartitionSize = kDefaultEvictPartitionSize)
: ShuffleWriterOptions(ShuffleWriterType::kHashShuffle, partitioning, startPartitionId),
splitBufferSize(partitionBufferSize),
splitBufferReallocThreshold(partitionBufferReallocThreshold) {}
splitBufferReallocThreshold(partitionBufferReallocThreshold),
evictPartitionSize(partitionEvictPartitionSize) {}

protected:
HashShuffleWriterOptions(ShuffleWriterType shuffleWriterType) : ShuffleWriterOptions(shuffleWriterType) {}
Expand All @@ -101,10 +105,12 @@ struct HashShuffleWriterOptions : ShuffleWriterOptions {
Partitioning partitioning,
int32_t startPartitionId,
int32_t partitionBufferSize,
double partitionBufferReallocThreshold)
double partitionBufferReallocThreshold,
int32_t partitionEvictPartitionSize = kDefaultEvictPartitionSize)
: ShuffleWriterOptions(shuffleWriterType, partitioning, startPartitionId),
splitBufferSize(partitionBufferSize),
splitBufferReallocThreshold(partitionBufferReallocThreshold) {}
splitBufferReallocThreshold(partitionBufferReallocThreshold),
evictPartitionSize(partitionEvictPartitionSize) {}
};

struct SortShuffleWriterOptions : ShuffleWriterOptions {
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/shuffle/Payload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ arrow::Result<uint8_t> readPayloadType(arrow::io::InputStream* is) {
}

arrow::Result<int64_t> compressBuffer(
const std::shared_ptr<arrow::Buffer>& buffer,
const std::shared_ptr<arrow::Buffer> buffer,
uint8_t* output,
int64_t outputLength,
arrow::util::Codec* codec) {
Expand Down
32 changes: 32 additions & 0 deletions cpp/velox/shuffle/VeloxHashShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,41 @@ arrow::Status VeloxHashShuffleWriter::doSplit(const facebook::velox::RowVector&
printPartitionBuffer();

setSplitState(SplitState::kInit);
if (evictPartitionSize_ > 0) {
// After split, evict large partition buffers to free up memory for the next input RowVector.
const auto partitionBytes = estimatePartitionBufferBytes();
for (uint32_t pid = 0; pid < partitionBytes.size(); ++pid) {
if (partitionBufferBase_[pid] > 0 && partitionBytes[pid] >= evictPartitionSize_) {
RETURN_NOT_OK(evictPartitionBuffers(pid, false));
}
}
}
return arrow::Status::OK();
}

std::vector<int64_t> VeloxHashShuffleWriter::estimatePartitionBufferBytes() const {
std::vector<int64_t> partitionBytes(numPartitions_, 0);

for (const auto& columnBuffers : partitionBuffers_) {
for (uint32_t pid = 0; pid < columnBuffers.size(); ++pid) {
for (const auto& buffer : columnBuffers[pid]) {
if (buffer) {
partitionBytes[pid] += buffer->capacity();
}
}
}
}

for (uint32_t pid = 0; pid < complexTypeFlushBuffer_.size(); ++pid) {
const auto& buffer = complexTypeFlushBuffer_[pid];
if (buffer) {
partitionBytes[pid] += buffer->capacity();
}
}

return partitionBytes;
}

arrow::Status VeloxHashShuffleWriter::splitRowVector(const facebook::velox::RowVector& rv) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitRV]);

Expand Down
6 changes: 5 additions & 1 deletion cpp/velox/shuffle/VeloxHashShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
MemoryManager* memoryManager)
: VeloxShuffleWriter(numPartitions, partitionWriter, options, memoryManager),
splitBufferSize_(options->splitBufferSize),
splitBufferReallocThreshold_(options->splitBufferReallocThreshold) {
splitBufferReallocThreshold_(options->splitBufferReallocThreshold),
evictPartitionSize_(options->evictPartitionSize) {
arenas_.resize(numPartitions);
}

Expand All @@ -223,6 +224,8 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {

arrow::Status initColumnTypes(const facebook::velox::RowVector& rv);

std::vector<int64_t> estimatePartitionBufferBytes() const;

arrow::Status splitRowVector(const facebook::velox::RowVector& rv);

arrow::Status initFromRowVector(const facebook::velox::RowVector& rv);
Expand Down Expand Up @@ -326,6 +329,7 @@ class VeloxHashShuffleWriter : public VeloxShuffleWriter {
protected:
int32_t splitBufferSize_;
double splitBufferReallocThreshold_;
int32_t evictPartitionSize_;

std::shared_ptr<arrow::Schema> schema_;

Expand Down
Loading
Loading