Skip to content

[VL] Restore hash shuffle reader payload merging#12097

Open
zhli1142015 wants to merge 2 commits into
apache:mainfrom
zhli1142015:vl-hash-shuffle-reader-stream-merge
Open

[VL] Restore hash shuffle reader payload merging#12097
zhli1142015 wants to merge 2 commits into
apache:mainfrom
zhli1142015:vl-hash-shuffle-reader-stream-merge

Conversation

@zhli1142015
Copy link
Copy Markdown
Contributor

@zhli1142015 zhli1142015 commented May 15, 2026

What changes are proposed in this pull request?

After #10499, the hash shuffle reader changed from potentially merging multiple payloads into larger batches to returning one batch per payload. That kept shuffle-read output batches small and increased downstream overhead.

Restore reader-side coalescing for mergeable plain hash shuffle payloads, but flush at Spark shuffle stream boundaries so payloads from different input streams are never combined. Keep dictionary and complex-type payloads unmerged, reset dictionary state per stream, and carry over a payload that would exceed the configured batch size.

Add stream-local merge tests covering multi-column primitive/bool/string/nullable data, per-stream merge boundaries, carry-over, dictionary and complex-type paths, and invalid batch sizes.
Now behavior:
image

Expected behavior:
image

How was this patch tested?

UT

Was this patch authored or co-authored using generative AI tooling?

Generated-by: GitHub Copilot

After apache#10499, the hash shuffle reader changed from potentially merging multiple payloads into larger batches to returning one batch per payload. That kept shuffle-read output batches small and increased downstream overhead.

Restore reader-side coalescing for mergeable plain hash shuffle payloads, but flush at Spark shuffle stream boundaries so payloads from different input streams are never combined. Keep dictionary and complex-type payloads unmerged, reset dictionary state per stream, and carry over a payload that would exceed the configured batch size.

Add stream-local merge tests covering multi-column primitive/bool/string/nullable data, per-stream merge boundaries, carry-over, dictionary and complex-type paths, and invalid batch sizes.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@github-actions github-actions Bot added the VELOX label May 15, 2026
@zhli1142015
Copy link
Copy Markdown
Contributor Author

cc @FelixYBW and @marin-ma thanks

@zhli1142015 zhli1142015 requested a review from marin-ma May 15, 2026 08:35
@marin-ma
Copy link
Copy Markdown
Contributor

The payload merging in shuffle reader was removed due to the functionality is duplicated with VeloxResizeBatchesExec. It's expected that the merging job should be properly handled by the downstream VeloxResizeBatchesExec. Do you see any performance drop after removing the payload merging in shuffle read?

Restore reader-side coalescing for mergeable plain hash shuffle payloads, but flush at Spark shuffle stream boundaries so payloads from different input streams are never combined.

Another benefit of using VeloxResizeBatchesExec to combine the batches is that it doesn't have the input stream boundary limitation.

@zhli1142015
Copy link
Copy Markdown
Contributor Author

zhli1142015 commented May 15, 2026

@marin-ma I re-checked the current code paths. VeloxResizeBatchesExec can address a similar symptom, but I do not think it is an exact replacement for this reader-side merge. In our internal evaluations, we have seen cases where the cost of VeloxResizeBatchesExec is higher than its benefit and causes regressions, so we have not been relying on it for those workloads.

The issue we are trying to address here is not really a shuffle regression by itself. After #10499, the hash shuffle reader can emit one RowVector / ColumnarBatch per payload. When the shuffle output contains many small payloads, the downstream operators after shuffle have to consume many small vectors, and that is where the overhead shows up. Restoring coalescing inside the hash shuffle reader reduces this fragmentation before those vectors reach the downstream operators.

I also re-checked the side-effect risk in this PR. The merge is local to the hash shuffle reader, capped by batchSize, flushed at Spark shuffle stream boundaries, and skipped for dictionary / complex payloads. If the next payload cannot be merged, the code carries it over instead of crossing the size limit. So the behavior remains conservative and should not change ordering or mix data across streams.

VeloxResizeBatchesExec can still be useful when it is enabled, especially because it can combine across stream boundaries, but this PR does not remove or block that path. These two mechanisms can coexist: the reader-side fast path handles the common plain hash-shuffle payload fragmentation early, and VeloxResizeBatchesExec can still do additional resizing afterwards if a deployment chooses to enable it. So I do not see this as an either-or choice; keeping this targeted reader-side coalescing looks reasonable to me.
Please let me know your thoughts. Thanks.

@marin-ma
Copy link
Copy Markdown
Contributor

@zhli1142015 I still think the "fast path" is a duplication of the VeloxResizeBatchesExec as they are basically doing the same thing, but the operator can handle all types of BaseVector.

If the goal is to have the shuffle reader produces larger output, can we simply follow the native implementation in VeloxResizeBatchesExec to use Velox api to handle all types of vectors, including complex datatype and dictionary encoding?

Gate the reader-side raw payload merge fast path behind a Velox config and document how it complements VeloxResizeBatchesExec.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@zhli1142015
Copy link
Copy Markdown
Contributor Author

zhli1142015 commented May 15, 2026

@zhli1142015 I still think the "fast path" is a duplication of the VeloxResizeBatchesExec as they are basically doing the same thing, but the operator can handle all types of BaseVector.

If the goal is to have the shuffle reader produces larger output, can we simply follow the native implementation in VeloxResizeBatchesExec to use Velox api to handle all types of vectors, including complex datatype and dictionary encoding?

We are using reader-side raw payload merge mainly because it has lower cost than VeloxResizeBatchesExec: it merges plain hash shuffle payload buffers before Velox vectors are materialized, so it avoids the generic RowVector append/resizing overhead for this case. I think it is better to treat this as a fast path rather than a replacement for VeloxResizeBatchesExec.

For completeness, users can still enable VeloxResizeBatchesExec separately to cover the generic cases that this raw-payload fast path intentionally does not handle, such as complex types or dictionary-encoded payloads.
I added a dedicated config for this fast path, defaulting to false, so users can choose whether to enable reader-side raw payload merge, VeloxResizeBatchesExec, or both depending on their workload.
Does this sound ok to you?

@github-actions github-actions Bot added the DOCS label May 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants