Skip to content

Commit a8a3d6c

Browse files
committed
Pre-compute children with overall stats to avoid wasted partition-specific walks
1 parent 2f843ef commit a8a3d6c

24 files changed

Lines changed: 183 additions & 114 deletions

File tree

datafusion/physical-optimizer/src/output_requirements.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,13 @@ impl ExecutionPlan for OutputRequirementExec {
245245

246246
fn partition_statistics_with_context(
247247
&self,
248-
_partition: Option<usize>,
248+
partition: Option<usize>,
249249
ctx: &StatisticsContext,
250250
) -> Result<Arc<Statistics>> {
251-
Ok(Arc::clone(&ctx.child_stats()[0]))
251+
match partition {
252+
Some(_) => ctx.compute_child_statistics(self.input.as_ref(), partition),
253+
None => Ok(Arc::clone(&ctx.child_stats()[0])),
254+
}
252255
}
253256

254257
fn try_swapping_with_projection(

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1558,10 +1558,13 @@ impl ExecutionPlan for AggregateExec {
15581558

15591559
fn partition_statistics_with_context(
15601560
&self,
1561-
_partition: Option<usize>,
1561+
partition: Option<usize>,
15621562
ctx: &StatisticsContext,
15631563
) -> Result<Arc<Statistics>> {
1564-
let child_statistics = Arc::clone(&ctx.child_stats()[0]);
1564+
let child_statistics = match partition {
1565+
Some(_) => ctx.compute_child_statistics(self.input.as_ref(), partition)?,
1566+
None => Arc::clone(&ctx.child_stats()[0]),
1567+
};
15651568
Ok(Arc::new(self.statistics_inner(&child_statistics)?))
15661569
}
15671570

datafusion/physical-plan/src/buffer.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,13 @@ impl ExecutionPlan for BufferExec {
247247

248248
fn partition_statistics_with_context(
249249
&self,
250-
_partition: Option<usize>,
250+
partition: Option<usize>,
251251
ctx: &StatisticsContext,
252252
) -> Result<Arc<Statistics>> {
253-
Ok(Arc::clone(&ctx.child_stats()[0]))
253+
match partition {
254+
Some(_) => ctx.compute_child_statistics(self.input.as_ref(), partition),
255+
None => Ok(Arc::clone(&ctx.child_stats()[0])),
256+
}
254257
}
255258

256259
fn supports_limit_pushdown(&self) -> bool {

datafusion/physical-plan/src/coalesce_batches.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,10 +225,15 @@ impl ExecutionPlan for CoalesceBatchesExec {
225225

226226
fn partition_statistics_with_context(
227227
&self,
228-
_partition: Option<usize>,
228+
partition: Option<usize>,
229229
ctx: &StatisticsContext,
230230
) -> Result<Arc<Statistics>> {
231-
let stats = Arc::unwrap_or_clone(Arc::clone(&ctx.child_stats()[0]));
231+
let stats = match partition {
232+
Some(_) => Arc::unwrap_or_clone(
233+
ctx.compute_child_statistics(self.input.as_ref(), partition)?,
234+
),
235+
None => Arc::unwrap_or_clone(Arc::clone(&ctx.child_stats()[0])),
236+
};
232237
Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))
233238
}
234239

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -245,11 +245,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
245245
_partition: Option<usize>,
246246
ctx: &StatisticsContext,
247247
) -> Result<Arc<Statistics>> {
248-
// CoalescePartitions merges all input partitions into one,
249-
// so it always needs overall (None) input stats
250-
let stats = Arc::unwrap_or_clone(
251-
ctx.compute_child_statistics(self.input.as_ref(), None)?,
252-
);
248+
let stats = Arc::unwrap_or_clone(Arc::clone(&ctx.child_stats()[0]));
253249
Ok(Arc::new(stats.with_fetch(self.fetch, 0, 1)?))
254250
}
255251

datafusion/physical-plan/src/coop.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,10 +309,13 @@ impl ExecutionPlan for CooperativeExec {
309309

310310
fn partition_statistics_with_context(
311311
&self,
312-
_partition: Option<usize>,
312+
partition: Option<usize>,
313313
ctx: &StatisticsContext,
314314
) -> Result<Arc<Statistics>> {
315-
Ok(Arc::clone(&ctx.child_stats()[0]))
315+
match partition {
316+
Some(_) => ctx.compute_child_statistics(self.input.as_ref(), partition),
317+
None => Ok(Arc::clone(&ctx.child_stats()[0])),
318+
}
316319
}
317320

318321
fn supports_limit_pushdown(&self) -> bool {

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -574,12 +574,15 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
574574
/// (the default), not an error.
575575
/// If `partition` is `None`, it returns statistics for the entire plan.
576576
///
577-
/// The [`StatisticsContext`] carries pre-computed child statistics and
578-
/// additional context for statistics computation. Use
579-
/// [`compute_statistics`] to compute statistics bottom-up, threading
580-
/// child statistics through the context automatically.
577+
/// The [`StatisticsContext`] carries pre-computed overall (`None`) child
578+
/// statistics via [`StatisticsContext::child_stats`] and a shared cache
579+
/// via [`StatisticsContext::compute_child_statistics`]. Operators that
580+
/// need per-partition child stats should call
581+
/// `ctx.compute_child_statistics(child, partition)`.
581582
///
582583
/// [`StatisticsContext`]: crate::statistics_context::StatisticsContext
584+
/// [`StatisticsContext::child_stats`]: crate::statistics_context::StatisticsContext::child_stats
585+
/// [`StatisticsContext::compute_child_statistics`]: crate::statistics_context::StatisticsContext::compute_child_statistics
583586
/// [`compute_statistics`]: crate::statistics_context::compute_statistics
584587
fn partition_statistics_with_context(
585588
&self,

datafusion/physical-plan/src/filter.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -579,10 +579,15 @@ impl ExecutionPlan for FilterExec {
579579
/// predicate's selectivity value can be determined for the incoming data.
580580
fn partition_statistics_with_context(
581581
&self,
582-
_partition: Option<usize>,
582+
partition: Option<usize>,
583583
ctx: &StatisticsContext,
584584
) -> Result<Arc<Statistics>> {
585-
let input_stats = Arc::unwrap_or_clone(Arc::clone(&ctx.child_stats()[0]));
585+
let input_stats = match partition {
586+
Some(_) => Arc::unwrap_or_clone(
587+
ctx.compute_child_statistics(self.input.as_ref(), partition)?,
588+
),
589+
None => Arc::unwrap_or_clone(Arc::clone(&ctx.child_stats()[0])),
590+
};
586591
let stats = Self::statistics_helper(
587592
&self.input.schema(),
588593
input_stats,

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -387,16 +387,16 @@ impl ExecutionPlan for CrossJoinExec {
387387
ctx: &StatisticsContext,
388388
) -> Result<Arc<Statistics>> {
389389
// Left side is always broadcast (collected into a single partition),
390-
// so it needs overall stats. For the Some case, the context has
391-
// partition-specific stats which would be incorrect.
392-
// Right side can have multiple partitions, so the context's
393-
// partition-specific or overall stats are correct as-is.
394-
let left_stats = match partition {
395-
Some(_) => ctx.compute_child_statistics(self.left.as_ref(), None)?,
396-
None => Arc::clone(&ctx.child_stats()[0]),
390+
// so it always needs overall stats (child_stats provides these).
391+
// Right side can have multiple partitions, so it needs per-partition
392+
// stats when a specific partition is requested.
393+
let left_stats = Arc::unwrap_or_clone(Arc::clone(&ctx.child_stats()[0]));
394+
let right_stats = match partition {
395+
Some(_) => Arc::unwrap_or_clone(
396+
ctx.compute_child_statistics(self.right.as_ref(), partition)?,
397+
),
398+
None => Arc::unwrap_or_clone(Arc::clone(&ctx.child_stats()[1])),
397399
};
398-
let left_stats = Arc::unwrap_or_clone(left_stats);
399-
let right_stats = Arc::unwrap_or_clone(Arc::clone(&ctx.child_stats()[1]));
400400

401401
Ok(Arc::new(stats_cartesian_product(left_stats, right_stats)))
402402
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1448,13 +1448,12 @@ impl ExecutionPlan for HashJoinExec {
14481448
) -> Result<Arc<Statistics>> {
14491449
let stats = match (partition, self.mode) {
14501450
// For CollectLeft mode, the left side is broadcast (collected into
1451-
// a single partition), so it needs overall stats. The context has
1452-
// partition-specific stats which would be incorrect.
1453-
// Right side is partitioned, so the context stats are correct.
1451+
// a single partition), so it needs overall stats (child_stats).
1452+
// Right side is partitioned, so it needs per-partition stats.
14541453
(Some(_), PartitionMode::CollectLeft) => {
1455-
let left_stats =
1456-
ctx.compute_child_statistics(self.left.as_ref(), None)?;
1457-
let right_stats = Arc::clone(&ctx.child_stats()[1]);
1454+
let left_stats = Arc::clone(&ctx.child_stats()[0]);
1455+
let right_stats =
1456+
ctx.compute_child_statistics(self.right.as_ref(), partition)?;
14581457

14591458
estimate_join_statistics(
14601459
Arc::unwrap_or_clone(left_stats),
@@ -1468,8 +1467,10 @@ impl ExecutionPlan for HashJoinExec {
14681467
// For Partitioned mode, both sides are hash-partitioned symmetrically,
14691468
// so each output partition uses the matching partition from both sides.
14701469
(Some(_), PartitionMode::Partitioned) => {
1471-
let left_stats = Arc::clone(&ctx.child_stats()[0]);
1472-
let right_stats = Arc::clone(&ctx.child_stats()[1]);
1470+
let left_stats =
1471+
ctx.compute_child_statistics(self.left.as_ref(), partition)?;
1472+
let right_stats =
1473+
ctx.compute_child_statistics(self.right.as_ref(), partition)?;
14731474

14741475
estimate_join_statistics(
14751476
Arc::unwrap_or_clone(left_stats),
@@ -1485,8 +1486,8 @@ impl ExecutionPlan for HashJoinExec {
14851486
let left_stats = Arc::clone(&ctx.child_stats()[0]);
14861487
let right_stats = Arc::clone(&ctx.child_stats()[1]);
14871488
estimate_join_statistics(
1488-
(*left_stats).clone(),
1489-
(*right_stats).clone(),
1489+
Arc::unwrap_or_clone(left_stats),
1490+
Arc::unwrap_or_clone(right_stats),
14901491
&self.on,
14911492
&self.join_type,
14921493
&self.join_schema,
@@ -1496,10 +1497,8 @@ impl ExecutionPlan for HashJoinExec {
14961497
// Auto mode hasn't decided partitioning yet, so it needs
14971498
// overall stats from both sides.
14981499
(Some(_), PartitionMode::Auto) => {
1499-
let left_stats =
1500-
ctx.compute_child_statistics(self.left.as_ref(), None)?;
1501-
let right_stats =
1502-
ctx.compute_child_statistics(self.right.as_ref(), None)?;
1500+
let left_stats = Arc::clone(&ctx.child_stats()[0]);
1501+
let right_stats = Arc::clone(&ctx.child_stats()[1]);
15031502
estimate_join_statistics(
15041503
Arc::unwrap_or_clone(left_stats),
15051504
Arc::unwrap_or_clone(right_stats),

0 commit comments

Comments
 (0)