diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 5d0b1da712..f9658fffc5 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -601,7 +601,7 @@ mod tests { use super::*; use crate::arrow::delete_filter::tests::setup; - use crate::scan::FileScanTaskDeleteFile; + use crate::scan::{BaseFileScanTask, FileScanTaskDeleteFile}; use crate::spec::{DataContentType, Schema}; #[tokio::test] @@ -927,19 +927,21 @@ mod tests { }; let file_scan_task = FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{}/data-1.parquet", table_location.to_str().unwrap()), - data_file_format: DataFileFormat::Parquet, - schema: data_file_schema.clone(), - project_field_ids: vec![2, 3], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/data-1.parquet", table_location.to_str().unwrap()), + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![2, 3], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![pos_del, eq_del], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, }; // Load the deletes - should handle both types without error diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index ed32b74ac0..fef565d2a3 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -240,8 +240,10 @@ impl DeleteFilter { return Ok(None); } - let bound_predicate = combined_predicate - .bind(file_scan_task.schema.clone(), file_scan_task.case_sensitive)?; + let bound_predicate = combined_predicate.bind( + file_scan_task.base.schema.clone(), + file_scan_task.base.case_sensitive, + )?; Ok(Some(bound_predicate)) } @@ -312,6 +314,7 @@ pub(crate) mod tests { use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; use crate::expr::Reference; use crate::io::FileIO; + use crate::scan::task::BaseFileScanTask; use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type}; type ArrowSchemaRef = Arc; @@ -432,34 +435,38 @@ pub(crate) mod tests { let file_scan_tasks = vec![ FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()), - data_file_format: DataFileFormat::Parquet, - schema: data_file_schema.clone(), - project_field_ids: vec![], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location.to_str().unwrap()), + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![pos_del_1, pos_del_2.clone()], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, }, FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()), - data_file_format: DataFileFormat::Parquet, - schema: data_file_schema.clone(), - project_field_ids: vec![], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/2.parquet", table_location.to_str().unwrap()), + data_file_format: DataFileFormat::Parquet, + schema: data_file_schema.clone(), + project_field_ids: vec![], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![pos_del_3], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, }, ]; @@ -497,24 +504,26 @@ pub(crate) mod tests { // ---------- fake FileScanTask ---------- let task = FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: "data.parquet".to_string(), - data_file_format: crate::spec::DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: "data.parquet".to_string(), + data_file_format: crate::spec::DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: true, + }, deletes: vec![FileScanTaskDeleteFile { file_path: "eq-del.parquet".to_string(), file_type: DataContentType::EqualityDeletes, partition_spec_id: 0, equality_ids: None, }], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: true, }; let filter = DeleteFilter::default(); diff --git a/crates/iceberg/src/arrow/incremental.rs b/crates/iceberg/src/arrow/incremental.rs index 7c9ed0bd56..cb397047ae 100644 --- a/crates/iceberg/src/arrow/incremental.rs +++ b/crates/iceberg/src/arrow/incremental.rs @@ -23,20 +23,16 @@ use arrow_schema::Schema as ArrowSchema; use futures::channel::mpsc::channel; use futures::stream::select; use futures::{Stream, StreamExt, TryStreamExt}; -use parquet::arrow::arrow_reader::ArrowReaderOptions; use crate::arrow::reader::process_record_batch_stream; -use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; use crate::arrow::{ArrowReader, StreamsInto}; use crate::delete_vector::DeleteVector; use crate::io::FileIO; -use crate::metadata_columns::{RESERVED_FIELD_ID_POS, row_pos_field}; use crate::runtime::spawn; use crate::scan::ArrowRecordBatchStream; use crate::scan::incremental::{ AppendedFileScanTask, DeleteScanTask, IncrementalFileScanTaskStreams, }; -use crate::spec::{Datum, PrimitiveType}; use crate::{Error, ErrorKind, Result}; /// Default batch size for incremental delete operations. @@ -63,87 +59,19 @@ async fn process_incremental_append_task( batch_size: Option, file_io: FileIO, ) -> Result { - let mut virtual_columns = Vec::new(); - - // Check if _pos column is requested and add it as a virtual column - let has_pos_column = task.base.project_field_ids.contains(&RESERVED_FIELD_ID_POS); - if has_pos_column { - // Add _pos as a virtual column to be produced by the Parquet reader - virtual_columns.push(Arc::clone(row_pos_field())); - } - - let arrow_reader_options = if !virtual_columns.is_empty() { - Some(ArrowReaderOptions::new().with_virtual_columns(virtual_columns.clone())?) - } else { - None - }; - - let mut record_batch_stream_builder = ArrowReader::create_parquet_record_batch_stream_builder( - &task.base.data_file_path, + // Call the unified file scanning method with pre-loaded positional deletes + // Incremental scans don't use predicates or row group filtering, only positional deletes + ArrowReader::process_base_file_scan_task( + task.base, + batch_size, file_io, - true, - arrow_reader_options, + task.positional_deletes, + None, // No byte range filtering + None, // No pre-loaded row selection + false, // No row group filtering for incremental scans + false, // No row selection filtering beyond positional deletes ) - .await?; - - // Create a projection mask for the batch stream to select which columns in the - // Parquet file that we want in the response - let projection_mask = ArrowReader::get_arrow_projection_mask( - &task.base.project_field_ids, - &task.schema_ref(), - record_batch_stream_builder.parquet_schema(), - record_batch_stream_builder.schema(), - false, // use_fallback - )?; - record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask); - - // RecordBatchTransformer performs any transformations required on the RecordBatches - // that come back from the file, such as type promotion, default column insertion, - // column re-ordering, and virtual field addition (like _file) - let datum = Datum::new( - PrimitiveType::String, - crate::spec::PrimitiveLiteral::String(task.base.data_file_path.clone()), - ); - let mut record_batch_transformer_builder = - RecordBatchTransformerBuilder::new(task.schema_ref(), &task.base.project_field_ids) - .with_constant(crate::metadata_columns::RESERVED_FIELD_ID_FILE, datum); - - if has_pos_column { - record_batch_transformer_builder = - record_batch_transformer_builder.with_virtual_field(Arc::clone(row_pos_field()))?; - } - - let mut record_batch_transformer = record_batch_transformer_builder.build(); - - if let Some(batch_size) = batch_size { - record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); - } - - // Apply positional deletes as row selections. - let row_selection = if let Some(positional_delete_indexes) = task.positional_deletes { - Some(ArrowReader::build_deletes_row_selection( - record_batch_stream_builder.metadata().row_groups(), - &None, - &positional_delete_indexes.lock().unwrap(), - )?) - } else { - None - }; - - if let Some(row_selection) = row_selection { - record_batch_stream_builder = record_batch_stream_builder.with_row_selection(row_selection); - } - - // Build the batch stream and send all the RecordBatches that it generates - // to the requester. - let record_batch_stream = record_batch_stream_builder - .build()? - .map(move |batch| match batch { - Ok(batch) => record_batch_transformer.process_record_batch(batch), - Err(err) => Err(err.into()), - }); - - Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) + .await } /// Helper function to create a RecordBatch from a chunk of position values. diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 50e2b45390..21e22973cc 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet}; use std::ops::Range; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene}; use arrow_array::{Array, ArrayRef, BooleanArray, Datum as ArrowDatum, RecordBatch, Scalar}; @@ -59,6 +59,7 @@ use crate::metadata_columns::{ RESERVED_FIELD_ID_FILE, RESERVED_FIELD_ID_POS, is_metadata_field, row_pos_field, }; use crate::runtime::spawn; +use crate::scan::task::BaseFileScanTask; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; @@ -256,25 +257,28 @@ impl ArrowReader { } } + /// Common file scanning logic for both regular and incremental scans. + /// + /// This method handles the core logic of reading Parquet files, applying projections, + /// transformations, and positional deletes. #[allow(clippy::too_many_arguments)] - async fn process_file_scan_task( - task: FileScanTask, + pub(crate) async fn process_base_file_scan_task( + base_task: BaseFileScanTask, batch_size: Option, file_io: FileIO, - delete_file_loader: CachingDeleteFileLoader, + preloaded_positional_deletes: Option>>, + preloaded_row_group_indices: Option>, + preloaded_row_selection: Option, row_group_filtering_enabled: bool, row_selection_enabled: bool, ) -> Result { - let should_load_page_index = - (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); - - let delete_filter_rx = - delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema)); + let should_load_page_index = (row_selection_enabled && base_task.predicate.is_some()) + || preloaded_positional_deletes.is_some(); let mut virtual_columns = Vec::new(); // Check if _pos column is requested and prepare virtual columns - let has_pos_column = task.project_field_ids.contains(&RESERVED_FIELD_ID_POS); + let has_pos_column = base_task.project_field_ids.contains(&RESERVED_FIELD_ID_POS); if has_pos_column { // Add _pos as a virtual column to be produced by the Parquet reader virtual_columns.push(Arc::clone(row_pos_field())); @@ -283,7 +287,7 @@ impl ArrowReader { // Migrated tables lack field IDs, requiring us to inspect the schema to choose // between field-ID-based or position-based projection let initial_stream_builder = Self::create_parquet_record_batch_stream_builder( - &task.data_file_path, + &base_task.data_file_path, file_io.clone(), should_load_page_index, Some(ArrowReaderOptions::new().with_virtual_columns(virtual_columns.clone())?), @@ -291,8 +295,6 @@ impl ArrowReader { .await?; // Check if Parquet file has embedded field IDs - // Corresponds to Java's ParquetSchemaUtil.hasIds() - // Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118 let missing_field_ids = initial_stream_builder .schema() .fields() @@ -300,35 +302,17 @@ impl ArrowReader { .next() .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()); - // Three-branch schema resolution strategy matching Java's ReadConf constructor - // - // Per Iceberg spec Column Projection rules: - // "Columns in Iceberg data files are selected by field id. The table schema's column - // names and order may change after a data file is written, and projection must be done - // using field ids." - // https://iceberg.apache.org/spec/#column-projection - // - // When Parquet files lack field IDs (e.g., Hive/Spark migrations via add_files), - // we must assign field IDs BEFORE reading data to enable correct projection. - // - // Java's ReadConf determines field ID strategy: - // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns() - // - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns() - // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback() + // Determine which schema resolution strategy to use let mut record_batch_stream_builder = if missing_field_ids { // Parquet file lacks field IDs - must assign them before reading - let arrow_schema = if let Some(name_mapping) = &task.name_mapping { - // Branch 2: Apply name mapping to assign correct Iceberg field IDs - // Per spec rule #2: "Use schema.name-mapping.default metadata to map field id - // to columns without field id" - // Corresponds to Java's ParquetSchemaUtil.applyNameMapping() + let arrow_schema = if let Some(name_mapping) = &base_task.name_mapping { + // Apply name mapping to assign correct Iceberg field IDs apply_name_mapping_to_arrow_schema( Arc::clone(initial_stream_builder.schema()), name_mapping, )? } else { - // Branch 3: No name mapping - use position-based fallback IDs - // Corresponds to Java's ParquetSchemaUtil.addFallbackIds() + // Use position-based fallback IDs add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema()) }; @@ -337,19 +321,19 @@ impl ArrowReader { .with_virtual_columns(virtual_columns.clone())?; Self::create_parquet_record_batch_stream_builder( - &task.data_file_path, + &base_task.data_file_path, file_io.clone(), should_load_page_index, Some(options), ) .await? } else { - // Branch 1: File has embedded field IDs - trust them + // File has embedded field IDs - trust them initial_stream_builder }; - // Filter out metadata fields for Parquet projection (they don't exist in files) - let project_field_ids_without_metadata: Vec = task + // Filter out metadata fields for Parquet projection + let project_field_ids_without_metadata: Vec = base_task .project_field_ids .iter() .filter(|&&id| !is_metadata_field(id)) @@ -357,42 +341,44 @@ impl ArrowReader { .collect(); // Create projection mask based on field IDs - // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false) - // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match) - // - If fallback IDs: position-based projection (missing_field_ids=true) let projection_mask = Self::get_arrow_projection_mask( &project_field_ids_without_metadata, - &task.schema, + &base_task.schema, record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), - missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection + missing_field_ids, )?; record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask.clone()); - // RecordBatchTransformer performs any transformations required on the RecordBatches - // that come back from the file, such as type promotion, default column insertion, - // column re-ordering, partition constants, and virtual field addition (like _file) - let mut record_batch_transformer_builder = - RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids()); + // Build RecordBatchTransformer for post-read transformations + let mut record_batch_transformer_builder = RecordBatchTransformerBuilder::new( + base_task.schema_ref(), + base_task.project_field_ids(), + ); // Add the _file metadata column if it's in the projected fields - if task.project_field_ids().contains(&RESERVED_FIELD_ID_FILE) { - let file_datum = Datum::string(task.data_file_path.clone()); + if base_task + .project_field_ids() + .contains(&RESERVED_FIELD_ID_FILE) + { + let file_datum = Datum::string(base_task.data_file_path.clone()); record_batch_transformer_builder = record_batch_transformer_builder.with_constant(RESERVED_FIELD_ID_FILE, file_datum); } - // Add the _pos virtual column if it's requested and produced by Parquet reader + // Add the _pos virtual column if requested if has_pos_column { record_batch_transformer_builder = record_batch_transformer_builder.with_virtual_field(Arc::clone(row_pos_field()))?; } - if let (Some(partition_spec), Some(partition_data)) = - (task.partition_spec.clone(), task.partition.clone()) - { + // Add partition constants if available + if let (Some(partition_spec), Some(partition_data)) = ( + base_task.partition_spec.clone(), + base_task.partition.clone(), + ) { record_batch_transformer_builder = record_batch_transformer_builder.with_partition(partition_spec, partition_data)?; } @@ -403,59 +389,18 @@ impl ArrowReader { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); } - let delete_filter = delete_filter_rx.await.unwrap()?; - let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?; + // Apply predicate-based filtering if present + let mut selected_row_group_indices = preloaded_row_group_indices; + let mut row_selection = preloaded_row_selection; - // In addition to the optional predicate supplied in the `FileScanTask`, - // we also have an optional predicate resulting from equality delete files. - // If both are present, we logical-AND them together to form a single filter - // predicate that we can pass to the `RecordBatchStreamBuilder`. - let final_predicate = match (&task.predicate, delete_predicate) { - (None, None) => None, - (Some(predicate), None) => Some(predicate.clone()), - (None, Some(ref predicate)) => Some(predicate.clone()), - (Some(filter_predicate), Some(delete_predicate)) => { - Some(filter_predicate.clone().and(delete_predicate)) - } - }; - - // There are three possible sources for potential lists of selected RowGroup indices, - // and two for `RowSelection`s. - // Selected RowGroup index lists can come from three sources: - // * When task.start and task.length specify a byte range (file splitting); - // * When there are equality delete files that are applicable; - // * When there is a scan predicate and row_group_filtering_enabled = true. - // `RowSelection`s can be created in either or both of the following cases: - // * When there are positional delete files that are applicable; - // * When there is a scan predicate and row_selection_enabled = true - // Note that row group filtering from predicates only happens when - // there is a scan predicate AND row_group_filtering_enabled = true, - // but we perform row selection filtering if there are applicable - // equality delete files OR (there is a scan predicate AND row_selection_enabled), - // since the only implemented method of applying positional deletes is - // by using a `RowSelection`. - let mut selected_row_group_indices = None; - let mut row_selection = None; - - // Filter row groups based on byte range from task.start and task.length. - // If both start and length are 0, read the entire file (backwards compatibility). - if task.start != 0 || task.length != 0 { - let byte_range_filtered_row_groups = Self::filter_row_groups_by_byte_range( - record_batch_stream_builder.metadata(), - task.start, - task.length, - )?; - selected_row_group_indices = Some(byte_range_filtered_row_groups); - } - - if let Some(predicate) = final_predicate { + if let Some(predicate) = &base_task.predicate { let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map( record_batch_stream_builder.parquet_schema(), - &predicate, + predicate, )?; let row_filter = Self::get_row_filter( - &predicate, + predicate, record_batch_stream_builder.parquet_schema(), &iceberg_field_ids, &field_id_map, @@ -464,18 +409,16 @@ impl ArrowReader { if row_group_filtering_enabled { let predicate_filtered_row_groups = Self::get_selected_row_group_indices( - &predicate, + predicate, record_batch_stream_builder.metadata(), &field_id_map, - &task.schema, + &base_task.schema, )?; - // Merge predicate-based filtering with byte range filtering (if present) - // by taking the intersection of both filters + // Merge with pre-loaded filtering selected_row_group_indices = match selected_row_group_indices { - Some(byte_range_filtered) => { - // Keep only row groups that are in both filters - let intersection: Vec = byte_range_filtered + Some(preloaded) => { + let intersection: Vec = preloaded .into_iter() .filter(|idx| predicate_filtered_row_groups.contains(idx)) .collect(); @@ -485,23 +428,21 @@ impl ArrowReader { }; } - if row_selection_enabled { + if row_selection_enabled && row_selection.is_none() { row_selection = Some(Self::get_row_selection_for_filter_predicate( - &predicate, + predicate, record_batch_stream_builder.metadata(), &selected_row_group_indices, &field_id_map, - &task.schema, + &base_task.schema, )?); } } - let positional_delete_indexes = delete_filter.get_delete_vector(&task); - - if let Some(positional_delete_indexes) = positional_delete_indexes { + // Apply positional deletes if present + if let Some(positional_delete_indexes) = preloaded_positional_deletes { let delete_row_selection = { let positional_delete_indexes = positional_delete_indexes.lock().unwrap(); - Self::build_deletes_row_selection( record_batch_stream_builder.metadata().row_groups(), &selected_row_group_indices, @@ -509,8 +450,7 @@ impl ArrowReader { ) }?; - // merge the row selection from the delete files with the row selection - // from the filter predicate, if there is one from the filter predicate + // Merge row selections row_selection = match row_selection { None => Some(delete_row_selection), Some(filter_row_selection) => { @@ -529,22 +469,91 @@ impl ArrowReader { record_batch_stream_builder.with_row_groups(selected_row_group_indices); } - // Build the batch stream and send all the RecordBatches that it generates - // to the requester. + // Build the batch stream let record_batch_stream = record_batch_stream_builder .build()? .map(move |batch| match batch { - Ok(batch) => { - // Process the record batch (type promotion, column reordering, virtual fields, etc.) - record_batch_transformer.process_record_batch(batch) - } + Ok(batch) => record_batch_transformer.process_record_batch(batch), Err(err) => Err(err.into()), }); Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } + async fn process_file_scan_task( + task: FileScanTask, + batch_size: Option, + file_io: FileIO, + delete_file_loader: CachingDeleteFileLoader, + row_group_filtering_enabled: bool, + row_selection_enabled: bool, + ) -> Result { + // Load delete files asynchronously + let delete_filter_rx = + delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.base.schema)); + + let delete_filter = delete_filter_rx.await.unwrap()?; + + // Build equality delete predicate from delete files + let delete_predicate = delete_filter.build_equality_delete_predicate(&task).await?; + + // Merge task predicate with equality delete predicate (AND logic) + let final_predicate = match (&task.base.predicate, delete_predicate) { + (None, None) => None, + (Some(predicate), None) => Some(predicate.clone()), + (None, Some(ref predicate)) => Some(predicate.clone()), + (Some(filter_predicate), Some(delete_predicate)) => { + Some(filter_predicate.clone().and(delete_predicate)) + } + }; + + // Prepare base task with merged predicate + let mut base_with_predicate = task.base.clone(); + if final_predicate.is_some() { + base_with_predicate.predicate = final_predicate; + } + + // Get pre-loaded positional deletes + let preloaded_positional_deletes = delete_filter.get_delete_vector(&task); + + // Handle byte range filtering (file splitting) + let preloaded_row_group_indices = if task.base.start != 0 || task.base.length != 0 { + // We need to load the Parquet metadata to filter row groups by byte range + // Create a temporary builder just for metadata access + let should_load_page_index = row_selection_enabled || !task.deletes.is_empty(); + let temp_builder = Self::create_parquet_record_batch_stream_builder( + &task.base.data_file_path, + file_io.clone(), + should_load_page_index, + None, + ) + .await?; + + let byte_range_filtered = Self::filter_row_groups_by_byte_range( + temp_builder.metadata(), + task.base.start, + task.base.length, + )?; + Some(byte_range_filtered) + } else { + None + }; + + // Call unified processing method + Self::process_base_file_scan_task( + base_with_predicate, + batch_size, + file_io, + preloaded_positional_deletes, + preloaded_row_group_indices, + None, + row_group_filtering_enabled, + row_selection_enabled, + ) + .await + } + pub(crate) async fn create_parquet_record_batch_stream_builder( data_file_path: &str, file_io: FileIO, @@ -1878,7 +1887,7 @@ mod tests { use crate::expr::visitors::bound_predicate_visitor::visit; use crate::expr::{Bind, Predicate, Reference}; use crate::io::FileIO; - use crate::scan::{FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream}; + use crate::scan::{BaseFileScanTask, FileScanTask, FileScanTaskDeleteFile, FileScanTaskStream}; use crate::spec::{ DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type, }; @@ -2173,19 +2182,21 @@ message schema { ) -> Vec> { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{table_location}/1.parquet"), - data_file_format: DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![1], - predicate: Some(predicate.bind(schema, true).unwrap()), + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/1.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1], + predicate: Some(predicate.bind(schema, true).unwrap()), + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -2495,36 +2506,40 @@ message schema { // Task 1: read only the first row group let task1 = FileScanTask { - start: rg0_start, - length: row_group_0.compressed_size() as u64, - record_count: Some(100), - data_file_path: file_path.clone(), - data_file_format: DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![1], - predicate: None, + base: BaseFileScanTask { + start: rg0_start, + length: row_group_0.compressed_size() as u64, + record_count: Some(100), + data_file_path: file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, }; // Task 2: read the second and third row groups let task2 = FileScanTask { - start: rg1_start, - length: file_end - rg1_start, - record_count: Some(200), - data_file_path: file_path.clone(), - data_file_format: DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![1], - predicate: None, + base: BaseFileScanTask { + start: rg1_start, + length: file_end - rg1_start, + record_count: Some(200), + data_file_path: file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, }; let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream; @@ -2640,19 +2655,21 @@ message schema { let reader = ArrowReaderBuilder::new(file_io).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{table_location}/old_file.parquet"), - data_file_format: DataFileFormat::Parquet, - schema: new_schema.clone(), - project_field_ids: vec![1, 2], // Request both columns 'a' and 'b' - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/old_file.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: new_schema.clone(), + project_field_ids: vec![1, 2], // Request both columns 'a' and 'b' + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -2807,24 +2824,26 @@ message schema { let reader = ArrowReaderBuilder::new(file_io).build(); let task = FileScanTask { - start: 0, - length: 0, - record_count: Some(200), - data_file_path: data_file_path.clone(), - data_file_format: DataFileFormat::Parquet, - schema: table_schema.clone(), - project_field_ids: vec![1], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: Some(200), + data_file_path: data_file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: table_schema.clone(), + project_field_ids: vec![1], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![FileScanTaskDeleteFile { file_path: delete_file_path, file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, }], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -3025,24 +3044,26 @@ message schema { // Create FileScanTask that reads ONLY row group 1 via byte range filtering let task = FileScanTask { - start: rg1_start, - length: rg1_length, - record_count: Some(100), // Row group 1 has 100 rows - data_file_path: data_file_path.clone(), - data_file_format: DataFileFormat::Parquet, - schema: table_schema.clone(), - project_field_ids: vec![1], - predicate: None, + base: BaseFileScanTask { + start: rg1_start, + length: rg1_length, + record_count: Some(100), // Row group 1 has 100 rows + data_file_path: data_file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: table_schema.clone(), + project_field_ids: vec![1], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![FileScanTaskDeleteFile { file_path: delete_file_path, file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, }], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -3236,24 +3257,26 @@ message schema { // Create FileScanTask that reads ONLY row group 1 via byte range filtering let task = FileScanTask { - start: rg1_start, - length: rg1_length, - record_count: Some(100), // Row group 1 has 100 rows - data_file_path: data_file_path.clone(), - data_file_format: DataFileFormat::Parquet, - schema: table_schema.clone(), - project_field_ids: vec![1], - predicate: None, + base: BaseFileScanTask { + start: rg1_start, + length: rg1_length, + record_count: Some(100), // Row group 1 has 100 rows + data_file_path: data_file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: table_schema.clone(), + project_field_ids: vec![1], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![FileScanTaskDeleteFile { file_path: delete_file_path, file_type: DataContentType::PositionDeletes, partition_spec_id: 0, equality_ids: None, }], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -3345,19 +3368,21 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{table_location}/1.parquet"), - data_file_format: DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![1, 2], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/1.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3443,19 +3468,21 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{table_location}/1.parquet"), - data_file_format: DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![1, 3], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/1.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 3], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3530,19 +3557,21 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{table_location}/1.parquet"), - data_file_format: DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![1, 2, 3], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/1.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2, 3], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3631,19 +3660,21 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{table_location}/1.parquet"), - data_file_format: DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![1, 2], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/1.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3761,19 +3792,21 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{table_location}/1.parquet"), - data_file_format: DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![1, 2], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/1.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3858,19 +3891,21 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{table_location}/1.parquet"), - data_file_format: DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![1, 5, 2], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/1.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 5, 2], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -3968,19 +4003,21 @@ message schema { let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{table_location}/1.parquet"), - data_file_format: DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![1, 2, 3], - predicate: Some(predicate.bind(schema, true).unwrap()), + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/1.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2, 3], + predicate: Some(predicate.bind(schema, true).unwrap()), + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; @@ -4059,49 +4096,55 @@ message schema { // Create tasks in a specific order: file_0, file_1, file_2 let tasks = vec![ Ok(FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{table_location}/file_0.parquet"), - data_file_format: DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![1, 2], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/file_0.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, }), Ok(FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{table_location}/file_1.parquet"), - data_file_format: DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![1, 2], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/file_1.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, }), Ok(FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{table_location}/file_2.parquet"), - data_file_format: DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![1, 2], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/file_2.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, }), ]; @@ -4268,19 +4311,21 @@ message schema { let reader = ArrowReaderBuilder::new(file_io).build(); let tasks = Box::pin(futures::stream::iter( vec![Ok(FileScanTask { - start: 0, - length: 0, - record_count: None, - data_file_path: format!("{table_location}/data.parquet"), - data_file_format: DataFileFormat::Parquet, - schema: schema.clone(), - project_field_ids: vec![1, 2], - predicate: None, + base: BaseFileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{table_location}/data.parquet"), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + partition: Some(partition_data), + partition_spec: Some(partition_spec), + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: Some(partition_data), - partition_spec: Some(partition_spec), - name_mapping: None, - case_sensitive: false, })] .into_iter(), )) as FileScanTaskStream; diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 3e7b87b099..756e5d48fc 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -124,28 +124,29 @@ impl ManifestEntryContext { .await; Ok(FileScanTask { - start: 0, - length: self.manifest_entry.file_size_in_bytes(), - record_count: Some(self.manifest_entry.record_count()), - - data_file_path: self.manifest_entry.file_path().to_string(), - data_file_format: self.manifest_entry.file_format(), - - schema: self.snapshot_schema, - project_field_ids: self.field_ids.to_vec(), - predicate: self - .bound_predicates - .map(|x| x.as_ref().snapshot_bound_predicate.clone()), - + base: crate::scan::task::BaseFileScanTask { + start: 0, + length: self.manifest_entry.file_size_in_bytes(), + record_count: Some(self.manifest_entry.record_count()), + + data_file_path: self.manifest_entry.file_path().to_string(), + data_file_format: self.manifest_entry.file_format(), + + schema: self.snapshot_schema, + project_field_ids: self.field_ids.to_vec(), + predicate: self + .bound_predicates + .map(|x| x.as_ref().snapshot_bound_predicate.clone()), + + // Include partition data and spec from manifest entry + partition: Some(self.manifest_entry.data_file.partition.clone()), + // TODO: Pass actual PartitionSpec through context chain for native flow + partition_spec: None, + // TODO: Extract name_mapping from table metadata property "schema.name-mapping.default" + name_mapping: None, + case_sensitive: self.case_sensitive, + }, deletes, - - // Include partition data and spec from manifest entry - partition: Some(self.manifest_entry.data_file.partition.clone()), - // TODO: Pass actual PartitionSpec through context chain for native flow - partition_spec: None, - // TODO: Extract name_mapping from table metadata property "schema.name-mapping.default" - name_mapping: None, - case_sensitive: self.case_sensitive, }) } } diff --git a/crates/iceberg/src/scan/incremental/mod.rs b/crates/iceberg/src/scan/incremental/mod.rs index 81f37b70cf..6a8ccfd2c7 100644 --- a/crates/iceberg/src/scan/incremental/mod.rs +++ b/crates/iceberg/src/scan/incremental/mod.rs @@ -34,6 +34,7 @@ use crate::metadata_columns::{ use crate::scan::DeleteFileContext; use crate::scan::cache::ExpressionEvaluatorCache; use crate::scan::context::ManifestEntryContext; +use crate::scan::task::BaseFileScanTask; use crate::spec::{DataContentType, ManifestStatus, Snapshot, SnapshotRef}; use crate::table::Table; use crate::util::snapshot::ancestors_between; @@ -767,7 +768,7 @@ impl IncrementalTableScan { let data_file_path = manifest_entry_context.manifest_entry.file_path(); let file_scan_task = IncrementalFileScanTask::Delete(DeletedFileScanTask { - base: BaseIncrementalFileScanTask { + base: BaseFileScanTask { start: 0, length: manifest_entry_context.manifest_entry.file_size_in_bytes(), record_count: Some(manifest_entry_context.manifest_entry.record_count()), @@ -775,6 +776,11 @@ impl IncrementalTableScan { data_file_format: manifest_entry_context.manifest_entry.file_format(), schema: manifest_entry_context.snapshot_schema.clone(), project_field_ids: manifest_entry_context.field_ids.as_ref().clone(), + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: manifest_entry_context.case_sensitive, }, }); diff --git a/crates/iceberg/src/scan/incremental/task.rs b/crates/iceberg/src/scan/incremental/task.rs index 793333f869..557595e563 100644 --- a/crates/iceberg/src/scan/incremental/task.rs +++ b/crates/iceberg/src/scan/incremental/task.rs @@ -23,49 +23,14 @@ use crate::Result; use crate::arrow::delete_filter::DeleteFilter; use crate::delete_vector::DeleteVector; use crate::scan::context::ManifestEntryContext; -use crate::spec::{DataFileFormat, Schema, SchemaRef}; - -/// Base file scan task containing common attributes for incremental scan tasks. -#[derive(Debug, Clone)] -pub struct BaseIncrementalFileScanTask { - /// The start offset of the file to scan. - pub start: u64, - /// The length of the file to scan. - pub length: u64, - /// The number of records in the file. - pub record_count: Option, - /// The path to the data file to scan. - pub data_file_path: String, - /// The format of the data file to scan. - pub data_file_format: DataFileFormat, - /// The schema of the data file to scan. - pub schema: SchemaRef, - /// The field ids to project. - pub project_field_ids: Vec, -} - -impl BaseIncrementalFileScanTask { - /// Returns the data file path of this file scan task. - pub fn data_file_path(&self) -> &str { - &self.data_file_path - } - - /// Returns the schema of this file scan task as a reference - pub fn schema(&self) -> &Schema { - &self.schema - } - - /// Returns the schema of this file scan task as a SchemaRef - pub fn schema_ref(&self) -> SchemaRef { - self.schema.clone() - } -} +use crate::scan::task::BaseFileScanTask; +use crate::spec::Schema; /// A file scan task for appended data files in an incremental scan. #[derive(Debug, Clone)] pub struct AppendedFileScanTask { /// The base file scan task attributes. - pub base: BaseIncrementalFileScanTask, + pub base: BaseFileScanTask, /// The optional positional deletes associated with this data file. pub positional_deletes: Option>>, } @@ -82,7 +47,7 @@ impl AppendedFileScanTask { } /// Returns the schema of this file scan task as a SchemaRef - pub fn schema_ref(&self) -> SchemaRef { + pub fn schema_ref(&self) -> crate::spec::SchemaRef { self.base.schema_ref() } } @@ -91,7 +56,7 @@ impl AppendedFileScanTask { #[derive(Debug, Clone)] pub struct DeletedFileScanTask { /// The base file scan task attributes. - pub base: BaseIncrementalFileScanTask, + pub base: BaseFileScanTask, } impl DeletedFileScanTask { @@ -106,7 +71,7 @@ impl DeletedFileScanTask { } /// Returns the schema of this file scan task as a SchemaRef - pub fn schema_ref(&self) -> SchemaRef { + pub fn schema_ref(&self) -> crate::spec::SchemaRef { self.base.schema_ref() } } @@ -148,7 +113,7 @@ impl IncrementalFileScanTask { ) -> Self { let data_file_path = manifest_entry_context.manifest_entry.file_path(); IncrementalFileScanTask::Append(AppendedFileScanTask { - base: BaseIncrementalFileScanTask { + base: BaseFileScanTask { start: 0, length: manifest_entry_context.manifest_entry.file_size_in_bytes(), record_count: Some(manifest_entry_context.manifest_entry.record_count()), @@ -156,6 +121,11 @@ impl IncrementalFileScanTask { data_file_format: manifest_entry_context.manifest_entry.file_format(), schema: manifest_entry_context.snapshot_schema.clone(), project_field_ids: manifest_entry_context.field_ids.as_ref().clone(), + predicate: None, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: manifest_entry_context.case_sensitive, }, positional_deletes: delete_filter.get_delete_vector_for_path(data_file_path), }) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 1e44ed6ee8..cd1a80ea52 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -22,7 +22,8 @@ use cache::*; mod context; use context::*; pub mod incremental; -mod task; +/// File scan task types used in table scans. +pub mod task; use std::sync::Arc; @@ -30,7 +31,7 @@ use arrow_array::RecordBatch; use futures::channel::mpsc::{Sender, channel}; use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryStreamExt}; -pub use task::*; +pub use task::{BaseFileScanTask, *}; use crate::arrow::ArrowReaderBuilder; use crate::delete_file_index::DeleteFileIndex; @@ -679,7 +680,7 @@ pub mod tests { use crate::expr::{BoundPredicate, Reference}; use crate::io::{FileIO, OutputFile}; use crate::metadata_columns::{RESERVED_COL_NAME_FILE, RESERVED_COL_NAME_POS}; - use crate::scan::FileScanTask; + use crate::scan::{BaseFileScanTask, FileScanTask}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, ManifestListWriter, ManifestStatus, ManifestWriterBuilder, NestedField, PartitionSpec, @@ -1457,17 +1458,17 @@ pub mod tests { assert_eq!(tasks.len(), 2); - tasks.sort_by_key(|t| t.data_file_path.to_string()); + tasks.sort_by_key(|t| t.data_file_path().to_string()); // Check first task is added data file assert_eq!( - tasks[0].data_file_path, + tasks[0].data_file_path(), format!("{}/1.parquet", &fixture.table_location) ); // Check second task is existing data file assert_eq!( - tasks[1].data_file_path, + tasks[1].data_file_path(), format!("{}/3.parquet", &fixture.table_location) ); } @@ -1943,12 +1944,12 @@ pub mod tests { let serialized = serde_json::to_string(&task).unwrap(); let deserialized: FileScanTask = serde_json::from_str(&serialized).unwrap(); - assert_eq!(task.data_file_path, deserialized.data_file_path); - assert_eq!(task.start, deserialized.start); - assert_eq!(task.length, deserialized.length); - assert_eq!(task.project_field_ids, deserialized.project_field_ids); - assert_eq!(task.predicate, deserialized.predicate); - assert_eq!(task.schema, deserialized.schema); + assert_eq!(task.data_file_path(), deserialized.data_file_path()); + assert_eq!(task.base.start, deserialized.base.start); + assert_eq!(task.base.length, deserialized.base.length); + assert_eq!(task.project_field_ids(), deserialized.project_field_ids()); + assert_eq!(task.predicate(), deserialized.predicate()); + assert_eq!(task.schema(), deserialized.schema()); }; // without predicate @@ -1963,37 +1964,41 @@ pub mod tests { .unwrap(), ); let task = FileScanTask { - data_file_path: "data_file_path".to_string(), - start: 0, - length: 100, - project_field_ids: vec![1, 2, 3], - predicate: None, - schema: schema.clone(), - record_count: Some(100), - data_file_format: DataFileFormat::Parquet, + base: BaseFileScanTask { + data_file_path: "data_file_path".to_string(), + start: 0, + length: 100, + project_field_ids: vec![1, 2, 3], + predicate: None, + schema: schema.clone(), + record_count: Some(100), + data_file_format: DataFileFormat::Parquet, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, }; test_fn(task); // with predicate let task = FileScanTask { - data_file_path: "data_file_path".to_string(), - start: 0, - length: 100, - project_field_ids: vec![1, 2, 3], - predicate: Some(BoundPredicate::AlwaysTrue), - schema, - record_count: None, - data_file_format: DataFileFormat::Avro, + base: BaseFileScanTask { + data_file_path: "data_file_path".to_string(), + start: 0, + length: 100, + project_field_ids: vec![1, 2, 3], + predicate: Some(BoundPredicate::AlwaysTrue), + schema, + record_count: None, + data_file_format: DataFileFormat::Avro, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + }, deletes: vec![], - partition: None, - partition_spec: None, - name_mapping: None, - case_sensitive: false, }; test_fn(task); } diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 5349a9bdd2..1b2f5a5253 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -48,9 +48,9 @@ where D: serde::Deserializer<'de> { )) } -/// A task to scan part of file. +/// Common fields for both regular and incremental file scan tasks. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct FileScanTask { +pub struct BaseFileScanTask { /// The start offset of the file to scan. pub start: u64, /// The length of the file to scan. @@ -75,9 +75,6 @@ pub struct FileScanTask { #[serde(skip_serializing_if = "Option::is_none")] pub predicate: Option, - /// The list of delete files that may need to be applied to this data file - pub deletes: Vec, - /// Partition data from the manifest entry, used to identify which columns can use /// constant values from partition metadata vs. reading from the data file. /// Per the Iceberg spec, only identity-transformed partition fields should use constants. @@ -109,7 +106,7 @@ pub struct FileScanTask { pub case_sensitive: bool, } -impl FileScanTask { +impl BaseFileScanTask { /// Returns the data file path of this file scan task. pub fn data_file_path(&self) -> &str { &self.data_file_path @@ -136,6 +133,44 @@ impl FileScanTask { } } +/// A task to scan part of file. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct FileScanTask { + /// Common file scan task attributes + #[serde(flatten)] + pub base: BaseFileScanTask, + + /// The list of delete files that may need to be applied to this data file + pub deletes: Vec, +} + +impl FileScanTask { + /// Returns the data file path of this file scan task. + pub fn data_file_path(&self) -> &str { + self.base.data_file_path() + } + + /// Returns the project field id of this file scan task. + pub fn project_field_ids(&self) -> &[i32] { + self.base.project_field_ids() + } + + /// Returns the predicate of this file scan task. + pub fn predicate(&self) -> Option<&BoundPredicate> { + self.base.predicate() + } + + /// Returns the schema of this file scan task as a reference + pub fn schema(&self) -> &Schema { + self.base.schema() + } + + /// Returns the schema of this file scan task as a SchemaRef + pub fn schema_ref(&self) -> SchemaRef { + self.base.schema_ref() + } +} + #[derive(Debug)] pub(crate) struct DeleteFileContext { pub(crate) manifest_entry: ManifestEntryRef,