diff --git a/graph/src/graph/graph.rs b/graph/src/graph/graph.rs index a8adb1f9..286a60e2 100644 --- a/graph/src/graph/graph.rs +++ b/graph/src/graph/graph.rs @@ -263,6 +263,8 @@ pub struct Graph { cache: Arc>>, /// Version counter (incremented on each write transaction) pub version: u64, + /// Schema version (incremented only on schema changes: new labels, relationship types, or attributes) + pub schema_version: u64, } /// Wrapper for plan trees to implement Send+Sync. @@ -440,6 +442,7 @@ impl Graph { NonZeroUsize::new(cache_size.max(1)).expect("cache_size.max(1) is always >= 1"), ))), version, + schema_version: 0, } } @@ -476,6 +479,7 @@ impl Graph { relationship_types: self.relationship_types.clone(), cache: self.cache.clone(), version: self.version + 1, + schema_version: self.schema_version, } } @@ -611,6 +615,29 @@ impl Graph { .map(TypeId) } + /// Get-or-create a relationship type by name, returning its `TypeId`. + pub fn get_type_id_mut( + &mut self, + relationship_type: &str, + ) -> TypeId { + if let Some(pos) = self + .relationship_types + .iter() + .position(|t| t.as_str() == relationship_type) + .map(TypeId) + { + return pos; + } + + self.relationship_types + .push(Arc::new(relationship_type.to_string())); + self.relationship_matrices.insert( + self.relationship_types.len() - 1, + Tensor::new(self.node_cap, self.node_cap), + ); + TypeId(self.relationship_types.len() - 1) + } + pub fn get_plan( &self, query: &str, @@ -782,6 +809,10 @@ impl Graph { NodeId(self.node_count + self.reserved_node_count - 1) } + pub const fn inc_reserved_node_count(&mut self) { + self.reserved_node_count += 1; + } + pub fn reserve_nodes( &mut self, count: usize, @@ -832,6 +863,14 @@ impl Graph { self.node_count + self.deleted_nodes.len() - 1 } + #[must_use] + pub fn max_relationship_id(&self) -> u64 { + if self.relationship_count == 0 { + return 0; + } + self.relationship_count + self.deleted_relationships.len() - 1 + } + pub fn set_nodes_attributes( &mut self, attrs: &HashMap, Value>>, @@ -1095,6 +1134,10 @@ impl Graph { self.node_attrs.get_attr_by_idx(id.0, attr_idx) } + pub const fn inc_reserved_relationship_count(&mut self) { + self.reserved_relationship_count += 1; + } + pub fn reserve_relationship(&mut self) -> RelationshipId { if self.reserved_relationship_count < self.deleted_relationships.len() { let id = self @@ -1402,21 +1445,30 @@ impl Graph { self.relationship_attrs.get_attr(id.0, attr) } + fn resize_node_matrices(&mut self) { + self.adjacancy_matrix.resize(self.node_cap, self.node_cap); + self.node_labels_matrix + .resize(self.node_cap, self.labels_matices.len() as u64); + self.all_nodes_matrix.resize(self.node_cap, self.node_cap); + for label_matrix in &mut self.labels_matices { + label_matrix.resize(self.node_cap, self.node_cap); + } + for relationship_matrix in &mut self.relationship_matrices { + relationship_matrix.resize(self.node_cap, self.node_cap); + } + } + + fn resize_relationship_matrices(&mut self) { + self.relationship_type_matrix + .resize(self.relationship_cap, self.relationship_types.len() as u64); + } + fn resize(&mut self) { if self.node_count > self.node_cap { while self.node_count > self.node_cap { self.node_cap *= 2; } - self.adjacancy_matrix.resize(self.node_cap, self.node_cap); - self.node_labels_matrix - .resize(self.node_cap, self.labels_matices.len() as u64); - self.all_nodes_matrix.resize(self.node_cap, self.node_cap); - for label_matrix in &mut self.labels_matices { - label_matrix.resize(self.node_cap, self.node_cap); - } - for relationship_matrix in &mut self.relationship_matrices { - relationship_matrix.resize(self.node_cap, self.node_cap); - } + self.resize_node_matrices(); } if self.labels_matices.len() as u64 > self.node_labels_matrix.ncols() { @@ -1428,8 +1480,7 @@ impl Graph { while self.relationship_count > self.relationship_cap { self.relationship_cap *= 2; } - self.relationship_type_matrix - .resize(self.relationship_cap, self.relationship_types.len() as u64); + self.resize_relationship_matrices(); } if self.relationship_types.len() as u64 > self.relationship_type_matrix.ncols() { @@ -1877,4 +1928,144 @@ impl Graph { } sz } + + #[must_use] + pub fn deleted_nodes_count(&self) -> u64 { + self.deleted_nodes.len() + } + + #[must_use] + pub fn deleted_relationships_count(&self) -> u64 { + self.deleted_relationships.len() + } + + #[must_use] + pub const fn deleted_relationships(&self) -> &RoaringTreemap { + &self.deleted_relationships + } + + #[must_use] + pub fn label_matrices(&self) -> &[VersionedMatrix] { + &self.labels_matices + } + + #[must_use] + pub fn relationship_tensors(&self) -> &[Tensor] { + &self.relationship_matrices + } + + /// Synchronously create an index (without spawning background population). + pub fn create_index_sync( + &mut self, + index_type: &IndexType, + entity_type: &EntityType, + label: &Arc, + attrs: &Vec>, + options: Option, + ) -> Result<(), String> { + match entity_type { + EntityType::Node => { + let len = self.get_label_matrix_mut(label).nvals(); + self.node_indexer + .create_index(index_type, label, attrs, len, options)?; + } + EntityType::Relationship => {} + } + Ok(()) + } + + /// Synchronously populate all pending indexes. + /// Used after RDB load when the graph is fully constructed. + pub fn populate_indexes_sync(&mut self) { + let fields_by_label = self.node_indexer.get_all_pending_fields(); + for (label, attrs) in fields_by_label { + if let Some(lm) = self.get_label_matrix(&label) { + let resolved_attrs: Vec<(u16, Vec<_>)> = attrs + .iter() + .filter_map(|(attr, fields)| { + self.get_node_attribute_id(attr) + .map(|idx| (idx as u16, fields.clone())) + }) + .collect(); + + let mut batch = Vec::new(); + for (n, _) in lm.iter(0, u64::MAX) { + let mut doc = Document::new(n); + let mut has_fields = false; + for (attr_idx, fields) in &resolved_attrs { + let value = self.get_node_attribute_by_idx(NodeId(n), *attr_idx); + if let Some(value) = value { + for field in fields { + doc.set(field, &value); + } + has_fields = true; + } + } + if has_fields { + batch.push(doc); + } + } + if !batch.is_empty() { + let mut add_docs = HashMap::new(); + add_docs.insert(label.clone(), batch); + self.node_indexer.commit(&mut add_docs, &mut HashMap::new()); + } + self.node_indexer.enable(&label); + } + } + } + + /// Get node attribute names. + pub fn get_node_attribute_names(&self) -> Vec> { + self.node_attrs.attrs_name.iter().cloned().collect() + } + + /// Get relationship attribute names. + pub fn get_relationship_attribute_names(&self) -> Vec> { + self.relationship_attrs.attrs_name.iter().cloned().collect() + } + + /// Register a node attribute name (get-or-create). + pub fn add_node_attribute_name( + &mut self, + name: &str, + ) { + let arc = Arc::new(name.to_string()); + if self.node_attrs.attrs_name.get_index_of(&arc).is_none() { + self.node_attrs.attrs_name.insert(arc); + } + } + + /// Register a relationship attribute name (get-or-create). + pub fn add_rel_attribute_name( + &mut self, + name: &str, + ) { + let arc = Arc::new(name.to_string()); + if self + .relationship_attrs + .attrs_name + .get_index_of(&arc) + .is_none() + { + self.relationship_attrs.attrs_name.insert(arc); + } + } + + /// Build the unified global attribute list (node attrs ∪ relationship attrs, in order). + pub fn build_global_attrs(&self) -> Vec> { + let mut attrs = Vec::new(); + let mut seen = std::collections::HashSet::new(); + for name in self.node_attrs.attrs_name.iter() { + if seen.insert(name.clone()) { + attrs.push(name.clone()); + } + } + for name in self.relationship_attrs.attrs_name.iter() { + if seen.insert(name.clone()) { + attrs.push(name.clone()); + } + } + attrs + } } diff --git a/graph/src/graph/mvcc_graph.rs b/graph/src/graph/mvcc_graph.rs index f0960455..69cb2ac0 100644 --- a/graph/src/graph/mvcc_graph.rs +++ b/graph/src/graph/mvcc_graph.rs @@ -89,6 +89,16 @@ impl MvccGraph { } } + /// Create an `MvccGraph` from an already-constructed `Graph`. + /// Used by the RDB load path. + #[must_use] + pub fn from_graph(graph: Graph) -> Self { + Self { + graph: Arc::new(AtomicRefCell::new(graph)), + write: AtomicBool::new(false), + } + } + #[must_use] pub fn read(&self) -> Arc> { self.graph.clone() @@ -114,6 +124,28 @@ impl MvccGraph { new_graph: Arc>, ) { debug_assert_eq!(self.graph.borrow().version + 1, new_graph.borrow().version); + + // Check if schema changed (new labels, relationship types, or attributes) + let old_labels = self.graph.borrow().get_labels().len(); + let old_types = self.graph.borrow().get_types().len(); + let old_node_attrs = self.graph.borrow().get_node_attribute_names().len(); + let old_rel_attrs = self.graph.borrow().get_relationship_attribute_names().len(); + + let new_labels = new_graph.borrow().get_labels().len(); + let new_types = new_graph.borrow().get_types().len(); + let new_node_attrs = new_graph.borrow().get_node_attribute_names().len(); + let new_rel_attrs = new_graph.borrow().get_relationship_attribute_names().len(); + + // If schema changed, ensure schema_version is incremented + if (old_labels != new_labels + || old_types != new_types + || old_node_attrs != new_node_attrs + || old_rel_attrs != new_rel_attrs) + && new_graph.borrow().schema_version == self.graph.borrow().schema_version + { + new_graph.borrow_mut().schema_version += 1; + } + new_graph.borrow_mut().set_indexer_graph(new_graph.clone()); self.graph = new_graph; self.write.store(false, Ordering::Release); diff --git a/graph/src/index/indexer.rs b/graph/src/index/indexer.rs index 6af0b822..ba2b76b1 100644 --- a/graph/src/index/indexer.rs +++ b/graph/src/index/indexer.rs @@ -439,6 +439,19 @@ impl Indexer { .unwrap_or_default() } + /// Get fields for all labels with pending population. + #[must_use] + pub fn get_all_pending_fields( + &self + ) -> Vec<(Arc, HashMap, Vec>>)> { + self.index + .read() + .iter() + .filter(|(_, index)| index.pending_count() > 0) + .map(|(label, index)| (label.clone(), index.fields().clone())) + .collect() + } + #[must_use] pub fn index_info(&self) -> Vec { self.index