Skip to content

Commit 189287f

Browse files
committed
test(expression_analyzer): add StatisticsTable and end-to-end SLT for OR selectivity
Add a reusable StatisticsTable (TableProvider + ExecutionPlan with user-supplied statistics) to the sqllogictest harness, and use it in expression_analyzer.slt
1 parent 8aa003b commit 189287f

6 files changed

Lines changed: 288 additions & 5 deletions

File tree

datafusion/physical-expr/src/expression_analyzer/default.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ impl DefaultExpressionAnalyzer {
7070
_ => None,
7171
}
7272
}
73-
7473
}
7574

7675
impl ExpressionAnalyzer for DefaultExpressionAnalyzer {

datafusion/physical-plan/src/filter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2658,7 +2658,7 @@ mod tests {
26582658
schema.clone(),
26592659
));
26602660
// (a = 42 OR b = 5): OR is not expressible as a single interval
2661-
let predicate = Arc::new(BinaryExpr::new(
2661+
let predicate: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
26622662
Arc::new(BinaryExpr::new(
26632663
Arc::new(Column::new("a", 0)),
26642664
Operator::Eq,
@@ -2673,7 +2673,7 @@ mod tests {
26732673
));
26742674

26752675
// Without ExpressionAnalyzer: default 20% selectivity -> 200 rows
2676-
let filter = Arc::new(FilterExec::try_new(predicate.clone(), input as _)?);
2676+
let filter = Arc::new(FilterExec::try_new(Arc::clone(&predicate), input as _)?);
26772677
let stats = filter.partition_statistics(None)?;
26782678
assert_eq!(stats.num_rows, Precision::Inexact(200));
26792679

datafusion/sqllogictest/src/test_context.rs

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,10 @@ impl TestContext {
177177
info!("Registering dummy async udf");
178178
register_async_abs_udf(test_ctx.session_ctx())
179179
}
180+
"expression_analyzer.slt" => {
181+
info!("Registering tables with controlled statistics");
182+
statistics_table::register_statistics_tables(test_ctx.session_ctx());
183+
}
180184
_ => {
181185
info!("Using default SessionContext");
182186
}
@@ -615,3 +619,165 @@ fn register_async_abs_udf(ctx: &SessionContext) {
615619
let udf = AsyncScalarUDF::new(Arc::new(async_abs));
616620
ctx.register_udf(udf.into_scalar_udf());
617621
}
622+
623+
/// A table provider with fully controlled statistics for testing
624+
/// statistics-dependent optimizer and planner behaviors.
625+
///
626+
/// Unlike [`MemTable`] (which derives statistics from actual data), this
627+
/// provider returns whatever `Statistics` you supply, letting tests exercise
628+
/// code paths that depend on specific column NDV, min/max, or row-count values
629+
/// without needing real data files.
630+
pub mod statistics_table {
631+
use std::sync::Arc;
632+
633+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
634+
use async_trait::async_trait;
635+
use datafusion::catalog::Session;
636+
use datafusion::common::tree_node::TreeNodeRecursion;
637+
use datafusion::common::{Result, stats::Precision};
638+
use datafusion::datasource::{TableProvider, TableType};
639+
use datafusion::execution::TaskContext;
640+
use datafusion::logical_expr::Expr;
641+
use datafusion::physical_expr::{EquivalenceProperties, PhysicalExpr};
642+
use datafusion::physical_plan::{
643+
ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
644+
PlanProperties, SendableRecordBatchStream, Statistics,
645+
execution_plan::{Boundedness, EmissionType},
646+
};
647+
use datafusion::prelude::SessionContext;
648+
649+
/// A [`TableProvider`] and [`ExecutionPlan`] that returns user-supplied
650+
/// statistics. Useful for testing code paths that depend on specific column
651+
/// NDV, min/max, or row counts without requiring real data files.
652+
#[derive(Debug, Clone)]
653+
pub struct StatisticsTable {
654+
schema: SchemaRef,
655+
stats: Statistics,
656+
cache: Arc<PlanProperties>,
657+
}
658+
659+
impl StatisticsTable {
660+
pub fn new(schema: SchemaRef, stats: Statistics) -> Self {
661+
assert_eq!(
662+
schema.fields().len(),
663+
stats.column_statistics.len(),
664+
"column_statistics length must match schema field count"
665+
);
666+
let cache = Arc::new(PlanProperties::new(
667+
EquivalenceProperties::new(Arc::clone(&schema)),
668+
Partitioning::UnknownPartitioning(1),
669+
EmissionType::Incremental,
670+
Boundedness::Bounded,
671+
));
672+
Self {
673+
schema,
674+
stats,
675+
cache,
676+
}
677+
}
678+
}
679+
680+
impl DisplayAs for StatisticsTable {
681+
fn fmt_as(
682+
&self,
683+
_t: DisplayFormatType,
684+
f: &mut std::fmt::Formatter,
685+
) -> std::fmt::Result {
686+
write!(f, "StatisticsTable")
687+
}
688+
}
689+
690+
impl ExecutionPlan for StatisticsTable {
691+
fn name(&self) -> &'static str {
692+
"StatisticsTable"
693+
}
694+
695+
fn properties(&self) -> &Arc<PlanProperties> {
696+
&self.cache
697+
}
698+
699+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
700+
vec![]
701+
}
702+
703+
fn apply_expressions(
704+
&self,
705+
_f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
706+
) -> Result<TreeNodeRecursion> {
707+
Ok(TreeNodeRecursion::Continue)
708+
}
709+
710+
fn with_new_children(
711+
self: Arc<Self>,
712+
_children: Vec<Arc<dyn ExecutionPlan>>,
713+
) -> Result<Arc<dyn ExecutionPlan>> {
714+
Ok(self)
715+
}
716+
717+
fn execute(
718+
&self,
719+
_partition: usize,
720+
_context: Arc<TaskContext>,
721+
) -> Result<SendableRecordBatchStream> {
722+
datafusion::common::not_impl_err!(
723+
"StatisticsTable is for statistics testing only"
724+
)
725+
}
726+
727+
fn partition_statistics(
728+
&self,
729+
_partition: Option<usize>,
730+
) -> Result<Arc<Statistics>> {
731+
Ok(Arc::new(self.stats.clone()))
732+
}
733+
}
734+
735+
#[async_trait]
736+
impl TableProvider for StatisticsTable {
737+
fn schema(&self) -> SchemaRef {
738+
Arc::clone(&self.schema)
739+
}
740+
741+
fn table_type(&self) -> TableType {
742+
TableType::Base
743+
}
744+
745+
async fn scan(
746+
&self,
747+
_state: &dyn Session,
748+
projection: Option<&Vec<usize>>,
749+
_filters: &[Expr],
750+
_limit: Option<usize>,
751+
) -> Result<Arc<dyn ExecutionPlan>> {
752+
let schema = datafusion::common::project_schema(&self.schema, projection)?;
753+
let stats = self.stats.clone().project(projection);
754+
Ok(Arc::new(StatisticsTable::new(schema, stats)))
755+
}
756+
}
757+
758+
/// Registers named [`StatisticsTable`] instances needed by SLT tests
759+
/// that require controlled statistics (NDV, row count, min/max).
760+
pub fn register_statistics_tables(ctx: &SessionContext) {
761+
// t_ndv: 1000 rows, column a (Int64, NDV=10), column b (Int64, NDV=5).
762+
let schema = Arc::new(Schema::new(vec![
763+
Field::new("a", DataType::Int64, false),
764+
Field::new("b", DataType::Int64, false),
765+
]));
766+
let stats = Statistics {
767+
num_rows: Precision::Inexact(1000),
768+
total_byte_size: Precision::Absent,
769+
column_statistics: vec![
770+
ColumnStatistics {
771+
distinct_count: Precision::Inexact(10),
772+
..Default::default()
773+
},
774+
ColumnStatistics {
775+
distinct_count: Precision::Inexact(5),
776+
..Default::default()
777+
},
778+
],
779+
};
780+
ctx.register_table("t_ndv", Arc::new(StatisticsTable::new(schema, stats)))
781+
.expect("registering t_ndv should succeed");
782+
}
783+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# Tests for ExpressionAnalyzerRegistry end-to-end integration.
19+
#
20+
# t_ndv is a StatisticsTable with controlled statistics:
21+
# 1000 rows, column a (Int64, NDV=10), column b (Int64, NDV=5).
22+
#
23+
# OR predicates are not expressible as a single interval, so the built-in
24+
# interval-arithmetic path always falls back to the default selectivity (20%).
25+
# ExpressionAnalyzerRegistry applies inclusion-exclusion instead:
26+
# P(a=42 OR b=5) = P(a=42) + P(b=5) - P(a=42)*P(b=5)
27+
# = 1/10 + 1/5 - 1/50
28+
# = 0.1 + 0.2 - 0.02 = 0.28
29+
# Expected rows = round(1000 * 0.28) = 280.
30+
31+
statement ok
32+
SET datafusion.execution.target_partitions = 1;
33+
34+
statement ok
35+
SET datafusion.explain.show_statistics = true;
36+
37+
statement ok
38+
SET datafusion.explain.physical_plan_only = true;
39+
40+
# Without ExpressionAnalyzerRegistry: OR predicate falls back to 20% default selectivity
41+
# → FilterExec estimated rows = 1000 * 0.20 = 200
42+
query TT
43+
EXPLAIN SELECT * FROM t_ndv WHERE a = 42 OR b = 5;
44+
----
45+
physical_plan
46+
01)FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(200), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
47+
02)--CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
48+
03)----StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
49+
50+
# Enable ExpressionAnalyzerRegistry so inclusion-exclusion applies to OR predicates
51+
statement ok
52+
SET datafusion.optimizer.use_expression_analyzer = true;
53+
54+
# With ExpressionAnalyzerRegistry: OR uses inclusion-exclusion
55+
# P(a=42 OR b=5) = 1/10 + 1/5 - (1/10 * 1/5) = 0.28 → 280 rows
56+
query TT
57+
EXPLAIN SELECT * FROM t_ndv WHERE a = 42 OR b = 5;
58+
----
59+
physical_plan
60+
01)FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
61+
02)--CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
62+
03)----StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
63+
64+
# Verify the registry survives physical optimizer rules: ORDER BY + LIMIT triggers the
65+
# TopK sort rule which rewrites the plan above FilterExec. The FilterExec row estimate
66+
# must still reflect inclusion-exclusion (280), not the 20% default.
67+
query TT
68+
EXPLAIN SELECT * FROM t_ndv WHERE a = 42 OR b = 5 ORDER BY a LIMIT 100;
69+
----
70+
physical_plan
71+
01)SortExec: TopK(fetch=100), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false], statistics=[Rows=Inexact(100), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
72+
02)--FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
73+
03)----CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
74+
04)------StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
75+
76+
# Verify the registry reaches FilterExec nodes created by optimizer rules: the
77+
# filter_pushdown rule running on UnionExec creates fresh FilterExec nodes (one per
78+
# branch) that never existed when the registry was first injected. Both must show
79+
# 280 rows, confirming re-injection after each rule reaches newly created nodes.
80+
# The UnionExec row count (560 = 2 * 280) and doubled NDVs (20, 10) also confirm
81+
# that distinct-count propagation through UnionExec is correct.
82+
query TT
83+
EXPLAIN SELECT * FROM (SELECT * FROM t_ndv UNION ALL SELECT * FROM t_ndv) WHERE a = 42 OR b = 5;
84+
----
85+
physical_plan
86+
01)UnionExec, statistics=[Rows=Inexact(560), Bytes=Absent, [(Col[0]: Distinct=Inexact(20)),(Col[1]: Distinct=Inexact(10))]]
87+
02)--FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
88+
03)----CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
89+
04)------StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
90+
05)--FilterExec: a@0 = 42 OR b@1 = 5, statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
91+
06)----CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
92+
07)------StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
93+
94+
# Verify the registry reaches a FilterExec pushed through a join: filter_pushdown
95+
# moves the WHERE clause filter to the left side of the HashJoinExec, creating a
96+
# FilterExec that was not present in the plan at initial injection time.
97+
query TT
98+
EXPLAIN SELECT l.a, r.b FROM t_ndv l JOIN t_ndv r ON l.a = r.a WHERE l.a = 42 OR l.b = 5;
99+
----
100+
physical_plan
101+
01)HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)], projection=[a@0, b@2], statistics=[Rows=Inexact(28000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
102+
02)--FilterExec: a@0 = 42 OR b@1 = 5, projection=[a@0], statistics=[Rows=Inexact(280), Bytes=Absent, [(Col[0]: Distinct=Inexact(10))]]
103+
03)----CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
104+
04)------StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
105+
05)--CooperativeExec, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
106+
06)----StatisticsTable, statistics=[Rows=Inexact(1000), Bytes=Absent, [(Col[0]: Distinct=Inexact(10)),(Col[1]: Distinct=Inexact(5))]]
107+
108+
statement ok
109+
SET datafusion.optimizer.use_expression_analyzer = false;
110+
111+
statement ok
112+
SET datafusion.explain.show_statistics = false;
113+
114+
statement ok
115+
SET datafusion.explain.physical_plan_only = false;
116+
117+
statement ok
118+
SET datafusion.execution.target_partitions = 4;

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ datafusion.optimizer.repartition_windows true Should DataFusion repartition data
478478
datafusion.optimizer.skip_failed_rules false When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail
479479
datafusion.optimizer.subset_repartition_threshold 4 Partition count threshold for subset satisfaction optimization. When the current partition count is >= this threshold, DataFusion will skip repartitioning if the required partitioning expression is a subset of the current partition expression such as Hash(a) satisfies Hash(a, b). When the current partition count is < this threshold, DataFusion will repartition to increase parallelism even when subset satisfaction applies. Set to 0 to always repartition (disable subset satisfaction optimization). Set to a high value to always use subset satisfaction. Example (subset_repartition_threshold = 4): ```text Hash([a]) satisfies Hash([a, b]) because (Hash([a, b]) is subset of Hash([a]) If current partitions (3) < threshold (4), repartition: AggregateExec: mode=FinalPartitioned, gby=[a, b], aggr=[SUM(x)] RepartitionExec: partitioning=Hash([a, b], 8), input_partitions=3 AggregateExec: mode=Partial, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 3) If current partitions (8) >= threshold (4), use subset satisfaction: AggregateExec: mode=SinglePartitioned, gby=[a, b], aggr=[SUM(x)] DataSourceExec: file_groups={...}, output_partitioning=Hash([a], 8) ```
480480
datafusion.optimizer.top_down_join_key_reordering true When set to true, the physical plan optimizer will run a top down process to reorder the join keys
481-
datafusion.optimizer.use_expression_analyzer false When set to true, the pluggable `ExpressionAnalyzerRegistry` from `SessionState` is injected into exec nodes that use expression-level statistics (`FilterExec`, `ProjectionExec`, `AggregateExec`, join nodes) and re-injected after each physical optimizer rule so rebuilt nodes always carry it. Custom analyzers then influence `partition_statistics` in those operators.
481+
datafusion.optimizer.use_expression_analyzer false When set to true, the pluggable `ExpressionAnalyzerRegistry` from `SessionState` is used for expression-level statistics estimation (NDV, selectivity, min/max, null fraction) in physical plan operators.
482482
datafusion.optimizer.use_statistics_registry false When set to true, the physical plan optimizer uses the pluggable `StatisticsRegistry` for a bottom-up statistics walk across operators, enabling more accurate cardinality estimates. Enabling `use_expression_analyzer` alongside this flag gives built-in providers access to custom expression-level analyzers (NDV, selectivity) for the operators they process.
483483
datafusion.runtime.list_files_cache_limit 1M Maximum memory to use for list files cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.
484484
datafusion.runtime.list_files_cache_ttl NULL TTL (time-to-live) of the entries in the list file cache. Supports units m (minutes), and s (seconds). Example: '2m' for 2 minutes.

0 commit comments

Comments
 (0)