diff --git a/docs/src/format/table/transaction.md b/docs/src/format/table/transaction.md index d1a5191bf54..9f3b6f9e8db 100644 --- a/docs/src/format/table/transaction.md +++ b/docs/src/format/table/transaction.md @@ -410,7 +410,7 @@ are rebaseable conflicts with Update: ### UpdateConfig -Modifies table configuration, table metadata, schema metadata, or field metadata without changing data. +Modifies table configuration, table metadata, schema metadata, field metadata, or fragment metadata without changing data.
UpdateConfig protobuf message @@ -423,11 +423,14 @@ Modifies table configuration, table metadata, schema metadata, or field metadata #### UpdateConfig Compatibility -An UpdateConfig operation only modifies table config and tends to be compatible with other operations. Here +An UpdateConfig operation only modifies metadata and tends to be compatible with other operations. Here are the operations that conflict with UpdateConfig: -- Overwrite -- UpdateConfig (only if the two operations modify the same config) +- Overwrite (if the UpdateConfig modifies schema, field, or fragment metadata) +- Restore (if the UpdateConfig modifies schema, field, or fragment metadata) +- UpdateConfig (only if the two operations modify the same config or the same fragment/field/schema metadata) +- Delete (only if the UpdateConfig modifies metadata on a fragment that was deleted) +- Update (only if the UpdateConfig modifies metadata on a fragment that was removed) ### DataReplacement diff --git a/docs/src/format/table/versioning.md b/docs/src/format/table/versioning.md index 745dd1ccd87..57db14fb9cf 100644 --- a/docs/src/format/table/versioning.md +++ b/docs/src/format/table/versioning.md @@ -28,7 +28,9 @@ they should return an "unsupported" error on any read or write operation. | 4 | `FLAG_USE_V2_FORMAT_DEPRECATED` | No | No | Files are written with the new v2 format. This flag is deprecated and no longer used. | | 8 | `FLAG_TABLE_CONFIG` | No | Yes | Table config is present in the manifest. | | 16 | `FLAG_BASE_PATHS` | Yes | Yes | Dataset uses multiple base paths (for shallow clones or multi-base datasets). | +| 32 | `FLAG_DISABLE_TRANSACTION_FILE` | No | No | Inline transaction in manifest; no separate transaction file written. | +| 64 | `FLAG_FRAGMENT_METADATA` | No | Yes | Fragments may contain free-form key-value metadata. | -Flags with bit values 32 and above are unknown and will cause implementations to reject the dataset with an "unsupported" error. +Flags with bit values 128 and above are unknown and will cause implementations to reject the dataset with an "unsupported" error. diff --git a/protos/table.proto b/protos/table.proto index d298809d5d8..1898394d398 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -346,6 +346,10 @@ message DataFragment { // deletion tombstones. To compute the current number of rows, subtract // `deletion_file.num_deleted_rows` from this value. uint64 physical_rows = 4; + + // Free-form key-value metadata for the fragment. + // Intended for lightweight metadata such as creation time, process ID, etc. + map metadata = 11; } message DataFile { diff --git a/protos/transaction.proto b/protos/transaction.proto index e72e95025a4..2552945c18f 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -287,12 +287,13 @@ message Transaction { } // An operation that updates the table config, table metadata, schema metadata, - // or field metadata. + // field metadata, or fragment metadata. message UpdateConfig { UpdateMap config_updates = 6; UpdateMap table_metadata_updates = 7; UpdateMap schema_metadata_updates = 8; map field_metadata_updates = 9; + map fragment_metadata_updates = 10; // Deprecated ------------------------------- map upsert_values = 1; diff --git a/rust/lance-table/benches/manifest_intern.rs b/rust/lance-table/benches/manifest_intern.rs index 09ba9264b21..e700fb5224e 100644 --- a/rust/lance-table/benches/manifest_intern.rs +++ b/rust/lance-table/benches/manifest_intern.rs @@ -72,6 +72,7 @@ fn make_uniform_pb_fragments(n: u64, num_fields: usize) -> Vec version_bytes.clone(), ), ), + metadata: Default::default(), }) .collect() } @@ -148,6 +149,7 @@ fn make_diverse_pb_fragments( version_payloads[version_idx].clone(), ), ), + metadata: Default::default(), } }) .collect() diff --git a/rust/lance-table/src/feature_flags.rs b/rust/lance-table/src/feature_flags.rs index 096f0da79e5..506ad73bda3 100644 --- a/rust/lance-table/src/feature_flags.rs +++ b/rust/lance-table/src/feature_flags.rs @@ -20,8 +20,10 @@ pub const FLAG_TABLE_CONFIG: u64 = 8; pub const FLAG_BASE_PATHS: u64 = 16; /// Disable writing transaction file under _transaction/, this flag is set when we only want to write inline transaction in manifest pub const FLAG_DISABLE_TRANSACTION_FILE: u64 = 32; +/// Fragments may contain free-form key-value metadata +pub const FLAG_FRAGMENT_METADATA: u64 = 64; /// The first bit that is unknown as a feature flag -pub const FLAG_UNKNOWN: u64 = 64; +pub const FLAG_UNKNOWN: u64 = 128; /// Set the reader and writer feature flags in the manifest based on the contents of the manifest. pub fn apply_feature_flags( @@ -74,6 +76,15 @@ pub fn apply_feature_flags( if disable_transaction_file { manifest.writer_feature_flags |= FLAG_DISABLE_TRANSACTION_FILE; } + + let has_fragment_metadata = manifest + .fragments + .iter() + .any(|frag| !frag.metadata.is_empty()); + if has_fragment_metadata { + manifest.writer_feature_flags |= FLAG_FRAGMENT_METADATA; + } + Ok(()) } @@ -103,6 +114,7 @@ mod tests { assert!(can_read_dataset(super::FLAG_TABLE_CONFIG)); assert!(can_read_dataset(super::FLAG_BASE_PATHS)); assert!(can_read_dataset(super::FLAG_DISABLE_TRANSACTION_FILE)); + assert!(can_read_dataset(super::FLAG_FRAGMENT_METADATA)); assert!(can_read_dataset( super::FLAG_DELETION_FILES | super::FLAG_STABLE_ROW_IDS @@ -120,12 +132,14 @@ mod tests { assert!(can_write_dataset(super::FLAG_TABLE_CONFIG)); assert!(can_write_dataset(super::FLAG_BASE_PATHS)); assert!(can_write_dataset(super::FLAG_DISABLE_TRANSACTION_FILE)); + assert!(can_write_dataset(super::FLAG_FRAGMENT_METADATA)); assert!(can_write_dataset( super::FLAG_DELETION_FILES | super::FLAG_STABLE_ROW_IDS | super::FLAG_USE_V2_FORMAT_DEPRECATED | super::FLAG_TABLE_CONFIG | super::FLAG_BASE_PATHS + | super::FLAG_FRAGMENT_METADATA )); assert!(!can_write_dataset(super::FLAG_UNKNOWN)); } @@ -181,4 +195,45 @@ mod tests { 0 ); } + + #[test] + fn test_fragment_metadata_feature_flag() { + use crate::format::{DataStorageFormat, Fragment, Manifest}; + use arrow_schema::{Field as ArrowField, Schema as ArrowSchema}; + use lance_core::datatypes::Schema; + use std::collections::HashMap; + use std::sync::Arc; + + let arrow_schema = ArrowSchema::new(vec![ArrowField::new( + "x", + arrow_schema::DataType::Int64, + false, + )]); + let schema = Schema::try_from(&arrow_schema).unwrap(); + + // No fragment metadata → no flag + let mut manifest = Manifest::new( + schema.clone(), + Arc::new(vec![Fragment::new(0)]), + DataStorageFormat::default(), + HashMap::new(), + ); + apply_feature_flags(&mut manifest, false, false).unwrap(); + assert_eq!(manifest.writer_feature_flags & FLAG_FRAGMENT_METADATA, 0); + assert_eq!(manifest.reader_feature_flags & FLAG_FRAGMENT_METADATA, 0); + + // With fragment metadata → writer flag only + let mut frag = Fragment::new(0); + frag.metadata + .insert("created_at".into(), "2026-05-22".into()); + let mut manifest = Manifest::new( + schema, + Arc::new(vec![frag]), + DataStorageFormat::default(), + HashMap::new(), + ); + apply_feature_flags(&mut manifest, false, false).unwrap(); + assert_ne!(manifest.writer_feature_flags & FLAG_FRAGMENT_METADATA, 0); + assert_eq!(manifest.reader_feature_flags & FLAG_FRAGMENT_METADATA, 0); + } } diff --git a/rust/lance-table/src/format/fragment.rs b/rust/lance-table/src/format/fragment.rs index dc5c94b388a..39b9da4c15b 100644 --- a/rust/lance-table/src/format/fragment.rs +++ b/rust/lance-table/src/format/fragment.rs @@ -380,6 +380,7 @@ impl DataFileFieldInterner { physical_rows, last_updated_at_version_meta, created_at_version_meta, + metadata: p.metadata, }) } } @@ -503,6 +504,10 @@ pub struct Fragment { /// Created at version metadata #[serde(skip_serializing_if = "Option::is_none")] pub created_at_version_meta: Option, + + /// Free-form key-value metadata for the fragment. + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub metadata: HashMap, } impl Fragment { @@ -515,6 +520,7 @@ impl Fragment { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), } } @@ -554,6 +560,7 @@ impl Fragment { row_id_meta: None, last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), } } @@ -680,6 +687,7 @@ impl TryFrom for Fragment { .created_at_version_sequence .map(RowDatasetVersionMeta::try_from) .transpose()?, + metadata: p.metadata, }) } } @@ -721,6 +729,7 @@ impl From<&Fragment> for pb::DataFragment { physical_rows: f.physical_rows.unwrap_or_default() as u64, last_updated_at_version_sequence, created_at_version_sequence, + metadata: f.metadata.clone(), } } } @@ -786,6 +795,43 @@ mod tests { assert_eq!(fragment, fragment2); } + #[test] + fn test_roundtrip_fragment_with_metadata() { + let mut fragment = Fragment::new(1); + let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]); + fragment.add_file_legacy("data.lance", &Schema::try_from(&schema).unwrap()); + fragment + .metadata + .insert("created_at".into(), "2026-05-22T00:00:00Z".into()); + fragment + .metadata + .insert("process_id".into(), "pid-42".into()); + + let proto = pb::DataFragment::from(&fragment); + assert_eq!(proto.metadata.len(), 2); + assert_eq!( + proto.metadata.get("created_at").unwrap(), + "2026-05-22T00:00:00Z" + ); + + let fragment2 = Fragment::try_from(proto).unwrap(); + assert_eq!(fragment, fragment2); + } + + #[test] + fn test_roundtrip_fragment_empty_metadata() { + let mut fragment = Fragment::new(2); + let schema = ArrowSchema::new(vec![ArrowField::new("x", DataType::Float16, true)]); + fragment.add_file_legacy("data.lance", &Schema::try_from(&schema).unwrap()); + + let proto = pb::DataFragment::from(&fragment); + assert!(proto.metadata.is_empty()); + + let fragment2 = Fragment::try_from(proto).unwrap(); + assert_eq!(fragment, fragment2); + assert!(fragment2.metadata.is_empty()); + } + #[test] fn test_to_json() { let mut fragment = Fragment::new(123); diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index d2b5f2d31c6..9b59251c728 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -1321,6 +1321,7 @@ mod tests { physical_rows: None, created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }, Fragment { id: 1, @@ -1333,6 +1334,7 @@ mod tests { physical_rows: None, created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }, ]; diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 30e43c5bb32..1514643dc17 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -3288,6 +3288,7 @@ impl Dataset { table_metadata_updates: None, schema_metadata_updates: None, field_metadata_updates, + fragment_metadata_updates: HashMap::new(), }, ) .await diff --git a/rust/lance/src/dataset/metadata.rs b/rust/lance/src/dataset/metadata.rs index 21d92100871..61f214ae6fb 100644 --- a/rust/lance/src/dataset/metadata.rs +++ b/rust/lance/src/dataset/metadata.rs @@ -80,18 +80,21 @@ impl<'a> std::future::IntoFuture for UpdateMetadataBuilder<'a> { table_metadata_updates: None, schema_metadata_updates: None, field_metadata_updates: HashMap::new(), + fragment_metadata_updates: HashMap::new(), }, MetadataType::TableMetadata => Operation::UpdateConfig { config_updates: None, table_metadata_updates: Some(update_map), schema_metadata_updates: None, field_metadata_updates: HashMap::new(), + fragment_metadata_updates: HashMap::new(), }, MetadataType::SchemaMetadata => Operation::UpdateConfig { config_updates: None, table_metadata_updates: None, schema_metadata_updates: Some(update_map), field_metadata_updates: HashMap::new(), + fragment_metadata_updates: HashMap::new(), }, }; @@ -167,6 +170,7 @@ impl<'a> std::future::IntoFuture for UpdateFieldMetadataBuilder<'a> { table_metadata_updates: None, schema_metadata_updates: None, field_metadata_updates: self.field_metadata_updates, + fragment_metadata_updates: HashMap::new(), }, ) .await?; diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index a042cf568ce..e598a52b1a5 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -1691,6 +1691,7 @@ mod tests { physical_rows: Some(0), last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }; let single_bin = CandidateBin { fragments: vec![fragment.clone()], diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index f5d792979df..085268b3e22 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -1046,6 +1046,7 @@ mod test { physical_rows: Some(50), last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), })) } else { Ok(None) diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 3f96b9964d5..9294e3be00f 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -431,6 +431,7 @@ pub enum Operation { table_metadata_updates: Option, schema_metadata_updates: Option, field_metadata_updates: HashMap, + fragment_metadata_updates: HashMap, }, /// Update merged generations in MemWAL index. /// This is used during merge-insert to atomically record which @@ -669,18 +670,21 @@ impl PartialEq for Operation { table_metadata_updates: a_table_metadata, schema_metadata_updates: a_schema, field_metadata_updates: a_field, + fragment_metadata_updates: a_frag, }, Self::UpdateConfig { config_updates: b_config, table_metadata_updates: b_table_metadata, schema_metadata_updates: b_schema, field_metadata_updates: b_field, + fragment_metadata_updates: b_frag, }, ) => { a_config == b_config && a_table_metadata == b_table_metadata && a_schema == b_schema && a_field == b_field + && a_frag == b_frag } ( Self::DataReplacement { replacements: a }, @@ -1442,12 +1446,14 @@ impl Operation { table_metadata_updates, schema_metadata_updates, field_metadata_updates, + fragment_metadata_updates, .. }, Self::UpdateConfig { table_metadata_updates: other_table_metadata, schema_metadata_updates: other_schema_metadata, field_metadata_updates: other_field_metadata, + fragment_metadata_updates: other_fragment_metadata, .. }, ) => { @@ -1467,6 +1473,13 @@ impl Operation { } } } + if !fragment_metadata_updates.is_empty() && !other_fragment_metadata.is_empty() { + for frag_id in fragment_metadata_updates.keys() { + if other_fragment_metadata.contains_key(frag_id) { + return true; + } + } + } false } _ => false, @@ -2386,6 +2399,7 @@ impl Transaction { table_metadata_updates, schema_metadata_updates, field_metadata_updates, + fragment_metadata_updates, } => { if let Some(config_updates) = config_updates { let mut config = manifest.config.clone(); @@ -2507,6 +2521,22 @@ impl Transaction { "the unenforced clustering key is a reserved key and cannot be set to an invalid value", )); } + if !fragment_metadata_updates.is_empty() { + let fragments = Arc::make_mut(&mut manifest.fragments); + for (frag_id, frag_metadata_update) in fragment_metadata_updates { + let fragment = + fragments + .iter_mut() + .find(|f| f.id == *frag_id) + .ok_or_else(|| { + Error::invalid_input(format!( + "Fragment with id {} does not exist", + frag_id + )) + })?; + apply_update_map(&mut fragment.metadata, frag_metadata_update); + } + } } _ => {} } @@ -3193,7 +3223,8 @@ impl TryFrom for Transaction { let has_new_fields = update_config.config_updates.is_some() || update_config.table_metadata_updates.is_some() || update_config.schema_metadata_updates.is_some() - || !update_config.field_metadata_updates.is_empty(); + || !update_config.field_metadata_updates.is_empty() + || !update_config.fragment_metadata_updates.is_empty(); // Check if old-style fields are present let has_old_fields = !update_config.upsert_values.is_empty() @@ -3245,6 +3276,7 @@ impl TryFrom for Transaction { table_metadata_updates: None, schema_metadata_updates, field_metadata_updates, + fragment_metadata_updates: HashMap::new(), } } else { // Use new-style fields directly (convert from protobuf) @@ -3265,6 +3297,13 @@ impl TryFrom for Transaction { (*field_id, UpdateMap::from(pb_update_map)) }) .collect(), + fragment_metadata_updates: update_config + .fragment_metadata_updates + .iter() + .map(|(frag_id, pb_update_map)| { + (*frag_id, UpdateMap::from(pb_update_map)) + }) + .collect(), } } } @@ -3529,6 +3568,7 @@ impl From<&Transaction> for pb::Transaction { table_metadata_updates, schema_metadata_updates, field_metadata_updates, + fragment_metadata_updates, } => pb::transaction::Operation::UpdateConfig(pb::transaction::UpdateConfig { config_updates: config_updates .as_ref() @@ -3545,6 +3585,12 @@ impl From<&Transaction> for pb::Transaction { (*field_id, pb::transaction::UpdateMap::from(update_map)) }) .collect(), + fragment_metadata_updates: fragment_metadata_updates + .iter() + .map(|(frag_id, update_map)| { + (*frag_id, pb::transaction::UpdateMap::from(update_map)) + }) + .collect(), // Leave old fields empty - we only write new-style fields upsert_values: Default::default(), delete_keys: Default::default(), @@ -4151,6 +4197,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }]; let mut next_row_id = 0; @@ -4183,6 +4230,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }]; let mut next_row_id = 100; @@ -4215,6 +4263,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }]; let mut next_row_id = 100; @@ -4250,6 +4299,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }]; let mut next_row_id = 100; @@ -4278,6 +4328,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }, Fragment { id: 2, @@ -4287,6 +4338,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }, ]; let mut next_row_id = 1000; @@ -4331,6 +4383,7 @@ mod tests { deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }]; let mut next_row_id = 0; @@ -4840,6 +4893,7 @@ mod tests { physical_rows: Some(5), last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }; let manifest = make_stable_row_id_manifest(vec![fragment.clone()]); @@ -5108,6 +5162,7 @@ mod tests { row_id_meta: None, last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }; let operation = Operation::Overwrite { @@ -5200,6 +5255,7 @@ mod tests { physical_rows: Some(5), last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }; let mut manifest = Manifest::new( @@ -5272,6 +5328,7 @@ mod tests { physical_rows: Some(5), last_updated_at_version_meta: Some(meta_v1.clone()), created_at_version_meta: None, + metadata: HashMap::new(), }; let mut manifest = Manifest::new( @@ -5291,6 +5348,7 @@ mod tests { physical_rows: Some(5), last_updated_at_version_meta: Some(meta_v1), created_at_version_meta: None, + metadata: HashMap::new(), }; let tx = Transaction::new( @@ -5340,6 +5398,7 @@ mod tests { physical_rows: Some(5), last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }; let manifest = Manifest::new( @@ -5404,6 +5463,7 @@ mod tests { physical_rows: Some(3), last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }; let mut manifest = Manifest::new( @@ -5426,6 +5486,7 @@ mod tests { physical_rows: Some(4), last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }; let tx = Transaction::new( @@ -5490,6 +5551,7 @@ mod tests { RowDatasetVersionMeta::from_sequence(&created_at_seq).unwrap(), ), last_updated_at_version_meta: None, + metadata: HashMap::new(), }; let new_seq = RowIdSequence::from([100u64, 102].as_slice()); @@ -5501,6 +5563,7 @@ mod tests { physical_rows: Some(2), created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }; let manifest = make_stable_row_id_manifest(vec![existing_fragment]); @@ -5545,6 +5608,7 @@ mod tests { RowDatasetVersionMeta::from_sequence(&frag_a_created).unwrap(), ), last_updated_at_version_meta: None, + metadata: HashMap::new(), }, Fragment { id: 2, @@ -5556,6 +5620,7 @@ mod tests { RowDatasetVersionMeta::from_sequence(&frag_b_created).unwrap(), ), last_updated_at_version_meta: None, + metadata: HashMap::new(), }, ]); @@ -5569,6 +5634,7 @@ mod tests { physical_rows: Some(2), created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }; let (result, _) = update_txn(vec![new_fragment]) @@ -5612,6 +5678,7 @@ mod tests { RowDatasetVersionMeta::from_sequence(&existing_created).unwrap(), ), last_updated_at_version_meta: None, + metadata: HashMap::new(), }; // New fragment has row 10 (UPDATE branch) and row 999 (INSERT branch) @@ -5624,6 +5691,7 @@ mod tests { physical_rows: Some(2), created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }; // update_txn uses read_version 4 → new_version is 5 @@ -5670,6 +5738,7 @@ mod tests { RowDatasetVersionMeta::from_sequence(&existing_created).unwrap(), ), last_updated_at_version_meta: None, + metadata: HashMap::new(), }; let new_seq = RowIdSequence::from([10u64, 500, 11, 501].as_slice()); @@ -5681,6 +5750,7 @@ mod tests { physical_rows: Some(4), created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }; // update_txn uses read_version 4 → new_version is 5 @@ -5714,6 +5784,7 @@ mod tests { physical_rows: Some(2), created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }; let new_seq = RowIdSequence::from([50u64].as_slice()); @@ -5725,6 +5796,7 @@ mod tests { physical_rows: Some(1), created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }; let manifest = make_stable_row_id_manifest(vec![existing_fragment]); @@ -5753,6 +5825,7 @@ mod tests { physical_rows: Some(2), created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }; let new_fragment = Fragment { @@ -5763,6 +5836,7 @@ mod tests { physical_rows: Some(3), created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }; let manifest = make_stable_row_id_manifest(vec![existing_fragment]); @@ -5795,6 +5869,7 @@ mod tests { vec![0xFFu8; 8].as_slice(), ))), last_updated_at_version_meta: None, + metadata: HashMap::new(), }; let new_seq = RowIdSequence::from([10u64].as_slice()); @@ -5806,6 +5881,7 @@ mod tests { physical_rows: Some(1), created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }; let manifest = make_stable_row_id_manifest(vec![existing_fragment]); @@ -5849,6 +5925,7 @@ mod tests { RowDatasetVersionMeta::from_sequence(&in_range_created).unwrap(), ), last_updated_at_version_meta: None, + metadata: HashMap::new(), }; // Fragment outside range – IDs [1000, 1001], created_at = 99 (must never appear) @@ -5869,6 +5946,7 @@ mod tests { RowDatasetVersionMeta::from_sequence(&out_of_range_created).unwrap(), ), last_updated_at_version_meta: None, + metadata: HashMap::new(), }; // New fragment rewrites both rows from the in-range fragment @@ -5881,6 +5959,7 @@ mod tests { physical_rows: Some(2), created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }; let manifest = make_stable_row_id_manifest(vec![in_range_frag, out_of_range_frag]); @@ -5919,6 +5998,7 @@ mod tests { physical_rows: Some(3), created_at_version_meta: Some(RowDatasetVersionMeta::from_sequence(&created).unwrap()), last_updated_at_version_meta: None, + metadata: HashMap::new(), }; // New fragment takes the boundary IDs: 10 (min) and 12 (max) @@ -5931,6 +6011,7 @@ mod tests { physical_rows: Some(2), created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }; let manifest = make_stable_row_id_manifest(vec![existing]); @@ -5981,6 +6062,7 @@ mod tests { RowDatasetVersionMeta::from_sequence(&src_created).unwrap(), ), last_updated_at_version_meta: None, + metadata: HashMap::new(), }; // New fragment rewrites all 100 rows preserving their stable IDs. @@ -5993,6 +6075,7 @@ mod tests { physical_rows: Some(100), created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }; let manifest = make_stable_row_id_manifest(vec![src_frag]); @@ -6042,6 +6125,7 @@ mod tests { RowDatasetVersionMeta::from_sequence(&created_a).unwrap(), ), last_updated_at_version_meta: None, + metadata: HashMap::new(), }, Fragment { id: 2, @@ -6053,6 +6137,7 @@ mod tests { RowDatasetVersionMeta::from_sequence(&created_b).unwrap(), ), last_updated_at_version_meta: None, + metadata: HashMap::new(), }, ]); @@ -6066,6 +6151,7 @@ mod tests { physical_rows: Some(2), created_at_version_meta: None, last_updated_at_version_meta: None, + metadata: HashMap::new(), }; let (result, _) = update_txn(vec![new_frag]) @@ -6113,6 +6199,7 @@ mod tests { }), schema_metadata_updates: None, field_metadata_updates: HashMap::new(), + fragment_metadata_updates: HashMap::new(), } } @@ -6127,4 +6214,40 @@ mod tests { assert!(!left.modifies_same_metadata(&different_key)); assert!(left.modifies_same_metadata(&replace)); } + + fn fragment_metadata_update(frag_id: u64, entries: Vec<(&str, Option<&str>)>) -> Operation { + let mut fragment_metadata_updates = HashMap::new(); + fragment_metadata_updates.insert( + frag_id, + UpdateMap { + update_entries: entries.into_iter().map(UpdateMapEntry::from).collect(), + replace: false, + }, + ); + Operation::UpdateConfig { + config_updates: None, + table_metadata_updates: None, + schema_metadata_updates: None, + field_metadata_updates: HashMap::new(), + fragment_metadata_updates, + } + } + + #[test] + fn test_fragment_metadata_conflicts_on_same_fragment() { + let left = fragment_metadata_update(1, vec![("key", Some("a"))]); + let same_frag = fragment_metadata_update(1, vec![("other", Some("b"))]); + let different_frag = fragment_metadata_update(2, vec![("key", Some("c"))]); + + assert!(left.modifies_same_metadata(&same_frag)); + assert!(!left.modifies_same_metadata(&different_frag)); + } + + #[test] + fn test_fragment_metadata_no_conflict_with_table_metadata() { + let frag_update = fragment_metadata_update(1, vec![("key", Some("a"))]); + let table_update = table_metadata_update(vec![("key", Some("b"))], false); + + assert!(!frag_update.modifies_same_metadata(&table_update)); + } } diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 45b78b48bcb..7bcebbf239d 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -499,6 +499,7 @@ mod tests { physical_rows: Some(10), last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), } } diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 67e57ed3320..d42fdf23f95 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -1696,6 +1696,7 @@ mod tests { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }, Fragment { id: 1, @@ -1708,6 +1709,7 @@ mod tests { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }, ]; @@ -1745,6 +1747,7 @@ mod tests { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }, Fragment { id: 1, @@ -1757,6 +1760,7 @@ mod tests { physical_rows: None, last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }, ]; assert_eq!(manifest.fragments.as_ref(), &expected_fragments); @@ -1847,6 +1851,7 @@ mod tests { physical_rows: Some(100), last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), }; Manifest::new( schema, diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 2e0a04be312..2e6c8784bde 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -1100,15 +1100,17 @@ impl<'a> TransactionRebase<'a> { if let Operation::UpdateConfig { schema_metadata_updates, field_metadata_updates, + fragment_metadata_updates, .. } = &self.transaction.operation { match &other_transaction.operation { - Operation::Overwrite { .. } => { - // Updates to schema metadata or field metadata conflict with any kind - // of overwrite. + Operation::Overwrite { .. } | Operation::Restore { .. } => { + // Updates to schema, field, or fragment metadata conflict with + // Overwrite/Restore since they replace all fragments and schema. if schema_metadata_updates.is_some() || !field_metadata_updates.is_empty() + || !fragment_metadata_updates.is_empty() || self .transaction .operation @@ -1134,16 +1136,39 @@ impl<'a> TransactionRebase<'a> { Ok(()) } } + Operation::Delete { + deleted_fragment_ids, + .. + } => { + if fragment_metadata_updates + .keys() + .any(|id| deleted_fragment_ids.contains(id)) + { + Err(self.incompatible_conflict_err(other_transaction, other_version)) + } else { + Ok(()) + } + } + Operation::Update { + removed_fragment_ids, + .. + } => { + if fragment_metadata_updates + .keys() + .any(|id| removed_fragment_ids.contains(id)) + { + Err(self.incompatible_conflict_err(other_transaction, other_version)) + } else { + Ok(()) + } + } Operation::Append { .. } | Operation::Clone { .. } - | Operation::Delete { .. } | Operation::CreateIndex { .. } | Operation::Rewrite { .. } | Operation::DataReplacement { .. } | Operation::Merge { .. } - | Operation::Restore { .. } | Operation::ReserveFragments { .. } - | Operation::Update { .. } | Operation::Project { .. } | Operation::UpdateMemWalState { .. } | Operation::UpdateBases { .. } => Ok(()), @@ -1800,6 +1825,7 @@ mod tests { table_metadata_updates: None, schema_metadata_updates, field_metadata_updates, + fragment_metadata_updates: HashMap::new(), } } diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index ae6d2a67d4b..d12bf048163 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::collections::HashMap; use std::sync::Arc; use lance_core::utils::tempfile::{TempDir, TempStrDir}; @@ -246,6 +247,7 @@ impl TestDatasetGenerator { physical_rows: Some(batch.num_rows()), last_updated_at_version_meta: None, created_at_version_meta: None, + metadata: HashMap::new(), } } }