-
-
Notifications
You must be signed in to change notification settings - Fork 216
feat: Add RebuildHNSWIndex command for index maintenance #833
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
ac2411c
c1c9514
01adafe
8e4aa54
2290341
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -640,6 +640,84 @@ impl HNSW for VectorCore { | |||||||||||||
| Ok(query) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /// Reconnect an existing vector to the HNSW graph without changing its ID. | ||||||||||||||
| /// Used for index rebuilding after deletions cause graph fragmentation. | ||||||||||||||
| fn reconnect_vector<'db, 'arena, 'txn, F>( | ||||||||||||||
| &'db self, | ||||||||||||||
| txn: &'txn mut RwTxn<'db>, | ||||||||||||||
| vector: &HVector<'arena>, | ||||||||||||||
| arena: &'arena bumpalo::Bump, | ||||||||||||||
| ) -> Result<(), VectorError> | ||||||||||||||
| where | ||||||||||||||
| F: Fn(&HVector<'arena>, &RoTxn<'db>) -> bool, | ||||||||||||||
| 'db: 'arena, | ||||||||||||||
| 'arena: 'txn, | ||||||||||||||
| { | ||||||||||||||
| let new_level = vector.level; // Keep existing level | ||||||||||||||
| let label = vector.label; | ||||||||||||||
|
|
||||||||||||||
| let entry_point = match self.get_entry_point(txn, label, arena) { | ||||||||||||||
| Ok(ep) => ep, | ||||||||||||||
| Err(_) => { | ||||||||||||||
| // First vector becomes entry point | ||||||||||||||
| self.set_entry_point(txn, vector)?; | ||||||||||||||
| return Ok(()); | ||||||||||||||
| } | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| let l = entry_point.level; | ||||||||||||||
| let mut curr_ep = entry_point; | ||||||||||||||
|
|
||||||||||||||
| // Navigate to insertion level | ||||||||||||||
| for level in (new_level + 1..=l).rev() { | ||||||||||||||
| let mut nearest = | ||||||||||||||
| self.search_level::<F>(txn, label, vector, &mut curr_ep, 1, level, None, arena)?; | ||||||||||||||
| if let Some(closest) = nearest.pop() { | ||||||||||||||
| curr_ep = closest; | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Connect at each level | ||||||||||||||
| for level in (0..=l.min(new_level)).rev() { | ||||||||||||||
| let nearest = self.search_level::<F>( | ||||||||||||||
| txn, | ||||||||||||||
| label, | ||||||||||||||
| vector, | ||||||||||||||
| &mut curr_ep, | ||||||||||||||
| self.config.ef_construct, | ||||||||||||||
| level, | ||||||||||||||
| None, | ||||||||||||||
| arena, | ||||||||||||||
| )?; | ||||||||||||||
|
|
||||||||||||||
| if let Some(closest) = nearest.peek() { | ||||||||||||||
| curr_ep = *closest; | ||||||||||||||
| } | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Silent failure allows continuing with stale
Suggested change
|
||||||||||||||
|
|
||||||||||||||
| let neighbors = | ||||||||||||||
| self.select_neighbors::<F>(txn, label, vector, nearest, level, true, None, arena)?; | ||||||||||||||
| self.set_neighbours(txn, vector.id, &neighbors, level)?; | ||||||||||||||
|
|
||||||||||||||
| // Update neighbors' connections | ||||||||||||||
| for e in neighbors { | ||||||||||||||
| let e_conns = BinaryHeap::from( | ||||||||||||||
| arena, | ||||||||||||||
| self.get_neighbors::<F>(txn, label, e.id, level, None, arena)?, | ||||||||||||||
| ); | ||||||||||||||
| let e_new_conn = | ||||||||||||||
| self.select_neighbors::<F>(txn, label, vector, e_conns, level, true, None, arena)?; | ||||||||||||||
| self.set_neighbours(txn, e.id, &e_new_conn, level)?; | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Update entry point if this vector has higher level | ||||||||||||||
| if new_level > l { | ||||||||||||||
| self.set_entry_point(txn, vector)?; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| Ok(()) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| fn delete(&self, txn: &mut RwTxn, id: u128, arena: &bumpalo::Bump) -> Result<(), VectorError> { | ||||||||||||||
| match self.get_vector_properties(txn, id, arena)? { | ||||||||||||||
| Some(mut properties) => { | ||||||||||||||
|
|
||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| use std::sync::Arc; | ||
|
|
||
| use sonic_rs::json; | ||
|
|
||
| use crate::helix_engine::types::GraphError; | ||
| use crate::helix_engine::vector_core::hnsw::HNSW; | ||
| use crate::helix_engine::vector_core::vector_core::ENTRY_POINT_KEY; | ||
| use crate::helix_gateway::router::router::{Handler, HandlerInput, HandlerSubmission}; | ||
| use crate::protocol; | ||
|
|
||
| const BATCH_SIZE: usize = 50; | ||
|
|
||
| /// Rebuild the HNSW index by reconnecting all vectors. | ||
| /// This fixes graph fragmentation caused by deletions and re-insertions. | ||
| /// Processes in batches to avoid OOM. | ||
| pub fn rebuild_hnsw_index_inner(input: HandlerInput) -> Result<protocol::Response, GraphError> { | ||
| eprintln!("[RebuildHNSWIndex] Starting rebuild..."); | ||
|
|
||
| let db = Arc::clone(&input.graph.storage); | ||
|
|
||
| // Step 1: Get vector IDs only (to avoid loading all vector data at once) | ||
| eprintln!("[RebuildHNSWIndex] Counting vectors..."); | ||
| let vector_ids: Vec<u128> = { | ||
| let txn = db.graph_env.read_txn().map_err(GraphError::from)?; | ||
| let arena = bumpalo::Bump::new(); | ||
| let vectors = db | ||
| .vectors | ||
| .get_all_vectors(&txn, None, &arena) | ||
| .map_err(|e| GraphError::New(format!("Failed to get vectors: {}", e)))?; | ||
| vectors.iter().map(|v| v.id).collect() | ||
| }; | ||
|
Comment on lines
+45
to
+50
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Loading all vectors into memory defeats the purpose of the batch processing mentioned in the PR description. Consider iterating over vector IDs directly from the database using a prefix iterator (similar to the existing implementation in |
||
| let vector_count = vector_ids.len(); | ||
| eprintln!("[RebuildHNSWIndex] Found {} vectors", vector_count); | ||
|
|
||
| if vector_count == 0 { | ||
| return Ok(protocol::Response { | ||
| body: sonic_rs::to_vec(&json!({ | ||
| "status": "success", | ||
| "message": "No vectors to rebuild", | ||
| "vectors_rebuilt": 0 | ||
| })) | ||
| .map_err(|e| GraphError::New(e.to_string()))?, | ||
| fmt: Default::default(), | ||
| }); | ||
| } | ||
|
|
||
| // Step 2: Clear all HNSW edges | ||
| { | ||
| let mut txn = db.graph_env.write_txn().map_err(GraphError::from)?; | ||
| eprintln!("[RebuildHNSWIndex] Clearing HNSW edges..."); | ||
| db.vectors | ||
| .edges_db | ||
| .clear(&mut txn) | ||
| .map_err(|e| GraphError::New(format!("Failed to clear edges: {}", e)))?; | ||
|
|
||
| eprintln!("[RebuildHNSWIndex] Clearing entry point..."); | ||
| let _ = db.vectors.vectors_db.delete(&mut txn, ENTRY_POINT_KEY); | ||
| txn.commit().map_err(GraphError::from)?; | ||
| } | ||
|
|
||
| // Step 3: Reconnect vectors one at a time (fresh arena each to control memory) | ||
| eprintln!("[RebuildHNSWIndex] Reconnecting {} vectors...", vector_count); | ||
|
|
||
| for (i, &vector_id) in vector_ids.iter().enumerate() { | ||
| if i % 500 == 0 { | ||
| eprintln!("[RebuildHNSWIndex] Progress: {}/{} ({:.1}%)", | ||
| i, vector_count, (i as f64 / vector_count as f64) * 100.0); | ||
| } | ||
|
|
||
| // Fresh arena for each vector to prevent memory growth | ||
| let arena = bumpalo::Bump::new(); | ||
| let mut txn = db.graph_env.write_txn().map_err(GraphError::from)?; | ||
|
|
||
| match db.vectors.get_full_vector(&txn, vector_id, &arena) { | ||
| Ok(v) => { | ||
| db.vectors | ||
| .reconnect_vector::<fn(&_, &_) -> bool>(&mut txn, &v, &arena) | ||
| .map_err(|e| GraphError::New(format!("Failed to reconnect vector {}: {}", vector_id, e)))?; | ||
| } | ||
| Err(e) => { | ||
| eprintln!("[RebuildHNSWIndex] Warning: skipping vector {}: {}", vector_id, e); | ||
| } | ||
| } | ||
|
|
||
| txn.commit().map_err(GraphError::from)?; | ||
| // Arena dropped here, memory freed | ||
| } | ||
|
|
||
| eprintln!("[RebuildHNSWIndex] Rebuild complete! {} vectors reconnected", vector_count); | ||
| Ok(protocol::Response { | ||
| body: sonic_rs::to_vec(&json!({ | ||
| "status": "success", | ||
| "vectors_rebuilt": vector_count | ||
| })) | ||
| .map_err(|e| GraphError::New(e.to_string()))?, | ||
| fmt: Default::default(), | ||
| }) | ||
| } | ||
|
|
||
| inventory::submit! { | ||
| HandlerSubmission( | ||
| Handler::new("RebuildHNSWIndex", rebuild_hnsw_index_inner, true) | ||
| ) | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.