From 595c0bad4c2797c5ac38ef56c7096cc7ba32b8ce Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 3 May 2026 07:13:25 -0500 Subject: [PATCH 1/2] catalog: query-aware statistics requests via ScanArgs / ScanResult MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an opt-in handshake that lets callers ask a `TableProvider` for specific stats by name and receive only what the provider can answer cheaply, instead of the all-or-nothing dense `Statistics` we have today. ## What's new * `datafusion-common::stats::StatisticsRequest` — enum of stat kinds that mirror `Statistics` / `ColumnStatistics` (Min, Max, NullCount, DistinctCount, Sum, ByteSize, RowCount, TotalByteSize). `Hash + Eq` so it can key a `HashMap`. * `datafusion-common::stats::StatisticsValue` — `Scalar(Precision<...>) | Distribution(Arc) | Sketch(Arc) | Absent`. Whether a value is exact or estimated travels in the `Precision` wrapper, not the variant. * `ScanArgs::with_statistics_requests` / `statistics_requests()` — the caller's question. * `ScanResult::with_statistics` / `statistics()` / `into_parts()` — the provider's answer, paired 1:1 with the requests slice. * `PartitionedFile::satisfied_stats` — sparse, `Arc>` for per-file answers. Memory scales with what was asked, not with table width. Providers that store stats out-of-band (Delta/Iceberg/Hudi manifests, Hive Metastore, custom catalogs) can populate this directly without rebuilding a full dense `Statistics`. * `FilePruner` learns to consume the sparse map. Internally, `file_stats_pruning` is now `Box` so we can dispatch between the existing `PrunableStatistics` (dense) and a new `SparseFilePruningStats` adapter (sparse). The sparse adapter looks up each `StatisticsRequest` directly in the map and materializes single-row arrays only for the columns the pruning predicate touches — no densify-then-throw-away. * `ListingTable::scan_with_args` populates `ScanResult.statistics` from the merged dense `Statistics` it already computed when `args.statistics_requests()` is set and `collect_statistics=true`. When `collect_statistics=false` it returns `Absent` for everything (the contract is "answer what's free"). `DistinctCount`/`Sum`/ `ByteSize` are likewise `Absent` for parquet — those aren't in thrift footers; layered helpers (or richer providers) can fill the gaps. ## Backwards compat All additions are opt-in: * `ScanArgs` / `ScanResult` gain new fields with `Default`-friendly initializers; existing callers that don't use the new builders see no change. * `FilePruner`'s field-type change is internal (private field). * The only minor source-level break is a new pub field on `PartitionedFile` (`satisfied_stats`). Callers using `PartitionedFile::new` / `From` / the existing builders are unaffected. Direct struct literals — uncommon, none in-tree — need to add `satisfied_stats: None` (or use the new `with_satisfied_stats` builder). ## Tests * `datafusion-common::stats::tests::statistics_request_is_hashable_keyable` — round-trip a `StatisticsRequest` through a `HashMap`. * `datafusion-pruning::file_pruner::tests` — three tests demonstrating end-to-end pruning against a sparse-only `PartitionedFile` (`x > 100` prunes a `[10, 20]` file, `x > 15` doesn't, no stats at all → no pruner). Co-Authored-By: Claude Opus 4.7 (1M context) --- datafusion/catalog-listing/src/table.rs | 215 +++++++++++++++++++- datafusion/catalog/src/table.rs | 28 +++ datafusion/datasource/src/mod.rs | 33 ++++ datafusion/expr-common/src/statistics.rs | 118 +++++++++++ datafusion/pruning/src/file_pruner.rs | 237 ++++++++++++++++++++++- 5 files changed, 623 insertions(+), 8 deletions(-) diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 06ba8c8113fac..f487d4b8e9037 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -23,7 +23,8 @@ use async_trait::async_trait; use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; use datafusion_common::stats::Precision; use datafusion_common::{ - Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema, + Constraints, ScalarValue, SchemaExt, Statistics, internal_datafusion_err, plan_err, + project_schema, }; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_groups::FileGroup; @@ -39,6 +40,9 @@ use datafusion_execution::cache::cache_manager::FileStatisticsCache; use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; use datafusion_expr::dml::InsertOp; use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::statistics::{ + SatisfiedStatistics, StatisticsRequest, StatisticsValue, +}; use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; use datafusion_physical_expr::create_lex_ordering; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; @@ -515,6 +519,39 @@ impl TableProvider for ListingTable { .list_files_for_scan(state, &partition_filters, statistic_file_limit) .await?; + // If the caller asked for per-file stats and we already have dense + // per-file `Statistics` (i.e. `collect_statistics=true` populated + // them during file listing), project them into a sparse + // `satisfied_stats` map keyed by request. This is "free" — the IO + // / decode cost was already paid; we're just surfacing what we + // have in the request-driven shape that downstream consumers + // (e.g. `FilePruner` via `SparseFilePruningStats`) prefer. + if !args.statistics_requests().is_empty() && self.options.collect_stat { + partitioned_file_lists = partitioned_file_lists + .into_iter() + .map(|fg| { + let files = fg + .into_inner() + .into_iter() + .map(|pf| { + let sparse = pf.statistics.as_ref().map(|stats| { + project_satisfied_stats( + args.statistics_requests(), + &self.table_schema, + stats, + ) + }); + match sparse { + Some(s) => pf.with_satisfied_stats(Arc::new(s)), + None => pf, + } + }) + .collect(); + FileGroup::new(files) + }) + .collect(); + } + // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { let projected_schema = project_schema(&self.schema(), projection.as_ref())?; @@ -688,6 +725,108 @@ impl TableProvider for ListingTable { } } +/// Project a per-file dense [`Statistics`] into the sparse map shape +/// that downstream consumers (e.g. `FilePruner` via +/// `SparseFilePruningStats`) read. Only entries the request list asked +/// for are present; non-`Absent` slots are kept verbatim (preserves the +/// `Precision` already on the field). The dense `Statistics` itself is +/// untouched. +/// +/// `DistinctCount` / `Sum` / `ByteSize` will only be present if the +/// underlying file format actually populated them on `ColumnStatistics` +/// — for parquet, only `null_count`, `min_value`, `max_value`, and +/// `num_rows` are typically available from thrift footers; the rest +/// stay `Absent` (so the entry is omitted). +fn project_satisfied_stats( + requests: &[StatisticsRequest], + schema: &SchemaRef, + stats: &Statistics, +) -> SatisfiedStatistics { + let mut out = SatisfiedStatistics::new(); + for req in requests { + let value = match req { + StatisticsRequest::RowCount => precision_to_scalar(&stats.num_rows, |n| { + ScalarValue::UInt64(Some(n as u64)) + }), + StatisticsRequest::TotalByteSize => { + precision_to_scalar(&stats.total_byte_size, |n| { + ScalarValue::UInt64(Some(n as u64)) + }) + } + StatisticsRequest::Min(c) => { + column_scalar(schema, stats, &c.name, |cs| &cs.min_value) + } + StatisticsRequest::Max(c) => { + column_scalar(schema, stats, &c.name, |cs| &cs.max_value) + } + StatisticsRequest::Sum(c) => { + column_scalar(schema, stats, &c.name, |cs| &cs.sum_value) + } + StatisticsRequest::NullCount(c) => { + column_usize(schema, stats, &c.name, |cs| &cs.null_count) + } + StatisticsRequest::DistinctCount(c) => { + column_usize(schema, stats, &c.name, |cs| &cs.distinct_count) + } + StatisticsRequest::ByteSize(c) => { + column_usize(schema, stats, &c.name, |cs| &cs.byte_size) + } + }; + // Skip `Absent` entries — the sparse map's invariant is "only + // entries the provider answered". Consumers infer `Absent` from + // a missing key. + if !matches!(value, StatisticsValue::Absent) { + out.insert(req.clone(), value); + } + } + out +} + +fn precision_to_scalar( + p: &Precision, + map: impl Fn(usize) -> ScalarValue, +) -> StatisticsValue { + match p { + Precision::Exact(n) => StatisticsValue::Scalar(Precision::Exact(map(*n))), + Precision::Inexact(n) => StatisticsValue::Scalar(Precision::Inexact(map(*n))), + Precision::Absent => StatisticsValue::Absent, + } +} + +fn column_scalar( + schema: &SchemaRef, + stats: &Statistics, + name: &str, + pick: impl Fn(&datafusion_common::ColumnStatistics) -> &Precision, +) -> StatisticsValue { + let Ok(idx) = schema.index_of(name) else { + return StatisticsValue::Absent; + }; + let Some(cs) = stats.column_statistics.get(idx) else { + return StatisticsValue::Absent; + }; + match pick(cs) { + Precision::Exact(v) => StatisticsValue::Scalar(Precision::Exact(v.clone())), + Precision::Inexact(v) => StatisticsValue::Scalar(Precision::Inexact(v.clone())), + Precision::Absent => StatisticsValue::Absent, + } +} + +fn column_usize( + schema: &SchemaRef, + stats: &Statistics, + name: &str, + pick: impl Fn(&datafusion_common::ColumnStatistics) -> &Precision, +) -> StatisticsValue { + let Ok(idx) = schema.index_of(name) else { + return StatisticsValue::Absent; + }; + let Some(cs) = stats.column_statistics.get(idx) else { + return StatisticsValue::Absent; + }; + precision_to_scalar(pick(cs), |n| ScalarValue::UInt64(Some(n as u64))) +} + impl ListingTable { /// Get the list of files for a scan as well as the file level statistics. /// The list is grouped to let the execution plan know how the files should @@ -1049,4 +1188,78 @@ mod tests { let result = derive_common_ordering_from_files(&file_groups); assert_eq!(result, Some(ordering)); } + + #[test] + fn project_satisfied_stats_picks_only_requested_and_present() { + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::{Column, ColumnStatistics}; + use datafusion_expr::statistics::{StatisticsRequest, StatisticsValue}; + + let schema: SchemaRef = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Int64, true), + ])); + + let mut stats = Statistics::new_unknown(&schema); + stats.num_rows = Precision::Exact(42); + // a: min/max known, null_count Absent + stats.column_statistics[0] = ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Exact(ScalarValue::Int64(Some(10))), + min_value: Precision::Exact(ScalarValue::Int64(Some(1))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Absent, + }; + // b: only null_count known + stats.column_statistics[1] = ColumnStatistics { + null_count: Precision::Exact(3), + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Absent, + }; + + let a = Column::from_name("a"); + let b = Column::from_name("b"); + let requests = vec![ + StatisticsRequest::RowCount, + StatisticsRequest::Min(a.clone()), + StatisticsRequest::Max(a.clone()), + StatisticsRequest::NullCount(a.clone()), // Absent — should be omitted + StatisticsRequest::NullCount(b.clone()), + StatisticsRequest::Min(b.clone()), // Absent — should be omitted + ]; + + let out = project_satisfied_stats(&requests, &schema, &stats); + + assert_eq!(out.len(), 4); + assert!(matches!( + out.get(&StatisticsRequest::RowCount), + Some(StatisticsValue::Scalar(Precision::Exact( + ScalarValue::UInt64(Some(42)) + ))) + )); + assert!(matches!( + out.get(&StatisticsRequest::Min(a.clone())), + Some(StatisticsValue::Scalar(Precision::Exact( + ScalarValue::Int64(Some(1)) + ))) + )); + assert!(matches!( + out.get(&StatisticsRequest::Max(a.clone())), + Some(StatisticsValue::Scalar(Precision::Exact( + ScalarValue::Int64(Some(10)) + ))) + )); + assert!(matches!( + out.get(&StatisticsRequest::NullCount(b.clone())), + Some(StatisticsValue::Scalar(Precision::Exact( + ScalarValue::UInt64(Some(3)) + ))) + )); + assert!(!out.contains_key(&StatisticsRequest::NullCount(a))); + assert!(!out.contains_key(&StatisticsRequest::Min(b))); + } } diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 5d1391bed1172..9543710887c16 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -26,6 +26,7 @@ use async_trait::async_trait; use datafusion_common::{Constraints, Statistics, not_impl_err}; use datafusion_common::{Result, internal_err}; use datafusion_expr::Expr; +use datafusion_expr::statistics::StatisticsRequest; use datafusion_expr::dml::InsertOp; use datafusion_expr::{ @@ -406,6 +407,7 @@ pub struct ScanArgs<'a> { filters: Option<&'a [Expr]>, projection: Option<&'a [usize]>, limit: Option, + statistics_requests: &'a [StatisticsRequest], } impl<'a> ScanArgs<'a> { @@ -467,6 +469,32 @@ impl<'a> ScanArgs<'a> { pub fn limit(&self) -> Option { self.limit } + + /// Set a list of statistics the caller would like the provider to + /// answer if it can do so cheaply. + /// + /// Typical sources a provider may answer from: + /// * Parquet thrift footers (Min/Max/NullCount/RowCount, exact) + /// * An external metadata catalog (Delta/Iceberg/Hudi manifests, + /// Hive-Metastore-style stats columns) + /// * Cached / materialized stats columns + /// + /// Providers surface their answers on a per-file basis via + /// `PartitionedFile::satisfied_stats` (a sparse map keyed by the + /// same `StatisticsRequest`). Anything the provider can't answer + /// is simply omitted — consumers treat a missing key as "absent". + /// The contract is "answer what's free, leave the rest off the + /// map": providers MUST NOT do expensive scans purely to satisfy + /// these requests. + pub fn with_statistics_requests(mut self, requests: &'a [StatisticsRequest]) -> Self { + self.statistics_requests = requests; + self + } + + /// Get the statistics requests. Empty if none were set. + pub fn statistics_requests(&self) -> &'a [StatisticsRequest] { + self.statistics_requests + } } /// Result of a table scan operation from [`TableProvider::scan_with_args`]. diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs index a9600271c28ce..d06ce7ea34723 100644 --- a/datafusion/datasource/src/mod.rs +++ b/datafusion/datasource/src/mod.rs @@ -58,6 +58,7 @@ use chrono::TimeZone; use datafusion_common::stats::Precision; use datafusion_common::{ColumnStatistics, Result, exec_datafusion_err}; use datafusion_common::{ScalarValue, Statistics}; +use datafusion_expr::statistics::SatisfiedStatistics; use datafusion_physical_expr::LexOrdering; use futures::{Stream, StreamExt}; use object_store::{GetOptions, GetRange, ObjectStore}; @@ -138,6 +139,18 @@ pub struct PartitionedFile { /// When set via [`Self::with_statistics`], partition column statistics are automatically /// computed from [`Self::partition_values`] with exact min/max/null_count/distinct_count. pub statistics: Option>, + /// Sparse, request-keyed stats answered by the provider for this file. + /// + /// Only entries for non-`Absent` answers are present, so memory scales + /// with the *count of stats actually requested* rather than the table's + /// column count. Used in tandem with — not in place of — + /// [`Self::statistics`]: existing consumers that read the dense + /// `Statistics` keep working; new consumers (e.g. `FilePruner` in + /// `datafusion-pruning`) prefer this sparse map when it's + /// populated. Providers that store stats out-of-band (Delta/Iceberg/Hudi + /// manifests, Hive Metastore, custom catalogs) can populate this + /// directly without rebuilding a full dense `Statistics`. + pub satisfied_stats: Option>, /// The known lexicographical ordering of the rows in this file, if any. /// /// This describes how the data within the file is sorted with respect to one or more @@ -168,6 +181,7 @@ impl PartitionedFile { partition_values: vec![], range: None, statistics: None, + satisfied_stats: None, ordering: None, extensions: None, metadata_size_hint: None, @@ -181,6 +195,7 @@ impl PartitionedFile { partition_values: vec![], range: None, statistics: None, + satisfied_stats: None, ordering: None, extensions: None, metadata_size_hint: None, @@ -200,6 +215,7 @@ impl PartitionedFile { partition_values: vec![], range: Some(FileRange { start, end }), statistics: None, + satisfied_stats: None, ordering: None, extensions: None, metadata_size_hint: None, @@ -328,6 +344,21 @@ impl PartitionedFile { self.ordering = ordering; self } + + /// Attach a sparse map of provider-answered statistics for this file. + /// + /// Used by table providers that store per-file stats out-of-band + /// (Delta/Iceberg/Hudi manifests, Hive Metastore, custom catalogs) + /// and want to surface them without reconstructing a full dense + /// [`Statistics`]. Consumers (e.g. `FilePruner`) prefer this map + /// when [`Self::statistics`] is not set. + pub fn with_satisfied_stats( + mut self, + satisfied_stats: Arc, + ) -> Self { + self.satisfied_stats = Some(satisfied_stats); + self + } } impl From for PartitionedFile { @@ -337,6 +368,7 @@ impl From for PartitionedFile { partition_values: vec![], range: None, statistics: None, + satisfied_stats: None, ordering: None, extensions: None, metadata_size_hint: None, @@ -534,6 +566,7 @@ pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec), + /// A typed probability distribution. Boxed so the enum stays small — + /// `Distribution` is significantly larger than the other variants. + Distribution(Box), + /// Provider can't (or won't) answer this request. The caller + /// decides whether to fall back to another mechanism. + Absent, +} + +impl StatisticsValue { + /// Convenience: an `Exact` scalar response. + pub fn exact(value: ScalarValue) -> Self { + Self::Scalar(Precision::Exact(value)) + } + /// Convenience: an `Inexact` scalar response. + pub fn inexact(value: ScalarValue) -> Self { + Self::Scalar(Precision::Inexact(value)) + } +} + +/// Sparse map of stats answers, keyed by request. Used as the storage for +/// per-file answers (see `PartitionedFile::satisfied_stats`) and as the +/// internal representation for adapters that consume request-driven +/// stats. Only entries the provider actually answered are present, so +/// memory scales with what was asked rather than with table width. +pub type SatisfiedStatistics = + std::collections::HashMap; + +#[cfg(test)] +mod stats_request_tests { + use super::*; + use std::collections::HashMap; + + #[test] + fn statistics_request_is_hashable_keyable() { + // Sanity: two equal `StatisticsRequest`s hash equal and round-trip + // through a HashMap, so they can be used as keys (e.g. for the + // sparse `PartitionedFile::satisfied_stats` map). + let r1 = StatisticsRequest::Min(Column::new_unqualified("c")); + let r2 = StatisticsRequest::Min(Column::new_unqualified("c")); + assert_eq!(r1, r2); + let mut map: HashMap = HashMap::new(); + map.insert( + r1.clone(), + StatisticsValue::exact(ScalarValue::Int64(Some(7))), + ); + match map.get(&r2) { + Some(StatisticsValue::Scalar(Precision::Exact(ScalarValue::Int64( + Some(7), + )))) => {} + other => panic!("unexpected lookup: {other:?}"), + } + } +} diff --git a/datafusion/pruning/src/file_pruner.rs b/datafusion/pruning/src/file_pruner.rs index f850e0c0114fb..38f06873bc4b1 100644 --- a/datafusion/pruning/src/file_pruner.rs +++ b/datafusion/pruning/src/file_pruner.rs @@ -17,11 +17,20 @@ //! File-level pruning based on partition values and file-level statistics +use std::collections::HashSet; use std::sync::Arc; +use arrow::array::{ArrayRef, BooleanArray}; use arrow::datatypes::{FieldRef, SchemaRef}; -use datafusion_common::{Result, internal_datafusion_err, pruning::PrunableStatistics}; +use datafusion_common::stats::Precision; +use datafusion_common::{ + Column as LogColumn, Result, ScalarValue, internal_datafusion_err, + pruning::{PrunableStatistics, PruningStatistics}, +}; use datafusion_datasource::PartitionedFile; +use datafusion_expr_common::statistics::{ + SatisfiedStatistics, StatisticsRequest, StatisticsValue, +}; use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, snapshot_generation}; use datafusion_physical_plan::metrics::Count; use log::debug; @@ -38,7 +47,12 @@ pub struct FilePruner { predicate: Arc, /// Schema used for pruning (the logical file schema). file_schema: SchemaRef, - file_stats_pruning: PrunableStatistics, + /// Pruning statistics adapter. Either a [`PrunableStatistics`] view + /// of the file's dense `Statistics`, or a [`SparseFilePruningStats`] + /// that hits the sparse [`PartitionedFile::satisfied_stats`] map + /// directly. Boxed so we can dispatch between the two without forcing + /// callers to densify a sparse map ahead of time. + file_stats_pruning: Box, predicate_creation_errors: Count, } @@ -70,16 +84,34 @@ impl FilePruner { } /// Create a new file pruner if statistics are available. - /// Returns None if this file does not have statistics. + /// + /// Prefers the dense [`PartitionedFile::statistics`] when present, and + /// falls back to the sparse [`PartitionedFile::satisfied_stats`] map. + /// In the sparse case the accessors look up each + /// [`StatisticsRequest`] by key and lazily materialize the + /// single-row arrays the pruning predicate needs — no + /// densify-then-throw-away. Returns `None` if neither source has + /// any stats. pub fn try_new( predicate: Arc, file_schema: &SchemaRef, partitioned_file: &PartitionedFile, predicate_creation_errors: Count, ) -> Option { - let file_stats = partitioned_file.statistics.as_ref()?; - let file_stats_pruning = - PrunableStatistics::new(vec![file_stats.clone()], Arc::clone(file_schema)); + let file_stats_pruning: Box = + if let Some(s) = &partitioned_file.statistics { + Box::new(PrunableStatistics::new( + vec![Arc::clone(s)], + Arc::clone(file_schema), + )) + } else if let Some(sparse) = &partitioned_file.satisfied_stats { + Box::new(SparseFilePruningStats { + sparse: Arc::clone(sparse), + schema: Arc::clone(file_schema), + }) + } else { + return None; + }; Some(Self { predicate_generation: None, predicate, @@ -114,7 +146,7 @@ impl FilePruner { let Some(pruning_predicate) = pruning_predicate else { return Ok(false); }; - match pruning_predicate.prune(&self.file_stats_pruning) { + match pruning_predicate.prune(&*self.file_stats_pruning) { Ok(values) => { assert!(values.len() == 1); // We expect a single container -> if all containers are false skip this file @@ -132,3 +164,194 @@ impl FilePruner { Ok(false) } } + +/// `PruningStatistics` adapter backed by a sparse +/// `HashMap`. +/// +/// Each accessor builds the corresponding [`StatisticsRequest`] for the +/// column it was asked about, looks it up in the map, and converts the +/// result into a single-element [`ArrayRef`]. The 1-row arrays are only +/// ever materialized for columns the pruning predicate actually touches — +/// the long-lived footprint is the sparse map itself, which only contains +/// entries the provider answered. +struct SparseFilePruningStats { + sparse: Arc, + schema: SchemaRef, +} + +impl SparseFilePruningStats { + fn lookup_scalar(&self, req: &StatisticsRequest) -> Option { + match self.sparse.get(req)? { + StatisticsValue::Scalar(Precision::Exact(v)) + | StatisticsValue::Scalar(Precision::Inexact(v)) => Some(v.clone()), + _ => None, + } + } + + /// Single-row Min/Max array, with a typed-null fallback for absent + /// entries so the array has a stable schema across calls. + fn min_or_max(&self, field: &FieldRef, req: &StatisticsRequest) -> Option { + let v = self.lookup_scalar(req)?; + v.to_array_of_size(1).ok().or_else(|| { + ScalarValue::try_from(field.data_type()) + .ok() + .and_then(|n| n.to_array_of_size(1).ok()) + }) + } +} + +impl PruningStatistics for SparseFilePruningStats { + fn min_values(&self, column: &datafusion_common::Column) -> Option { + let field = self.schema.field_with_name(&column.name).ok()?; + let field = Arc::new(field.clone()); + self.min_or_max( + &field, + &StatisticsRequest::Min(LogColumn::new_unqualified(&column.name)), + ) + } + + fn max_values(&self, column: &datafusion_common::Column) -> Option { + let field = self.schema.field_with_name(&column.name).ok()?; + let field = Arc::new(field.clone()); + self.min_or_max( + &field, + &StatisticsRequest::Max(LogColumn::new_unqualified(&column.name)), + ) + } + + fn num_containers(&self) -> usize { + 1 + } + + fn null_counts(&self, column: &datafusion_common::Column) -> Option { + let v = self.lookup_scalar(&StatisticsRequest::NullCount( + LogColumn::new_unqualified(&column.name), + ))?; + let n = scalar_to_u64(&v)?; + ScalarValue::UInt64(Some(n)).to_array_of_size(1).ok() + } + + fn row_counts(&self) -> Option { + let v = self.lookup_scalar(&StatisticsRequest::RowCount)?; + let n = scalar_to_u64(&v)?; + ScalarValue::UInt64(Some(n)).to_array_of_size(1).ok() + } + + fn contained( + &self, + _column: &datafusion_common::Column, + _values: &HashSet, + ) -> Option { + // Bloom-filter-style membership isn't representable in the + // current StatisticsRequest set. A `Contained(column, values)` + // request kind could be added later if a provider can answer it. + None + } +} + +fn scalar_to_u64(v: &ScalarValue) -> Option { + match v { + ScalarValue::UInt64(Some(n)) => Some(*n), + ScalarValue::Int64(Some(n)) if *n >= 0 => Some(*n as u64), + ScalarValue::UInt32(Some(n)) => Some(*n as u64), + ScalarValue::Int32(Some(n)) if *n >= 0 => Some(*n as u64), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::Column as LogColumn; + use datafusion_physical_expr::expressions::lit; + use datafusion_physical_expr::expressions::{BinaryExpr, Column as PhysCol}; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use std::collections::HashMap; + use std::sync::Arc; + + fn make_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("x", DataType::Int64, false)])) + } + + fn make_file_with_sparse_stats(min: i64, max: i64) -> PartitionedFile { + let mut map = HashMap::new(); + let col = LogColumn::new_unqualified("x"); + map.insert( + StatisticsRequest::Min(col.clone()), + StatisticsValue::Scalar(Precision::Exact(ScalarValue::Int64(Some(min)))), + ); + map.insert( + StatisticsRequest::Max(col), + StatisticsValue::Scalar(Precision::Exact(ScalarValue::Int64(Some(max)))), + ); + PartitionedFile::new("dummy", 100).with_satisfied_stats(Arc::new(map)) + } + + fn metric_count() -> Count { + datafusion_physical_plan::metrics::MetricBuilder::new( + &datafusion_physical_plan::metrics::ExecutionPlanMetricsSet::new(), + ) + .global_counter("e") + } + + #[test] + fn file_pruner_uses_sparse_stats_when_dense_absent() { + let schema = make_schema(); + // file with x in [10, 20] + let file = make_file_with_sparse_stats(10, 20); + + // predicate: x > 100 — file can be pruned + let predicate: Arc = Arc::new(BinaryExpr::new( + Arc::new(PhysCol::new("x", 0)), + datafusion_expr::Operator::Gt, + lit(ScalarValue::Int64(Some(100))), + )); + + let mut pruner = FilePruner::try_new(predicate, &schema, &file, metric_count()) + .expect("should construct from satisfied_stats"); + + assert!( + pruner.should_prune().expect("prune ok"), + "x > 100 should prune a file whose x in [10, 20]" + ); + } + + #[test] + fn file_pruner_does_not_prune_when_predicate_overlaps() { + let schema = make_schema(); + let file = make_file_with_sparse_stats(10, 20); + + // predicate: x > 15 — overlap, should NOT prune + let predicate: Arc = Arc::new(BinaryExpr::new( + Arc::new(PhysCol::new("x", 0)), + datafusion_expr::Operator::Gt, + lit(ScalarValue::Int64(Some(15))), + )); + + let mut pruner = FilePruner::try_new(predicate, &schema, &file, metric_count()) + .expect("should construct"); + + assert!( + !pruner.should_prune().expect("prune ok"), + "x > 15 should NOT prune — overlap with [10, 20]" + ); + } + + #[test] + fn file_pruner_returns_none_without_any_stats() { + let schema = make_schema(); + let file = PartitionedFile::new("dummy", 100); // neither dense nor sparse + + let predicate: Arc = Arc::new(BinaryExpr::new( + Arc::new(PhysCol::new("x", 0)), + datafusion_expr::Operator::Gt, + lit(ScalarValue::Int64(Some(0))), + )); + + assert!( + FilePruner::try_new(predicate, &schema, &file, metric_count()).is_none(), + "no stats at all -> no pruner" + ); + } +} From 6514f89f700e75795f4f3201b539e8aead571993 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 3 May 2026 09:02:40 -0500 Subject: [PATCH 2/2] optimizer: derive StatisticsRequests from logical plan, thread to scan MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stacked on top of the API-only commit. Adds the missing piece: a small optimizer rule that walks the optimized logical plan and populates `TableScan.statistics_requests` based on the surrounding plan shape, plus a physical-planner hook that threads those into `ScanArgs::with_statistics_requests`. * `TableScan` gains `statistics_requests: Vec` (default empty) and a `with_statistics_requests` builder. * New `RequestStatistics` `OptimizerRule` (registered last in the default pipeline). Walks the plan once, derives: Sort → Min / Max / NullCount on each sort key Filter → Min / Max / NullCount / DistinctCount on referenced cols Join → DistinctCount / NullCount on join keys (both sides) always → RowCount per scan Stable, deterministic ordering. Idempotent. Never reshapes the plan — only annotates `TableScan` nodes. * `DefaultPhysicalPlanner` reads `scan.statistics_requests` and threads them into `ScanArgs::with_statistics_requests` when calling `provider.scan_with_args`. * `ScanArgs::statistics_requests` field switched from `Option<&[StatisticsRequest]>` to `&[StatisticsRequest]` (empty slice = no requests; collapses two ways of saying the same thing). * `request_statistics::tests` (3 unit tests) — confirm RowCount per scan, filter-column requests, join-key DistinctCount. * `user_defined::statistics_requests` (2 e2e tests) — register a `RecordingTable` provider, run SQL through the full pipeline, assert the requests that reached `scan_with_args` match what the plan shape implies. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../core/src/optimizer_rule_reference.md | 53 +- datafusion/core/src/physical_planner.rs | 4 +- datafusion/core/tests/user_defined/mod.rs | 4 + .../tests/user_defined/statistics_requests.rs | 161 +++++ datafusion/expr/src/logical_plan/plan.rs | 19 + datafusion/expr/src/logical_plan/tree_node.rs | 2 + datafusion/optimizer/src/lib.rs | 1 + .../optimizer/src/optimize_projections/mod.rs | 4 +- datafusion/optimizer/src/optimizer.rs | 6 + datafusion/optimizer/src/push_down_filter.rs | 1 + .../optimizer/src/request_statistics.rs | 589 ++++++++++++++++++ datafusion/proto/src/logical_plan/mod.rs | 1 + .../sqllogictest/test_files/explain.slt | 4 + 13 files changed, 821 insertions(+), 28 deletions(-) create mode 100644 datafusion/core/tests/user_defined/statistics_requests.rs create mode 100644 datafusion/optimizer/src/request_statistics.rs diff --git a/datafusion/core/src/optimizer_rule_reference.md b/datafusion/core/src/optimizer_rule_reference.md index fcbb200c71624..f2c3bf9b76212 100644 --- a/datafusion/core/src/optimizer_rule_reference.md +++ b/datafusion/core/src/optimizer_rule_reference.md @@ -35,32 +35,33 @@ Rule order matters. The default pipeline may change between releases. ### Logical Optimizer Rules -| order | rule | summary | -| ----- | ----------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | -| 1 | `rewrite_set_comparison` | Rewrites `ANY` and `ALL` set-comparison subqueries into `EXISTS`-based boolean expressions with correct SQL NULL semantics. | -| 2 | `optimize_unions` | Flattens nested unions and removes unions with a single input. | -| 3 | `simplify_expressions` | Constant-folds and simplifies expressions while preserving output names. | -| 4 | `replace_distinct_aggregate` | Rewrites `DISTINCT` and `DISTINCT ON` operators into aggregate-based plans that later rules can optimize further. | -| 5 | `eliminate_join` | Replaces keyless inner joins with a literal `false` filter by an empty relation. | -| 6 | `decorrelate_predicate_subquery` | Converts eligible `IN` and `EXISTS` predicate subqueries into semi or anti joins. | -| 7 | `scalar_subquery_to_join` | Rewrites eligible scalar subqueries into joins and adds schema-preserving projections. | -| 8 | `decorrelate_lateral_join` | Rewrites eligible lateral joins into regular joins. | -| 9 | `extract_equijoin_predicate` | Splits join filters into equijoin keys and residual predicates. | -| 10 | `eliminate_duplicated_expr` | Removes duplicate expressions from projections, aggregates, and similar operators. | -| 11 | `eliminate_filter` | Drops always-true filters and replaces always-false or NULL filters with empty relations. | -| 12 | `eliminate_cross_join` | Uses filter predicates to replace cross joins with inner joins when join keys can be found. | -| 13 | `eliminate_limit` | Removes no-op limits and simplifies trivial limit shapes. | -| 14 | `propagate_empty_relation` | Pushes empty-relation knowledge upward so operators fed by no rows collapse early. | -| 15 | `filter_null_join_keys` | Adds `IS NOT NULL` filters to nullable equijoin keys that can never match. | -| 16 | `eliminate_outer_join` | Rewrites outer joins to inner joins when later filters reject the NULL-extended rows. | -| 17 | `push_down_limit` | Moves literal limits closer to scans and unions and merges adjacent limits. | -| 18 | `push_down_filter` | Moves filters as early as possible through filter-commutative operators. | -| 19 | `single_distinct_aggregation_to_group_by` | Rewrites single-column `DISTINCT` aggregations into two-stage `GROUP BY` plans. | -| 20 | `eliminate_group_by_constant` | Removes constant or functionally redundant expressions from `GROUP BY`. | -| 21 | `common_sub_expression_eliminate` | Computes repeated subexpressions once and reuses the result. | -| 22 | `extract_leaf_expressions` | Pulls cheap leaf expressions closer to data sources so later pruning and filter rules can act earlier. | -| 23 | `push_down_leaf_projections` | Pushes the helper projections created by leaf extraction toward leaf inputs. | -| 24 | `optimize_projections` | Prunes unused columns and removes unnecessary logical projections. | +| order | rule | summary | +| ----- | ----------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------ | +| 1 | `rewrite_set_comparison` | Rewrites `ANY` and `ALL` set-comparison subqueries into `EXISTS`-based boolean expressions with correct SQL NULL semantics. | +| 2 | `optimize_unions` | Flattens nested unions and removes unions with a single input. | +| 3 | `simplify_expressions` | Constant-folds and simplifies expressions while preserving output names. | +| 4 | `replace_distinct_aggregate` | Rewrites `DISTINCT` and `DISTINCT ON` operators into aggregate-based plans that later rules can optimize further. | +| 5 | `eliminate_join` | Replaces keyless inner joins with a literal `false` filter by an empty relation. | +| 6 | `decorrelate_predicate_subquery` | Converts eligible `IN` and `EXISTS` predicate subqueries into semi or anti joins. | +| 7 | `scalar_subquery_to_join` | Rewrites eligible scalar subqueries into joins and adds schema-preserving projections. | +| 8 | `decorrelate_lateral_join` | Rewrites eligible lateral joins into regular joins. | +| 9 | `extract_equijoin_predicate` | Splits join filters into equijoin keys and residual predicates. | +| 10 | `eliminate_duplicated_expr` | Removes duplicate expressions from projections, aggregates, and similar operators. | +| 11 | `eliminate_filter` | Drops always-true filters and replaces always-false or NULL filters with empty relations. | +| 12 | `eliminate_cross_join` | Uses filter predicates to replace cross joins with inner joins when join keys can be found. | +| 13 | `eliminate_limit` | Removes no-op limits and simplifies trivial limit shapes. | +| 14 | `propagate_empty_relation` | Pushes empty-relation knowledge upward so operators fed by no rows collapse early. | +| 15 | `filter_null_join_keys` | Adds `IS NOT NULL` filters to nullable equijoin keys that can never match. | +| 16 | `eliminate_outer_join` | Rewrites outer joins to inner joins when later filters reject the NULL-extended rows. | +| 17 | `push_down_limit` | Moves literal limits closer to scans and unions and merges adjacent limits. | +| 18 | `push_down_filter` | Moves filters as early as possible through filter-commutative operators. | +| 19 | `single_distinct_aggregation_to_group_by` | Rewrites single-column `DISTINCT` aggregations into two-stage `GROUP BY` plans. | +| 20 | `eliminate_group_by_constant` | Removes constant or functionally redundant expressions from `GROUP BY`. | +| 21 | `common_sub_expression_eliminate` | Computes repeated subexpressions once and reuses the result. | +| 22 | `extract_leaf_expressions` | Pulls cheap leaf expressions closer to data sources so later pruning and filter rules can act earlier. | +| 23 | `push_down_leaf_projections` | Pushes the helper projections created by leaf extraction toward leaf inputs. | +| 24 | `optimize_projections` | Prunes unused columns and removes unnecessary logical projections. | +| 25 | `request_statistics` | Walks the optimized plan once and attaches per-`TableScan` `StatisticsRequest`s describing what stats downstream nodes would benefit from. | ### Physical Optimizer Rules diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 3b2c7a78e898e..6cd1ea319e57a 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -645,6 +645,7 @@ impl DefaultPhysicalPlanner { filters, fetch, projected_schema, + statistics_requests, .. } = scan; @@ -657,7 +658,8 @@ impl DefaultPhysicalPlanner { let opts = ScanArgs::default() .with_projection(projection.as_deref()) .with_filters(Some(&filters_vec)) - .with_limit(*fetch); + .with_limit(*fetch) + .with_statistics_requests(statistics_requests); let res = source.scan_with_args(session_state, opts).await?; Arc::clone(res.plan()) } else { diff --git a/datafusion/core/tests/user_defined/mod.rs b/datafusion/core/tests/user_defined/mod.rs index bc9949f5d681c..e9962dd7d8c2b 100644 --- a/datafusion/core/tests/user_defined/mod.rs +++ b/datafusion/core/tests/user_defined/mod.rs @@ -41,3 +41,7 @@ mod relation_planner; /// Tests for insert operations mod insert_operation; + +/// End-to-end tests for `StatisticsRequest`s flowing from the optimizer +/// rule through the physical planner into a custom `TableProvider`. +mod statistics_requests; diff --git a/datafusion/core/tests/user_defined/statistics_requests.rs b/datafusion/core/tests/user_defined/statistics_requests.rs new file mode 100644 index 0000000000000..0fdcdc570ea22 --- /dev/null +++ b/datafusion/core/tests/user_defined/statistics_requests.rs @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! End-to-end test that the optimizer-derived `StatisticsRequest`s +//! reach a custom `TableProvider`'s `scan_with_args`. + +use std::sync::{Arc, Mutex}; + +use arrow::array::{Int64Array, RecordBatch}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::catalog::{ScanArgs, ScanResult, Session, TableProvider}; +use datafusion::datasource::TableType; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::execution::context::SessionContext; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_common::Result; +use datafusion_expr_common::statistics::StatisticsRequest; + +/// A `TableProvider` that records the last `statistics_requests` it was +/// asked for, so the test can assert what reached it. +#[derive(Debug)] +struct RecordingTable { + schema: SchemaRef, + batch: RecordBatch, + last_requests: Arc>>, +} + +#[async_trait] +impl TableProvider for RecordingTable { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(MemorySourceConfig::try_new_exec( + &[vec![self.batch.clone()]], + self.schema.clone(), + projection.cloned(), + )?) + } + + async fn scan_with_args<'a>( + &self, + state: &dyn Session, + args: ScanArgs<'a>, + ) -> Result { + // Record what reached us, then delegate to scan(). + *self.last_requests.lock().unwrap() = args.statistics_requests().to_vec(); + let plan = self + .scan( + state, + args.projection().map(|p| p.to_vec()).as_ref(), + args.filters().unwrap_or(&[]), + args.limit(), + ) + .await?; + Ok(ScanResult::new(plan)) + } +} + +fn make_table() -> (Arc, Arc>>) { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int64, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1, 2, 3])), + Arc::new(Int64Array::from(vec![10, 20, 30])), + ], + ) + .unwrap(); + let last_requests = Arc::new(Mutex::new(Vec::new())); + let provider = Arc::new(RecordingTable { + schema, + batch, + last_requests: last_requests.clone(), + }); + (provider, last_requests) +} + +#[tokio::test] +async fn requests_reach_provider_scan_with_args() -> Result<()> { + let (provider, last_requests) = make_table(); + let ctx = SessionContext::new(); + ctx.register_table("t", provider)?; + + // Filter on `a` + sort on `b` should request Min/Max/NullCount on + // both, plus DistinctCount on `a` (filter), plus a RowCount. + let _ = ctx + .sql("SELECT a, b FROM t WHERE a > 0 ORDER BY b LIMIT 10") + .await? + .collect() + .await?; + + let got = last_requests.lock().unwrap().clone(); + assert!(!got.is_empty(), "expected non-empty requests, got {got:?}"); + + let has = |needle: &StatisticsRequest| got.iter().any(|r| r == needle); + use datafusion_common::Column; + use datafusion_expr_common::statistics::StatisticsRequest::*; + assert!(has(&RowCount), "expected RowCount, got {got:?}"); + assert!( + has(&Min(Column::new_unqualified("a"))), + "expected Min(a), got {got:?}" + ); + assert!( + has(&DistinctCount(Column::new_unqualified("a"))), + "expected DistinctCount(a), got {got:?}" + ); + assert!( + has(&Min(Column::new_unqualified("b"))), + "expected Min(b) from ORDER BY, got {got:?}" + ); + + Ok(()) +} + +#[tokio::test] +async fn no_requests_when_plan_has_no_filter_sort_or_join() -> Result<()> { + let (provider, last_requests) = make_table(); + let ctx = SessionContext::new(); + ctx.register_table("t", provider)?; + + // Plain `SELECT *` — only `RowCount` should be requested. + let _ = ctx.sql("SELECT a, b FROM t").await?.collect().await?; + + let got = last_requests.lock().unwrap().clone(); + use datafusion_expr_common::statistics::StatisticsRequest::*; + assert_eq!(got.len(), 1, "expected only RowCount, got {got:?}"); + assert!(matches!(got[0], RowCount)); + + Ok(()) +} diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index db8b82fe87a14..fdf0c6cb20614 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2769,6 +2769,12 @@ pub struct TableScan { pub filters: Vec, /// Optional number of rows to read pub fetch: Option, + /// Statistics the planner would like the provider to answer for this + /// scan (typically derived from the surrounding plan shape — e.g. + /// Min/Max for sort keys, DistinctCount for join keys). Threaded into + /// `ScanArgs::with_statistics_requests` at physical planning time. + /// Empty by default. + pub statistics_requests: Vec, } impl Debug for TableScan { @@ -2890,8 +2896,19 @@ impl TableScan { projected_schema, filters, fetch, + statistics_requests: Vec::new(), }) } + + /// Attach a list of statistics requests for the optimizer-aware + /// stats-collection path. See [`Self::statistics_requests`]. + pub fn with_statistics_requests( + mut self, + statistics_requests: Vec, + ) -> Self { + self.statistics_requests = statistics_requests; + self + } } // Repartition the plan based on a partitioning scheme. @@ -5128,6 +5145,7 @@ mod tests { projected_schema: Arc::clone(&schema), filters: vec![], fetch: None, + statistics_requests: Vec::new(), })); let col = schema.field_names()[0].clone(); @@ -5158,6 +5176,7 @@ mod tests { projected_schema: Arc::clone(&unique_schema), filters: vec![], fetch: None, + statistics_requests: Vec::new(), })); let col = schema.field_names()[0].clone(); diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index ef9382a57209a..34cb44f1f7b3e 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -615,6 +615,7 @@ impl LogicalPlan { projected_schema, filters, fetch, + statistics_requests, }) => filters.map_elements(f)?.update_data(|filters| { LogicalPlan::TableScan(TableScan { table_name, @@ -623,6 +624,7 @@ impl LogicalPlan { projected_schema, filters, fetch, + statistics_requests, }) }), LogicalPlan::Distinct(Distinct::On(DistinctOn { diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index e610091824092..66329b6d7f7b3 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -66,6 +66,7 @@ pub mod propagate_empty_relation; pub mod push_down_filter; pub mod push_down_limit; pub mod replace_distinct_aggregate; +pub mod request_statistics; pub mod rewrite_set_comparison; pub mod scalar_subquery_to_join; pub mod simplify_expressions; diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index af944abc6f0b4..3a38e709086d5 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -276,6 +276,7 @@ fn optimize_projections( filters, fetch, projected_schema: _, + statistics_requests, } = table_scan; // Get indices referred to in the original (schema with all fields) @@ -285,7 +286,8 @@ fn optimize_projections( None => indices.into_inner(), }; let new_scan = - TableScan::try_new(table_name, source, Some(projection), filters, fetch)?; + TableScan::try_new(table_name, source, Some(projection), filters, fetch)? + .with_statistics_requests(statistics_requests); return Transformed::yes(LogicalPlan::TableScan(new_scan)) .transform_data(|plan| optimize_subqueries(plan, config)); diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index d0fbb31414dab..56a5e2bf08b57 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -52,6 +52,7 @@ use crate::propagate_empty_relation::PropagateEmptyRelation; use crate::push_down_filter::PushDownFilter; use crate::push_down_limit::PushDownLimit; use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate; +use crate::request_statistics::RequestStatistics; use crate::rewrite_set_comparison::RewriteSetComparison; use crate::scalar_subquery_to_join::ScalarSubqueryToJoin; use crate::simplify_expressions::SimplifyExpressions; @@ -305,6 +306,11 @@ impl Optimizer { Arc::new(ExtractLeafExpressions::new()), Arc::new(PushDownLeafProjections::new()), Arc::new(OptimizeProjections::new()), + // Run last: annotates each `TableScan` with the stats the + // surrounding (now-stable) plan shape would benefit from. + // The physical planner threads these into + // `ScanArgs::with_statistics_requests`. + Arc::new(RequestStatistics::new()), ]; Self::with_rules(rules) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 9c69276fa1db3..f1abff6911321 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -3168,6 +3168,7 @@ mod tests { projection, source: Arc::new(test_provider), fetch: None, + statistics_requests: Vec::new(), }); Ok(LogicalPlanBuilder::from(table_scan)) diff --git a/datafusion/optimizer/src/request_statistics.rs b/datafusion/optimizer/src/request_statistics.rs new file mode 100644 index 0000000000000..150e10f790a7d --- /dev/null +++ b/datafusion/optimizer/src/request_statistics.rs @@ -0,0 +1,589 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`RequestStatistics`] walks the optimized logical plan once and +//! attaches a `Vec` to each `TableScan` describing +//! what stats the surrounding plan shape would benefit from. The +//! physical planner reads these and threads them into +//! `ScanArgs::with_statistics_requests`, where the `TableProvider` +//! decides what it can answer cheaply. +//! +//! This rule is meant to run **last** in the optimizer pipeline, after +//! every other rule has finished rewriting — that way the requests +//! reflect the plan shape the physical planner is actually going to +//! plan, not an intermediate one that a later rule reshaped. +//! +//! The rule itself never changes plan structure; it only annotates +//! `TableScan` nodes. Idempotent: running it twice yields the same +//! result. + +use std::collections::{BTreeSet, HashMap, HashSet}; + +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; +use datafusion_common::{Column, Result, TableReference}; +use datafusion_expr::LogicalPlan; +use datafusion_expr_common::statistics::StatisticsRequest; + +use crate::optimizer::ApplyOrder; +use crate::{OptimizerConfig, OptimizerRule}; + +/// Optimizer rule that annotates each `TableScan` with the statistics +/// the optimizer / physical planner / table provider could benefit +/// from, derived from the surrounding plan shape. +/// +/// Heuristics (one entry per relevant node): +/// - `Sort` → `Min` / `Max` / `NullCount` on each sort key. +/// - `Filter` → `Min` / `Max` / `NullCount` / `DistinctCount` on every +/// column referenced in the predicate. +/// - `Join` → `DistinctCount` / `NullCount` on join keys (both sides). +/// - Always → `RowCount` per scan. +/// +/// Columns are attributed back to a source `TableScan` by walking each +/// `TableScan`'s output schema; an unqualified column with a unique +/// name in the plan resolves to its source. Ambiguous names (same +/// column name in multiple TableScans) and renames through projections +/// are accepted as a known POC limitation — the worst case is "we +/// over-request" or "we miss a column", never incorrectness. +#[derive(Default, Debug)] +pub struct RequestStatistics; + +impl RequestStatistics { + #[expect(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for RequestStatistics { + fn name(&self) -> &str { + "request_statistics" + } + + fn apply_order(&self) -> Option { + // We need the whole plan to derive per-table requests, so we + // run our own walk inside `rewrite` instead of letting the + // framework descend. + None + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + let requests = derive_requests(&plan); + if requests.is_empty() { + return Ok(Transformed::no(plan)); + } + + let plan = plan.transform_down(|node| { + if let LogicalPlan::TableScan(scan) = node { + let new_requests = + requests.get(&scan.table_name).cloned().unwrap_or_default(); + if new_requests == scan.statistics_requests { + return Ok(Transformed::no(LogicalPlan::TableScan(scan))); + } + let mut scan = scan; + scan.statistics_requests = new_requests; + Ok(Transformed::yes(LogicalPlan::TableScan(scan))) + } else { + Ok(Transformed::no(node)) + } + })?; + Ok(plan) + } +} + +/// Walk the plan and build a `(TableReference -> Vec)` +/// map. The result is sorted/de-duplicated for stability. +fn derive_requests( + plan: &LogicalPlan, +) -> HashMap> { + // Per-table accumulators. We use a BTreeSet so requests come out in + // a stable, deterministic order regardless of plan-walk order. + let mut acc: HashMap> = HashMap::new(); + + // Fallback name->table map for the rare case where an expression + // surfaces an unqualified `Column { relation: None, .. }` after + // analysis. By the time this rule runs (last in the pipeline), the + // analyzer has qualified almost every column reference; if a name + // were truly ambiguous the planner would have already errored. So + // this map is purely defensive. + // + // Ambiguity is tracked explicitly: if two distinct scans project + // the same column name (e.g. both sides of a join projecting `id`), + // we mark the entry `None` and `resolve()` will refuse to attribute + // unqualified refs to either side — losing some stats coverage is + // fine (these requests are advisory) but misattribution is not. + // + // Note that for join keys we use *per-side* origin maps below, so + // the global ambiguity guard isn't a coverage hazard there. + let origin = collect_origin(plan); + + // RowCount request per scan. + let _ = plan.apply(|node| { + if let LogicalPlan::TableScan(scan) = node { + acc.entry(scan.table_name.clone()) + .or_default() + .insert(RequestKey::RowCount); + } + Ok::<_, datafusion_common::DataFusionError>(TreeNodeRecursion::Continue) + }); + + // Pass 2: walk every interesting node and add per-column requests. + let _ = plan.apply(|node| { + match node { + LogicalPlan::Sort(sort) => { + let mut cols = HashSet::new(); + for s in &sort.expr { + s.expr.add_column_refs(&mut cols); + } + for c in cols { + if let Some(t) = resolve(c, &origin) { + let entry = acc.entry(t).or_default(); + entry.insert(RequestKey::Min(c.name.clone())); + entry.insert(RequestKey::Max(c.name.clone())); + entry.insert(RequestKey::NullCount(c.name.clone())); + } + } + } + LogicalPlan::Filter(filter) => { + let mut cols = HashSet::new(); + filter.predicate.add_column_refs(&mut cols); + for c in cols { + if let Some(t) = resolve(c, &origin) { + let entry = acc.entry(t).or_default(); + entry.insert(RequestKey::Min(c.name.clone())); + entry.insert(RequestKey::Max(c.name.clone())); + entry.insert(RequestKey::NullCount(c.name.clone())); + entry.insert(RequestKey::DistinctCount(c.name.clone())); + } + } + } + LogicalPlan::Join(join) => { + // Each join-key pair `(l_expr, r_expr)` is structurally + // tied to a side: `l_expr`'s columns are provided by the + // left subtree, `r_expr`'s by the right. We use per-side + // origin maps so we ask each table only about the + // columns it itself projects — never asking the right + // side for a left-side key, or vice-versa, even when an + // unqualified `Column` survives analysis. + let left_origin = collect_origin(&join.left); + let right_origin = collect_origin(&join.right); + + let mut add_join_key = |c: &Column, + side_origin: &HashMap< + String, + Option, + >| { + if let Some(t) = resolve(c, side_origin) { + let entry = acc.entry(t).or_default(); + entry.insert(RequestKey::DistinctCount(c.name.clone())); + entry.insert(RequestKey::NullCount(c.name.clone())); + } + }; + + for (l_expr, r_expr) in &join.on { + let mut l_cols = HashSet::new(); + l_expr.add_column_refs(&mut l_cols); + for c in l_cols { + add_join_key(c, &left_origin); + } + let mut r_cols = HashSet::new(); + r_expr.add_column_refs(&mut r_cols); + for c in r_cols { + add_join_key(c, &right_origin); + } + } + + // `join.filter` spans both sides; route each column by + // which subtree's schema actually contains it. Anything + // that doesn't belong to a single side (e.g. a literal + // or a column from neither schema) is skipped. + if let Some(f) = &join.filter { + let mut cols = HashSet::new(); + f.add_column_refs(&mut cols); + for c in cols { + let in_left = join.left.schema().has_column(c); + let in_right = join.right.schema().has_column(c); + let side_origin = match (in_left, in_right) { + (true, false) => &left_origin, + (false, true) => &right_origin, + _ => continue, + }; + add_join_key(c, side_origin); + } + } + } + _ => {} + } + Ok::<_, datafusion_common::DataFusionError>(TreeNodeRecursion::Continue) + }); + + // Materialize: convert each table's BTreeSet of `RequestKey`s into + // a `Vec`. + acc.into_iter() + .map(|(t, keys)| { + let v = keys + .into_iter() + .map(|k| k.into_request()) + .collect::>(); + (t, v) + }) + .collect() +} + +/// Walk `plan` and return a `name -> Option` map +/// covering every column projected by a `TableScan` reachable from +/// `plan`. `Some(t)` means the name unambiguously belongs to scan `t` +/// within this subtree; `None` means at least two distinct scans in +/// the subtree project the same name and the resolver should refuse +/// to attribute unqualified refs. +/// +/// Called once for the whole plan (defensive global fallback) and +/// once per side of every Join (so join keys are scoped to the side +/// that actually provides them). +fn collect_origin(plan: &LogicalPlan) -> HashMap> { + use std::collections::hash_map::Entry; + let mut origin: HashMap> = HashMap::new(); + let _ = plan.apply(|node| { + if let LogicalPlan::TableScan(scan) = node { + for f in scan.projected_schema.fields() { + match origin.entry(f.name().clone()) { + Entry::Vacant(v) => { + v.insert(Some(scan.table_name.clone())); + } + Entry::Occupied(mut o) => { + if o.get().as_ref() != Some(&scan.table_name) { + // Different scan in this subtree claims the + // same name -> ambiguous. + o.insert(None); + } + } + } + } + } + Ok::<_, datafusion_common::DataFusionError>(TreeNodeRecursion::Continue) + }); + origin +} + +/// Internal de-dup key. We can't `Hash + Ord` `StatisticsRequest` +/// directly (it carries a `Column` whose `Ord` isn't necessarily +/// stable across `relation` shapes), so we project to a small enum +/// keyed only by column-name and kind. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +enum RequestKey { + RowCount, + Min(String), + Max(String), + NullCount(String), + DistinctCount(String), +} + +impl RequestKey { + fn into_request(self) -> StatisticsRequest { + match self { + RequestKey::RowCount => StatisticsRequest::RowCount, + RequestKey::Min(c) => StatisticsRequest::Min(Column::new_unqualified(c)), + RequestKey::Max(c) => StatisticsRequest::Max(Column::new_unqualified(c)), + RequestKey::NullCount(c) => { + StatisticsRequest::NullCount(Column::new_unqualified(c)) + } + RequestKey::DistinctCount(c) => { + StatisticsRequest::DistinctCount(Column::new_unqualified(c)) + } + } + } +} + +fn resolve( + c: &Column, + origin: &HashMap>, +) -> Option { + // Prefer the fully-qualified ref the planner attached. Fall back to + // the origin map only for unqualified columns, and only when the + // name unambiguously points at a single scan. + c.relation + .clone() + .or_else(|| origin.get(&c.name).cloned().flatten()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::OptimizerContext; + use crate::test::test_table_scan_with_name; + use datafusion_expr::logical_plan::builder::LogicalPlanBuilder; + use datafusion_expr::{col, lit}; + + fn rule_apply(plan: LogicalPlan) -> LogicalPlan { + let cfg = OptimizerContext::new(); + RequestStatistics::new() + .rewrite(plan, &cfg) + .expect("rule succeeded") + .data + } + + /// Pick the requests off the (single) `TableScan` in `plan`. + fn requests_for(plan: &LogicalPlan) -> Vec { + let mut out = Vec::new(); + let _ = plan.apply(|n| { + if let LogicalPlan::TableScan(s) = n { + out = s.statistics_requests.clone(); + } + Ok::<_, datafusion_common::DataFusionError>(TreeNodeRecursion::Continue) + }); + out + } + + fn has_min(rs: &[StatisticsRequest], col: &str) -> bool { + rs.iter() + .any(|r| matches!(r, StatisticsRequest::Min(c) if c.name == col)) + } + fn has_distinct(rs: &[StatisticsRequest], col: &str) -> bool { + rs.iter() + .any(|r| matches!(r, StatisticsRequest::DistinctCount(c) if c.name == col)) + } + fn has_row_count(rs: &[StatisticsRequest]) -> bool { + rs.iter().any(|r| matches!(r, StatisticsRequest::RowCount)) + } + + #[test] + fn always_requests_row_count_per_scan() { + let scan = test_table_scan_with_name("t").unwrap(); + let plan = LogicalPlanBuilder::from(scan).build().unwrap(); + let rs = requests_for(&rule_apply(plan)); + assert!(has_row_count(&rs), "expected RowCount, got {rs:?}"); + } + + #[test] + fn filter_columns_request_min_max_null_distinct() { + let scan = test_table_scan_with_name("t").unwrap(); + let plan = LogicalPlanBuilder::from(scan) + .filter(col("a").gt(lit(5_i32))) + .unwrap() + .build() + .unwrap(); + let rs = requests_for(&rule_apply(plan)); + assert!(has_min(&rs, "a"), "expected Min(a), got {rs:?}"); + assert!( + has_distinct(&rs, "a"), + "expected DistinctCount(a), got {rs:?}" + ); + } + + #[test] + fn join_keys_request_distinct_count() { + let l = test_table_scan_with_name("l").unwrap(); + let r = test_table_scan_with_name("r").unwrap(); + let plan = LogicalPlanBuilder::from(l) + .join_on( + r, + datafusion_expr::JoinType::Inner, + vec![col("l.a").eq(col("r.a"))], + ) + .unwrap() + .build() + .unwrap(); + // Walk both TableScans. + let mut all = Vec::new(); + let _ = plan.apply(|n| { + if let LogicalPlan::TableScan(s) = n { + all.push((s.table_name.to_string(), s.statistics_requests.clone())); + } + Ok::<_, datafusion_common::DataFusionError>(TreeNodeRecursion::Continue) + }); + let plan = rule_apply(plan); + let mut by_name: HashMap> = HashMap::new(); + let _ = plan.apply(|n| { + if let LogicalPlan::TableScan(s) = n { + by_name.insert(s.table_name.to_string(), s.statistics_requests.clone()); + } + Ok::<_, datafusion_common::DataFusionError>(TreeNodeRecursion::Continue) + }); + for (name, rs) in &by_name { + assert!( + has_distinct(rs, "a"), + "{name}: expected DistinctCount(a), got {rs:?}" + ); + } + } + + /// Symmetric join keys with the same column name on both sides + /// (`t1.a = t2.a`). Each table should be asked exactly once for + /// `DistinctCount(a)` / `NullCount(a)`, attributed to itself — not + /// to the other side, and not duplicated. + #[test] + fn join_on_same_name_routes_per_side_without_dupes() { + let l = test_table_scan_with_name("t1").unwrap(); + let r = test_table_scan_with_name("t2").unwrap(); + let plan = LogicalPlanBuilder::from(l) + .join_on( + r, + datafusion_expr::JoinType::Inner, + vec![col("t1.a").eq(col("t2.a"))], + ) + .unwrap() + .build() + .unwrap(); + let plan = rule_apply(plan); + + let mut by_name: HashMap> = HashMap::new(); + let _ = plan.apply(|n| { + if let LogicalPlan::TableScan(s) = n { + by_name.insert(s.table_name.to_string(), s.statistics_requests.clone()); + } + Ok::<_, datafusion_common::DataFusionError>(TreeNodeRecursion::Continue) + }); + + // Both scans must be present. + assert!(by_name.contains_key("t1")); + assert!(by_name.contains_key("t2")); + + // Each side gets DistinctCount(a) + NullCount(a) attributed to + // itself. The Column carried on the request itself is + // unqualified by design (the request is already keyed under the + // owning TableScan), so we count by (kind, name). + for (name, rs) in &by_name { + let distinct_a = rs + .iter() + .filter( + |r| matches!(r, StatisticsRequest::DistinctCount(c) if c.name == "a"), + ) + .count(); + let null_a = rs + .iter() + .filter(|r| matches!(r, StatisticsRequest::NullCount(c) if c.name == "a")) + .count(); + assert_eq!(distinct_a, 1, "{name}: DistinctCount(a) count, got {rs:?}"); + assert_eq!(null_a, 1, "{name}: NullCount(a) count, got {rs:?}"); + + // Sanity: we should not be asking the same scan about the + // other side's columns. With identical schemas (`a`, `b`, + // `c`) on both scans this is the strict check that proves + // join keys did not bleed across sides — the only `a`-keyed + // request must be from this side's own key. + let total_a = rs + .iter() + .filter(|r| match r { + StatisticsRequest::DistinctCount(c) + | StatisticsRequest::NullCount(c) + | StatisticsRequest::Min(c) + | StatisticsRequest::Max(c) + | StatisticsRequest::Sum(c) + | StatisticsRequest::ByteSize(c) => c.name == "a", + _ => false, + }) + .count(); + assert_eq!( + total_a, 2, + "{name}: expected exactly DistinctCount(a)+NullCount(a), got {rs:?}" + ); + + // No requests on `b` or `c` — there's no other plan node + // referencing them, so the only way they'd appear is a + // bug in routing. + for col in ["b", "c"] { + let any = rs.iter().any(|r| match r { + StatisticsRequest::DistinctCount(c) + | StatisticsRequest::NullCount(c) + | StatisticsRequest::Min(c) + | StatisticsRequest::Max(c) + | StatisticsRequest::Sum(c) + | StatisticsRequest::ByteSize(c) => c.name == col, + _ => false, + }); + assert!(!any, "{name}: unexpected request on `{col}`, got {rs:?}"); + } + } + } + + /// Asymmetric join keys: `l.a = r.b`. Left table should be asked + /// about `a`, right table about `b` — neither side gets the other + /// side's column. + #[test] + fn asymmetric_join_keys_attribute_per_side() { + let l = test_table_scan_with_name("l").unwrap(); + let r = test_table_scan_with_name("r").unwrap(); + let plan = LogicalPlanBuilder::from(l) + .join_on( + r, + datafusion_expr::JoinType::Inner, + vec![col("l.a").eq(col("r.b"))], + ) + .unwrap() + .build() + .unwrap(); + let plan = rule_apply(plan); + let mut by_name: HashMap> = HashMap::new(); + let _ = plan.apply(|n| { + if let LogicalPlan::TableScan(s) = n { + by_name.insert(s.table_name.to_string(), s.statistics_requests.clone()); + } + Ok::<_, datafusion_common::DataFusionError>(TreeNodeRecursion::Continue) + }); + let l_rs = &by_name["l"]; + let r_rs = &by_name["r"]; + assert!( + has_distinct(l_rs, "a"), + "l: expected DistinctCount(a), got {l_rs:?}" + ); + assert!( + !has_distinct(l_rs, "b"), + "l: must NOT have DistinctCount(b), got {l_rs:?}" + ); + assert!( + has_distinct(r_rs, "b"), + "r: expected DistinctCount(b), got {r_rs:?}" + ); + assert!( + !has_distinct(r_rs, "a"), + "r: must NOT have DistinctCount(a), got {r_rs:?}" + ); + } + + /// Direct test of the origin map's ambiguity guard: when two scans + /// project the same column name, an unqualified column reference + /// must NOT resolve to either side. (In real plans the planner + /// qualifies refs and this fallback rarely fires, but we still want + /// it to fail closed.) + #[test] + fn ambiguous_unqualified_column_does_not_misattribute() { + let mut origin: HashMap> = HashMap::new(); + origin.insert("id".to_string(), Some(TableReference::bare("users"))); + // Second scan projects same name -> mark ambiguous. + let entry = origin.get_mut("id").unwrap(); + if entry.as_ref() != Some(&TableReference::bare("orders")) { + *entry = None; + } + let unqualified = Column::new_unqualified("id"); + assert_eq!(resolve(&unqualified, &origin), None); + + // A qualified ref still resolves directly via Column.relation, + // bypassing the ambiguous origin entry. + let qualified = Column::new(Some(TableReference::bare("users")), "id"); + assert_eq!( + resolve(&qualified, &origin), + Some(TableReference::bare("users")) + ); + } +} diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 7ae5cbeed3e53..3243f88026d26 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -378,6 +378,7 @@ fn from_table_source( projected_schema, filters: vec![], fetch: None, + statistics_requests: Vec::new(), }); LogicalPlanNode::try_from_logical_plan(&r, extension_codec) diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 2e8a65385541e..7f98769591317 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -200,6 +200,7 @@ logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after extract_leaf_expressions SAME TEXT AS ABOVE logical_plan after push_down_leaf_projections SAME TEXT AS ABOVE logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c] +logical_plan after request_statistics SAME TEXT AS ABOVE logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE logical_plan after optimize_unions SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE @@ -224,6 +225,7 @@ logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after extract_leaf_expressions SAME TEXT AS ABOVE logical_plan after push_down_leaf_projections SAME TEXT AS ABOVE logical_plan after optimize_projections SAME TEXT AS ABOVE +logical_plan after request_statistics SAME TEXT AS ABOVE logical_plan TableScan: simple_explain_test projection=[a, b, c] initial_physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true initial_physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] @@ -575,6 +577,7 @@ logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after extract_leaf_expressions SAME TEXT AS ABOVE logical_plan after push_down_leaf_projections SAME TEXT AS ABOVE logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c] +logical_plan after request_statistics SAME TEXT AS ABOVE logical_plan after rewrite_set_comparison SAME TEXT AS ABOVE logical_plan after optimize_unions SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE @@ -599,6 +602,7 @@ logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after extract_leaf_expressions SAME TEXT AS ABOVE logical_plan after push_down_leaf_projections SAME TEXT AS ABOVE logical_plan after optimize_projections SAME TEXT AS ABOVE +logical_plan after request_statistics SAME TEXT AS ABOVE logical_plan TableScan: simple_explain_test projection=[a, b, c] initial_physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true initial_physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]