Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 247 additions & 0 deletions rust/lance/src/dataset/write/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8589,6 +8589,131 @@ MergeInsert: on=[id], when_matched=DoNothing, when_not_matched=InsertAll, when_n
}
}

// Regression test for GitHub issue #6877.
//
// Two sequential full-schema merge_insert UpdateAll calls against the same
// target row, on a dataset with stable_row_ids enabled and a BTREE scalar
// index on the join column, used to fail on the second call with
// "Ambiguous merge inserts are prohibited" — even though each call's
// source had exactly one row per key.
//
// Mechanism: with stable row ids the BTREE stores stable_row_ids (not
// physical addresses). After the first merge_insert, A's stable_row_id is
// preserved but its physical home moves to an unindexed fragment. The
// BTREE-side TakeExec resolves the stable_row_id to A's new location and
// emits a row; the unindexed-fragments scan also covers the new fragment
// and emits the same logical row. Both surface the same `_rowid`, so the
// merge_insert source-dedup HashSet sees a duplicate and aborts.
//
// Fix: thread `restrict_to_fragments` into `do_create_deletion_mask_row_id`
// so the allow-list only contains stable_row_ids whose current physical
// home is inside the index's fragment_bitmap.
#[tokio::test]
async fn test_issue_6877_repeated_merge_insert_stable_row_ids() {
use arrow_array::Int32Array;

let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("value", DataType::Int32, false),
]));

let initial = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["A", "B", "C"])),
Arc::new(Int32Array::from(vec![1, 2, 3])),
],
)
.unwrap();

let mut ds = Dataset::write(
Box::new(RecordBatchIterator::new([Ok(initial)], schema.clone())),
"memory://test_6877",
Some(WriteParams {
mode: WriteMode::Overwrite,
enable_stable_row_ids: true,
..Default::default()
}),
)
.await
.unwrap();

ds.create_index(
&["id"],
IndexType::Scalar,
None,
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();

// First merge_insert: A 1 -> 11.
let update_a = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["A"])),
Arc::new(Int32Array::from(vec![11])),
],
)
.unwrap();
let (ds, _) = MergeInsertBuilder::try_new(Arc::new(ds), vec!["id".to_string()])
.unwrap()
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::DoNothing)
.try_build()
.unwrap()
.execute_reader(Box::new(RecordBatchIterator::new(
[Ok(update_a)],
schema.clone(),
)))
.await
.unwrap();

// Second merge_insert: A 11 -> 22. Used to fail before the fix.
let update_a_again = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["A"])),
Arc::new(Int32Array::from(vec![22])),
],
)
.unwrap();
let (ds, _) = MergeInsertBuilder::try_new(ds, vec!["id".to_string()])
.unwrap()
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::DoNothing)
.try_build()
.unwrap()
.execute_reader(Box::new(RecordBatchIterator::new(
[Ok(update_a_again)],
schema.clone(),
)))
.await
.unwrap();

// Sanity check: A's value is now 22.
let batches = ds
.scan()
.filter("id = 'A'")
.unwrap()
.try_into_stream()
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
let combined = concat_batches(&schema, &batches).unwrap();
assert_eq!(combined.num_rows(), 1);
let values = combined
.column_by_name("value")
.unwrap()
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(values.value(0), 22);
}

// Regression test: partial-schema merge_insert followed by update (deleting all rows
// in a fragment) followed by partial merge_insert should not produce
// "fragment id N does not exist" errors.
Expand Down Expand Up @@ -8945,6 +9070,128 @@ MergeInsert: on=[id], when_matched=DoNothing, when_not_matched=InsertAll, when_n
);
}

// Companion regression test for issue #6877 on the FTS path.
//
// The FTS prefilter shares `do_create_deletion_mask_row_id` with the
// scalar-index path, so the same stable-row-id bypass that produced
// duplicate rows in merge_insert can produce duplicate hits in FTS search
// after a merge_insert moves rows to unindexed fragments. This test pins
// the contract for the FTS consumer.
#[tokio::test]
async fn test_issue_6877_fts_no_duplicates_stable_row_ids() {
let rows_per_frag = 10usize;
let num_frags = 3usize;

let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("text", DataType::Utf8, false),
]));

let make_batch = |frag_idx: usize| {
let start = frag_idx * rows_per_frag;
let ids: Vec<String> = (start..start + rows_per_frag)
.map(|j| format!("id-{j:04}"))
.collect();
let texts: Vec<String> = (start..start + rows_per_frag)
.map(|j| format!("common unique{j:04}"))
.collect();
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(texts)),
],
)
.unwrap()
};

let batch0 = make_batch(0);
let reader = Box::new(RecordBatchIterator::new([Ok(batch0)], schema.clone()));
let mut ds = Dataset::write(
reader,
"memory://fts_stable_row_id_test",
Some(WriteParams {
mode: WriteMode::Overwrite,
enable_stable_row_ids: true,
..Default::default()
}),
)
.await
.unwrap();
for frag_idx in 1..num_frags {
let batch = make_batch(frag_idx);
let reader = Box::new(RecordBatchIterator::new([Ok(batch)], schema.clone()));
ds.append(reader, None).await.unwrap();
}

let params = InvertedIndexParams::default();
ds.create_index(&["text"], IndexType::Inverted, None, &params, true)
.await
.unwrap();

// Full-schema merge_insert rewriting fragment 1's rows. After this,
// the original locations are tombstoned and the new locations live in
// a new (unindexed) fragment; the stable_row_ids are preserved.
let frag1_start = rows_per_frag;
let ids: Vec<String> = (frag1_start..frag1_start + rows_per_frag)
.map(|j| format!("id-{j:04}"))
.collect();
let texts: Vec<String> = (frag1_start..frag1_start + rows_per_frag)
.map(|j| format!("common updated{j:04}"))
.collect();
let update_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(texts)),
],
)
.unwrap();
let reader = Box::new(RecordBatchIterator::new([Ok(update_batch)], schema.clone()));
let (ds, _) = MergeInsertBuilder::try_new(Arc::new(ds), vec!["id".to_string()])
.unwrap()
.when_matched(WhenMatched::UpdateAll)
.when_not_matched(WhenNotMatched::DoNothing)
.try_build()
.unwrap()
.execute_reader(reader)
.await
.unwrap();

// FTS search for "common" — every row should match exactly once.
let query = FullTextSearchQuery::new("common".to_string());
let results = ds
.scan()
.full_text_search(query)
.unwrap()
.try_into_batch()
.await
.unwrap();

let ids = results
.column_by_name("id")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let unique_ids: std::collections::HashSet<&str> =
(0..ids.len()).map(|i| ids.value(i)).collect();
assert_eq!(
unique_ids.len(),
ids.len(),
"Found duplicate ids in FTS results: {} unique out of {} total",
unique_ids.len(),
ids.len()
);
assert_eq!(
unique_ids.len(),
rows_per_frag * num_frags,
"Expected {} rows but got {}",
rows_per_frag * num_frags,
unique_ids.len()
);
}

// Regression test: after a partial-schema merge_insert invalidates a fragment,
// compaction should succeed and subsequent searches should return correct results.
//
Expand Down
101 changes: 95 additions & 6 deletions rust/lance/src/index/prefilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,33 @@ impl DatasetPreFilter {
}

#[instrument(level = "debug", skip_all)]
async fn do_create_deletion_mask_row_id(dataset: Arc<Dataset>) -> Result<Arc<RowAddrMask>> {
// This can only be computed as an allow list, since we have no idea
// what the row ids were in the missing fragments.
async fn do_create_deletion_mask_row_id(
dataset: Arc<Dataset>,
restrict_to: Option<RoaringBitmap>,
) -> Result<Arc<RowAddrMask>> {
// The mask is an allow-list of stable row ids. When `restrict_to` is
// set the iteration is limited to the listed fragments, so the
// resulting list excludes stable row ids whose *current* physical home
// is outside the restriction. This is the missing piece for the
// stable-row-id branch of #6563: without it, the merge_insert UNION
// (indexed scan ∪ unindexed-fragments scan) sees the same logical row
// twice — once via the BTREE (which holds the row's stable_row_id) and
// once via the unindexed scan (which holds the fragment the row now
// lives in). See issue #6877.
async fn load_row_ids_and_deletions(
dataset: &Dataset,
restrict_to: Option<&RoaringBitmap>,
) -> Result<Vec<(Arc<RowIdSequence>, Option<Arc<DeletionVector>>)>> {
stream::iter(dataset.get_fragments())
let frags: Vec<_> = dataset
.get_fragments()
.into_iter()
.filter(|f| {
restrict_to
.map(|allow| allow.contains(f.id() as u32))
.unwrap_or(true)
})
.collect();
stream::iter(frags)
.map(|frag| async move {
let row_ids = load_row_id_sequence(dataset, frag.metadata());
let deletion_vector = frag.get_deletion_vector();
Expand All @@ -145,16 +165,31 @@ impl DatasetPreFilter {
.await
}

let restrict_hash = restrict_to.as_ref().map(|b| {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut h = DefaultHasher::new();
// RoaringBitmap doesn't implement Hash; serialize the sorted u32s.
for v in b.iter() {
v.hash(&mut h);
}
h.finish()
});

let dataset_clone = dataset.clone();
let restrict_for_load = restrict_to.clone();
let key = crate::session::caches::RowAddrMaskKey {
version: dataset.manifest().version,
restrict_hash,
};
dataset
.metadata_cache
.as_ref()
.get_or_insert_with_key(key, move || {
async move {
let row_ids_and_deletions = load_row_ids_and_deletions(&dataset_clone).await?;
let row_ids_and_deletions =
load_row_ids_and_deletions(&dataset_clone, restrict_for_load.as_ref())
.await?;

// The process of computing the final mask is CPU-bound, so we spawn it
// on a blocking thread.
Expand Down Expand Up @@ -268,7 +303,12 @@ impl DatasetPreFilter {
if missing_frags.is_empty() && frags_with_deletion_files.is_empty() && !needs_allow_list {
None
} else if dataset.manifest.uses_stable_row_ids() {
Some(Self::do_create_deletion_mask_row_id(dataset.clone()).boxed())
let restrict_to = if restrict_to_fragments {
Some(fragments)
} else {
None
};
Some(Self::do_create_deletion_mask_row_id(dataset.clone(), restrict_to).boxed())
} else if missing_frags.is_empty() && frags_with_deletion_files.is_empty() {
// No deletions to load, but the dataset has fragments outside the
// index bitmap. Return a synchronous allow-list mask.
Expand Down Expand Up @@ -529,4 +569,53 @@ mod test {
let mask = mask.unwrap().await.unwrap();
assert_eq!(mask.allow_list().and_then(|x| x.len()), Some(3)); // There were three rows left over;
}

// Regression test for issue #6877.
//
// `create_restricted_deletion_mask` on a stable-row-id dataset must honor
// the bitmap restriction by excluding stable row ids whose *current*
// physical home is outside the bitmap. Without this, the merge_insert
// UNION (indexed-scan ∪ unindexed-fragments scan) emits the same logical
// row twice — once via the BTREE (which holds the row's stable_row_id)
// and once via the unindexed scan.
#[tokio::test]
async fn test_restricted_deletion_mask_stable_row_id_honors_bitmap() {
// Dataset with three fragments, 3 rows each, stable_row_ids = {0..9}.
// Row x=8 is deleted, so the live stable_row_ids are {0..7}.
let datasets = test_datasets(true).await;
let ds = datasets.deletions_no_missing_frags.clone();

// Full bitmap: allow-list covers all currently-live stable row ids.
let mask = DatasetPreFilter::create_restricted_deletion_mask(
ds.clone(),
RoaringBitmap::from_iter(0..3),
)
.expect("full-bitmap mask present on stable-row-id dataset with deletions")
.await
.unwrap();
let expected_all = RowAddrTreeMap::from_iter(0..8);
assert_eq!(mask.allow_list(), Some(&expected_all));

// Restricted to fragments {0, 1}: allow-list must exclude stable row
// ids whose current home is in fragment 2 (rows 6, 7 — 8 was deleted).
let mask = DatasetPreFilter::create_restricted_deletion_mask(
ds.clone(),
RoaringBitmap::from_iter(0..2),
)
.expect("restricted mask present")
.await
.unwrap();
let expected_restricted = RowAddrTreeMap::from_iter(0..6);
assert_eq!(mask.allow_list(), Some(&expected_restricted));

// Restricted to empty bitmap: every BTREE-returned address is filtered
// out. Empty allow-list is the correct semantic ("no row's current home
// is in the restriction").
let mask =
DatasetPreFilter::create_restricted_deletion_mask(ds.clone(), RoaringBitmap::new())
.expect("empty-restriction mask present")
.await
.unwrap();
assert_eq!(mask.allow_list().and_then(|x| x.len()), Some(0));
}
}
Loading
Loading