Skip to content

Commit bb09951

Browse files
committed
Add per-call StatsCache to eliminate redundant subtree walks
Memoize results within a single compute_statistics invocation using pointer-based cache keys. Operators now use ctx.compute_child_statistics instead of calling compute_statistics directly, so partition-merging and asymmetric join operators hit the cache for subtrees already walked.
1 parent b380893 commit bb09951

9 files changed

Lines changed: 182 additions & 44 deletions

File tree

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
3030
use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
3131
use crate::projection::{ProjectionExec, make_with_child};
3232
use crate::sort_pushdown::SortOrderPushdownResult;
33-
use crate::statistics_context::{StatisticsContext, compute_statistics};
33+
use crate::statistics_context::StatisticsContext;
3434
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties};
3535
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3636

@@ -243,11 +243,13 @@ impl ExecutionPlan for CoalescePartitionsExec {
243243
fn partition_statistics_with_context(
244244
&self,
245245
_partition: Option<usize>,
246-
_ctx: &StatisticsContext,
246+
ctx: &StatisticsContext,
247247
) -> Result<Arc<Statistics>> {
248248
// CoalescePartitions merges all input partitions into one,
249249
// so it always needs overall (None) input stats
250-
let stats = Arc::unwrap_or_clone(compute_statistics(self.input.as_ref(), None)?);
250+
let stats = Arc::unwrap_or_clone(
251+
ctx.compute_child_statistics(self.input.as_ref(), None)?,
252+
);
251253
Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))
252254
}
253255

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::projection::{
3131
ProjectionExec, join_allows_pushdown, join_table_borders, new_join_children,
3232
physical_to_column_exprs,
3333
};
34-
use crate::statistics_context::{StatisticsContext, compute_statistics};
34+
use crate::statistics_context::StatisticsContext;
3535
use crate::{
3636
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
3737
ExecutionPlanProperties, PlanProperties, RecordBatchStream,
@@ -392,7 +392,7 @@ impl ExecutionPlan for CrossJoinExec {
392392
// Right side can have multiple partitions, so the context's
393393
// partition-specific or overall stats are correct as-is.
394394
let left_stats = match partition {
395-
Some(_) => compute_statistics(self.left.as_ref(), None)?,
395+
Some(_) => ctx.compute_child_statistics(self.left.as_ref(), None)?,
396396
None => Arc::clone(&ctx.child_stats()[0]),
397397
};
398398
let left_stats = Arc::unwrap_or_clone(left_stats);

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use crate::projection::{
5353
};
5454
use crate::repartition::REPARTITION_RANDOM_STATE;
5555
use crate::spill::get_record_batch_memory_size;
56-
use crate::statistics_context::{StatisticsContext, compute_statistics};
56+
use crate::statistics_context::StatisticsContext;
5757
use crate::{
5858
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
5959
PlanProperties, SendableRecordBatchStream, Statistics,
@@ -1452,7 +1452,8 @@ impl ExecutionPlan for HashJoinExec {
14521452
// partition-specific stats which would be incorrect.
14531453
// Right side is partitioned, so the context stats are correct.
14541454
(Some(_), PartitionMode::CollectLeft) => {
1455-
let left_stats = compute_statistics(self.left.as_ref(), None)?;
1455+
let left_stats =
1456+
ctx.compute_child_statistics(self.left.as_ref(), None)?;
14561457
let right_stats = Arc::clone(&ctx.child_stats()[1]);
14571458

14581459
estimate_join_statistics(
@@ -1495,8 +1496,10 @@ impl ExecutionPlan for HashJoinExec {
14951496
// Auto mode hasn't decided partitioning yet, so it needs
14961497
// overall stats from both sides.
14971498
(Some(_), PartitionMode::Auto) => {
1498-
let left_stats = compute_statistics(self.left.as_ref(), None)?;
1499-
let right_stats = compute_statistics(self.right.as_ref(), None)?;
1499+
let left_stats =
1500+
ctx.compute_child_statistics(self.left.as_ref(), None)?;
1501+
let right_stats =
1502+
ctx.compute_child_statistics(self.right.as_ref(), None)?;
15001503
estimate_join_statistics(
15011504
Arc::unwrap_or_clone(left_stats),
15021505
Arc::unwrap_or_clone(right_stats),

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::projection::{
4242
EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection,
4343
try_pushdown_through_join,
4444
};
45-
use crate::statistics_context::{StatisticsContext, compute_statistics};
45+
use crate::statistics_context::StatisticsContext;
4646
use crate::{
4747
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
4848
PlanProperties, RecordBatchStream, SendableRecordBatchStream,
@@ -708,7 +708,7 @@ impl ExecutionPlan for NestedLoopJoinExec {
708708
// Right side can have multiple partitions, so the context's
709709
// partition-specific or overall stats are correct as-is.
710710
let left_stats = match partition {
711-
Some(_) => compute_statistics(self.left.as_ref(), None)?,
711+
Some(_) => ctx.compute_child_statistics(self.left.as_ref(), None)?,
712712
None => Arc::clone(&ctx.child_stats()[0]),
713713
};
714714
let left_stats = Arc::unwrap_or_clone(left_stats);
@@ -2769,6 +2769,7 @@ fn build_unmatched_batch(
27692769
#[cfg(test)]
27702770
pub(crate) mod tests {
27712771
use super::*;
2772+
use crate::statistics_context::compute_statistics;
27722773
use crate::test::{TestMemoryExec, assert_join_metrics};
27732774
use crate::{
27742775
common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::projection::{ProjectionExec, all_columns, make_with_child, update_exp
3838
use crate::sorts::streaming_merge::StreamingMergeBuilder;
3939
use crate::spill::spill_manager::SpillManager;
4040
use crate::spill::spill_pool::{self, SpillPoolWriter};
41-
use crate::statistics_context::{StatisticsContext, compute_statistics};
41+
use crate::statistics_context::StatisticsContext;
4242
use crate::stream::RecordBatchStreamAdapter;
4343
use crate::{
4444
DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics,
@@ -1205,7 +1205,7 @@ impl ExecutionPlan for RepartitionExec {
12051205

12061206
// Repartition needs overall input stats to divide across
12071207
// output partitions, not partition-specific child stats
1208-
let overall = compute_statistics(self.input.as_ref(), None)?;
1208+
let overall = ctx.compute_child_statistics(self.input.as_ref(), None)?;
12091209
let mut stats = Arc::unwrap_or_clone(overall);
12101210

12111211
// Distribute statistics across partitions

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
4545
use crate::spill::get_record_batch_memory_size;
4646
use crate::spill::in_progress_spill_file::InProgressSpillFile;
4747
use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
48-
use crate::statistics_context::{StatisticsContext, compute_statistics};
48+
use crate::statistics_context::StatisticsContext;
4949
use crate::stream::RecordBatchStreamAdapter;
5050
use crate::stream::ReservationStream;
5151
use crate::topk::TopK;
@@ -1287,7 +1287,7 @@ impl ExecutionPlan for SortExec {
12871287
let child_stats = if self.preserve_partitioning() {
12881288
Arc::clone(&ctx.child_stats()[0])
12891289
} else {
1290-
compute_statistics(self.input.as_ref(), None)?
1290+
ctx.compute_child_statistics(self.input.as_ref(), None)?
12911291
};
12921292
let stats = Arc::unwrap_or_clone(child_stats);
12931293
Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))

datafusion/physical-plan/src/sorts/sort_preserving_merge.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::limit::LimitStream;
2424
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
2525
use crate::projection::{ProjectionExec, make_with_child, update_ordering};
2626
use crate::sorts::streaming_merge::StreamingMergeBuilder;
27-
use crate::statistics_context::{StatisticsContext, compute_statistics};
27+
use crate::statistics_context::StatisticsContext;
2828
use crate::{
2929
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
3030
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
@@ -397,11 +397,11 @@ impl ExecutionPlan for SortPreservingMergeExec {
397397
fn partition_statistics_with_context(
398398
&self,
399399
_partition: Option<usize>,
400-
_ctx: &StatisticsContext,
400+
ctx: &StatisticsContext,
401401
) -> Result<Arc<Statistics>> {
402402
// SortPreservingMergeExec merges all input partitions into one,
403403
// so it always needs overall (None) input stats
404-
compute_statistics(self.input.as_ref(), None)
404+
ctx.compute_child_statistics(self.input.as_ref(), None)
405405
}
406406

407407
fn supports_limit_pushdown(&self) -> bool {

datafusion/physical-plan/src/statistics_context.rs

Lines changed: 156 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,49 +25,112 @@
2525
use crate::ExecutionPlan;
2626
use datafusion_common::Result;
2727
use datafusion_common::Statistics;
28+
use std::cell::RefCell;
29+
use std::collections::HashMap;
30+
use std::rc::Rc;
2831
use std::sync::Arc;
2932

33+
/// Per-call memoization cache for [`compute_statistics`].
34+
///
35+
/// Keyed by `(plan node pointer address, partition)`. Created once per
36+
/// top-level [`compute_statistics`] call and shared across all recursive
37+
/// and operator-internal calls via [`StatisticsContext`].
38+
///
39+
/// The pointer-based key is safe within a single synchronous
40+
/// `compute_statistics` call: all `Arc<dyn ExecutionPlan>` nodes are held
41+
/// by the plan tree for the duration of the walk, so addresses cannot be
42+
/// reused.
43+
#[derive(Debug, Default)]
44+
struct StatsCache(HashMap<(usize, Option<usize>), Arc<Statistics>>);
45+
46+
impl StatsCache {
47+
fn get(
48+
&self,
49+
plan: &dyn ExecutionPlan,
50+
partition: Option<usize>,
51+
) -> Option<&Arc<Statistics>> {
52+
let key = (
53+
plan as *const dyn ExecutionPlan as *const () as usize,
54+
partition,
55+
);
56+
self.0.get(&key)
57+
}
58+
59+
fn insert(
60+
&mut self,
61+
plan: &dyn ExecutionPlan,
62+
partition: Option<usize>,
63+
stats: Arc<Statistics>,
64+
) {
65+
let key = (
66+
plan as *const dyn ExecutionPlan as *const () as usize,
67+
partition,
68+
);
69+
self.0.insert(key, stats);
70+
}
71+
}
72+
3073
/// Context passed to [`ExecutionPlan::partition_statistics_with_context`]
3174
/// carrying external information that operators can use when computing
3275
/// their statistics.
33-
///
34-
/// # Example
35-
///
36-
/// ```ignore
37-
/// use datafusion_physical_plan::statistics_context::StatisticsContext;
38-
///
39-
/// // Leaf node: no children
40-
/// let leaf_ctx = StatisticsContext::empty();
41-
///
42-
/// // Parent node: receives pre-computed child stats
43-
/// let child_stats = vec![child1_stats, child2_stats];
44-
/// let parent_ctx = StatisticsContext::new(child_stats);
45-
/// ```
46-
#[derive(Debug, Clone)]
76+
#[derive(Debug)]
4777
pub struct StatisticsContext {
4878
/// Pre-computed statistics for each child of the current node,
4979
/// in the same order as [`ExecutionPlan::children`].
5080
child_stats: Vec<Arc<Statistics>>,
81+
/// Shared memoization cache for the current `compute_statistics` walk
82+
cache: Option<Rc<RefCell<StatsCache>>>,
5183
}
5284

5385
impl StatisticsContext {
5486
/// Creates a new context with pre-computed child statistics.
5587
pub fn new(child_stats: Vec<Arc<Statistics>>) -> Self {
56-
Self { child_stats }
88+
Self {
89+
child_stats,
90+
cache: None,
91+
}
5792
}
5893

5994
/// Creates an empty context (for leaf nodes or when child stats
6095
/// are not available).
6196
pub fn empty() -> Self {
6297
Self {
6398
child_stats: Vec::new(),
99+
cache: None,
64100
}
65101
}
66102

67103
/// Returns the pre-computed statistics for each child node.
68104
pub fn child_stats(&self) -> &[Arc<Statistics>] {
69105
&self.child_stats
70106
}
107+
108+
/// Computes statistics for a child plan, using the shared cache
109+
/// from the current [`compute_statistics`] walk.
110+
///
111+
/// Use this when [`Self::child_stats`] does not provide the right
112+
/// granularity: partition-preserving operators needing per-partition
113+
/// child stats (via `Some(partition)`), or partition-merging operators
114+
/// needing overall stats (via `None`).
115+
pub fn compute_child_statistics(
116+
&self,
117+
plan: &dyn ExecutionPlan,
118+
partition: Option<usize>,
119+
) -> Result<Arc<Statistics>> {
120+
match &self.cache {
121+
Some(cache) => compute_statistics_inner(plan, partition, cache),
122+
None => compute_statistics(plan, partition),
123+
}
124+
}
125+
}
126+
127+
impl Clone for StatisticsContext {
128+
fn clone(&self) -> Self {
129+
Self {
130+
child_stats: self.child_stats.clone(),
131+
cache: self.cache.clone(),
132+
}
133+
}
71134
}
72135

73136
impl Default for StatisticsContext {
@@ -78,25 +141,96 @@ impl Default for StatisticsContext {
78141

79142
/// Computes statistics for a plan node by first recursively computing
80143
/// statistics for all children, then calling
81-
/// [`ExecutionPlan::partition_statistics_with_context`] with the pre-computed child
82-
/// statistics.
144+
/// [`ExecutionPlan::partition_statistics_with_context`] with the pre-computed
145+
/// child statistics.
146+
///
147+
/// Results are memoized within a single call: operators that internally
148+
/// call [`StatisticsContext::compute_child_statistics`] (e.g., partition-merging
149+
/// operators requesting overall child stats with `None`) will hit the
150+
/// cache instead of re-walking subtrees.
83151
///
84152
/// The `partition` parameter is forwarded to children. This is correct
85153
/// for partition-preserving operators (filter, projection, sort, etc.),
86154
/// but operators that need overall child stats regardless of the
87155
/// requested partition (e.g., repartition, coalesce, asymmetric joins)
88-
/// must handle this internally by calling `compute_statistics` with
89-
/// `None` on the relevant children.
156+
/// must handle this internally by calling
157+
/// [`StatisticsContext::compute_child_statistics`] with `None`.
90158
pub fn compute_statistics(
91159
plan: &dyn ExecutionPlan,
92160
partition: Option<usize>,
93161
) -> Result<Arc<Statistics>> {
162+
let cache = Rc::new(RefCell::new(StatsCache::default()));
163+
compute_statistics_inner(plan, partition, &cache)
164+
}
165+
166+
fn compute_statistics_inner(
167+
plan: &dyn ExecutionPlan,
168+
partition: Option<usize>,
169+
cache: &Rc<RefCell<StatsCache>>,
170+
) -> Result<Arc<Statistics>> {
171+
if let Some(cached) = cache.borrow().get(plan, partition) {
172+
return Ok(Arc::clone(cached));
173+
}
174+
94175
let child_stats = plan
95176
.children()
96177
.iter()
97-
.map(|child| compute_statistics(child.as_ref(), partition))
178+
.map(|child| compute_statistics_inner(child.as_ref(), partition, cache))
98179
.collect::<Result<Vec<_>>>()?;
99180

100-
let ctx = StatisticsContext::new(child_stats);
101-
plan.partition_statistics_with_context(partition, &ctx)
181+
let ctx = StatisticsContext {
182+
child_stats,
183+
cache: Some(Rc::clone(cache)),
184+
};
185+
let result = plan.partition_statistics_with_context(partition, &ctx)?;
186+
187+
cache
188+
.borrow_mut()
189+
.insert(plan, partition, Arc::clone(&result));
190+
Ok(result)
191+
}
192+
193+
#[cfg(all(test, feature = "test_utils"))]
194+
mod tests {
195+
use super::*;
196+
use crate::coalesce_partitions::CoalescePartitionsExec;
197+
use crate::test::exec::StatisticsExec;
198+
use arrow::datatypes::{DataType, Field, Schema};
199+
use datafusion_common::{ColumnStatistics, stats::Precision};
200+
201+
fn make_stats_leaf(num_rows: usize) -> Arc<dyn ExecutionPlan> {
202+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
203+
let col_stats = vec![ColumnStatistics {
204+
null_count: Precision::Exact(0),
205+
max_value: Precision::Absent,
206+
min_value: Precision::Absent,
207+
sum_value: Precision::Absent,
208+
distinct_count: Precision::Absent,
209+
byte_size: Precision::Absent,
210+
}];
211+
Arc::new(StatisticsExec::new(
212+
Statistics {
213+
num_rows: Precision::Exact(num_rows),
214+
total_byte_size: Precision::Absent,
215+
column_statistics: col_stats,
216+
},
217+
schema,
218+
))
219+
}
220+
221+
#[test]
222+
fn child_stats_always_returns_overall_stats() {
223+
// CoalescePartitionsExec merges partitions, so when called with
224+
// Some(0) the walk should still pre-compute children with None
225+
let leaf = make_stats_leaf(100);
226+
let plan: Arc<dyn ExecutionPlan> = Arc::new(CoalescePartitionsExec::new(leaf));
227+
228+
// Calling with Some(0) should still work and return correct stats
229+
let stats = compute_statistics(plan.as_ref(), Some(0)).unwrap();
230+
assert_eq!(stats.num_rows, Precision::Exact(100));
231+
232+
// Calling with None should return the same
233+
let stats_none = compute_statistics(plan.as_ref(), None).unwrap();
234+
assert_eq!(stats_none.num_rows, Precision::Exact(100));
235+
}
102236
}

0 commit comments

Comments
 (0)