Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 203 additions & 12 deletions graph/src/graph/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ pub struct Graph {
cache: Arc<Mutex<LruCache<String, PlanTree>>>,
/// Version counter (incremented on each write transaction)
pub version: u64,
/// Schema version (incremented only on schema changes: new labels, relationship types, or attributes)
pub schema_version: u64,
}

/// Wrapper for plan trees to implement Send+Sync.
Expand Down Expand Up @@ -440,6 +442,7 @@ impl Graph {
NonZeroUsize::new(cache_size.max(1)).expect("cache_size.max(1) is always >= 1"),
))),
version,
schema_version: 0,
}
}

Expand Down Expand Up @@ -476,6 +479,7 @@ impl Graph {
relationship_types: self.relationship_types.clone(),
cache: self.cache.clone(),
version: self.version + 1,
schema_version: self.schema_version,
}
}

Expand Down Expand Up @@ -611,6 +615,29 @@ impl Graph {
.map(TypeId)
}

/// Get-or-create a relationship type by name, returning its `TypeId`.
pub fn get_type_id_mut(
&mut self,
relationship_type: &str,
) -> TypeId {
if let Some(pos) = self
.relationship_types
.iter()
.position(|t| t.as_str() == relationship_type)
.map(TypeId)
{
return pos;
}

self.relationship_types
.push(Arc::new(relationship_type.to_string()));
self.relationship_matrices.insert(
self.relationship_types.len() - 1,
Tensor::new(self.node_cap, self.node_cap),
);
TypeId(self.relationship_types.len() - 1)
}

pub fn get_plan(
&self,
query: &str,
Expand Down Expand Up @@ -782,6 +809,10 @@ impl Graph {
NodeId(self.node_count + self.reserved_node_count - 1)
}

pub const fn inc_reserved_node_count(&mut self) {
self.reserved_node_count += 1;
}

pub fn reserve_nodes(
&mut self,
count: usize,
Expand Down Expand Up @@ -832,6 +863,14 @@ impl Graph {
self.node_count + self.deleted_nodes.len() - 1
}

#[must_use]
pub fn max_relationship_id(&self) -> u64 {
if self.relationship_count == 0 {
return 0;
}
self.relationship_count + self.deleted_relationships.len() - 1
}

pub fn set_nodes_attributes(
&mut self,
attrs: &HashMap<u64, OrderMap<Arc<String>, Value>>,
Expand Down Expand Up @@ -1095,6 +1134,10 @@ impl Graph {
self.node_attrs.get_attr_by_idx(id.0, attr_idx)
}

pub const fn inc_reserved_relationship_count(&mut self) {
self.reserved_relationship_count += 1;
}

pub fn reserve_relationship(&mut self) -> RelationshipId {
if self.reserved_relationship_count < self.deleted_relationships.len() {
let id = self
Expand Down Expand Up @@ -1402,21 +1445,30 @@ impl Graph {
self.relationship_attrs.get_attr(id.0, attr)
}

fn resize_node_matrices(&mut self) {
self.adjacancy_matrix.resize(self.node_cap, self.node_cap);
self.node_labels_matrix
.resize(self.node_cap, self.labels_matices.len() as u64);
self.all_nodes_matrix.resize(self.node_cap, self.node_cap);
for label_matrix in &mut self.labels_matices {
label_matrix.resize(self.node_cap, self.node_cap);
}
for relationship_matrix in &mut self.relationship_matrices {
relationship_matrix.resize(self.node_cap, self.node_cap);
}
}

fn resize_relationship_matrices(&mut self) {
self.relationship_type_matrix
.resize(self.relationship_cap, self.relationship_types.len() as u64);
}

fn resize(&mut self) {
if self.node_count > self.node_cap {
while self.node_count > self.node_cap {
self.node_cap *= 2;
}
self.adjacancy_matrix.resize(self.node_cap, self.node_cap);
self.node_labels_matrix
.resize(self.node_cap, self.labels_matices.len() as u64);
self.all_nodes_matrix.resize(self.node_cap, self.node_cap);
for label_matrix in &mut self.labels_matices {
label_matrix.resize(self.node_cap, self.node_cap);
}
for relationship_matrix in &mut self.relationship_matrices {
relationship_matrix.resize(self.node_cap, self.node_cap);
}
self.resize_node_matrices();
}

if self.labels_matices.len() as u64 > self.node_labels_matrix.ncols() {
Expand All @@ -1428,8 +1480,7 @@ impl Graph {
while self.relationship_count > self.relationship_cap {
self.relationship_cap *= 2;
}
self.relationship_type_matrix
.resize(self.relationship_cap, self.relationship_types.len() as u64);
self.resize_relationship_matrices();
}

if self.relationship_types.len() as u64 > self.relationship_type_matrix.ncols() {
Expand Down Expand Up @@ -1877,4 +1928,144 @@ impl Graph {
}
sz
}

#[must_use]
pub fn deleted_nodes_count(&self) -> u64 {
self.deleted_nodes.len()
}

#[must_use]
pub fn deleted_relationships_count(&self) -> u64 {
self.deleted_relationships.len()
}

#[must_use]
pub const fn deleted_relationships(&self) -> &RoaringTreemap {
&self.deleted_relationships
}

#[must_use]
pub fn label_matrices(&self) -> &[VersionedMatrix] {
&self.labels_matices
}

#[must_use]
pub fn relationship_tensors(&self) -> &[Tensor] {
&self.relationship_matrices
}

/// Synchronously create an index (without spawning background population).
pub fn create_index_sync(
&mut self,
index_type: &IndexType,
entity_type: &EntityType,
label: &Arc<String>,
attrs: &Vec<Arc<String>>,
options: Option<IndexOptions>,
) -> Result<(), String> {
match entity_type {
EntityType::Node => {
let len = self.get_label_matrix_mut(label).nvals();
self.node_indexer
.create_index(index_type, label, attrs, len, options)?;
}
EntityType::Relationship => {}
}
Ok(())
}

/// Synchronously populate all pending indexes.
/// Used after RDB load when the graph is fully constructed.
pub fn populate_indexes_sync(&mut self) {
let fields_by_label = self.node_indexer.get_all_pending_fields();
for (label, attrs) in fields_by_label {
if let Some(lm) = self.get_label_matrix(&label) {
let resolved_attrs: Vec<(u16, Vec<_>)> = attrs
.iter()
.filter_map(|(attr, fields)| {
self.get_node_attribute_id(attr)
.map(|idx| (idx as u16, fields.clone()))
})
.collect();

let mut batch = Vec::new();
for (n, _) in lm.iter(0, u64::MAX) {
let mut doc = Document::new(n);
let mut has_fields = false;
for (attr_idx, fields) in &resolved_attrs {
let value = self.get_node_attribute_by_idx(NodeId(n), *attr_idx);
if let Some(value) = value {
for field in fields {
doc.set(field, &value);
}
has_fields = true;
}
}
if has_fields {
batch.push(doc);
}
}
if !batch.is_empty() {
let mut add_docs = HashMap::new();
add_docs.insert(label.clone(), batch);
self.node_indexer.commit(&mut add_docs, &mut HashMap::new());
}
self.node_indexer.enable(&label);
}
}
}

/// Get node attribute names.
pub fn get_node_attribute_names(&self) -> Vec<Arc<String>> {
self.node_attrs.attrs_name.iter().cloned().collect()
}

/// Get relationship attribute names.
pub fn get_relationship_attribute_names(&self) -> Vec<Arc<String>> {
self.relationship_attrs.attrs_name.iter().cloned().collect()
}

/// Register a node attribute name (get-or-create).
pub fn add_node_attribute_name(
&mut self,
name: &str,
) {
let arc = Arc::new(name.to_string());
if self.node_attrs.attrs_name.get_index_of(&arc).is_none() {
self.node_attrs.attrs_name.insert(arc);
}
}

/// Register a relationship attribute name (get-or-create).
pub fn add_rel_attribute_name(
&mut self,
name: &str,
) {
let arc = Arc::new(name.to_string());
if self
.relationship_attrs
.attrs_name
.get_index_of(&arc)
.is_none()
{
self.relationship_attrs.attrs_name.insert(arc);
}
}

/// Build the unified global attribute list (node attrs ∪ relationship attrs, in order).
pub fn build_global_attrs(&self) -> Vec<Arc<String>> {
let mut attrs = Vec::new();
let mut seen = std::collections::HashSet::new();
for name in self.node_attrs.attrs_name.iter() {
if seen.insert(name.clone()) {
attrs.push(name.clone());
}
}
for name in self.relationship_attrs.attrs_name.iter() {
if seen.insert(name.clone()) {
attrs.push(name.clone());
}
}
attrs
}
}
32 changes: 32 additions & 0 deletions graph/src/graph/mvcc_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ impl MvccGraph {
}
}

/// Create an `MvccGraph` from an already-constructed `Graph`.
/// Used by the RDB load path.
#[must_use]
pub fn from_graph(graph: Graph) -> Self {
Self {
graph: Arc::new(AtomicRefCell::new(graph)),
write: AtomicBool::new(false),
}
}

#[must_use]
pub fn read(&self) -> Arc<AtomicRefCell<Graph>> {
self.graph.clone()
Expand All @@ -114,6 +124,28 @@ impl MvccGraph {
new_graph: Arc<AtomicRefCell<Graph>>,
) {
debug_assert_eq!(self.graph.borrow().version + 1, new_graph.borrow().version);

// Check if schema changed (new labels, relationship types, or attributes)
let old_labels = self.graph.borrow().get_labels().len();
let old_types = self.graph.borrow().get_types().len();
let old_node_attrs = self.graph.borrow().get_node_attribute_names().len();
let old_rel_attrs = self.graph.borrow().get_relationship_attribute_names().len();

let new_labels = new_graph.borrow().get_labels().len();
let new_types = new_graph.borrow().get_types().len();
let new_node_attrs = new_graph.borrow().get_node_attribute_names().len();
let new_rel_attrs = new_graph.borrow().get_relationship_attribute_names().len();

// If schema changed, ensure schema_version is incremented
if (old_labels != new_labels
|| old_types != new_types
|| old_node_attrs != new_node_attrs
|| old_rel_attrs != new_rel_attrs)
&& new_graph.borrow().schema_version == self.graph.borrow().schema_version
{
new_graph.borrow_mut().schema_version += 1;
}

new_graph.borrow_mut().set_indexer_graph(new_graph.clone());
self.graph = new_graph;
self.write.store(false, Ordering::Release);
Expand Down
13 changes: 13 additions & 0 deletions graph/src/index/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,19 @@ impl Indexer {
.unwrap_or_default()
}

/// Get fields for all labels with pending population.
#[must_use]
pub fn get_all_pending_fields(
&self
) -> Vec<(Arc<String>, HashMap<Arc<String>, Vec<Arc<Field>>>)> {
self.index
.read()
.iter()
.filter(|(_, index)| index.pending_count() > 0)
.map(|(label, index)| (label.clone(), index.fields().clone()))
.collect()
}

#[must_use]
pub fn index_info(&self) -> Vec<IndexInfo> {
self.index
Expand Down
Loading