Skip to content

Commit 68cab2b

Browse files
adriangbclaude
andcommitted
optimizer: derive StatisticsRequests from logical plan, thread to scan
Stacked on top of the API-only commit. Adds the missing piece: a small optimizer rule that walks the optimized logical plan and populates `TableScan.statistics_requests` based on the surrounding plan shape, plus a physical-planner hook that threads those into `ScanArgs::with_statistics_requests`. * `TableScan` gains `statistics_requests: Vec<StatisticsRequest>` (default empty) and a `with_statistics_requests` builder. * New `RequestStatistics` `OptimizerRule` (registered last in the default pipeline). Walks the plan once, derives: Sort → Min / Max / NullCount on each sort key Filter → Min / Max / NullCount / DistinctCount on referenced cols Join → DistinctCount / NullCount on join keys (both sides) always → RowCount per scan Stable, deterministic ordering. Idempotent. Never reshapes the plan — only annotates `TableScan` nodes. * `DefaultPhysicalPlanner` reads `scan.statistics_requests` and threads them into `ScanArgs::with_statistics_requests` when calling `provider.scan_with_args`. * `ScanArgs::statistics_requests` field switched from `Option<&[StatisticsRequest]>` to `&[StatisticsRequest]` (empty slice = no requests; collapses two ways of saying the same thing). * `request_statistics::tests` (3 unit tests) — confirm RowCount per scan, filter-column requests, join-key DistinctCount. * `user_defined::statistics_requests` (2 e2e tests) — register a `RecordingTable` provider, run SQL through the full pipeline, assert the requests that reached `scan_with_args` match what the plan shape implies. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 7b04c0f commit 68cab2b

11 files changed

Lines changed: 537 additions & 2 deletions

File tree

datafusion/core/src/physical_planner.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,7 @@ impl DefaultPhysicalPlanner {
645645
filters,
646646
fetch,
647647
projected_schema,
648+
statistics_requests,
648649
..
649650
} = scan;
650651

@@ -657,7 +658,8 @@ impl DefaultPhysicalPlanner {
657658
let opts = ScanArgs::default()
658659
.with_projection(projection.as_deref())
659660
.with_filters(Some(&filters_vec))
660-
.with_limit(*fetch);
661+
.with_limit(*fetch)
662+
.with_statistics_requests(statistics_requests);
661663
let res = source.scan_with_args(session_state, opts).await?;
662664
Arc::clone(res.plan())
663665
} else {

datafusion/core/tests/user_defined/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,7 @@ mod relation_planner;
4141

4242
/// Tests for insert operations
4343
mod insert_operation;
44+
45+
/// End-to-end tests for `StatisticsRequest`s flowing from the optimizer
46+
/// rule through the physical planner into a custom `TableProvider`.
47+
mod statistics_requests;
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
11+
//! End-to-end test that the optimizer-derived `StatisticsRequest`s
12+
//! reach a custom `TableProvider`'s `scan_with_args`.
13+
14+
use std::sync::{Arc, Mutex};
15+
16+
use arrow::array::{Int64Array, RecordBatch};
17+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
18+
use async_trait::async_trait;
19+
use datafusion::catalog::{ScanArgs, ScanResult, Session, TableProvider};
20+
use datafusion::datasource::TableType;
21+
use datafusion::datasource::memory::MemorySourceConfig;
22+
use datafusion::execution::context::SessionContext;
23+
use datafusion::logical_expr::Expr;
24+
use datafusion::physical_plan::ExecutionPlan;
25+
use datafusion_common::Result;
26+
use datafusion_expr_common::statistics::StatisticsRequest;
27+
28+
/// A `TableProvider` that records the last `statistics_requests` it was
29+
/// asked for, so the test can assert what reached it.
30+
#[derive(Debug)]
31+
struct RecordingTable {
32+
schema: SchemaRef,
33+
batch: RecordBatch,
34+
last_requests: Arc<Mutex<Vec<StatisticsRequest>>>,
35+
}
36+
37+
#[async_trait]
38+
impl TableProvider for RecordingTable {
39+
fn schema(&self) -> SchemaRef {
40+
self.schema.clone()
41+
}
42+
43+
fn table_type(&self) -> TableType {
44+
TableType::Base
45+
}
46+
47+
async fn scan(
48+
&self,
49+
_state: &dyn Session,
50+
projection: Option<&Vec<usize>>,
51+
_filters: &[Expr],
52+
_limit: Option<usize>,
53+
) -> Result<Arc<dyn ExecutionPlan>> {
54+
Ok(MemorySourceConfig::try_new_exec(
55+
&[vec![self.batch.clone()]],
56+
self.schema.clone(),
57+
projection.cloned(),
58+
)?)
59+
}
60+
61+
async fn scan_with_args<'a>(
62+
&self,
63+
state: &dyn Session,
64+
args: ScanArgs<'a>,
65+
) -> Result<ScanResult> {
66+
// Record what reached us, then delegate to scan().
67+
*self.last_requests.lock().unwrap() = args.statistics_requests().to_vec();
68+
let plan = self
69+
.scan(
70+
state,
71+
args.projection().map(|p| p.to_vec()).as_ref(),
72+
args.filters().unwrap_or(&[]),
73+
args.limit(),
74+
)
75+
.await?;
76+
Ok(ScanResult::new(plan))
77+
}
78+
}
79+
80+
fn make_table() -> (Arc<RecordingTable>, Arc<Mutex<Vec<StatisticsRequest>>>) {
81+
let schema = Arc::new(Schema::new(vec![
82+
Field::new("a", DataType::Int64, false),
83+
Field::new("b", DataType::Int64, false),
84+
]));
85+
let batch = RecordBatch::try_new(
86+
schema.clone(),
87+
vec![
88+
Arc::new(Int64Array::from(vec![1, 2, 3])),
89+
Arc::new(Int64Array::from(vec![10, 20, 30])),
90+
],
91+
)
92+
.unwrap();
93+
let last_requests = Arc::new(Mutex::new(Vec::new()));
94+
let provider = Arc::new(RecordingTable {
95+
schema,
96+
batch,
97+
last_requests: last_requests.clone(),
98+
});
99+
(provider, last_requests)
100+
}
101+
102+
#[tokio::test]
103+
async fn requests_reach_provider_scan_with_args() -> Result<()> {
104+
let (provider, last_requests) = make_table();
105+
let ctx = SessionContext::new();
106+
ctx.register_table("t", provider)?;
107+
108+
// Filter on `a` + sort on `b` should request Min/Max/NullCount on
109+
// both, plus DistinctCount on `a` (filter), plus a RowCount.
110+
let _ = ctx
111+
.sql("SELECT a, b FROM t WHERE a > 0 ORDER BY b LIMIT 10")
112+
.await?
113+
.collect()
114+
.await?;
115+
116+
let got = last_requests.lock().unwrap().clone();
117+
assert!(!got.is_empty(), "expected non-empty requests, got {got:?}");
118+
119+
let has = |needle: &StatisticsRequest| got.iter().any(|r| r == needle);
120+
use datafusion_common::Column;
121+
use datafusion_expr_common::statistics::StatisticsRequest::*;
122+
assert!(has(&RowCount), "expected RowCount, got {got:?}");
123+
assert!(
124+
has(&Min(Column::new_unqualified("a"))),
125+
"expected Min(a), got {got:?}"
126+
);
127+
assert!(
128+
has(&DistinctCount(Column::new_unqualified("a"))),
129+
"expected DistinctCount(a), got {got:?}"
130+
);
131+
assert!(
132+
has(&Min(Column::new_unqualified("b"))),
133+
"expected Min(b) from ORDER BY, got {got:?}"
134+
);
135+
136+
Ok(())
137+
}
138+
139+
#[tokio::test]
140+
async fn no_requests_when_plan_has_no_filter_sort_or_join() -> Result<()> {
141+
let (provider, last_requests) = make_table();
142+
let ctx = SessionContext::new();
143+
ctx.register_table("t", provider)?;
144+
145+
// Plain `SELECT *` — only `RowCount` should be requested.
146+
let _ = ctx.sql("SELECT a, b FROM t").await?.collect().await?;
147+
148+
let got = last_requests.lock().unwrap().clone();
149+
use datafusion_expr_common::statistics::StatisticsRequest::*;
150+
assert_eq!(got.len(), 1, "expected only RowCount, got {got:?}");
151+
assert!(matches!(got[0], RowCount));
152+
153+
Ok(())
154+
}

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2769,6 +2769,12 @@ pub struct TableScan {
27692769
pub filters: Vec<Expr>,
27702770
/// Optional number of rows to read
27712771
pub fetch: Option<usize>,
2772+
/// Statistics the planner would like the provider to answer for this
2773+
/// scan (typically derived from the surrounding plan shape — e.g.
2774+
/// Min/Max for sort keys, DistinctCount for join keys). Threaded into
2775+
/// `ScanArgs::with_statistics_requests` at physical planning time.
2776+
/// Empty by default.
2777+
pub statistics_requests: Vec<datafusion_expr_common::statistics::StatisticsRequest>,
27722778
}
27732779

27742780
impl Debug for TableScan {
@@ -2890,8 +2896,19 @@ impl TableScan {
28902896
projected_schema,
28912897
filters,
28922898
fetch,
2899+
statistics_requests: Vec::new(),
28932900
})
28942901
}
2902+
2903+
/// Attach a list of statistics requests for the optimizer-aware
2904+
/// stats-collection path. See [`Self::statistics_requests`].
2905+
pub fn with_statistics_requests(
2906+
mut self,
2907+
statistics_requests: Vec<datafusion_expr_common::statistics::StatisticsRequest>,
2908+
) -> Self {
2909+
self.statistics_requests = statistics_requests;
2910+
self
2911+
}
28952912
}
28962913

28972914
// Repartition the plan based on a partitioning scheme.
@@ -5128,6 +5145,7 @@ mod tests {
51285145
projected_schema: Arc::clone(&schema),
51295146
filters: vec![],
51305147
fetch: None,
5148+
statistics_requests: Vec::new(),
51315149
}));
51325150
let col = schema.field_names()[0].clone();
51335151

@@ -5158,6 +5176,7 @@ mod tests {
51585176
projected_schema: Arc::clone(&unique_schema),
51595177
filters: vec![],
51605178
fetch: None,
5179+
statistics_requests: Vec::new(),
51615180
}));
51625181
let col = schema.field_names()[0].clone();
51635182

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,7 @@ impl LogicalPlan {
615615
projected_schema,
616616
filters,
617617
fetch,
618+
statistics_requests,
618619
}) => filters.map_elements(f)?.update_data(|filters| {
619620
LogicalPlan::TableScan(TableScan {
620621
table_name,
@@ -623,6 +624,7 @@ impl LogicalPlan {
623624
projected_schema,
624625
filters,
625626
fetch,
627+
statistics_requests,
626628
})
627629
}),
628630
LogicalPlan::Distinct(Distinct::On(DistinctOn {

datafusion/optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ pub mod propagate_empty_relation;
6666
pub mod push_down_filter;
6767
pub mod push_down_limit;
6868
pub mod replace_distinct_aggregate;
69+
pub mod request_statistics;
6970
pub mod rewrite_set_comparison;
7071
pub mod scalar_subquery_to_join;
7172
pub mod simplify_expressions;

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ fn optimize_projections(
276276
filters,
277277
fetch,
278278
projected_schema: _,
279+
statistics_requests,
279280
} = table_scan;
280281

281282
// Get indices referred to in the original (schema with all fields)
@@ -285,7 +286,8 @@ fn optimize_projections(
285286
None => indices.into_inner(),
286287
};
287288
let new_scan =
288-
TableScan::try_new(table_name, source, Some(projection), filters, fetch)?;
289+
TableScan::try_new(table_name, source, Some(projection), filters, fetch)?
290+
.with_statistics_requests(statistics_requests);
289291

290292
return Transformed::yes(LogicalPlan::TableScan(new_scan))
291293
.transform_data(|plan| optimize_subqueries(plan, config));

datafusion/optimizer/src/optimizer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use crate::propagate_empty_relation::PropagateEmptyRelation;
5252
use crate::push_down_filter::PushDownFilter;
5353
use crate::push_down_limit::PushDownLimit;
5454
use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
55+
use crate::request_statistics::RequestStatistics;
5556
use crate::rewrite_set_comparison::RewriteSetComparison;
5657
use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
5758
use crate::simplify_expressions::SimplifyExpressions;
@@ -305,6 +306,11 @@ impl Optimizer {
305306
Arc::new(ExtractLeafExpressions::new()),
306307
Arc::new(PushDownLeafProjections::new()),
307308
Arc::new(OptimizeProjections::new()),
309+
// Run last: annotates each `TableScan` with the stats the
310+
// surrounding (now-stable) plan shape would benefit from.
311+
// The physical planner threads these into
312+
// `ScanArgs::with_statistics_requests`.
313+
Arc::new(RequestStatistics::new()),
308314
];
309315

310316
Self::with_rules(rules)

datafusion/optimizer/src/push_down_filter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3168,6 +3168,7 @@ mod tests {
31683168
projection,
31693169
source: Arc::new(test_provider),
31703170
fetch: None,
3171+
statistics_requests: Vec::new(),
31713172
});
31723173

31733174
Ok(LogicalPlanBuilder::from(table_scan))

0 commit comments

Comments
 (0)