Skip to content

Commit 7c25d5a

Browse files
adriangbclaude
andcommitted
optimizer: derive StatisticsRequests from logical plan, thread to scan
Stacked on top of the API-only commit. Adds the missing piece: a small optimizer rule that walks the optimized logical plan and populates `TableScan.statistics_requests` based on the surrounding plan shape, plus a physical-planner hook that threads those into `ScanArgs::with_statistics_requests`. * `TableScan` gains `statistics_requests: Vec<StatisticsRequest>` (default empty) and a `with_statistics_requests` builder. * New `RequestStatistics` `OptimizerRule` (registered last in the default pipeline). Walks the plan once, derives: Sort → Min / Max / NullCount on each sort key Filter → Min / Max / NullCount / DistinctCount on referenced cols Join → DistinctCount / NullCount on join keys (both sides) always → RowCount per scan Stable, deterministic ordering. Idempotent. Never reshapes the plan — only annotates `TableScan` nodes. * `DefaultPhysicalPlanner` reads `scan.statistics_requests` and threads them into `ScanArgs::with_statistics_requests` when calling `provider.scan_with_args`. * `ScanArgs::statistics_requests` field switched from `Option<&[StatisticsRequest]>` to `&[StatisticsRequest]` (empty slice = no requests; collapses two ways of saying the same thing). * `request_statistics::tests` (3 unit tests) — confirm RowCount per scan, filter-column requests, join-key DistinctCount. * `user_defined::statistics_requests` (2 e2e tests) — register a `RecordingTable` provider, run SQL through the full pipeline, assert the requests that reached `scan_with_args` match what the plan shape implies. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 595c0ba commit 7c25d5a

13 files changed

Lines changed: 575 additions & 28 deletions

File tree

datafusion/core/src/optimizer_rule_reference.md

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -35,32 +35,33 @@ Rule order matters. The default pipeline may change between releases.
3535

3636
### Logical Optimizer Rules
3737

38-
| order | rule | summary |
39-
| ----- | ----------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- |
40-
| 1 | `rewrite_set_comparison` | Rewrites `ANY` and `ALL` set-comparison subqueries into `EXISTS`-based boolean expressions with correct SQL NULL semantics. |
41-
| 2 | `optimize_unions` | Flattens nested unions and removes unions with a single input. |
42-
| 3 | `simplify_expressions` | Constant-folds and simplifies expressions while preserving output names. |
43-
| 4 | `replace_distinct_aggregate` | Rewrites `DISTINCT` and `DISTINCT ON` operators into aggregate-based plans that later rules can optimize further. |
44-
| 5 | `eliminate_join` | Replaces keyless inner joins with a literal `false` filter by an empty relation. |
45-
| 6 | `decorrelate_predicate_subquery` | Converts eligible `IN` and `EXISTS` predicate subqueries into semi or anti joins. |
46-
| 7 | `scalar_subquery_to_join` | Rewrites eligible scalar subqueries into joins and adds schema-preserving projections. |
47-
| 8 | `decorrelate_lateral_join` | Rewrites eligible lateral joins into regular joins. |
48-
| 9 | `extract_equijoin_predicate` | Splits join filters into equijoin keys and residual predicates. |
49-
| 10 | `eliminate_duplicated_expr` | Removes duplicate expressions from projections, aggregates, and similar operators. |
50-
| 11 | `eliminate_filter` | Drops always-true filters and replaces always-false or NULL filters with empty relations. |
51-
| 12 | `eliminate_cross_join` | Uses filter predicates to replace cross joins with inner joins when join keys can be found. |
52-
| 13 | `eliminate_limit` | Removes no-op limits and simplifies trivial limit shapes. |
53-
| 14 | `propagate_empty_relation` | Pushes empty-relation knowledge upward so operators fed by no rows collapse early. |
54-
| 15 | `filter_null_join_keys` | Adds `IS NOT NULL` filters to nullable equijoin keys that can never match. |
55-
| 16 | `eliminate_outer_join` | Rewrites outer joins to inner joins when later filters reject the NULL-extended rows. |
56-
| 17 | `push_down_limit` | Moves literal limits closer to scans and unions and merges adjacent limits. |
57-
| 18 | `push_down_filter` | Moves filters as early as possible through filter-commutative operators. |
58-
| 19 | `single_distinct_aggregation_to_group_by` | Rewrites single-column `DISTINCT` aggregations into two-stage `GROUP BY` plans. |
59-
| 20 | `eliminate_group_by_constant` | Removes constant or functionally redundant expressions from `GROUP BY`. |
60-
| 21 | `common_sub_expression_eliminate` | Computes repeated subexpressions once and reuses the result. |
61-
| 22 | `extract_leaf_expressions` | Pulls cheap leaf expressions closer to data sources so later pruning and filter rules can act earlier. |
62-
| 23 | `push_down_leaf_projections` | Pushes the helper projections created by leaf extraction toward leaf inputs. |
63-
| 24 | `optimize_projections` | Prunes unused columns and removes unnecessary logical projections. |
38+
| order | rule | summary |
39+
| ----- | ----------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------ |
40+
| 1 | `rewrite_set_comparison` | Rewrites `ANY` and `ALL` set-comparison subqueries into `EXISTS`-based boolean expressions with correct SQL NULL semantics. |
41+
| 2 | `optimize_unions` | Flattens nested unions and removes unions with a single input. |
42+
| 3 | `simplify_expressions` | Constant-folds and simplifies expressions while preserving output names. |
43+
| 4 | `replace_distinct_aggregate` | Rewrites `DISTINCT` and `DISTINCT ON` operators into aggregate-based plans that later rules can optimize further. |
44+
| 5 | `eliminate_join` | Replaces keyless inner joins with a literal `false` filter by an empty relation. |
45+
| 6 | `decorrelate_predicate_subquery` | Converts eligible `IN` and `EXISTS` predicate subqueries into semi or anti joins. |
46+
| 7 | `scalar_subquery_to_join` | Rewrites eligible scalar subqueries into joins and adds schema-preserving projections. |
47+
| 8 | `decorrelate_lateral_join` | Rewrites eligible lateral joins into regular joins. |
48+
| 9 | `extract_equijoin_predicate` | Splits join filters into equijoin keys and residual predicates. |
49+
| 10 | `eliminate_duplicated_expr` | Removes duplicate expressions from projections, aggregates, and similar operators. |
50+
| 11 | `eliminate_filter` | Drops always-true filters and replaces always-false or NULL filters with empty relations. |
51+
| 12 | `eliminate_cross_join` | Uses filter predicates to replace cross joins with inner joins when join keys can be found. |
52+
| 13 | `eliminate_limit` | Removes no-op limits and simplifies trivial limit shapes. |
53+
| 14 | `propagate_empty_relation` | Pushes empty-relation knowledge upward so operators fed by no rows collapse early. |
54+
| 15 | `filter_null_join_keys` | Adds `IS NOT NULL` filters to nullable equijoin keys that can never match. |
55+
| 16 | `eliminate_outer_join` | Rewrites outer joins to inner joins when later filters reject the NULL-extended rows. |
56+
| 17 | `push_down_limit` | Moves literal limits closer to scans and unions and merges adjacent limits. |
57+
| 18 | `push_down_filter` | Moves filters as early as possible through filter-commutative operators. |
58+
| 19 | `single_distinct_aggregation_to_group_by` | Rewrites single-column `DISTINCT` aggregations into two-stage `GROUP BY` plans. |
59+
| 20 | `eliminate_group_by_constant` | Removes constant or functionally redundant expressions from `GROUP BY`. |
60+
| 21 | `common_sub_expression_eliminate` | Computes repeated subexpressions once and reuses the result. |
61+
| 22 | `extract_leaf_expressions` | Pulls cheap leaf expressions closer to data sources so later pruning and filter rules can act earlier. |
62+
| 23 | `push_down_leaf_projections` | Pushes the helper projections created by leaf extraction toward leaf inputs. |
63+
| 24 | `optimize_projections` | Prunes unused columns and removes unnecessary logical projections. |
64+
| 25 | `request_statistics` | Walks the optimized plan once and attaches per-`TableScan` `StatisticsRequest`s describing what stats downstream nodes would benefit from. |
6465

6566
### Physical Optimizer Rules
6667

datafusion/core/src/physical_planner.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,7 @@ impl DefaultPhysicalPlanner {
645645
filters,
646646
fetch,
647647
projected_schema,
648+
statistics_requests,
648649
..
649650
} = scan;
650651

@@ -657,7 +658,8 @@ impl DefaultPhysicalPlanner {
657658
let opts = ScanArgs::default()
658659
.with_projection(projection.as_deref())
659660
.with_filters(Some(&filters_vec))
660-
.with_limit(*fetch);
661+
.with_limit(*fetch)
662+
.with_statistics_requests(statistics_requests);
661663
let res = source.scan_with_args(session_state, opts).await?;
662664
Arc::clone(res.plan())
663665
} else {

datafusion/core/tests/user_defined/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,7 @@ mod relation_planner;
4141

4242
/// Tests for insert operations
4343
mod insert_operation;
44+
45+
/// End-to-end tests for `StatisticsRequest`s flowing from the optimizer
46+
/// rule through the physical planner into a custom `TableProvider`.
47+
mod statistics_requests;
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! End-to-end test that the optimizer-derived `StatisticsRequest`s
19+
//! reach a custom `TableProvider`'s `scan_with_args`.
20+
21+
use std::sync::{Arc, Mutex};
22+
23+
use arrow::array::{Int64Array, RecordBatch};
24+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
25+
use async_trait::async_trait;
26+
use datafusion::catalog::{ScanArgs, ScanResult, Session, TableProvider};
27+
use datafusion::datasource::TableType;
28+
use datafusion::datasource::memory::MemorySourceConfig;
29+
use datafusion::execution::context::SessionContext;
30+
use datafusion::logical_expr::Expr;
31+
use datafusion::physical_plan::ExecutionPlan;
32+
use datafusion_common::Result;
33+
use datafusion_expr_common::statistics::StatisticsRequest;
34+
35+
/// A `TableProvider` that records the last `statistics_requests` it was
36+
/// asked for, so the test can assert what reached it.
37+
#[derive(Debug)]
38+
struct RecordingTable {
39+
schema: SchemaRef,
40+
batch: RecordBatch,
41+
last_requests: Arc<Mutex<Vec<StatisticsRequest>>>,
42+
}
43+
44+
#[async_trait]
45+
impl TableProvider for RecordingTable {
46+
fn schema(&self) -> SchemaRef {
47+
self.schema.clone()
48+
}
49+
50+
fn table_type(&self) -> TableType {
51+
TableType::Base
52+
}
53+
54+
async fn scan(
55+
&self,
56+
_state: &dyn Session,
57+
projection: Option<&Vec<usize>>,
58+
_filters: &[Expr],
59+
_limit: Option<usize>,
60+
) -> Result<Arc<dyn ExecutionPlan>> {
61+
Ok(MemorySourceConfig::try_new_exec(
62+
&[vec![self.batch.clone()]],
63+
self.schema.clone(),
64+
projection.cloned(),
65+
)?)
66+
}
67+
68+
async fn scan_with_args<'a>(
69+
&self,
70+
state: &dyn Session,
71+
args: ScanArgs<'a>,
72+
) -> Result<ScanResult> {
73+
// Record what reached us, then delegate to scan().
74+
*self.last_requests.lock().unwrap() = args.statistics_requests().to_vec();
75+
let plan = self
76+
.scan(
77+
state,
78+
args.projection().map(|p| p.to_vec()).as_ref(),
79+
args.filters().unwrap_or(&[]),
80+
args.limit(),
81+
)
82+
.await?;
83+
Ok(ScanResult::new(plan))
84+
}
85+
}
86+
87+
fn make_table() -> (Arc<RecordingTable>, Arc<Mutex<Vec<StatisticsRequest>>>) {
88+
let schema = Arc::new(Schema::new(vec![
89+
Field::new("a", DataType::Int64, false),
90+
Field::new("b", DataType::Int64, false),
91+
]));
92+
let batch = RecordBatch::try_new(
93+
schema.clone(),
94+
vec![
95+
Arc::new(Int64Array::from(vec![1, 2, 3])),
96+
Arc::new(Int64Array::from(vec![10, 20, 30])),
97+
],
98+
)
99+
.unwrap();
100+
let last_requests = Arc::new(Mutex::new(Vec::new()));
101+
let provider = Arc::new(RecordingTable {
102+
schema,
103+
batch,
104+
last_requests: last_requests.clone(),
105+
});
106+
(provider, last_requests)
107+
}
108+
109+
#[tokio::test]
110+
async fn requests_reach_provider_scan_with_args() -> Result<()> {
111+
let (provider, last_requests) = make_table();
112+
let ctx = SessionContext::new();
113+
ctx.register_table("t", provider)?;
114+
115+
// Filter on `a` + sort on `b` should request Min/Max/NullCount on
116+
// both, plus DistinctCount on `a` (filter), plus a RowCount.
117+
let _ = ctx
118+
.sql("SELECT a, b FROM t WHERE a > 0 ORDER BY b LIMIT 10")
119+
.await?
120+
.collect()
121+
.await?;
122+
123+
let got = last_requests.lock().unwrap().clone();
124+
assert!(!got.is_empty(), "expected non-empty requests, got {got:?}");
125+
126+
let has = |needle: &StatisticsRequest| got.iter().any(|r| r == needle);
127+
use datafusion_common::Column;
128+
use datafusion_expr_common::statistics::StatisticsRequest::*;
129+
assert!(has(&RowCount), "expected RowCount, got {got:?}");
130+
assert!(
131+
has(&Min(Column::new_unqualified("a"))),
132+
"expected Min(a), got {got:?}"
133+
);
134+
assert!(
135+
has(&DistinctCount(Column::new_unqualified("a"))),
136+
"expected DistinctCount(a), got {got:?}"
137+
);
138+
assert!(
139+
has(&Min(Column::new_unqualified("b"))),
140+
"expected Min(b) from ORDER BY, got {got:?}"
141+
);
142+
143+
Ok(())
144+
}
145+
146+
#[tokio::test]
147+
async fn no_requests_when_plan_has_no_filter_sort_or_join() -> Result<()> {
148+
let (provider, last_requests) = make_table();
149+
let ctx = SessionContext::new();
150+
ctx.register_table("t", provider)?;
151+
152+
// Plain `SELECT *` — only `RowCount` should be requested.
153+
let _ = ctx.sql("SELECT a, b FROM t").await?.collect().await?;
154+
155+
let got = last_requests.lock().unwrap().clone();
156+
use datafusion_expr_common::statistics::StatisticsRequest::*;
157+
assert_eq!(got.len(), 1, "expected only RowCount, got {got:?}");
158+
assert!(matches!(got[0], RowCount));
159+
160+
Ok(())
161+
}

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2769,6 +2769,12 @@ pub struct TableScan {
27692769
pub filters: Vec<Expr>,
27702770
/// Optional number of rows to read
27712771
pub fetch: Option<usize>,
2772+
/// Statistics the planner would like the provider to answer for this
2773+
/// scan (typically derived from the surrounding plan shape — e.g.
2774+
/// Min/Max for sort keys, DistinctCount for join keys). Threaded into
2775+
/// `ScanArgs::with_statistics_requests` at physical planning time.
2776+
/// Empty by default.
2777+
pub statistics_requests: Vec<datafusion_expr_common::statistics::StatisticsRequest>,
27722778
}
27732779

27742780
impl Debug for TableScan {
@@ -2890,8 +2896,19 @@ impl TableScan {
28902896
projected_schema,
28912897
filters,
28922898
fetch,
2899+
statistics_requests: Vec::new(),
28932900
})
28942901
}
2902+
2903+
/// Attach a list of statistics requests for the optimizer-aware
2904+
/// stats-collection path. See [`Self::statistics_requests`].
2905+
pub fn with_statistics_requests(
2906+
mut self,
2907+
statistics_requests: Vec<datafusion_expr_common::statistics::StatisticsRequest>,
2908+
) -> Self {
2909+
self.statistics_requests = statistics_requests;
2910+
self
2911+
}
28952912
}
28962913

28972914
// Repartition the plan based on a partitioning scheme.
@@ -5128,6 +5145,7 @@ mod tests {
51285145
projected_schema: Arc::clone(&schema),
51295146
filters: vec![],
51305147
fetch: None,
5148+
statistics_requests: Vec::new(),
51315149
}));
51325150
let col = schema.field_names()[0].clone();
51335151

@@ -5158,6 +5176,7 @@ mod tests {
51585176
projected_schema: Arc::clone(&unique_schema),
51595177
filters: vec![],
51605178
fetch: None,
5179+
statistics_requests: Vec::new(),
51615180
}));
51625181
let col = schema.field_names()[0].clone();
51635182

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,7 @@ impl LogicalPlan {
615615
projected_schema,
616616
filters,
617617
fetch,
618+
statistics_requests,
618619
}) => filters.map_elements(f)?.update_data(|filters| {
619620
LogicalPlan::TableScan(TableScan {
620621
table_name,
@@ -623,6 +624,7 @@ impl LogicalPlan {
623624
projected_schema,
624625
filters,
625626
fetch,
627+
statistics_requests,
626628
})
627629
}),
628630
LogicalPlan::Distinct(Distinct::On(DistinctOn {

datafusion/optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ pub mod propagate_empty_relation;
6666
pub mod push_down_filter;
6767
pub mod push_down_limit;
6868
pub mod replace_distinct_aggregate;
69+
pub mod request_statistics;
6970
pub mod rewrite_set_comparison;
7071
pub mod scalar_subquery_to_join;
7172
pub mod simplify_expressions;

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ fn optimize_projections(
276276
filters,
277277
fetch,
278278
projected_schema: _,
279+
statistics_requests,
279280
} = table_scan;
280281

281282
// Get indices referred to in the original (schema with all fields)
@@ -285,7 +286,8 @@ fn optimize_projections(
285286
None => indices.into_inner(),
286287
};
287288
let new_scan =
288-
TableScan::try_new(table_name, source, Some(projection), filters, fetch)?;
289+
TableScan::try_new(table_name, source, Some(projection), filters, fetch)?
290+
.with_statistics_requests(statistics_requests);
289291

290292
return Transformed::yes(LogicalPlan::TableScan(new_scan))
291293
.transform_data(|plan| optimize_subqueries(plan, config));

0 commit comments

Comments
 (0)