Skip to content

Commit 7b04c0f

Browse files
adriangbclaude
andcommitted
catalog: query-aware statistics requests via ScanArgs / ScanResult
Adds an opt-in handshake that lets callers ask a `TableProvider` for specific stats by name and receive only what the provider can answer cheaply, instead of the all-or-nothing dense `Statistics` we have today. ## What's new * `datafusion-common::stats::StatisticsRequest` — enum of stat kinds that mirror `Statistics` / `ColumnStatistics` (Min, Max, NullCount, DistinctCount, Sum, ByteSize, RowCount, TotalByteSize). `Hash + Eq` so it can key a `HashMap`. * `datafusion-common::stats::StatisticsValue` — `Scalar(Precision<...>) | Distribution(Arc<dyn Any>) | Sketch(Arc<dyn Any>) | Absent`. Whether a value is exact or estimated travels in the `Precision` wrapper, not the variant. * `ScanArgs::with_statistics_requests` / `statistics_requests()` — the caller's question. * `ScanResult::with_statistics` / `statistics()` / `into_parts()` — the provider's answer, paired 1:1 with the requests slice. * `PartitionedFile::satisfied_stats` — sparse, `Arc<HashMap<StatisticsRequest, StatisticsValue>>` for per-file answers. Memory scales with what was asked, not with table width. Providers that store stats out-of-band (Delta/Iceberg/Hudi manifests, Hive Metastore, custom catalogs) can populate this directly without rebuilding a full dense `Statistics`. * `FilePruner` learns to consume the sparse map. Internally, `file_stats_pruning` is now `Box<dyn PruningStatistics + Send + Sync>` so we can dispatch between the existing `PrunableStatistics` (dense) and a new `SparseFilePruningStats` adapter (sparse). The sparse adapter looks up each `StatisticsRequest` directly in the map and materializes single-row arrays only for the columns the pruning predicate touches — no densify-then-throw-away. * `ListingTable::scan_with_args` populates `ScanResult.statistics` from the merged dense `Statistics` it already computed when `args.statistics_requests()` is set and `collect_statistics=true`. When `collect_statistics=false` it returns `Absent` for everything (the contract is "answer what's free"). `DistinctCount`/`Sum`/ `ByteSize` are likewise `Absent` for parquet — those aren't in thrift footers; layered helpers (or richer providers) can fill the gaps. ## Backwards compat All additions are opt-in: * `ScanArgs` / `ScanResult` gain new fields with `Default`-friendly initializers; existing callers that don't use the new builders see no change. * `FilePruner`'s field-type change is internal (private field). * The only minor source-level break is a new pub field on `PartitionedFile` (`satisfied_stats`). Callers using `PartitionedFile::new` / `From<ObjectMeta>` / the existing builders are unaffected. Direct struct literals — uncommon, none in-tree — need to add `satisfied_stats: None` (or use the new `with_satisfied_stats` builder). ## Tests * `datafusion-common::stats::tests::statistics_request_is_hashable_keyable` — round-trip a `StatisticsRequest` through a `HashMap`. * `datafusion-pruning::file_pruner::tests` — three tests demonstrating end-to-end pruning against a sparse-only `PartitionedFile` (`x > 100` prunes a `[10, 20]` file, `x > 15` doesn't, no stats at all → no pruner). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9a29e33 commit 7b04c0f

5 files changed

Lines changed: 626 additions & 8 deletions

File tree

datafusion/catalog-listing/src/table.rs

Lines changed: 218 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use async_trait::async_trait;
2323
use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
2424
use datafusion_common::stats::Precision;
2525
use datafusion_common::{
26-
Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema,
26+
Constraints, ScalarValue, SchemaExt, Statistics, internal_datafusion_err, plan_err,
27+
project_schema,
2728
};
2829
use datafusion_datasource::file::FileSource;
2930
use datafusion_datasource::file_groups::FileGroup;
@@ -39,6 +40,9 @@ use datafusion_execution::cache::cache_manager::FileStatisticsCache;
3940
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
4041
use datafusion_expr::dml::InsertOp;
4142
use datafusion_expr::execution_props::ExecutionProps;
43+
use datafusion_expr::statistics::{
44+
SatisfiedStatistics, StatisticsRequest, StatisticsValue,
45+
};
4246
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
4347
use datafusion_physical_expr::create_lex_ordering;
4448
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
@@ -515,6 +519,39 @@ impl TableProvider for ListingTable {
515519
.list_files_for_scan(state, &partition_filters, statistic_file_limit)
516520
.await?;
517521

522+
// If the caller asked for per-file stats and we already have dense
523+
// per-file `Statistics` (i.e. `collect_statistics=true` populated
524+
// them during file listing), project them into a sparse
525+
// `satisfied_stats` map keyed by request. This is "free" — the IO
526+
// / decode cost was already paid; we're just surfacing what we
527+
// have in the request-driven shape that downstream consumers
528+
// (e.g. `FilePruner` via `SparseFilePruningStats`) prefer.
529+
if !args.statistics_requests().is_empty() && self.options.collect_stat {
530+
partitioned_file_lists = partitioned_file_lists
531+
.into_iter()
532+
.map(|fg| {
533+
let files = fg
534+
.into_inner()
535+
.into_iter()
536+
.map(|pf| {
537+
let sparse = pf.statistics.as_ref().map(|stats| {
538+
project_satisfied_stats(
539+
args.statistics_requests(),
540+
&self.table_schema,
541+
stats,
542+
)
543+
});
544+
match sparse {
545+
Some(s) => pf.with_satisfied_stats(Arc::new(s)),
546+
None => pf,
547+
}
548+
})
549+
.collect();
550+
FileGroup::new(files)
551+
})
552+
.collect();
553+
}
554+
518555
// if no files need to be read, return an `EmptyExec`
519556
if partitioned_file_lists.is_empty() {
520557
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
@@ -688,6 +725,112 @@ impl TableProvider for ListingTable {
688725
}
689726
}
690727

728+
/// Project a per-file dense [`Statistics`] into the sparse map shape
729+
/// that downstream consumers (e.g. [`FilePruner`] via
730+
/// [`SparseFilePruningStats`]) read. Only entries the request list asked
731+
/// for are present; non-`Absent` slots are kept verbatim (preserves the
732+
/// `Precision` already on the field). The dense `Statistics` itself is
733+
/// untouched.
734+
///
735+
/// `DistinctCount` / `Sum` / `ByteSize` will only be present if the
736+
/// underlying file format actually populated them on
737+
/// [`ColumnStatistics`] — for parquet, only `null_count`, `min_value`,
738+
/// `max_value`, and `num_rows` are typically available from thrift
739+
/// footers; the rest stay `Absent` (so the entry is omitted).
740+
///
741+
/// [`FilePruner`]: datafusion_pruning::FilePruner
742+
/// [`SparseFilePruningStats`]: datafusion_pruning::SparseFilePruningStats
743+
/// [`ColumnStatistics`]: datafusion_common::ColumnStatistics
744+
fn project_satisfied_stats(
745+
requests: &[StatisticsRequest],
746+
schema: &SchemaRef,
747+
stats: &Statistics,
748+
) -> SatisfiedStatistics {
749+
let mut out = SatisfiedStatistics::new();
750+
for req in requests {
751+
let value = match req {
752+
StatisticsRequest::RowCount => precision_to_scalar(&stats.num_rows, |n| {
753+
ScalarValue::UInt64(Some(n as u64))
754+
}),
755+
StatisticsRequest::TotalByteSize => {
756+
precision_to_scalar(&stats.total_byte_size, |n| {
757+
ScalarValue::UInt64(Some(n as u64))
758+
})
759+
}
760+
StatisticsRequest::Min(c) => {
761+
column_scalar(schema, stats, &c.name, |cs| &cs.min_value)
762+
}
763+
StatisticsRequest::Max(c) => {
764+
column_scalar(schema, stats, &c.name, |cs| &cs.max_value)
765+
}
766+
StatisticsRequest::Sum(c) => {
767+
column_scalar(schema, stats, &c.name, |cs| &cs.sum_value)
768+
}
769+
StatisticsRequest::NullCount(c) => {
770+
column_usize(schema, stats, &c.name, |cs| &cs.null_count)
771+
}
772+
StatisticsRequest::DistinctCount(c) => {
773+
column_usize(schema, stats, &c.name, |cs| &cs.distinct_count)
774+
}
775+
StatisticsRequest::ByteSize(c) => {
776+
column_usize(schema, stats, &c.name, |cs| &cs.byte_size)
777+
}
778+
};
779+
// Skip `Absent` entries — the sparse map's invariant is "only
780+
// entries the provider answered". Consumers infer `Absent` from
781+
// a missing key.
782+
if !matches!(value, StatisticsValue::Absent) {
783+
out.insert(req.clone(), value);
784+
}
785+
}
786+
out
787+
}
788+
789+
fn precision_to_scalar(
790+
p: &Precision<usize>,
791+
map: impl Fn(usize) -> ScalarValue,
792+
) -> StatisticsValue {
793+
match p {
794+
Precision::Exact(n) => StatisticsValue::Scalar(Precision::Exact(map(*n))),
795+
Precision::Inexact(n) => StatisticsValue::Scalar(Precision::Inexact(map(*n))),
796+
Precision::Absent => StatisticsValue::Absent,
797+
}
798+
}
799+
800+
fn column_scalar(
801+
schema: &SchemaRef,
802+
stats: &Statistics,
803+
name: &str,
804+
pick: impl Fn(&datafusion_common::ColumnStatistics) -> &Precision<ScalarValue>,
805+
) -> StatisticsValue {
806+
let Ok(idx) = schema.index_of(name) else {
807+
return StatisticsValue::Absent;
808+
};
809+
let Some(cs) = stats.column_statistics.get(idx) else {
810+
return StatisticsValue::Absent;
811+
};
812+
match pick(cs) {
813+
Precision::Exact(v) => StatisticsValue::Scalar(Precision::Exact(v.clone())),
814+
Precision::Inexact(v) => StatisticsValue::Scalar(Precision::Inexact(v.clone())),
815+
Precision::Absent => StatisticsValue::Absent,
816+
}
817+
}
818+
819+
fn column_usize(
820+
schema: &SchemaRef,
821+
stats: &Statistics,
822+
name: &str,
823+
pick: impl Fn(&datafusion_common::ColumnStatistics) -> &Precision<usize>,
824+
) -> StatisticsValue {
825+
let Ok(idx) = schema.index_of(name) else {
826+
return StatisticsValue::Absent;
827+
};
828+
let Some(cs) = stats.column_statistics.get(idx) else {
829+
return StatisticsValue::Absent;
830+
};
831+
precision_to_scalar(pick(cs), |n| ScalarValue::UInt64(Some(n as u64)))
832+
}
833+
691834
impl ListingTable {
692835
/// Get the list of files for a scan as well as the file level statistics.
693836
/// The list is grouped to let the execution plan know how the files should
@@ -1049,4 +1192,78 @@ mod tests {
10491192
let result = derive_common_ordering_from_files(&file_groups);
10501193
assert_eq!(result, Some(ordering));
10511194
}
1195+
1196+
#[test]
1197+
fn project_satisfied_stats_picks_only_requested_and_present() {
1198+
use arrow::datatypes::{DataType, Field, Schema};
1199+
use datafusion_common::{Column, ColumnStatistics};
1200+
use datafusion_expr::statistics::{StatisticsRequest, StatisticsValue};
1201+
1202+
let schema: SchemaRef = Arc::new(Schema::new(vec![
1203+
Field::new("a", DataType::Int64, true),
1204+
Field::new("b", DataType::Int64, true),
1205+
]));
1206+
1207+
let mut stats = Statistics::new_unknown(&schema);
1208+
stats.num_rows = Precision::Exact(42);
1209+
// a: min/max known, null_count Absent
1210+
stats.column_statistics[0] = ColumnStatistics {
1211+
null_count: Precision::Absent,
1212+
max_value: Precision::Exact(ScalarValue::Int64(Some(10))),
1213+
min_value: Precision::Exact(ScalarValue::Int64(Some(1))),
1214+
sum_value: Precision::Absent,
1215+
distinct_count: Precision::Absent,
1216+
byte_size: Precision::Absent,
1217+
};
1218+
// b: only null_count known
1219+
stats.column_statistics[1] = ColumnStatistics {
1220+
null_count: Precision::Exact(3),
1221+
max_value: Precision::Absent,
1222+
min_value: Precision::Absent,
1223+
sum_value: Precision::Absent,
1224+
distinct_count: Precision::Absent,
1225+
byte_size: Precision::Absent,
1226+
};
1227+
1228+
let a = Column::from_name("a");
1229+
let b = Column::from_name("b");
1230+
let requests = vec![
1231+
StatisticsRequest::RowCount,
1232+
StatisticsRequest::Min(a.clone()),
1233+
StatisticsRequest::Max(a.clone()),
1234+
StatisticsRequest::NullCount(a.clone()), // Absent — should be omitted
1235+
StatisticsRequest::NullCount(b.clone()),
1236+
StatisticsRequest::Min(b.clone()), // Absent — should be omitted
1237+
];
1238+
1239+
let out = project_satisfied_stats(&requests, &schema, &stats);
1240+
1241+
assert_eq!(out.len(), 4);
1242+
assert!(matches!(
1243+
out.get(&StatisticsRequest::RowCount),
1244+
Some(StatisticsValue::Scalar(Precision::Exact(
1245+
ScalarValue::UInt64(Some(42))
1246+
)))
1247+
));
1248+
assert!(matches!(
1249+
out.get(&StatisticsRequest::Min(a.clone())),
1250+
Some(StatisticsValue::Scalar(Precision::Exact(
1251+
ScalarValue::Int64(Some(1))
1252+
)))
1253+
));
1254+
assert!(matches!(
1255+
out.get(&StatisticsRequest::Max(a.clone())),
1256+
Some(StatisticsValue::Scalar(Precision::Exact(
1257+
ScalarValue::Int64(Some(10))
1258+
)))
1259+
));
1260+
assert!(matches!(
1261+
out.get(&StatisticsRequest::NullCount(b.clone())),
1262+
Some(StatisticsValue::Scalar(Precision::Exact(
1263+
ScalarValue::UInt64(Some(3))
1264+
)))
1265+
));
1266+
assert!(!out.contains_key(&StatisticsRequest::NullCount(a)));
1267+
assert!(!out.contains_key(&StatisticsRequest::Min(b)));
1268+
}
10521269
}

datafusion/catalog/src/table.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use async_trait::async_trait;
2626
use datafusion_common::{Constraints, Statistics, not_impl_err};
2727
use datafusion_common::{Result, internal_err};
2828
use datafusion_expr::Expr;
29+
use datafusion_expr::statistics::StatisticsRequest;
2930

3031
use datafusion_expr::dml::InsertOp;
3132
use datafusion_expr::{
@@ -406,6 +407,7 @@ pub struct ScanArgs<'a> {
406407
filters: Option<&'a [Expr]>,
407408
projection: Option<&'a [usize]>,
408409
limit: Option<usize>,
410+
statistics_requests: &'a [StatisticsRequest],
409411
}
410412

411413
impl<'a> ScanArgs<'a> {
@@ -467,6 +469,31 @@ impl<'a> ScanArgs<'a> {
467469
pub fn limit(&self) -> Option<usize> {
468470
self.limit
469471
}
472+
473+
/// Set a list of statistics the caller would like the provider to
474+
/// answer if it can do so cheaply.
475+
///
476+
/// Typical sources a provider may answer from:
477+
/// * Parquet thrift footers (Min/Max/NullCount/RowCount, exact)
478+
/// * An external metadata catalog (Delta/Iceberg/Hudi manifests,
479+
/// Hive-Metastore-style stats columns)
480+
/// * Cached / materialized stats columns
481+
///
482+
/// The provider returns its answers on
483+
/// [`ScanResult::statistics`] paired 1:1 with `requests`. Anything
484+
/// not answerable should come back as [`StatisticsValue::Absent`] —
485+
/// the caller decides what to do with the gaps. The contract is
486+
/// "answer what's free, leave the rest as `Absent`": providers MUST
487+
/// NOT do expensive scans purely to satisfy these requests.
488+
pub fn with_statistics_requests(mut self, requests: &'a [StatisticsRequest]) -> Self {
489+
self.statistics_requests = requests;
490+
self
491+
}
492+
493+
/// Get the statistics requests. Empty if none were set.
494+
pub fn statistics_requests(&self) -> &'a [StatisticsRequest] {
495+
self.statistics_requests
496+
}
470497
}
471498

472499
/// Result of a table scan operation from [`TableProvider::scan_with_args`].

datafusion/datasource/src/mod.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use chrono::TimeZone;
5858
use datafusion_common::stats::Precision;
5959
use datafusion_common::{ColumnStatistics, Result, exec_datafusion_err};
6060
use datafusion_common::{ScalarValue, Statistics};
61+
use datafusion_expr::statistics::SatisfiedStatistics;
6162
use datafusion_physical_expr::LexOrdering;
6263
use futures::{Stream, StreamExt};
6364
use object_store::{GetOptions, GetRange, ObjectStore};
@@ -138,6 +139,18 @@ pub struct PartitionedFile {
138139
/// When set via [`Self::with_statistics`], partition column statistics are automatically
139140
/// computed from [`Self::partition_values`] with exact min/max/null_count/distinct_count.
140141
pub statistics: Option<Arc<Statistics>>,
142+
/// Sparse, request-keyed stats answered by the provider for this file.
143+
///
144+
/// Only entries for non-`Absent` answers are present, so memory scales
145+
/// with the *count of stats actually requested* rather than the table's
146+
/// column count. Used in tandem with — not in place of —
147+
/// [`Self::statistics`]: existing consumers that read the dense
148+
/// `Statistics` keep working; new consumers (e.g. `FilePruner` in
149+
/// `datafusion-pruning`) prefer this sparse map when it's
150+
/// populated. Providers that store stats out-of-band (Delta/Iceberg/Hudi
151+
/// manifests, Hive Metastore, custom catalogs) can populate this
152+
/// directly without rebuilding a full dense `Statistics`.
153+
pub satisfied_stats: Option<Arc<SatisfiedStatistics>>,
141154
/// The known lexicographical ordering of the rows in this file, if any.
142155
///
143156
/// This describes how the data within the file is sorted with respect to one or more
@@ -168,6 +181,7 @@ impl PartitionedFile {
168181
partition_values: vec![],
169182
range: None,
170183
statistics: None,
184+
satisfied_stats: None,
171185
ordering: None,
172186
extensions: None,
173187
metadata_size_hint: None,
@@ -181,6 +195,7 @@ impl PartitionedFile {
181195
partition_values: vec![],
182196
range: None,
183197
statistics: None,
198+
satisfied_stats: None,
184199
ordering: None,
185200
extensions: None,
186201
metadata_size_hint: None,
@@ -200,6 +215,7 @@ impl PartitionedFile {
200215
partition_values: vec![],
201216
range: Some(FileRange { start, end }),
202217
statistics: None,
218+
satisfied_stats: None,
203219
ordering: None,
204220
extensions: None,
205221
metadata_size_hint: None,
@@ -328,6 +344,21 @@ impl PartitionedFile {
328344
self.ordering = ordering;
329345
self
330346
}
347+
348+
/// Attach a sparse map of provider-answered statistics for this file.
349+
///
350+
/// Used by table providers that store per-file stats out-of-band
351+
/// (Delta/Iceberg/Hudi manifests, Hive Metastore, custom catalogs)
352+
/// and want to surface them without reconstructing a full dense
353+
/// [`Statistics`]. Consumers (e.g. `FilePruner`) prefer this map
354+
/// when [`Self::statistics`] is not set.
355+
pub fn with_satisfied_stats(
356+
mut self,
357+
satisfied_stats: Arc<SatisfiedStatistics>,
358+
) -> Self {
359+
self.satisfied_stats = Some(satisfied_stats);
360+
self
361+
}
331362
}
332363

333364
impl From<ObjectMeta> for PartitionedFile {
@@ -337,6 +368,7 @@ impl From<ObjectMeta> for PartitionedFile {
337368
partition_values: vec![],
338369
range: None,
339370
statistics: None,
371+
satisfied_stats: None,
340372
ordering: None,
341373
extensions: None,
342374
metadata_size_hint: None,
@@ -534,6 +566,7 @@ pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGro
534566
byte_size: Precision::Absent,
535567
}],
536568
})),
569+
satisfied_stats: None,
537570
ordering: None,
538571
extensions: None,
539572
metadata_size_hint: None,

0 commit comments

Comments
 (0)