feat(cubestore): push Sort(fetch=N) to workers for ORDER BY group-by …#11027
feat(cubestore): push Sort(fetch=N) to workers for ORDER BY group-by …#11027paveltiunov wants to merge 1 commit into
Conversation
…column LIMIT queries When a query has ORDER BY on GROUP BY columns with a LIMIT but doesn't match the index sort prefix (so the existing limit_and_reverse optimization cannot apply), propagate a worker_sort_and_limit through ClusterSendNode serialization to workers. Workers wrap their aggregate output with SortExec(fetch=N), bounding memory to O(limit) via DataFusion's heap-based sort instead of materializing all groups. Co-authored-by: Cursor <cursoragent@cursor.com>
|
Claude finished @paveltiunov's task in 4m 32s —— View job Code ReviewVerdict: Solid optimization with a sound core idea (push Details — checklist, findings, scope
High
Medium
Low
Notes
|
| let output_schema = p.schema(); | ||
| // Map each sort column to its position in the worker output schema | ||
| let mapped: Option<Vec<(usize, bool, bool)>> = sort_cols | ||
| .iter() | ||
| .map(|col_name| { | ||
| output_schema | ||
| .fields() | ||
| .iter() | ||
| .position(|f| f.name() == col_name) | ||
| .map(|pos| (pos, ctx.sort_is_asc, !ctx.sort_is_asc)) | ||
| }) | ||
| .collect(); | ||
| mapped.map(|cols| (cols, ctx.limit.unwrap())) |
There was a problem hiding this comment.
Schema-index mismatch risk. output_schema = p.schema() is the raw table-scan schema (here p is a LogicalPlan::TableScan), but at the worker, apply_worker_sort_and_limit resolves the column by index against w.input.schema(), which is the partial-aggregate output schema after push_aggregate_to_workers has rewritten the plan. These happen to coincide today because GROUP BY columns land at positions 0..N in both schemas, but the contract is fragile: any future projection or column reordering between the scan and the partial-aggregate output would silently sort by the wrong column.
Two ways to harden this:
- Resolve by name in the worker (
apply_worker_sort_and_limit): build sort exprs by searchingschemafor the field by storedcol_name, not by index. Store(name, asc, nulls_first)inworker_sort_and_limitinstead of(usize, bool, bool). - Or assert
output_schema.field(pos).name() == col_nameon both ends.
Option (1) is cleaner and matches how get_limit_for_pushdown already compares by name.
| .fields() | ||
| .iter() | ||
| .position(|f| f.name() == col_name) | ||
| .map(|pos| (pos, ctx.sort_is_asc, !ctx.sort_is_asc)) |
There was a problem hiding this comment.
nulls_first heuristic doesn't match SQL defaults. Hard-coding nulls_first = !ctx.sort_is_asc means:
- ASC → nulls last
- DESC → nulls first
That matches DataFusion's default SortOptions, but ctx.sort is derived in sort_to_column_names, which discards each SortExpr's actual nulls_first flag. If a user wrote ORDER BY x ASC NULLS FIRST LIMIT N, the router's SortExec will use nulls_first = true while the worker's pushed-down SortExec(fetch=N) will use nulls_first = false — the worker can drop the very rows the router would keep, producing wrong results.
Either capture the actual nulls_first in sort_to_column_names/PlanningContext, or restrict the push-down to plans whose router sort matches the default.
| /// When the router plan has `SortExec(fetch=N)` sorting by GROUP BY columns, | ||
| /// and the worker uses `InlineAggregateExec` (streaming aggregate where groups don't overlap), | ||
| /// push a matching `SortExec(fetch=N)` to the worker. DataFusion's SortExec with fetch uses | ||
| /// a bounded heap, so this limits worker output to N rows with O(N) memory. | ||
| fn push_sort_to_workers( | ||
| p: Arc<dyn ExecutionPlan>, | ||
| ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> { | ||
| let Some(sort) = p.as_any().downcast_ref::<SortExec>() else { | ||
| return Ok(p); | ||
| }; | ||
| let Some(fetch) = sort.fetch() else { | ||
| return Ok(p); | ||
| }; | ||
|
|
||
| let sort_exprs = sort.expr().clone(); | ||
|
|
||
| // Walk down through Projection → InlineAggregate(Final) → ClusterSend | ||
| let sort_input = sort.input(); | ||
| let (below_proj, col_mapping) = if let Some(proj) = sort_input.as_any().downcast_ref::<ProjectionExec>() { | ||
| // Map sort column indices through the projection | ||
| let mapping: Vec<Option<usize>> = proj.expr().iter().map(|(expr, _name)| { | ||
| expr.as_any() | ||
| .downcast_ref::<datafusion::physical_plan::expressions::Column>() | ||
| .map(|c| c.index()) | ||
| }).collect(); | ||
| (proj.input().clone(), Some(mapping)) | ||
| } else { | ||
| (sort_input.clone(), None) | ||
| }; | ||
|
|
||
| let Some(inline_final) = below_proj.as_any().downcast_ref::<InlineAggregateExec>() else { | ||
| return Ok(p); | ||
| }; | ||
| if *inline_final.mode() != InlineAggregateMode::Final { | ||
| return Ok(p); | ||
| } | ||
| let group_key_len = inline_final.group_expr().expr().len(); | ||
|
|
||
| // Translate sort expressions to pre-projection column indices | ||
| let translated_sort_exprs: Vec<_> = sort_exprs.iter().map(|se| { | ||
| if let Some(col) = se.expr.as_any().downcast_ref::<datafusion::physical_plan::expressions::Column>() { | ||
| let actual_idx = if let Some(ref mapping) = col_mapping { | ||
| mapping.get(col.index()).copied().flatten() | ||
| } else { | ||
| Some(col.index()) | ||
| }; | ||
| actual_idx.filter(|&idx| idx < group_key_len) | ||
| } else { | ||
| None | ||
| } | ||
| }).collect(); | ||
|
|
||
| // All sort columns must be GROUP BY columns | ||
| if translated_sort_exprs.iter().any(|x| x.is_none()) { | ||
| return Ok(p); | ||
| } | ||
|
|
||
| // Find ClusterSendExec below InlineAggregate(Final), possibly through CheckMemoryExec | ||
| let final_input = inline_final.input(); | ||
| let (cluster_send, through_check_memory) = | ||
| if let Some(cs) = final_input.as_any().downcast_ref::<ClusterSendExec>() { | ||
| (cs, false) | ||
| } else if let Some(cm) = final_input.as_any().downcast_ref::<CheckMemoryExec>() { | ||
| if let Some(cs) = cm.input.as_any().downcast_ref::<ClusterSendExec>() { | ||
| (cs, true) | ||
| } else { | ||
| return Ok(p); | ||
| } | ||
| } else { | ||
| return Ok(p); | ||
| }; | ||
|
|
||
| // Don't override if limit_and_reverse is already set | ||
| if cluster_send.limit_and_reverse.is_some() { | ||
| return Ok(p); | ||
| } | ||
|
|
||
| let worker_input = &cluster_send.input_for_optimizations; | ||
|
|
||
| // Verify the worker has InlineAggregateExec(Partial) - confirms groups don't overlap | ||
| let has_inline_partial = worker_input | ||
| .as_any() | ||
| .downcast_ref::<InlineAggregateExec>() | ||
| .map_or(false, |ia| *ia.mode() == InlineAggregateMode::Partial); | ||
| if !has_inline_partial { | ||
| return Ok(p); | ||
| } | ||
|
|
||
| // Build sort expressions for the worker (same column indices, same options) | ||
| let worker_schema = worker_input.schema(); | ||
| let worker_sort_exprs: Vec<_> = sort_exprs.iter().zip(translated_sort_exprs.iter()).map(|(se, &mapped_idx)| { | ||
| let idx = mapped_idx.unwrap(); | ||
| datafusion::physical_expr::PhysicalSortExpr { | ||
| expr: Arc::new(datafusion::physical_plan::expressions::Column::new( | ||
| worker_schema.field(idx).name(), | ||
| idx, | ||
| )), | ||
| options: se.options, | ||
| } | ||
| }).collect(); | ||
|
|
||
| // Wrap the worker plan: SortExec(fetch=N) → InlinePartialAggregate | ||
| let new_worker_input: Arc<dyn ExecutionPlan> = Arc::new( | ||
| SortExec::new(LexOrdering::new(worker_sort_exprs), worker_input.clone()) | ||
| .with_fetch(Some(fetch)), | ||
| ); | ||
|
|
||
| // Rebuild ClusterSendExec with the new worker input | ||
| let new_cluster_send: Arc<dyn ExecutionPlan> = Arc::new( | ||
| cluster_send.with_changed_schema(new_worker_input, cluster_send.required_input_ordering.clone()), | ||
| ); | ||
|
|
||
| // Re-wrap with CheckMemoryExec if it was present | ||
| let new_final_child: Arc<dyn ExecutionPlan> = if through_check_memory { | ||
| final_input.clone().with_new_children(vec![new_cluster_send])? | ||
| } else { | ||
| new_cluster_send | ||
| }; | ||
|
|
||
| // Rebuild InlineAggregate(Final) with new child | ||
| let new_inline_final: Arc<dyn ExecutionPlan> = | ||
| Arc::clone(&below_proj).with_new_children(vec![new_final_child])?; | ||
|
|
||
| // Rebuild Projection if present | ||
| let new_sort_input = if sort_input.as_any().downcast_ref::<ProjectionExec>().is_some() { | ||
| sort_input.clone().with_new_children(vec![new_inline_final])? | ||
| } else { | ||
| new_inline_final | ||
| }; | ||
|
|
||
| // Rebuild SortExec with the new subtree | ||
| p.with_new_children(vec![new_sort_input]) | ||
| } | ||
|
|
||
| /// Apply worker_sort_and_limit on a WorkerExec by wrapping its child | ||
| /// (the partial aggregate) with SortExec(fetch=N). Must run AFTER | ||
| /// push_aggregate_to_workers and replace_with_inline_aggregate. | ||
| fn apply_worker_sort_and_limit( | ||
| p: Arc<dyn ExecutionPlan>, | ||
| ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> { | ||
| use crate::queryplanner::planning::WorkerExec; | ||
|
|
||
| let Some(w) = p.as_any().downcast_ref::<WorkerExec>() else { | ||
| return Ok(p); | ||
| }; | ||
| let Some((sort_cols, limit)) = w.worker_sort_and_limit.as_ref() else { | ||
| return Ok(p); | ||
| }; | ||
|
|
||
| let input = &w.input; | ||
| let schema = input.schema(); | ||
| let sort_exprs: Vec<_> = sort_cols | ||
| .iter() | ||
| .map(|(col_idx, asc, nulls_first)| { | ||
| datafusion::physical_expr::PhysicalSortExpr { | ||
| expr: Arc::new( | ||
| datafusion::physical_plan::expressions::Column::new( | ||
| schema.field(*col_idx).name(), | ||
| *col_idx, | ||
| ), | ||
| ), | ||
| options: datafusion::arrow::compute::SortOptions { | ||
| descending: !asc, | ||
| nulls_first: *nulls_first, | ||
| }, | ||
| } | ||
| }) | ||
| .collect(); | ||
| let sort_exec: Arc<dyn ExecutionPlan> = Arc::new( | ||
| SortExec::new(LexOrdering::new(sort_exprs), input.clone()) | ||
| .with_fetch(Some(*limit)), |
There was a problem hiding this comment.
Two parallel mechanisms for the same optimization — please clarify or consolidate.
This PR introduces two distinct paths that both wrap the worker plan with SortExec(fetch=N):
-
push_sort_to_workers(this file, run fromfinalize_physical_plan): walks the router's already-built physical plan (SortExec → [Projection] → InlineAggregate(Final) → [CheckMemory] → ClusterSendExec) and rebuildsClusterSendExecwith aSortExec-wrappedinput_for_optimizations. -
worker_sort_and_limit(logical field onClusterSendNode, serialized to workers, applied inapply_worker_sort_and_limitagainstWorkerExec): the worker re-plans the logical plan and wraps withSortExecwhen WorkerExec is built.
Workers re-plan from the serialized logical plan rather than receiving the router's physical plan, so path (1) doesn't actually reach a remote worker — it only affects local execution. Meanwhile (2) is the path that actually reaches remote workers via serialization.
A few concrete concerns:
- Coverage gap: path (2) only fires when
ChooseIndexbuilds theClusterSendNode. Path (1) fires whenever the post-finalize plan matches the SortExec→…→ClusterSendExec shape. These shouldn't disagree, but they can. - Path (1) over-matches: there's no check that
ctx.sortever existed at logical-planning time. If a later physical-plan rule introduces aSortExec(fetch=N)for an internal reason (e.g. a TopK lowering), path (1) will still push it to the worker. - Path (1) skips when
limit_and_reverse.is_some()but path (2) is gated onlimit_and_reverse.is_none()at logical time. The skip is conservative-correct but the intent overlaps confusingly with the logical gate.
Consider either removing path (1) (since the logical-level field already covers the worker case end-to-end), or document very clearly which path fires in which deployment topology. Right now this is the single hardest piece of the PR to reason about.
| .downcast_ref::<InlineAggregateExec>() | ||
| .map_or(false, |ia| *ia.mode() == InlineAggregateMode::Partial); | ||
| if !has_inline_partial { | ||
| return Ok(p); | ||
| } | ||
|
|
||
| // Build sort expressions for the worker (same column indices, same options) | ||
| let worker_schema = worker_input.schema(); | ||
| let worker_sort_exprs: Vec<_> = sort_exprs.iter().zip(translated_sort_exprs.iter()).map(|(se, &mapped_idx)| { | ||
| let idx = mapped_idx.unwrap(); | ||
| datafusion::physical_expr::PhysicalSortExpr { |
There was a problem hiding this comment.
Don't skip when limit_and_reverse is set — it can pessimize. limit_and_reverse says "the scan can return at most limit rows from the index in sorted order." That limit is on the scan input, not on the post-aggregate output, and it only matches the index sort prefix. If the router still has a SortExec(fetch=M) above the final aggregate (e.g. because the ORDER BY columns and the index prefix only overlap partially), bailing out here leaves the final aggregate consuming every group with no fetch bound.
Recommend: rather than return Ok(p) when limit_and_reverse.is_some(), just continue with the push-down — SortExec(fetch=M) is correct on top of a limit-scan worker input too.
| let limit_and_reverse = if let Some(limit) = limit { | ||
| Some((limit, !ctx.sort_is_asc)) | ||
| let reverse = if ctx.sort.is_some() | ||
| && !ctx.sort.as_ref().unwrap().is_empty() | ||
| { | ||
| !ctx.sort_is_asc | ||
| } else { | ||
| false | ||
| }; | ||
| Some((limit, reverse)) |
There was a problem hiding this comment.
Subtle behavior change in limit_and_reverse reversal. Previously this was unconditionally Some((limit, !ctx.sort_is_asc)). Now the reverse flag is forced to false when ctx.sort is empty. That's only reachable because get_limit_for_pushdown was simultaneously relaxed (further down) to return Some(limit) even when ctx.sort is None. Without a fast scan to confirm: please double-check that any code path consuming limit_and_reverse (e.g. CubeTable read path, replace_suboptimal_merge_sorts) correctly handles the (limit, false) shape when the worker is NOT performing a reverse scan and the index has its own natural order — there's a risk of returning a non-deterministic "first N rows" silently.
A test for SELECT … FROM data WHERE … LIMIT N (no ORDER BY) on a clustered setup would lock in the new behavior.
| pub enum InlineAggregateMode { | ||
| Partial, | ||
| Final, | ||
| Single, |
There was a problem hiding this comment.
InlineAggregateMode::Single is added end-to-end but I don't see anything in this PR that actually constructs a Single-mode AggregateExec — DataFusion only emits AggregateMode::Single for a one-pass aggregate which Cube's distributed planner typically rewrites into Partial+Final. Is this dead code today, or is there a path I'm missing? If it's anticipatory, a test that exercises it (or at least a comment noting which planner path produces it) would prevent the new branches in inline_aggregate_stream.rs from going untested.
| match &row.values()[1] { | ||
| TableValue::String(region) => { | ||
| assert_eq!(region, "East"); | ||
| } | ||
| _ => panic!("expected string"), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Test 4: ORDER BY 1 DESC with LIMIT on non-prefix column | ||
| { | ||
| let result = service | ||
| .exec_query( | ||
| "EXPLAIN ANALYZE SELECT region, sum(amount) \ | ||
| FROM foo.data GROUP BY 1 ORDER BY 1 DESC LIMIT 2", | ||
| ) | ||
| .await? | ||
| .collect() | ||
| .await?; | ||
|
|
||
| let worker_row = &result.get_rows()[1]; | ||
| let worker_plan = match &worker_row.values()[2] { | ||
| TableValue::String(s) => s.clone(), | ||
| _ => panic!("expected string"), | ||
| }; | ||
| assert!( | ||
| worker_plan.contains("Sort, fetch: 2"), | ||
| "Worker should have Sort with fetch=2 for DESC. Plan: {}", | ||
| worker_plan | ||
| ); | ||
| } | ||
|
|
||
| // Test 5: Verify correctness of ORDER BY 1 DESC LIMIT 2 | ||
| { | ||
| let result = service | ||
| .exec_query( | ||
| "SELECT region, sum(amount) \ | ||
| FROM foo.data GROUP BY 1 ORDER BY 1 DESC LIMIT 2", | ||
| ) | ||
| .await? | ||
| .collect() | ||
| .await?; | ||
|
|
||
| assert_eq!(result.len(), 2); | ||
| let regions: Vec<&str> = result | ||
| .get_rows() | ||
| .iter() | ||
| .map(|r| match &r.values()[0] { | ||
| TableValue::String(s) => s.as_str(), | ||
| _ => panic!("expected string"), | ||
| }) | ||
| .collect(); | ||
| assert_eq!(regions, vec!["West", "South"]); | ||
| } | ||
|
|
||
| Ok::<(), CubeError>(()) | ||
| }) | ||
| .await; | ||
| Ok::<(), CubeError>(()) | ||
| }) | ||
| .await; | ||
| Ok::<(), CubeError>(()) | ||
| }) | ||
| .await; |
There was a problem hiding this comment.
Tests are tightly coupled to plan-printer output. Asserting worker_plan.contains("Sort, fetch: 3") will break the next time pp_phys_plan_indented changes its format. Consider extracting a small helper that parses out the SortExec node (or matches Sort, fetch: \d+ with a regex) and asserting on its parameters, so a cosmetic printer change doesn't break correctness tests.
Also missing: a test for the ORDER BY with nulls_first specified explicitly path — given the hard-coded nulls_first = !ctx.sort_is_asc in planning.rs (see comment on that line), the absence of a NULL-handling test is a real coverage gap, not just a polish issue.
…column LIMIT queries
When a query has ORDER BY on GROUP BY columns with a LIMIT but doesn't match the index sort prefix (so the existing limit_and_reverse optimization cannot apply), propagate a worker_sort_and_limit through ClusterSendNode serialization to workers. Workers wrap their aggregate output with SortExec(fetch=N), bounding memory to O(limit) via DataFusion's heap-based sort instead of materializing all groups.
Check List