Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions helix-db/src/helix_engine/vector_core/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,23 @@ pub trait HNSW {
/// * `txn` - The transaction to use
/// * `id` - The id of the vector
fn delete(&self, txn: &mut RwTxn, id: u128, arena: &bumpalo::Bump) -> Result<(), VectorError>;

/// Reconnect an existing vector to the HNSW graph without changing its ID.
/// Used for index rebuilding after deletions cause graph fragmentation.
///
/// # Arguments
///
/// * `txn` - The write transaction to use
/// * `vector` - The vector to reconnect (with existing ID and level)
/// * `arena` - The bump allocator for temporary allocations
fn reconnect_vector<'db, 'arena, 'txn, F>(
&'db self,
txn: &'txn mut RwTxn<'db>,
vector: &HVector<'arena>,
arena: &'arena bumpalo::Bump,
) -> Result<(), VectorError>
where
F: Fn(&HVector<'arena>, &RoTxn<'db>) -> bool,
'db: 'arena,
'arena: 'txn;
}
111 changes: 111 additions & 0 deletions helix-db/src/helix_engine/vector_core/vector_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,39 @@ impl VectorCore {

Ok(vectors)
}

/// Get all vector IDs from the database without loading full vector data.
/// This is memory-efficient for operations that only need IDs.
pub fn get_all_vector_ids<'db, 'txn>(
&self,
txn: &'txn RoTxn<'db>,
) -> Result<Vec<u128>, VectorError> {
let mut ids = Vec::new();
let mut seen = std::collections::HashSet::new();

// Iterate over all vectors in the database
let prefix_iter = self.vectors_db.prefix_iter(txn, VECTOR_PREFIX)?;

for result in prefix_iter {
let (key, _) = result?;

// Extract id from the key: v: (2 bytes) + id (16 bytes) + level (8 bytes)
if key.len() < VECTOR_PREFIX.len() + 16 {
continue; // Skip malformed keys
}

let mut id_bytes = [0u8; 16];
id_bytes.copy_from_slice(&key[VECTOR_PREFIX.len()..VECTOR_PREFIX.len() + 16]);
let id = u128::from_be_bytes(id_bytes);

// Deduplicate (same ID may have multiple level entries)
if seen.insert(id) {
ids.push(id);
}
}

Ok(ids)
}
}

impl HNSW for VectorCore {
Expand Down Expand Up @@ -640,6 +673,84 @@ impl HNSW for VectorCore {
Ok(query)
}

/// Reconnect an existing vector to the HNSW graph without changing its ID.
/// Used for index rebuilding after deletions cause graph fragmentation.
fn reconnect_vector<'db, 'arena, 'txn, F>(
&'db self,
txn: &'txn mut RwTxn<'db>,
vector: &HVector<'arena>,
arena: &'arena bumpalo::Bump,
) -> Result<(), VectorError>
where
F: Fn(&HVector<'arena>, &RoTxn<'db>) -> bool,
'db: 'arena,
'arena: 'txn,
{
let new_level = vector.level; // Keep existing level
let label = vector.label;

let entry_point = match self.get_entry_point(txn, label, arena) {
Ok(ep) => ep,
Err(_) => {
// First vector becomes entry point
self.set_entry_point(txn, vector)?;
return Ok(());
}
};

let l = entry_point.level;
let mut curr_ep = entry_point;

// Navigate to insertion level
for level in (new_level + 1..=l).rev() {
let mut nearest =
self.search_level::<F>(txn, label, vector, &mut curr_ep, 1, level, None, arena)?;
curr_ep = nearest.pop().ok_or(VectorError::VectorCoreError(
"empty search result".to_string(),
))?;
}

// Connect at each level
for level in (0..=l.min(new_level)).rev() {
let nearest = self.search_level::<F>(
txn,
label,
vector,
&mut curr_ep,
self.config.ef_construct,
level,
None,
arena,
)?;

curr_ep = *nearest.peek().ok_or(VectorError::VectorCoreError(
"empty search result".to_string(),
))?;

let neighbors =
self.select_neighbors::<F>(txn, label, vector, nearest, level, true, None, arena)?;
self.set_neighbours(txn, vector.id, &neighbors, level)?;

// Update neighbors' connections
for e in neighbors {
let e_conns = BinaryHeap::from(
arena,
self.get_neighbors::<F>(txn, label, e.id, level, None, arena)?,
);
let e_new_conn =
self.select_neighbors::<F>(txn, label, vector, e_conns, level, true, None, arena)?;
self.set_neighbours(txn, e.id, &e_new_conn, level)?;
}
}

// Update entry point if this vector has higher level
if new_level > l {
self.set_entry_point(txn, vector)?;
}

Ok(())
}

fn delete(&self, txn: &mut RwTxn, id: u128, arena: &bumpalo::Bump) -> Result<(), VectorError> {
match self.get_vector_properties(txn, id, arena)? {
Some(mut properties) => {
Expand Down
1 change: 1 addition & 0 deletions helix-db/src/helix_gateway/builtin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod all_nodes_and_edges;
pub mod node_by_id;
pub mod node_connections;
pub mod nodes_by_label;
pub mod rebuild_hnsw_index;
127 changes: 127 additions & 0 deletions helix-db/src/helix_gateway/builtin/rebuild_hnsw_index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::sync::Arc;

use sonic_rs::json;

use crate::helix_engine::types::GraphError;
use crate::helix_engine::vector_core::hnsw::HNSW;
use crate::helix_engine::vector_core::vector_core::ENTRY_POINT_KEY;
use crate::helix_gateway::router::router::{Handler, HandlerInput, HandlerSubmission};
use crate::protocol;

const DEFAULT_BATCH_SIZE: usize = 5;

/// Rebuild the HNSW index by reconnecting all vectors.
/// This fixes graph fragmentation caused by deletions and re-insertions.
///
/// Request body (optional):
/// - batch_size: Number of vectors to process per transaction (default: 5)
///
/// Example: {"batch_size": 10}
pub fn rebuild_hnsw_index_inner(input: HandlerInput) -> Result<protocol::Response, GraphError> {
eprintln!("[RebuildHNSWIndex] Starting rebuild...");

// Parse batch_size from request body (default to DEFAULT_BATCH_SIZE)
let batch_size: usize = if input.request.body.is_empty() {
DEFAULT_BATCH_SIZE
} else {
match sonic_rs::from_slice::<sonic_rs::Value>(&input.request.body) {
Ok(val) => val
.get("batch_size")
.and_then(|v| v.as_u64())
.map(|v| v as usize)
.unwrap_or(DEFAULT_BATCH_SIZE),
Err(_) => DEFAULT_BATCH_SIZE,
}
};

// Ensure batch_size is at least 1
let batch_size = batch_size.max(1);
eprintln!("[RebuildHNSWIndex] Using batch size: {}", batch_size);

let db = Arc::clone(&input.graph.storage);

// Step 1: Get vector IDs only (memory-efficient, doesn't load vector data)
eprintln!("[RebuildHNSWIndex] Collecting vector IDs...");
let vector_ids: Vec<u128> = {
let txn = db.graph_env.read_txn().map_err(GraphError::from)?;
db.vectors
.get_all_vector_ids(&txn)
.map_err(|e| GraphError::New(format!("Failed to get vector IDs: {}", e)))?
};
Comment on lines +45 to +50
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loading all vectors into memory defeats the purpose of the batch processing mentioned in the PR description. get_all_vectors loads full vector data for all vectors, which could cause OOM on large datasets.

Consider iterating over vector IDs directly from the database using a prefix iterator (similar to the existing implementation in get_all_vectors at vector_core.rs:446-489) to truly avoid loading all data at once.

let vector_count = vector_ids.len();
eprintln!("[RebuildHNSWIndex] Found {} vectors", vector_count);

if vector_count == 0 {
return Ok(protocol::Response {
body: sonic_rs::to_vec(&json!({
"status": "success",
"message": "No vectors to rebuild",
"vectors_rebuilt": 0
}))
.map_err(|e| GraphError::New(e.to_string()))?,
fmt: Default::default(),
});
}

// Step 2: Clear all HNSW edges
{
let mut txn = db.graph_env.write_txn().map_err(GraphError::from)?;
eprintln!("[RebuildHNSWIndex] Clearing HNSW edges...");
db.vectors
.edges_db
.clear(&mut txn)
.map_err(|e| GraphError::New(format!("Failed to clear edges: {}", e)))?;

eprintln!("[RebuildHNSWIndex] Clearing entry point...");
let _ = db.vectors.vectors_db.delete(&mut txn, ENTRY_POINT_KEY);
txn.commit().map_err(GraphError::from)?;
}

// Step 3: Reconnect vectors in batches (fresh arena per batch to control memory)
eprintln!("[RebuildHNSWIndex] Reconnecting {} vectors in batches of {}...", vector_count, batch_size);

for (batch_idx, chunk) in vector_ids.chunks(batch_size).enumerate() {
let processed = batch_idx * batch_size;
if processed % 500 == 0 {
eprintln!("[RebuildHNSWIndex] Progress: {}/{} ({:.1}%)",
processed, vector_count, (processed as f64 / vector_count as f64) * 100.0);
}

// Fresh arena for each batch to prevent memory growth
let arena = bumpalo::Bump::new();
let mut txn = db.graph_env.write_txn().map_err(GraphError::from)?;

for &vector_id in chunk {
match db.vectors.get_full_vector(&txn, vector_id, &arena) {
Ok(v) => {
db.vectors
.reconnect_vector::<fn(&_, &_) -> bool>(&mut txn, &v, &arena)
.map_err(|e| GraphError::New(format!("Failed to reconnect vector {}: {}", vector_id, e)))?;
}
Err(e) => {
eprintln!("[RebuildHNSWIndex] Warning: skipping vector {}: {}", vector_id, e);
}
}
}

txn.commit().map_err(GraphError::from)?;
// Arena dropped here, memory freed
}

eprintln!("[RebuildHNSWIndex] Rebuild complete! {} vectors reconnected", vector_count);
Ok(protocol::Response {
body: sonic_rs::to_vec(&json!({
"status": "success",
"vectors_rebuilt": vector_count,
"batch_size": batch_size
}))
.map_err(|e| GraphError::New(e.to_string()))?,
fmt: Default::default(),
})
}

inventory::submit! {
HandlerSubmission(
Handler::new("RebuildHNSWIndex", rebuild_hnsw_index_inner, true)
)
}