Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 150 additions & 4 deletions rust/lance/src/index/append.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashMap;
use std::sync::Arc;

use futures::{FutureExt, TryStreamExt};
Expand Down Expand Up @@ -49,7 +50,15 @@ async fn build_stable_row_id_filter(
// Instead, we:
// 1) keep only fragments still considered "effective" for the old index, and
// 2) load their persisted row-id sequences from dataset metadata, then
// 3) build one exact allow-list used to retain only still-valid old rows.
// 3) drop rows tombstoned by each fragment's deletion vector, and
// 4) build one exact allow-list used to retain only still-valid old rows.
//
// Step 3 is load-bearing: a `merge_insert` UPDATE tombstones the old physical
// row but keeps its stable ID in the fragment's row-id sequence, while a fresh
// entry for that same stable ID arrives in the unindexed delta. Without masking
// the deleted positions the stale entry survives the optimize merge alongside
// the new one, leaving two rows with the same stable ID on the merged page and
// tripping the sorted-iter invariant in `FlatIndex::try_new`.
let retained_frags = dataset
.manifest
.fragments
Expand All @@ -66,10 +75,32 @@ async fn build_stable_row_id_filter(
.try_collect::<Vec<_>>()
.await?;

let row_id_maps = row_id_sequences
let fragments = dataset.get_fragments();
let fragment_map = fragments
.iter()
.map(|(_, seq)| RowAddrTreeMap::from(seq.as_ref()))
.collect::<Vec<_>>();
.map(|frag| (frag.id() as u32, frag))
.collect::<HashMap<_, _>>();

let mut row_id_maps = Vec::with_capacity(row_id_sequences.len());
for (fragment_id, sequence) in row_id_sequences {
let fragment = fragment_map
.get(&fragment_id)
.expect("retained fragment should exist in dataset");
let deletion_vector = if fragment.metadata().deletion_file.is_some() {
fragment.get_deletion_vector().await?
} else {
None
};
let map = match deletion_vector {
Some(dv) if !dv.is_empty() => {
let mut live_sequence = sequence.as_ref().clone();
live_sequence.mask(dv.to_sorted_iter())?;
RowAddrTreeMap::from(&live_sequence)
}
_ => RowAddrTreeMap::from(sequence.as_ref()),
};
row_id_maps.push(map);
}
let row_id_map_refs = row_id_maps.iter().collect::<Vec<_>>();

// Merge all fragment-local row-id sets into one exact membership structure.
Expand Down Expand Up @@ -1276,4 +1307,119 @@ mod tests {
let dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap();
assert_eq!(query_id_count(&dataset, "song-42").await, 1);
}

#[tokio::test]
async fn test_optimize_btree_after_merge_insert_update_with_stable_row_ids() {
// Regression test: optimize_indices after a merge_insert UPDATE on a
// stable-row-id dataset (with no intervening compaction) used to leave two
// entries with the same stable row ID on the merged BTREE page, tripping
// `RowAddrTreeMap::from_sorted_iter` on the next index-using read. The
// updated rows keep their stable IDs in the original fragment's row-id
// sequence (tombstoned only by the deletion vector) while a fresh copy
// arrives in the unindexed delta.
let test_dir = TempStrDir::default();
let test_uri = test_dir.as_str();

let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::UInt32, false),
Field::new("v", DataType::UInt32, false),
]));
let make_batch = |start: u32, end: u32, voffset: u32| {
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(UInt32Array::from_iter_values(start..end)),
Arc::new(UInt32Array::from_iter_values(
(start..end).map(|i| i + voffset),
)),
],
)
.unwrap()
};

let reader = RecordBatchIterator::new(vec![Ok(make_batch(0, 1000, 0))], schema.clone());
let mut dataset = Dataset::write(
reader,
test_uri,
Some(WriteParams {
max_rows_per_file: 1000,
enable_stable_row_ids: true,
..Default::default()
}),
)
.await
.unwrap();
assert!(dataset.manifest.uses_stable_row_ids());

dataset
.create_index(
&["id"],
IndexType::BTree,
Some("id_idx".into()),
&ScalarIndexParams::default(),
true,
)
.await
.unwrap();

// UPDATE the first 250 rows (same id, new v) via merge_insert.
let merge_job =
MergeInsertBuilder::try_new(Arc::new(dataset.clone()), vec!["id".to_string()])
.unwrap()
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::InsertAll)
.try_build()
.unwrap();
let new_reader = Box::new(RecordBatchIterator::new(
[Ok(make_batch(0, 250, 1_000_000))],
schema.clone(),
));
let (_, stats) = merge_job
.execute(reader_to_stream(new_reader))
.await
.unwrap();
assert_eq!(stats.num_updated_rows, 250);
assert_eq!(stats.num_inserted_rows, 0);

// Optimize WITHOUT compacting first — this is the trigger sequence.
let mut dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap();
dataset
.optimize_indices(&OptimizeOptions::default())
.await
.unwrap();

let dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap();
assert!(
dataset
.unindexed_fragments("id_idx")
.await
.unwrap()
.is_empty()
);

// Index-using reads must not panic and must return correct results.
assert_eq!(
dataset
.count_rows(Some("id > 100".to_string()))
.await
.unwrap(),
899
);

// Every id resolves to exactly one row (no duplicate stable IDs survived
// the optimize merge), spanning updated, boundary, and untouched rows.
for id in [0u32, 1, 249, 250, 999] {
let matched = dataset
.scan()
.filter(&format!("id = {id}"))
.unwrap()
.project(&["id"])
.unwrap()
.try_into_batch()
.await
.unwrap()
.num_rows();
assert_eq!(matched, 1, "id {id} should match exactly one row");
}
}
}
Loading