-
-
Notifications
You must be signed in to change notification settings - Fork 216
feat: Add RebuildHNSWIndex command for index maintenance #833
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
ac2411c
c1c9514
01adafe
8e4aa54
2290341
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -487,6 +487,39 @@ impl VectorCore { | |||||||||||||
|
|
||||||||||||||
| Ok(vectors) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /// Get all vector IDs from the database without loading full vector data. | ||||||||||||||
| /// This is memory-efficient for operations that only need IDs. | ||||||||||||||
| pub fn get_all_vector_ids<'db, 'txn>( | ||||||||||||||
| &self, | ||||||||||||||
| txn: &'txn RoTxn<'db>, | ||||||||||||||
| ) -> Result<Vec<u128>, VectorError> { | ||||||||||||||
| let mut ids = Vec::new(); | ||||||||||||||
| let mut seen = std::collections::HashSet::new(); | ||||||||||||||
|
|
||||||||||||||
| // Iterate over all vectors in the database | ||||||||||||||
| let prefix_iter = self.vectors_db.prefix_iter(txn, VECTOR_PREFIX)?; | ||||||||||||||
|
|
||||||||||||||
| for result in prefix_iter { | ||||||||||||||
| let (key, _) = result?; | ||||||||||||||
|
|
||||||||||||||
| // Extract id from the key: v: (2 bytes) + id (16 bytes) + level (8 bytes) | ||||||||||||||
| if key.len() < VECTOR_PREFIX.len() + 16 { | ||||||||||||||
| continue; // Skip malformed keys | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| let mut id_bytes = [0u8; 16]; | ||||||||||||||
| id_bytes.copy_from_slice(&key[VECTOR_PREFIX.len()..VECTOR_PREFIX.len() + 16]); | ||||||||||||||
| let id = u128::from_be_bytes(id_bytes); | ||||||||||||||
|
|
||||||||||||||
| // Deduplicate (same ID may have multiple level entries) | ||||||||||||||
| if seen.insert(id) { | ||||||||||||||
| ids.push(id); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| Ok(ids) | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| impl HNSW for VectorCore { | ||||||||||||||
|
|
@@ -640,6 +673,84 @@ impl HNSW for VectorCore { | |||||||||||||
| Ok(query) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /// Reconnect an existing vector to the HNSW graph without changing its ID. | ||||||||||||||
| /// Used for index rebuilding after deletions cause graph fragmentation. | ||||||||||||||
| fn reconnect_vector<'db, 'arena, 'txn, F>( | ||||||||||||||
| &'db self, | ||||||||||||||
| txn: &'txn mut RwTxn<'db>, | ||||||||||||||
| vector: &HVector<'arena>, | ||||||||||||||
| arena: &'arena bumpalo::Bump, | ||||||||||||||
| ) -> Result<(), VectorError> | ||||||||||||||
| where | ||||||||||||||
| F: Fn(&HVector<'arena>, &RoTxn<'db>) -> bool, | ||||||||||||||
| 'db: 'arena, | ||||||||||||||
| 'arena: 'txn, | ||||||||||||||
| { | ||||||||||||||
| let new_level = vector.level; // Keep existing level | ||||||||||||||
| let label = vector.label; | ||||||||||||||
|
|
||||||||||||||
| let entry_point = match self.get_entry_point(txn, label, arena) { | ||||||||||||||
| Ok(ep) => ep, | ||||||||||||||
| Err(_) => { | ||||||||||||||
| // First vector becomes entry point | ||||||||||||||
| self.set_entry_point(txn, vector)?; | ||||||||||||||
| return Ok(()); | ||||||||||||||
| } | ||||||||||||||
| }; | ||||||||||||||
|
|
||||||||||||||
| let l = entry_point.level; | ||||||||||||||
| let mut curr_ep = entry_point; | ||||||||||||||
|
|
||||||||||||||
| // Navigate to insertion level | ||||||||||||||
| for level in (new_level + 1..=l).rev() { | ||||||||||||||
| let mut nearest = | ||||||||||||||
| self.search_level::<F>(txn, label, vector, &mut curr_ep, 1, level, None, arena)?; | ||||||||||||||
| if let Some(closest) = nearest.pop() { | ||||||||||||||
| curr_ep = closest; | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Connect at each level | ||||||||||||||
| for level in (0..=l.min(new_level)).rev() { | ||||||||||||||
| let nearest = self.search_level::<F>( | ||||||||||||||
| txn, | ||||||||||||||
| label, | ||||||||||||||
| vector, | ||||||||||||||
| &mut curr_ep, | ||||||||||||||
| self.config.ef_construct, | ||||||||||||||
| level, | ||||||||||||||
| None, | ||||||||||||||
| arena, | ||||||||||||||
| )?; | ||||||||||||||
|
|
||||||||||||||
| if let Some(closest) = nearest.peek() { | ||||||||||||||
| curr_ep = *closest; | ||||||||||||||
| } | ||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Silent failure allows continuing with stale
Suggested change
|
||||||||||||||
|
|
||||||||||||||
| let neighbors = | ||||||||||||||
| self.select_neighbors::<F>(txn, label, vector, nearest, level, true, None, arena)?; | ||||||||||||||
| self.set_neighbours(txn, vector.id, &neighbors, level)?; | ||||||||||||||
|
|
||||||||||||||
| // Update neighbors' connections | ||||||||||||||
| for e in neighbors { | ||||||||||||||
| let e_conns = BinaryHeap::from( | ||||||||||||||
| arena, | ||||||||||||||
| self.get_neighbors::<F>(txn, label, e.id, level, None, arena)?, | ||||||||||||||
| ); | ||||||||||||||
| let e_new_conn = | ||||||||||||||
| self.select_neighbors::<F>(txn, label, vector, e_conns, level, true, None, arena)?; | ||||||||||||||
| self.set_neighbours(txn, e.id, &e_new_conn, level)?; | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Update entry point if this vector has higher level | ||||||||||||||
| if new_level > l { | ||||||||||||||
| self.set_entry_point(txn, vector)?; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| Ok(()) | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| fn delete(&self, txn: &mut RwTxn, id: u128, arena: &bumpalo::Bump) -> Result<(), VectorError> { | ||||||||||||||
| match self.get_vector_properties(txn, id, arena)? { | ||||||||||||||
| Some(mut properties) => { | ||||||||||||||
|
|
||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,127 @@ | ||
| use std::sync::Arc; | ||
|
|
||
| use sonic_rs::json; | ||
|
|
||
| use crate::helix_engine::types::GraphError; | ||
| use crate::helix_engine::vector_core::hnsw::HNSW; | ||
| use crate::helix_engine::vector_core::vector_core::ENTRY_POINT_KEY; | ||
| use crate::helix_gateway::router::router::{Handler, HandlerInput, HandlerSubmission}; | ||
| use crate::protocol; | ||
|
|
||
| const DEFAULT_BATCH_SIZE: usize = 5; | ||
|
|
||
| /// Rebuild the HNSW index by reconnecting all vectors. | ||
| /// This fixes graph fragmentation caused by deletions and re-insertions. | ||
| /// | ||
| /// Request body (optional): | ||
| /// - batch_size: Number of vectors to process per transaction (default: 5) | ||
| /// | ||
| /// Example: {"batch_size": 10} | ||
| pub fn rebuild_hnsw_index_inner(input: HandlerInput) -> Result<protocol::Response, GraphError> { | ||
| eprintln!("[RebuildHNSWIndex] Starting rebuild..."); | ||
|
|
||
| // Parse batch_size from request body (default to DEFAULT_BATCH_SIZE) | ||
| let batch_size: usize = if input.request.body.is_empty() { | ||
| DEFAULT_BATCH_SIZE | ||
| } else { | ||
| match sonic_rs::from_slice::<sonic_rs::Value>(&input.request.body) { | ||
| Ok(val) => val | ||
| .get("batch_size") | ||
| .and_then(|v| v.as_u64()) | ||
| .map(|v| v as usize) | ||
| .unwrap_or(DEFAULT_BATCH_SIZE), | ||
| Err(_) => DEFAULT_BATCH_SIZE, | ||
| } | ||
| }; | ||
|
|
||
| // Ensure batch_size is at least 1 | ||
| let batch_size = batch_size.max(1); | ||
| eprintln!("[RebuildHNSWIndex] Using batch size: {}", batch_size); | ||
|
|
||
| let db = Arc::clone(&input.graph.storage); | ||
|
|
||
| // Step 1: Get vector IDs only (memory-efficient, doesn't load vector data) | ||
| eprintln!("[RebuildHNSWIndex] Collecting vector IDs..."); | ||
| let vector_ids: Vec<u128> = { | ||
| let txn = db.graph_env.read_txn().map_err(GraphError::from)?; | ||
| db.vectors | ||
| .get_all_vector_ids(&txn) | ||
| .map_err(|e| GraphError::New(format!("Failed to get vector IDs: {}", e)))? | ||
| }; | ||
|
Comment on lines
+45
to
+50
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Loading all vectors into memory defeats the purpose of the batch processing mentioned in the PR description. Consider iterating over vector IDs directly from the database using a prefix iterator (similar to the existing implementation in |
||
| let vector_count = vector_ids.len(); | ||
| eprintln!("[RebuildHNSWIndex] Found {} vectors", vector_count); | ||
|
|
||
| if vector_count == 0 { | ||
| return Ok(protocol::Response { | ||
| body: sonic_rs::to_vec(&json!({ | ||
| "status": "success", | ||
| "message": "No vectors to rebuild", | ||
| "vectors_rebuilt": 0 | ||
| })) | ||
| .map_err(|e| GraphError::New(e.to_string()))?, | ||
| fmt: Default::default(), | ||
| }); | ||
| } | ||
|
|
||
| // Step 2: Clear all HNSW edges | ||
| { | ||
| let mut txn = db.graph_env.write_txn().map_err(GraphError::from)?; | ||
| eprintln!("[RebuildHNSWIndex] Clearing HNSW edges..."); | ||
| db.vectors | ||
| .edges_db | ||
| .clear(&mut txn) | ||
| .map_err(|e| GraphError::New(format!("Failed to clear edges: {}", e)))?; | ||
|
|
||
| eprintln!("[RebuildHNSWIndex] Clearing entry point..."); | ||
| let _ = db.vectors.vectors_db.delete(&mut txn, ENTRY_POINT_KEY); | ||
| txn.commit().map_err(GraphError::from)?; | ||
| } | ||
|
|
||
| // Step 3: Reconnect vectors in batches (fresh arena per batch to control memory) | ||
| eprintln!("[RebuildHNSWIndex] Reconnecting {} vectors in batches of {}...", vector_count, batch_size); | ||
|
|
||
| for (batch_idx, chunk) in vector_ids.chunks(batch_size).enumerate() { | ||
| let processed = batch_idx * batch_size; | ||
| if processed % 500 == 0 { | ||
| eprintln!("[RebuildHNSWIndex] Progress: {}/{} ({:.1}%)", | ||
| processed, vector_count, (processed as f64 / vector_count as f64) * 100.0); | ||
| } | ||
|
|
||
| // Fresh arena for each batch to prevent memory growth | ||
| let arena = bumpalo::Bump::new(); | ||
| let mut txn = db.graph_env.write_txn().map_err(GraphError::from)?; | ||
|
|
||
| for &vector_id in chunk { | ||
| match db.vectors.get_full_vector(&txn, vector_id, &arena) { | ||
| Ok(v) => { | ||
| db.vectors | ||
| .reconnect_vector::<fn(&_, &_) -> bool>(&mut txn, &v, &arena) | ||
| .map_err(|e| GraphError::New(format!("Failed to reconnect vector {}: {}", vector_id, e)))?; | ||
| } | ||
| Err(e) => { | ||
| eprintln!("[RebuildHNSWIndex] Warning: skipping vector {}: {}", vector_id, e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| txn.commit().map_err(GraphError::from)?; | ||
| // Arena dropped here, memory freed | ||
| } | ||
|
|
||
| eprintln!("[RebuildHNSWIndex] Rebuild complete! {} vectors reconnected", vector_count); | ||
| Ok(protocol::Response { | ||
| body: sonic_rs::to_vec(&json!({ | ||
| "status": "success", | ||
| "vectors_rebuilt": vector_count, | ||
| "batch_size": batch_size | ||
| })) | ||
| .map_err(|e| GraphError::New(e.to_string()))?, | ||
| fmt: Default::default(), | ||
| }) | ||
| } | ||
|
|
||
| inventory::submit! { | ||
| HandlerSubmission( | ||
| Handler::new("RebuildHNSWIndex", rebuild_hnsw_index_inner, true) | ||
| ) | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.