Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
216d62a
Add a default FileStatisticsCache implementation for the ListingTable
mkleen Jan 18, 2026
81f868a
fixup! Add a default FileStatisticsCache implementation for the Listi…
mkleen Jan 28, 2026
a01c299
Adapt memory usage when removing entries
mkleen Feb 4, 2026
9907189
Adapt heapsize for &str
mkleen Feb 4, 2026
28cb372
Fix formatting
mkleen Feb 4, 2026
251e48a
Adapt heapsize for &str and add another scalarvalue
mkleen Feb 4, 2026
255896c
Add better error message
mkleen Feb 10, 2026
e073409
Add todo to add heapsize for ordering in CachedFileMetadata
mkleen Feb 10, 2026
a91effe
Fix comment/docs on DefaultFileStatisticsCache
mkleen Feb 10, 2026
c65b8fe
Simplify test data generation
mkleen Feb 10, 2026
d025b27
Remove potential stale entry, if entry is too large
mkleen Feb 10, 2026
ad8bc83
Fix typo in sql logic test comment
mkleen Feb 10, 2026
5773e88
Fix comment about default behaviour in cache manager
mkleen Feb 10, 2026
20f5998
Fix variable name in test
mkleen Feb 10, 2026
a8b83d8
Fix variable name in test
mkleen Feb 10, 2026
c27c2b8
Disable cache for sql logic test
mkleen Feb 10, 2026
99a312b
Include key into memory estimation
mkleen Feb 11, 2026
7694254
Fix fmt
mkleen Feb 11, 2026
774dc5f
Fix clippy
mkleen Feb 11, 2026
cb6b4d8
minor
mkleen Feb 11, 2026
e371582
Add more key memory accounting
mkleen Feb 12, 2026
50714d0
Fix Formatting
mkleen Feb 12, 2026
6b21bd1
Account path as string and remove dependency to object_store
mkleen Feb 12, 2026
996de78
Improve error handling
mkleen Feb 12, 2026
3af0ce1
Fix fmt
mkleen Feb 12, 2026
3445710
Remove path.clone
mkleen Feb 12, 2026
c97adfa
Simplify accounting for statistics
mkleen Feb 12, 2026
7f6340e
Adapt offset buffer
mkleen Feb 12, 2026
0a5d3c8
Fix heap size for Arc
mkleen Feb 12, 2026
673276e
Adapt estimate in test
mkleen Feb 12, 2026
03591ae
Fix sql logic test
mkleen Feb 12, 2026
497371c
Register cache from cachemanager at listing table
mkleen Apr 8, 2026
af71c2b
Revert slt
mkleen Apr 8, 2026
71720ca
Add tablescoping for file stats cache
mkleen Feb 18, 2026
7e64ad7
Adapt slt
mkleen Apr 9, 2026
4902bd2
Fix linter
mkleen Apr 9, 2026
ac90305
Remove uneeded clone
mkleen Apr 9, 2026
494143f
Rename cache_unit to file_statistics_cache
mkleen Apr 9, 2026
77705ad
Simplify heap size accounting
mkleen Apr 9, 2026
9a95a4b
Adapt comments in test
mkleen Apr 10, 2026
1851d59
Seperate drop table clean-ups
mkleen Apr 10, 2026
e1d49a1
fixup! Seperate drop table clean-ups
mkleen Apr 10, 2026
f069267
Increase default limit to 10 mb
mkleen Apr 15, 2026
3fca43e
Increase default limit to 20 mb
mkleen Apr 15, 2026
06a00c2
Fix comment
mkleen Apr 15, 2026
be61ff6
Fix deregister logic
mkleen Apr 15, 2026
74a8696
Fix slt
mkleen Apr 15, 2026
36c1e22
Add table reference to FileStatisticsCacheEntry
mkleen Apr 15, 2026
b72a2fc
fixup! Add table reference to FileStatisticsCacheEntry
mkleen Apr 15, 2026
dc74700
Fix comment
mkleen Apr 15, 2026
59c7cfe
Fix runtime_env entry
mkleen Apr 19, 2026
cd42155
Add cache for all benchmark runs
mkleen Apr 21, 2026
d6a5815
Add cache to listing table creation
mkleen Apr 21, 2026
a0b4186
fixup! Add cache to listing table creation
mkleen Apr 21, 2026
47df32a
Adapt limit to 20M in configs.md
mkleen Apr 22, 2026
fc05625
fixup! Adapt limit to 20M in configs.md
mkleen Apr 22, 2026
3e589cd
Fix linter
mkleen Apr 22, 2026
4cbb857
Add cache to listing table in _read_type()
mkleen Apr 22, 2026
494ab5e
Add ListView and LargeListView to heapsize
mkleen Apr 22, 2026
1987c73
fixup! Add ListView and LargeListView to heapsize
mkleen Apr 22, 2026
e0af041
Remove array.slt
mkleen Apr 22, 2026
c1fbe3a
Add table ref to ListingTableUrl
mkleen Apr 23, 2026
041b457
Add heapsize for table-scoped-path
mkleen Apr 23, 2026
758ac8a
Make list_entries table-scoped
mkleen Apr 23, 2026
a8bfdb2
fixup! Make list_entries table-scoped
mkleen Apr 23, 2026
3a55cfb
fixup! fixup! Make list_entries table-scoped
mkleen Apr 23, 2026
c40fa27
Improve heap size estimation for Arc
mkleen Apr 26, 2026
a871d2a
fixup! Improve heap size estimation for Arc
mkleen Apr 27, 2026
8f95d97
Update migration guide
mkleen Apr 27, 2026
49caa3b
fixup! Update migration guide
mkleen Apr 27, 2026
f82ba41
Improve heapsize estimation for TableReference
mkleen Apr 29, 2026
81c91f8
Improve memory handling when inserting
mkleen Apr 29, 2026
9b09a91
Fix comments in Cache Manager
mkleen Apr 29, 2026
aeda623
Improve upgrade guide
mkleen Apr 29, 2026
e767d62
Fix upgrade guide
mkleen Apr 29, 2026
3df09dd
Return stale entries from cache
mkleen May 4, 2026
e408763
Fix upgrade guide
mkleen May 4, 2026
9c0632d
Fix Arc<str> heapsize test
mkleen May 5, 2026
48ddc28
Remove const i32 cast from heapsize estimation
mkleen May 5, 2026
8239e72
Fix heapsize estimation for Arc<T>
mkleen May 5, 2026
7f65cdc
Fix comment in cache_manager
mkleen May 5, 2026
d0db342
Fix linter + clippy
mkleen May 5, 2026
f76ae44
Adapt test acording to heapsize estimation changes
mkleen May 5, 2026
96845ec
Always add tableref to partioned files
mkleen May 8, 2026
87ad810
fixup! Always add tableref to partioned files
mkleen May 8, 2026
1f4539f
Add table to statistics_cache output
mkleen May 8, 2026
858e168
Adopt test to new output
mkleen May 8, 2026
e55503d
Adopt configuration with '0' value
mkleen May 8, 2026
abff629
Update configs.md
mkleen May 8, 2026
7c9c830
Merge branch 'main' into file-stats-cache
mkleen May 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ impl ExternalAggrConfig {
let config = ListingTableConfig::new(table_path).with_listing_options(options);
let config = config.infer_schema(&state).await?;

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,9 @@ impl RunOpt {
_ => unreachable!(),
};

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,9 @@ impl RunOpt {
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ impl RunOpt {
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/tpcds/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,9 @@ impl RunOpt {
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,9 @@ impl RunOpt {
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
7 changes: 6 additions & 1 deletion datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ impl TableFunctionImpl for StatisticsCacheFunc {

let schema = Arc::new(Schema::new(vec![
Field::new("path", DataType::Utf8, false),
Field::new("table", DataType::Utf8, false),
Field::new(
"file_modified",
DataType::Timestamp(TimeUnit::Millisecond, None),
Expand All @@ -649,6 +650,7 @@ impl TableFunctionImpl for StatisticsCacheFunc {

// construct record batch from metadata
let mut path_arr = vec![];
let mut table_arr = vec![];
let mut file_modified_arr = vec![];
let mut file_size_bytes_arr = vec![];
let mut e_tag_arr = vec![];
Expand All @@ -661,7 +663,9 @@ impl TableFunctionImpl for StatisticsCacheFunc {
if let Some(file_statistics_cache) = self.cache_manager.get_file_statistic_cache()
{
for (path, entry) in file_statistics_cache.list_entries() {
path_arr.push(path.to_string());
path_arr.push(path.path.to_string());
table_arr
.push(path.table.map_or_else(|| "".to_string(), |t| t.to_string()));
file_modified_arr
.push(Some(entry.object_meta.last_modified.timestamp_millis()));
file_size_bytes_arr.push(entry.object_meta.size);
Expand All @@ -678,6 +682,7 @@ impl TableFunctionImpl for StatisticsCacheFunc {
schema.clone(),
vec![
Arc::new(StringArray::from(path_arr)),
Arc::new(StringArray::from(table_arr)),
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
Arc::new(UInt64Array::from(file_size_bytes_arr)),
Arc::new(StringArray::from(e_tag_arr)),
Expand Down
74 changes: 9 additions & 65 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,7 @@ mod tests {
use super::*;
use datafusion::{
common::test_util::batches_to_string,
execution::cache::{
DefaultListFilesCache, cache_manager::CacheManagerConfig,
cache_unit::DefaultFileStatisticsCache,
},
execution::cache::{DefaultListFilesCache, cache_manager::CacheManagerConfig},
prelude::{ParquetReadOptions, col, lit, split_part},
};
use insta::assert_snapshot;
Expand Down Expand Up @@ -656,8 +653,6 @@ mod tests {
Ok(())
}

/// Shows that the statistics cache is not enabled by default yet
/// See https://github.com/apache/datafusion/issues/19217
#[tokio::test]
async fn test_statistics_cache_default() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();
Expand Down Expand Up @@ -687,68 +682,17 @@ mod tests {
.await?;
}

// When the cache manager creates a StatisticsCache by default,
// the contents will show up here
let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let sql = "SELECT split_part(path, '/', -1) as filename, table, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
++
++
");

Ok(())
}

// Can be removed when https://github.com/apache/datafusion/issues/19217 is resolved
#[tokio::test]
async fn test_statistics_cache_override() -> Result<(), DataFusionError> {
// Install a specific StatisticsCache implementation
let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default());
let cache_config = CacheManagerConfig::default()
.with_files_statistics_cache(Some(file_statistics_cache.clone()));
let runtime = RuntimeEnvBuilder::new()
.with_cache_manager(cache_config)
.build()?;
let config = SessionConfig::new().with_collect_statistics(true);
let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));

ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

for filename in [
"alltypes_plain",
"alltypes_tiny_pages",
"lz4_raw_compressed_larger",
] {
ctx.sql(
format!(
"create external table {filename}
stored as parquet
location '../parquet-testing/data/{filename}.parquet'",
)
.as_str(),
)
.await?
.collect()
.await?;
}

let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
+-----------------------------------+-----------------+--------------+-------------+------------------+
| filename | file_size_bytes | num_rows | num_columns | table_size_bytes |
+-----------------------------------+-----------------+--------------+-------------+------------------+
| alltypes_plain.parquet | 1851 | Exact(8) | 11 | Absent |
| alltypes_tiny_pages.parquet | 454233 | Exact(7300) | 13 | Absent |
| lz4_raw_compressed_larger.parquet | 380836 | Exact(10000) | 1 | Absent |
+-----------------------------------+-----------------+--------------+-------------+------------------+
+-----------------------------------+---------------------------+-----------------+--------------+-------------+------------------+
| filename | table | file_size_bytes | num_rows | num_columns | table_size_bytes |
+-----------------------------------+---------------------------+-----------------+--------------+-------------+------------------+
| alltypes_plain.parquet | alltypes_plain | 1851 | Exact(8) | 11 | Absent |
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the file statistics cache is now tablescoped, this should be reflected in the output.

| alltypes_tiny_pages.parquet | alltypes_tiny_pages | 454233 | Exact(7300) | 13 | Absent |
| lz4_raw_compressed_larger.parquet | lz4_raw_compressed_larger | 380836 | Exact(10000) | 1 | Absent |
+-----------------------------------+---------------------------+-----------------+--------------+-------------+------------------+
");

Ok(())
Expand Down
34 changes: 30 additions & 4 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ use std::mem;
use std::sync::Arc;

use datafusion_catalog::Session;
use datafusion_common::{HashMap, Result, ScalarValue, assert_or_internal_err};
use datafusion_datasource::ListingTableUrl;
use datafusion_common::{
HashMap, Result, ScalarValue, TableReference, assert_or_internal_err,
};
use datafusion_datasource::PartitionedFile;
use datafusion_datasource::{FileExtensions, ListingTableUrl};
use datafusion_expr::{BinaryExpr, Operator, lit, utils};

use arrow::{
Expand Down Expand Up @@ -382,6 +384,7 @@ fn try_into_partitioned_file(

let mut pf: PartitionedFile = object_meta.into();
pf.partition_values = partition_values;
pf.table_reference.clone_from(table_path.get_table_ref());

Ok(Some(pf))
}
Expand Down Expand Up @@ -416,8 +419,15 @@ pub async fn pruned_partition_list<'a>(
table_path
);

// if no partition col => simply list all the files
Ok(objects.map_ok(|object_meta| object_meta.into()).boxed())
// if no partition col => list all the files
Ok(objects
.try_filter_map(|object_meta| {
futures::future::ready(object_meta_to_partitioned_file(
object_meta,
table_path.get_table_ref(),
Copy link
Copy Markdown
Contributor Author

@mkleen mkleen May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table-reference needs to be always passed on the PartitionedFile because file statistics cache is now table-scoped and need the Table-reference for the caching for the key.

))
})
.boxed())
} else {
let df_schema = DFSchema::from_unqualified_fields(
partition_cols
Expand All @@ -442,6 +452,22 @@ pub async fn pruned_partition_list<'a>(
}
}

fn object_meta_to_partitioned_file(
object_meta: ObjectMeta,
table_ref: &Option<TableReference>,
) -> Result<Option<PartitionedFile>> {
Ok(Some(PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
statistics: None,
ordering: None,
extensions: FileExtensions::new(),
metadata_size_hint: None,
table_reference: table_ref.clone(),
}))
}

/// Extract the partition values for the given `file_path` (in the given `table_path`)
/// associated to the partitions defined by `table_partition_cols`
pub fn parse_partitions_for_path<'a, I>(
Expand Down
35 changes: 19 additions & 16 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use datafusion_datasource::{
};
use datafusion_execution::cache::TableScopedPath;
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
Expand Down Expand Up @@ -187,7 +186,7 @@ pub struct ListingTable {
/// The SQL definition for this table, if any
definition: Option<String>,
/// Cache for collected file statistics
collected_statistics: Arc<dyn FileStatisticsCache>,
collected_statistics: Option<Arc<dyn FileStatisticsCache>>,
/// Constraints applied to this table
constraints: Constraints,
/// Column default expressions for columns that are not physically present in the data files
Expand Down Expand Up @@ -231,7 +230,7 @@ impl ListingTable {
schema_source,
options,
definition: None,
collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
collected_statistics: None,
constraints: Constraints::default(),
column_defaults: HashMap::new(),
expr_adapter_factory: config.expr_adapter_factory,
Expand Down Expand Up @@ -260,10 +259,8 @@ impl ListingTable {
/// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
/// multiple times in the same session.
///
/// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
self.collected_statistics =
cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
self.collected_statistics = cache;
self
}

Expand Down Expand Up @@ -802,11 +799,15 @@ impl ListingTable {
) -> datafusion_common::Result<(Arc<Statistics>, Option<LexOrdering>)> {
use datafusion_execution::cache::cache_manager::CachedFileMetadata;

let path = &part_file.object_meta.location;
let path = TableScopedPath {
table: part_file.table_reference.clone(),
path: part_file.object_meta.location.clone(),
};
let meta = &part_file.object_meta;

// Check cache first - if we have valid cached statistics and ordering
if let Some(cached) = self.collected_statistics.get(path)
if let Some(cache) = &self.collected_statistics
&& let Some(cached) = cache.get(&path)
&& cached.is_valid_for(meta)
{
// Return cached statistics and ordering
Expand All @@ -823,14 +824,16 @@ impl ListingTable {
let statistics = Arc::new(file_meta.statistics);

// Store in cache
self.collected_statistics.put(
path,
CachedFileMetadata::new(
meta.clone(),
Arc::clone(&statistics),
file_meta.ordering.clone(),
),
);
if let Some(cache) = &self.collected_statistics {
cache.put(
&path,
CachedFileMetadata::new(
meta.clone(),
Arc::clone(&statistics),
file_meta.ordering.clone(),
),
);
}

Ok((statistics, file_meta.ordering))
}
Expand Down
Loading
Loading