feat(cubestore): execute sorted partial aggregate per partition below the merge, propagate LIMIT into it#10995
feat(cubestore): execute sorted partial aggregate per partition below the merge, propagate LIMIT into it#10995waralexrom wants to merge 9 commits into
Conversation
|
Claude finished @waralexrom's task in 3m 19s —— View job Code Review — verdict: ship after addressing notes · 0 high, 0 medium, 4 lowFull review (click to expand)Checklist
Summary The optimization is well-motivated and the implementation is careful in the places that matter most for correctness:
Inline findings (all low severity, none blocking)
Other observations
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #10995 +/- ##
==========================================
+ Coverage 58.44% 58.51% +0.06%
==========================================
Files 216 216
Lines 17248 17265 +17
Branches 3520 3524 +4
==========================================
+ Hits 10080 10102 +22
+ Misses 6660 6648 -12
- Partials 508 515 +7
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
Claude finished @waralexrom's task in 1m 26s —— View job Code Review — verdict: ship · 0 high, 0 medium, 0 low (all prior findings addressed)Full review (click to expand)Checklist
Summary All four low-severity findings from the prior review (15:18Z) are addressed across
Nothing else to flag. The plan-shape-based safety (limit never lands above a merge of per-partition partial states), the closed-groups-only emit contract, and the cross-partition-duplicate-key path through the router's Final aggregate continue to read correctly. Test coverage on this round is appropriately targeted at the regressions each fix could re-introduce. Pre-existing nits left in place from prior review (out of scope for this PR):
|
|
Claude finished @waralexrom's task in 1m 58s —— View job Code Review — verdict: ship · 0 high, 0 medium, 0 lowFull review (click to expand)Checklist
Summary The only new code since
Nothing to flag. Pre-existing nits called out in earlier rounds ( All four prior review threads are resolved upstream. |
… LIMIT into it Rewrite the worker plan PartialAggregate(Sorted) -> SortPreservingMerge into SortPreservingMerge -> per-partition PartialAggregate(Sorted), so the merge carries reduced partial states instead of all raw rows. Only sorted streaming aggregates are pushed: they hold O(1) accumulators per partition, while a hash aggregate would multiply its O(num_groups) memory by the partition count. Because the merged stream now contains duplicate group keys from different partitions, a plain worker row limit could truncate a group's partial states and silently corrupt its total. add_limit_to_workers turns the worker limit into a per-partition group limit on the aggregate plus a widened row budget (limit * partitions) on the merge (TailLimit for the reverse case). InlineAggregateStream honors the group limit: the emit-early threshold becomes min(batch_size, remaining limit) instead of a hard batch_size (4096), only closed groups are emitted, and input reading stops once the limit is reached, so a downstream LIMIT short-circuits the scan.
…ndow TailLimitStream collected its whole input to take the tail, materializing all worker rows for reverse limits; now it keeps a sliding window of trailing batches covering 'limit' rows, newer rows displace older ones. TailLimitExec returns the last 'limit' rows of each input partition instead of requiring a single one. The reverse worker limit above merged per-partition partial aggregates becomes a per-partition tail below the merge: within a partition the aggregate emits unique group keys, so 'limit' rows there are 'limit' complete groups and the merge carries at most 'limit' rows per partition instead of all groups with cross-partition duplicates. Groups beyond the last 'limit' may arrive with partial totals, but the router orders by the group key and its own limit drops them.
… first aggregate InlineAggregateStream could emit more groups than its limit: emit-early ran once per input batch while a single batch can bring an arbitrary group backlog, and the final emit was unclamped. Now the backlog is drained in emit threshold chunks at the top of the poll loop (also stops reading input as early as possible) and the final emit is clamped by the remaining limit. add_limit_to_workers is rewritten as a limit descent: probe from the worker input through sort preserving merges to the first aggregate and place a per-partition row limit directly above it (LocalLimit forward, TailLimit reverse, GlobalLimit for a single partition), additionally passing the limit into InlineAggregateExec for the early input stop. Within a partition the aggregate emits unique group keys, so the row limit cuts at group boundaries; correctness no longer depends on which pass produced the plan shape, and a merge of per-partition partial aggregates never gets a row limit above it -- the fetch = limit * partitions widening is gone. Plans without aggregation keep the plain top-level limit. TailLimitExec now declares maintains_input_order, matching its passthrough properties.
…ognized shapes The limit descent now looks through ProjectionExec: a row limit is a plain count and commutes with column renames. Without this, a projection between the worker and the merge would route the limit into the generic fallback, whose LimitPushdown descends through projections into merges and could place a row fetch above a duplicate-bearing merge of per-partition partial states. For shapes the descent doesn't recognize, the fallback now refuses to add a worker limit at all when the subtree contains a per-partition partial aggregate: skipping the limit is always correct, the router applies the real one.
…h size A batch covering the whole window replaces it on arrival, cut to the last 'limit' rows; smaller batches go through the usual suffix eviction. The window now never holds more than 2 * limit rows regardless of input batch sizes (previously a single large batch stayed in the window whole until the end of the input).
…ensive threshold guard
A global (no GROUP BY) aggregate doesn't use the input ordering, but the scan still merges its partitions with a SortPreservingMergeExec when the index has a sort key. For such queries the sort key often comes from the equality filters (it is picked for index selection and partition pruning, not as an ordering requirement), so the filters make the merge keys constant and every merge comparison becomes a full-length tie across all chunks -- pure waste. The new pre-optimize pass replaces such merges under the aggregate with plain partition coalescing, descending through filters, projections and unions. Restricted to hash aggregates without group expressions (one accumulator set per partition even when later optimizations make them run per partition; a grouped hash aggregate would multiply its hash table by the partition count) and without ordering requirements (array_agg(ORDER BY) and other order-sensitive aggregates keep their merge). The deduplicating merge under LastRowByUniqueKey is out of the descent's reach and stays intact.
The worker limit only descended when ORDER BY columns prefixed the index sort key. A bare GROUP BY ... LIMIT n (no ORDER BY) left limit_and_reverse unset, so workers fully aggregated the index and the result was capped only on the router -- a wide-key global aggregate could exhaust memory. get_limit_for_pushdown now also fires when there is no ORDER BY: it reads the group-by keys (new ChooseIndexContext.group_by) and pushes the limit when the group-by key set is a prefix of the index sort key, so groups are emitted in sort order and the per-partition group limit covers the global first N. Guards against unsound pushdown: - unusable_sort: an ORDER BY that can't be reduced to a column prefix (mixed asc/desc or a non-column expr) still constrains output order, so the group-by fallback is skipped. - group_by_has_having: a HAVING above the aggregate drops groups on the router after workers truncate, undercounting the result; the fallback is skipped when a filter is seen above the group-by aggregate. WHERE below the aggregate stays safe and reverse is forced off for the no-ORDER-BY path.
… ORDER BY For GROUP BY ... ORDER BY <group cols> LIMIT n where the ORDER BY is a subset of the group keys but not an index sort-key prefix, the limit can't ride the index (no early scan termination) and the worker would otherwise aggregate the whole index and emit every group -- OOM on wide pre-aggregations. ChooseIndex now computes a worker_sort_and_limit descriptor: the full group key reordered with the ORDER BY columns first, plus the limit. It is carried on ClusterSendNode through serialization (the worker plan has no ORDER BY of its own, so it can't be re-derived there). A final physical pass push_worker_sort_and_limit, run last in finalize, rewrites the worker subtree to SortPreservingMerge(worker_order) <- Sort(worker_order, fetch=n, per partition) <- partial and rebuilds the router final aggregate over SortPreservingMerge(worker_order). Correctness: the per-partition sort key is the full group key, so a group in the global first n has fewer than n smaller-keyed groups and survives the fetch in every partition it appears in -- all of its partial states reach the final aggregate, so sums are correct even for groups split across partitions. Sorting by the ORDER BY columns alone would drop partial states of tied groups. Gated off when an ORDER BY can't be reduced to columns (mixed asc/desc), when a HAVING sits above the aggregate, and preserved through pull_up_cluster_send so the FROM ... AS alias (SubqueryAlias) doesn't silently drop the descriptor. Tests: ported worker_sort_and_limit_cluster (two-worker cluster), added limit_pushdown_group_nonprefix_order (sum correctness across chunk-spanning groups) and limit_pushdown_group_having (HAVING is not pushed).
21fb7d4 to
f09cf57
Compare
|
Claude finished @waralexrom's task in 4m 34s —— View job Code Review — verdict: ship · 0 high, 0 medium, 3 lowFull review (click to expand)Checklist
Summary Two new commits, both add limit-pushdown coverage that the original PR didn't have:
Inline findings (3 low, none blocking)
Other
|
| /// Descends through merge/coalesce/check-memory wrappers to the ClusterSend feeding a final | ||
| /// aggregate. | ||
| fn find_cluster_send(p: &Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> { | ||
| let any = p.as_any(); | ||
| if any.is::<ClusterSendExec>() { | ||
| return Some(p.clone()); | ||
| } | ||
| if any.is::<SortPreservingMergeExec>() | ||
| || any.is::<CoalescePartitionsExec>() | ||
| || any.is::<CheckMemoryExec>() | ||
| { | ||
| return find_cluster_send(p.children().into_iter().next()?); | ||
| } | ||
| None | ||
| } | ||
|
|
||
| /// The partial aggregate inside a worker subtree, reached through merge/coalesce wrappers. Returns | ||
| /// `None` for any other shape (including an already-rewritten subtree, whose partial now sits under | ||
| /// a `SortExec` -- not a wrapper we descend -- keeping the pass idempotent). | ||
| fn locate_partial_aggregate(p: &Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> { | ||
| let mut candidate = p.clone(); | ||
| loop { | ||
| let any = candidate.as_any(); | ||
| if let Some(a) = any.downcast_ref::<InlineAggregateExec>() { | ||
| return (*a.mode() == InlineAggregateMode::Partial).then_some(candidate.clone()); | ||
| } | ||
| if let Some(a) = any.downcast_ref::<AggregateExec>() { | ||
| return (*a.mode() == AggregateMode::Partial).then_some(candidate.clone()); | ||
| } | ||
| if any.is::<SortPreservingMergeExec>() | ||
| || any.is::<CoalescePartitionsExec>() | ||
| || any.is::<CheckMemoryExec>() | ||
| { | ||
| candidate = candidate.children().into_iter().next()?.clone(); | ||
| continue; | ||
| } | ||
| return None; | ||
| } | ||
| } |
There was a problem hiding this comment.
find_cluster_send and locate_partial_aggregate only descend through SortPreservingMergeExec / CoalescePartitionsExec / CheckMemoryExec. first_aggregate_below_merges (line 628) — added a few commits ago to address the prior projection-descent finding — also descends through ProjectionExec for the same reason: a column-rename projection can legitimately sit between the partial aggregate and the merge, and treating it as a "stop" silently no-ops the optimization (here: the worker-bounded sort isn't installed, so we go back to aggregating every group — the OOM scenario this PR is meant to prevent).
For symmetry with the earlier fix and to avoid quietly losing the bound when a projection sneaks in, consider extending both helpers (and find_cluster_send on the router side) to descend through ProjectionExec too. A row-count limit / column-rename projection commute, same as for first_aggregate_below_merges.
Low severity: not a correctness bug (when it no-ops the plan is unchanged), but it defeats the whole point of this commit for any worker subtree that contains a projection.
| let sort = ctx.sort.as_ref().filter(|s| !s.is_empty())?; | ||
| let group_by = ctx.group_by.as_ref().filter(|g| !g.is_empty())?; | ||
|
|
||
| // Every ORDER BY column must be a group-by column; map it to its group-key position. | ||
| let mut cols: Vec<(usize, bool, bool)> = Vec::with_capacity(group_by.len()); | ||
| let mut used = vec![false; group_by.len()]; | ||
| for name in sort { | ||
| let idx = group_by.iter().position(|g| g == name)?; | ||
| if used[idx] { | ||
| continue; | ||
| } | ||
| used[idx] = true; | ||
| cols.push((idx, ctx.sort_is_asc, !ctx.sort_is_asc)); | ||
| } |
There was a problem hiding this comment.
nulls_first is hard-coded to !ctx.sort_is_asc (DataFusion's default: ASC→NULLS LAST, DESC→NULLS FIRST). The same default is replayed in worker_ordering (distributed_partial_aggregate.rs).
sort_to_column_names never captures the SortExpr's actual nulls_first flag, so a query that explicitly says ORDER BY x ASC NULLS FIRST (non-default) will be pushed to workers as ASC NULLS LAST with fetch = N. Per-partition top-N would then drop NULL-keyed groups that the global query needs to keep, and the result is silently wrong.
In practice cubestore index keys are non-nullable for typed columns, so the buggy path is reachable mostly through nullable text/numeric columns appearing in the GROUP BY. Low severity, but worth either:
- preserving the original
nulls_firstthroughsort_to_column_names→ChooseIndexContext→WorkerSortAndLimit, or - gating the optimization off when any group/sort column is nullable, or
- at minimum a comment here calling out the default-NULLS assumption.
| return Ok(ClusterSendNode::new(id, Arc::new(p), snapshots, limit).into_plan()); | ||
| let mut new_send = ClusterSendNode::new(id, Arc::new(p), snapshots, limit); | ||
| new_send.worker_sort_and_limit = worker_sort_and_limit; | ||
| return Ok(new_send.into_plan()); |
There was a problem hiding this comment.
The Projection/Filter/SubqueryAlias/Unnest branch now preserves worker_sort_and_limit (matches the commit message about SubqueryAlias). But the Sort branch above (~line 1764) and the Union branch below (~line 1821) silently drop it by reconstructing ClusterSendNode::new(...) (which defaults to None).
For Sort-with-fetch this is fine — the new Sort under the cluster send obviates the descriptor. For Union, dropping is probably also correct (row positions across legs don't necessarily match the descriptor's group-column indices). But it's silent — worth either an explicit // drop worker_sort_and_limit: … comment on each path, or preserving it where safe. Low severity — optimization loss only.
Summary
For prefix
GROUP BYqueries the dominant worker cost wasSortPreservingMergeExecmerging all raw rows, because the sorted partial aggregate sat above the merge. This PR pushes the sorted partial aggregate below the merge (so the merge carries reduced partial states instead of raw rows) and propagatesLIMITdown to the aggregate so it emits early and stops scanning (previously the sorted aggregate's emit threshold was a hardbatch_size= 4096, so any result under 4096 groups read its entire input before emitting anything andLIMITcould not short-circuit).Measured on a 10M-row stand: the worker step of a 1000-group prefix
GROUP BYis ~3.7x faster (357ms → 97ms) with no RSS increase;GROUP BY ... ORDER BY <prefix> LIMIT 10goes from ~365ms to ~14ms (×28).Changes
push_sorted_partial_aggregate_below_merge(pre-optimize pass): rewritesPartialAggregate(Sorted) → SortPreservingMerge → source(N partitions)intoSortPreservingMerge → per-partition PartialAggregate(Sorted). Only sorted streaming aggregates are pushed — they hold O(1) accumulators per partition, while a hash aggregate would multiply its O(num_groups) memory by the partition count. Duplicate group keys across partitions are combined by the router's Final aggregate, same as partial states from different workers.InlineAggregateStreamhonors a grouplimitwith a strict contract: emit-early threshold becomesmin(batch_size, remaining), only closed groups are emitted, the backlog from large input batches is drained in chunks, the final emit is clamped, and input reading stops as soon as the limit is reached.add_limit_to_workersis rewritten as a limit descent: the worker limit descends through sort preserving merges and lands directly above the first aggregate as a per-partition row limit (LocalLimitforward /TailLimitreverse /GlobalLimitfor a single partition), additionally passing the limit intoInlineAggregateExecfor the early input stop. A row limit never sits above a merge of per-partition partial aggregates (it would truncate a group's partial states across duplicate keys and silently corrupt totals), and the safety is derived from the plan shape rather than from which pass produced it.TailLimitExec: keeps a sliding window of trailing batches (O(limit) memory) instead of collecting its whole input, returns the lastlimitrows of each input partition, and declaresmaintains_input_order.Testing
GROUP BYwith overlapping keys across union partitions with/withoutLIMIT,DESC,ORDER BY <aggregate>; 7500 groups (above the 4096 batch size) withLIMIT 10/LIMIT 5000/DESC; plan shape assertions for the newMergeSort → LocalLimit → InlinePartialAggregate, limitandMergeSort → TailLimit → InlinePartialAggregateforms.over_10k_joinis flaky on master under parallel load too — verified 4/5 failures on both master and this branch under the same load, unrelated).