Skip to content

Commit 2f843ef

Browse files
committed
Add compute_statistics micro-benchmark
Coalesce chain and cross-join tree (#19795) benchmarks comparing cached vs non-shared-cache statistics computation.
1 parent bb09951 commit 2f843ef

2 files changed

Lines changed: 227 additions & 0 deletions

File tree

datafusion/physical-plan/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,3 +108,7 @@ required-features = ["test_utils"]
108108
harness = false
109109
name = "aggregate_vectorized"
110110
required-features = ["test_utils"]
111+
112+
[[bench]]
113+
harness = false
114+
name = "compute_statistics"
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks for `compute_statistics` with `StatsCache`.
19+
//!
20+
//! Demonstrates that caching eliminates redundant subtree walks in plans
21+
//! containing partition-merging operators (CoalescePartitionsExec) and
22+
//! binary join trees (CrossJoinExec).
23+
//!
24+
//! The plan shapes here mirror the reproducers from the planning-speed
25+
//! EPIC (<https://github.com/apache/datafusion/issues/19795>):
26+
//! - Coalesce chain: deep linear plans (e.g. deeply nested subqueries)
27+
//! - Cross-join tree: balanced binary trees from multi-way joins
28+
//! (mirrors the `physical_many_self_joins` sql_planner benchmark)
29+
30+
use std::fmt;
31+
use std::sync::Arc;
32+
33+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
34+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
35+
use datafusion_common::tree_node::TreeNodeRecursion;
36+
use datafusion_common::{Result, Statistics};
37+
use datafusion_execution::TaskContext;
38+
use datafusion_physical_expr::EquivalenceProperties;
39+
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
40+
use datafusion_physical_plan::execution_plan::{
41+
Boundedness, EmissionType, ExecutionPlan, PlanProperties,
42+
};
43+
use datafusion_physical_plan::joins::CrossJoinExec;
44+
use datafusion_physical_plan::statistics_context::{
45+
StatisticsContext, compute_statistics,
46+
};
47+
use datafusion_physical_plan::{
48+
DisplayAs, DisplayFormatType, Partitioning, SendableRecordBatchStream,
49+
};
50+
51+
/// Minimal leaf node for benchmarking
52+
#[derive(Debug)]
53+
struct BenchLeaf {
54+
schema: SchemaRef,
55+
cache: Arc<PlanProperties>,
56+
}
57+
58+
impl BenchLeaf {
59+
fn new(col_name: &str) -> Self {
60+
let schema = Arc::new(Schema::new(vec![Field::new(
61+
col_name,
62+
DataType::Int32,
63+
false,
64+
)]));
65+
let cache = Arc::new(PlanProperties::new(
66+
EquivalenceProperties::new(Arc::clone(&schema)),
67+
Partitioning::UnknownPartitioning(2),
68+
EmissionType::Incremental,
69+
Boundedness::Bounded,
70+
));
71+
Self { schema, cache }
72+
}
73+
}
74+
75+
impl DisplayAs for BenchLeaf {
76+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
77+
write!(f, "BenchLeaf")
78+
}
79+
}
80+
81+
impl ExecutionPlan for BenchLeaf {
82+
fn name(&self) -> &str {
83+
"BenchLeaf"
84+
}
85+
86+
fn schema(&self) -> SchemaRef {
87+
Arc::clone(&self.schema)
88+
}
89+
90+
fn properties(&self) -> &Arc<PlanProperties> {
91+
&self.cache
92+
}
93+
94+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
95+
vec![]
96+
}
97+
98+
fn with_new_children(
99+
self: Arc<Self>,
100+
_children: Vec<Arc<dyn ExecutionPlan>>,
101+
) -> Result<Arc<dyn ExecutionPlan>> {
102+
Ok(self)
103+
}
104+
105+
fn apply_expressions(
106+
&self,
107+
_f: &mut dyn FnMut(
108+
&dyn datafusion_physical_expr::PhysicalExpr,
109+
) -> Result<TreeNodeRecursion>,
110+
) -> Result<TreeNodeRecursion> {
111+
Ok(TreeNodeRecursion::Continue)
112+
}
113+
114+
fn execute(
115+
&self,
116+
_partition: usize,
117+
_context: Arc<TaskContext>,
118+
) -> Result<SendableRecordBatchStream> {
119+
unimplemented!()
120+
}
121+
122+
fn partition_statistics_with_context(
123+
&self,
124+
_partition: Option<usize>,
125+
_ctx: &StatisticsContext,
126+
) -> Result<Arc<Statistics>> {
127+
Ok(Arc::new(Statistics::new_unknown(&self.schema)))
128+
}
129+
}
130+
131+
/// Build: CoalescePartitions^depth -> BenchLeaf
132+
fn build_coalesce_chain(depth: usize) -> Arc<dyn ExecutionPlan> {
133+
let mut plan: Arc<dyn ExecutionPlan> = Arc::new(BenchLeaf::new("a"));
134+
for _ in 0..depth {
135+
plan = Arc::new(CoalescePartitionsExec::new(plan));
136+
}
137+
plan
138+
}
139+
140+
/// Build a balanced binary tree of CrossJoinExec with 2^depth leaves.
141+
/// Mirrors the plan shape produced by multi-way self-joins like the
142+
/// `physical_many_self_joins` benchmark in sql_planner.rs (#19795).
143+
fn build_cross_join_tree(depth: usize, next_col: &mut usize) -> Arc<dyn ExecutionPlan> {
144+
if depth == 0 {
145+
let col_name = format!("c{next_col}");
146+
*next_col += 1;
147+
return Arc::new(BenchLeaf::new(&col_name));
148+
}
149+
let left = build_cross_join_tree(depth - 1, next_col);
150+
let right = build_cross_join_tree(depth - 1, next_col);
151+
Arc::new(CrossJoinExec::new(left, right))
152+
}
153+
154+
/// Recursive walk without a shared cross-node cache, simulating pre-cache behavior.
155+
/// Each operator's internal `compute_child_statistics` call triggers a fresh
156+
/// subtree walk, resulting in O(n^2) total node visits for a chain of depth n.
157+
///
158+
/// Note: each `compute_child_statistics` re-walk still benefits from its own
159+
/// ephemeral cache; only the cross-node sharing is removed.
160+
fn compute_statistics_without_shared_cache(
161+
plan: &dyn ExecutionPlan,
162+
partition: Option<usize>,
163+
) -> Result<Arc<Statistics>> {
164+
let child_stats = plan
165+
.children()
166+
.iter()
167+
.map(|child| compute_statistics_without_shared_cache(child.as_ref(), partition))
168+
.collect::<Result<Vec<_>>>()?;
169+
let ctx = StatisticsContext::new(child_stats);
170+
plan.partition_statistics_with_context(partition, &ctx)
171+
}
172+
173+
fn bench_compute_statistics(c: &mut Criterion) {
174+
// --- Coalesce chain (linear plan) ---
175+
// Deep linear plans arise from deeply nested subqueries, CTEs, etc.
176+
let mut group = c.benchmark_group("compute_statistics_coalesce_chain");
177+
for depth in [10, 20, 50] {
178+
let plan = build_coalesce_chain(depth);
179+
group.bench_with_input(BenchmarkId::new("cached", depth), &plan, |b, plan| {
180+
b.iter(|| compute_statistics(plan.as_ref(), None).unwrap());
181+
});
182+
group.bench_with_input(
183+
BenchmarkId::new("no_shared_cache", depth),
184+
&plan,
185+
|b, plan| {
186+
b.iter(|| {
187+
compute_statistics_without_shared_cache(plan.as_ref(), None).unwrap()
188+
});
189+
},
190+
);
191+
}
192+
group.finish();
193+
194+
// --- Cross-join tree (balanced binary plan) ---
195+
// Binary trees arise from multi-way joins (e.g. physical_many_self_joins
196+
// in sql_planner.rs, see #19795). CrossJoinExec calls
197+
// compute_child_statistics for per-partition stats, re-walking the left
198+
// subtree at each node. The gap between cached/uncached is smaller than
199+
// the linear chain because only the left child triggers a re-walk.
200+
let mut group = c.benchmark_group("compute_statistics_cross_join_tree");
201+
for depth in [3, 5, 7] {
202+
let mut next_col = 0;
203+
let plan = build_cross_join_tree(depth, &mut next_col);
204+
let label = format!("depth={depth}_leaves={}", 1usize << depth);
205+
group.bench_with_input(BenchmarkId::new("cached", &label), &plan, |b, plan| {
206+
b.iter(|| compute_statistics(plan.as_ref(), Some(0)).unwrap());
207+
});
208+
group.bench_with_input(
209+
BenchmarkId::new("no_shared_cache", &label),
210+
&plan,
211+
|b, plan| {
212+
b.iter(|| {
213+
compute_statistics_without_shared_cache(plan.as_ref(), Some(0))
214+
.unwrap()
215+
});
216+
},
217+
);
218+
}
219+
group.finish();
220+
}
221+
222+
criterion_group!(benches, bench_compute_statistics);
223+
criterion_main!(benches);

0 commit comments

Comments
 (0)