Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl InlineAggregateStream {
aggregate_expressions(&agg.aggr_expr, &agg.mode, agg_group_by.num_group_exprs())?;

let filter_expressions = match agg.mode {
InlineAggregateMode::Partial => agg_filter_expr,
InlineAggregateMode::Partial | InlineAggregateMode::Single => agg_filter_expr,
InlineAggregateMode::Final => {
vec![None; agg.aggr_expr.len()]
}
Expand Down Expand Up @@ -113,7 +113,7 @@ fn aggregate_expressions(
col_idx_base: usize,
) -> DFResult<Vec<Vec<Arc<dyn PhysicalExpr>>>> {
match mode {
InlineAggregateMode::Partial => Ok(aggr_expr
InlineAggregateMode::Partial | InlineAggregateMode::Single => Ok(aggr_expr
.iter()
.map(|agg| {
let mut result = agg.expressions();
Expand Down Expand Up @@ -225,7 +225,6 @@ impl Stream for InlineAggregateStream {
ExecutionState::ProducingOutput(batch) => {
let batch = batch.clone();

// Determine next state
self.exec_state = if self.input_done {
ExecutionState::Done
} else {
Expand Down Expand Up @@ -267,7 +266,7 @@ impl InlineAggregateStream {
let state = acc.state(emit_to)?;
aggr_arrays.extend(state);
}
InlineAggregateMode::Final => {
InlineAggregateMode::Final | InlineAggregateMode::Single => {
// Emit final aggregated values
aggr_arrays.push(acc.evaluate(emit_to)?);
}
Expand Down Expand Up @@ -333,7 +332,7 @@ impl InlineAggregateStream {
// Call the appropriate method on each aggregator with
// the entire input row and the relevant group indexes
match self.mode {
InlineAggregateMode::Partial => {
InlineAggregateMode::Partial | InlineAggregateMode::Single => {
acc.update_batch(values, group_indices, opt_filter, total_num_groups)?;
}
_ => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::sync::Arc;
pub enum InlineAggregateMode {
Partial,
Final,
Single,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InlineAggregateMode::Single is added end-to-end but I don't see anything in this PR that actually constructs a Single-mode AggregateExec — DataFusion only emits AggregateMode::Single for a one-pass aggregate which Cube's distributed planner typically rewrites into Partial+Final. Is this dead code today, or is there a path I'm missing? If it's anticipatory, a test that exercises it (or at least a comment noting which planner path produces it) would prevent the new branches in inline_aggregate_stream.rs from going untested.

}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -66,10 +67,11 @@ impl InlineAggregateExec {
return None;
}

// Only support Partial and Final modes
// Only support Partial, Final, and Single modes
let mode = match aggregate.mode() {
AggregateMode::Partial => InlineAggregateMode::Partial,
AggregateMode::Final => InlineAggregateMode::Final,
AggregateMode::Single => InlineAggregateMode::Single,
_ => return None,
};

Expand Down Expand Up @@ -111,6 +113,11 @@ impl InlineAggregateExec {
self.limit
}

pub fn with_limit(mut self, limit: Option<usize>) -> Self {
self.limit = limit;
self
}

pub fn aggr_expr(&self) -> &[Arc<AggregateFunctionExpr>] {
&self.aggr_expr
}
Expand Down Expand Up @@ -151,7 +158,7 @@ impl ExecutionPlan for InlineAggregateExec {

fn required_input_distribution(&self) -> Vec<Distribution> {
match &self.mode {
InlineAggregateMode::Partial => {
InlineAggregateMode::Partial | InlineAggregateMode::Single => {
vec![Distribution::UnspecifiedDistribution]
}
InlineAggregateMode::Final => {
Expand Down Expand Up @@ -181,7 +188,7 @@ impl ExecutionPlan for InlineAggregateExec {
group_by: self.group_by.clone(),
aggr_expr: self.aggr_expr.clone(),
filter_expr: self.filter_expr.clone(),
limit: self.limit.clone(),
limit: self.limit,
input: children[0].clone(),
schema: self.schema.clone(),
input_schema: self.input_schema.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub fn push_aggregate_to_workers(
let worker_input = p_partial.clone().with_new_children(vec![w.input.clone()])?;

// Worker plan, execute partial aggregate inside the worker.
Arc::new(WorkerExec::new(
let new_worker = WorkerExec::new(
worker_input,
w.max_batch_rows,
// TODO upgrade DF: WorkerExec limit_and_reverse must be wrong here. Should be
Expand All @@ -98,7 +98,9 @@ pub fn push_aggregate_to_workers(
WorkerPlanningParams {
worker_partition_count: w.properties().output_partitioning().partition_count(),
},
))
w.worker_sort_and_limit.clone(),
);
Arc::new(new_worker)
} else {
return Ok(p_final);
};
Expand Down
190 changes: 190 additions & 0 deletions rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
pub mod rewrite_plan;
pub mod rolling_optimizer;
mod trace_data_loaded;

Check warning on line 8 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Check warning on line 8 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs
use super::serialized_plan::PreSerializedPlan;
use crate::cluster::{Cluster, WorkerPlanningParams};
use crate::queryplanner::optimizations::distributed_partial_aggregate::{
add_limit_to_workers, ensure_partition_merge, push_aggregate_to_workers,
replace_suboptimal_merge_sorts,
};

Check warning on line 14 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Check warning on line 14 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs
use crate::queryplanner::optimizations::inline_aggregate_rewriter::replace_with_inline_aggregate;
use crate::queryplanner::check_memory::CheckMemoryExec;
use crate::queryplanner::inline_aggregate::{InlineAggregateExec, InlineAggregateMode};
use crate::queryplanner::planning::CubeExtensionPlanner;
use crate::queryplanner::pretty_printers::{pp_phys_plan_ext, PPOptions};
use crate::queryplanner::query_executor::ClusterSendExec;
use crate::queryplanner::rolling::RollingWindowPlanner;
use crate::queryplanner::trace_data_loaded::DataLoadedSize;
use crate::util::memory::MemoryHandler;
Expand All @@ -25,7 +28,10 @@
use datafusion::execution::context::QueryPlanner;
use datafusion::execution::SessionState;
use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_expr::LexOrdering;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use distributed_partial_aggregate::ensure_partition_merge_with_acceptable_parent;
Expand Down Expand Up @@ -148,6 +154,10 @@
// Replace sorted AggregateExec with InlineAggregateExec for better performance
let p = rewrite_physical_plan(p, &mut |p| replace_with_inline_aggregate(p))?;

// Apply worker_sort_and_limit AFTER aggregate restructuring, so the SortExec
// wraps the aggregate output (not the raw scan input).
let p = rewrite_physical_plan(p, &mut |p| apply_worker_sort_and_limit(p))?;

Ok(p)
}

Expand Down Expand Up @@ -177,10 +187,190 @@
"Rewrote physical plan by add_limit_to_workers:\n{}",
pp_phys_plan_ext(p.as_ref(), &PPOptions::show_nonmeta())
);
let p = push_sort_to_workers(p)?;
log::trace!(
"Rewrote physical plan by push_sort_to_workers:\n{}",
pp_phys_plan_ext(p.as_ref(), &PPOptions::show_nonmeta())
);
let p = rewrite_physical_plan(p, &mut |p| replace_suboptimal_merge_sorts(p))?;
log::trace!(
"Rewrote physical plan by replace_suboptimal_merge_sorts:\n{}",
pp_phys_plan_ext(p.as_ref(), &PPOptions::show_nonmeta())
);
Ok(p)
}

/// When the router plan has `SortExec(fetch=N)` sorting by GROUP BY columns,
/// and the worker uses `InlineAggregateExec` (streaming aggregate where groups don't overlap),
/// push a matching `SortExec(fetch=N)` to the worker. DataFusion's SortExec with fetch uses
/// a bounded heap, so this limits worker output to N rows with O(N) memory.
fn push_sort_to_workers(
p: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let Some(sort) = p.as_any().downcast_ref::<SortExec>() else {
return Ok(p);
};
let Some(fetch) = sort.fetch() else {
return Ok(p);
};

let sort_exprs = sort.expr().clone();

Check warning on line 218 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Check warning on line 218 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs
// Walk down through Projection → InlineAggregate(Final) → ClusterSend
let sort_input = sort.input();
let (below_proj, col_mapping) = if let Some(proj) = sort_input.as_any().downcast_ref::<ProjectionExec>() {
// Map sort column indices through the projection
let mapping: Vec<Option<usize>> = proj.expr().iter().map(|(expr, _name)| {
expr.as_any()
.downcast_ref::<datafusion::physical_plan::expressions::Column>()
.map(|c| c.index())
}).collect();
(proj.input().clone(), Some(mapping))
} else {
(sort_input.clone(), None)
};

let Some(inline_final) = below_proj.as_any().downcast_ref::<InlineAggregateExec>() else {
return Ok(p);
};
if *inline_final.mode() != InlineAggregateMode::Final {
return Ok(p);
}
let group_key_len = inline_final.group_expr().expr().len();

Check warning on line 239 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Check warning on line 239 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

// Translate sort expressions to pre-projection column indices
let translated_sort_exprs: Vec<_> = sort_exprs.iter().map(|se| {
if let Some(col) = se.expr.as_any().downcast_ref::<datafusion::physical_plan::expressions::Column>() {
let actual_idx = if let Some(ref mapping) = col_mapping {
mapping.get(col.index()).copied().flatten()
} else {
Some(col.index())
};
actual_idx.filter(|&idx| idx < group_key_len)
} else {
None
}
}).collect();

// All sort columns must be GROUP BY columns
if translated_sort_exprs.iter().any(|x| x.is_none()) {
return Ok(p);
}

// Find ClusterSendExec below InlineAggregate(Final), possibly through CheckMemoryExec
let final_input = inline_final.input();
let (cluster_send, through_check_memory) =
if let Some(cs) = final_input.as_any().downcast_ref::<ClusterSendExec>() {
(cs, false)
} else if let Some(cm) = final_input.as_any().downcast_ref::<CheckMemoryExec>() {
if let Some(cs) = cm.input.as_any().downcast_ref::<ClusterSendExec>() {
(cs, true)
} else {
return Ok(p);
}
} else {
return Ok(p);
};

// Don't override if limit_and_reverse is already set
if cluster_send.limit_and_reverse.is_some() {
return Ok(p);
}

let worker_input = &cluster_send.input_for_optimizations;

// Verify the worker has InlineAggregateExec(Partial) - confirms groups don't overlap
let has_inline_partial = worker_input
.as_any()
.downcast_ref::<InlineAggregateExec>()
.map_or(false, |ia| *ia.mode() == InlineAggregateMode::Partial);
if !has_inline_partial {
return Ok(p);
}

Check warning on line 290 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Check warning on line 290 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs
// Build sort expressions for the worker (same column indices, same options)
let worker_schema = worker_input.schema();
let worker_sort_exprs: Vec<_> = sort_exprs.iter().zip(translated_sort_exprs.iter()).map(|(se, &mapped_idx)| {
let idx = mapped_idx.unwrap();
datafusion::physical_expr::PhysicalSortExpr {
Comment on lines +285 to +295

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't skip when limit_and_reverse is set — it can pessimize. limit_and_reverse says "the scan can return at most limit rows from the index in sorted order." That limit is on the scan input, not on the post-aggregate output, and it only matches the index sort prefix. If the router still has a SortExec(fetch=M) above the final aggregate (e.g. because the ORDER BY columns and the index prefix only overlap partially), bailing out here leaves the final aggregate consuming every group with no fetch bound.

Recommend: rather than return Ok(p) when limit_and_reverse.is_some(), just continue with the push-down — SortExec(fetch=M) is correct on top of a limit-scan worker input too.

expr: Arc::new(datafusion::physical_plan::expressions::Column::new(
worker_schema.field(idx).name(),
idx,
)),
options: se.options,
}
}).collect();

// Wrap the worker plan: SortExec(fetch=N) → InlinePartialAggregate
let new_worker_input: Arc<dyn ExecutionPlan> = Arc::new(
SortExec::new(LexOrdering::new(worker_sort_exprs), worker_input.clone())
.with_fetch(Some(fetch)),
);

Check warning on line 308 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Check warning on line 308 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

// Rebuild ClusterSendExec with the new worker input
let new_cluster_send: Arc<dyn ExecutionPlan> = Arc::new(
cluster_send.with_changed_schema(new_worker_input, cluster_send.required_input_ordering.clone()),
);

// Re-wrap with CheckMemoryExec if it was present
let new_final_child: Arc<dyn ExecutionPlan> = if through_check_memory {
final_input.clone().with_new_children(vec![new_cluster_send])?

Check warning on line 317 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Check warning on line 317 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs
} else {
new_cluster_send
};

// Rebuild InlineAggregate(Final) with new child
let new_inline_final: Arc<dyn ExecutionPlan> =
Arc::clone(&below_proj).with_new_children(vec![new_final_child])?;

Check warning on line 324 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Check warning on line 324 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

// Rebuild Projection if present
let new_sort_input = if sort_input.as_any().downcast_ref::<ProjectionExec>().is_some() {
sort_input.clone().with_new_children(vec![new_inline_final])?
} else {
new_inline_final
};

// Rebuild SortExec with the new subtree
p.with_new_children(vec![new_sort_input])
}

/// Apply worker_sort_and_limit on a WorkerExec by wrapping its child
/// (the partial aggregate) with SortExec(fetch=N). Must run AFTER
/// push_aggregate_to_workers and replace_with_inline_aggregate.
fn apply_worker_sort_and_limit(
p: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
use crate::queryplanner::planning::WorkerExec;

let Some(w) = p.as_any().downcast_ref::<WorkerExec>() else {
return Ok(p);
};
let Some((sort_cols, limit)) = w.worker_sort_and_limit.as_ref() else {
return Ok(p);
};

let input = &w.input;
let schema = input.schema();

Check warning on line 353 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Check warning on line 353 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs
let sort_exprs: Vec<_> = sort_cols
.iter()
.map(|(col_idx, asc, nulls_first)| {
datafusion::physical_expr::PhysicalSortExpr {
expr: Arc::new(
datafusion::physical_plan::expressions::Column::new(
schema.field(*col_idx).name(),
*col_idx,
),
),
options: datafusion::arrow::compute::SortOptions {
descending: !asc,
nulls_first: *nulls_first,
},

Check warning on line 367 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Check warning on line 367 in rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

View workflow job for this annotation

GitHub Actions / Debian Rust nightly-2025-08-01

Diff in /__w/cube/cube/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs
}
})
.collect();
let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(
SortExec::new(LexOrdering::new(sort_exprs), input.clone())
.with_fetch(Some(*limit)),
Comment on lines +203 to +373

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two parallel mechanisms for the same optimization — please clarify or consolidate.

This PR introduces two distinct paths that both wrap the worker plan with SortExec(fetch=N):

  1. push_sort_to_workers (this file, run from finalize_physical_plan): walks the router's already-built physical plan (SortExec → [Projection] → InlineAggregate(Final) → [CheckMemory] → ClusterSendExec) and rebuilds ClusterSendExec with a SortExec-wrapped input_for_optimizations.

  2. worker_sort_and_limit (logical field on ClusterSendNode, serialized to workers, applied in apply_worker_sort_and_limit against WorkerExec): the worker re-plans the logical plan and wraps with SortExec when WorkerExec is built.

Workers re-plan from the serialized logical plan rather than receiving the router's physical plan, so path (1) doesn't actually reach a remote worker — it only affects local execution. Meanwhile (2) is the path that actually reaches remote workers via serialization.

A few concrete concerns:

  • Coverage gap: path (2) only fires when ChooseIndex builds the ClusterSendNode. Path (1) fires whenever the post-finalize plan matches the SortExec→…→ClusterSendExec shape. These shouldn't disagree, but they can.
  • Path (1) over-matches: there's no check that ctx.sort ever existed at logical-planning time. If a later physical-plan rule introduces a SortExec(fetch=N) for an internal reason (e.g. a TopK lowering), path (1) will still push it to the worker.
  • Path (1) skips when limit_and_reverse.is_some() but path (2) is gated on limit_and_reverse.is_none() at logical time. The skip is conservative-correct but the intent overlaps confusingly with the logical gate.

Consider either removing path (1) (since the logical-level field already covers the worker case end-to-end), or document very clearly which path fires in which deployment topology. Right now this is the single hardest piece of the PR to reason about.

);
p.with_new_children(vec![sort_exec])
}
1 change: 1 addition & 0 deletions rust/cubestore/cubestore/src/queryplanner/panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,5 +186,6 @@ pub fn plan_panic_worker() -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
WorkerPlanningParams {
worker_partition_count: 1,
},
/* worker_sort_and_limit */ None,
)))
}
Loading
Loading