diff --git a/graph/src/runtime/ops/commit.rs b/graph/src/runtime/ops/commit.rs index 5ab905a4..132b648c 100644 --- a/graph/src/runtime/ops/commit.rs +++ b/graph/src/runtime/ops/commit.rs @@ -78,6 +78,25 @@ impl<'a> Iterator for CommitOp<'a> { { return Some(Err(e)); } + // Commit succeeded — build effects buffer from pending data, then clear. + { + let pending = self.runtime.pending.borrow(); + if pending.effects_count() > 0 { + let mut buf_ref = self.runtime.effects_buffer.borrow_mut(); + let buf = buf_ref.get_or_insert_with(Vec::new); + let n_effects = pending.build_effects_buffer(&self.runtime.g, buf); + self.runtime + .effects_count + .set(self.runtime.effects_count.get() + n_effects); + } + } + self.runtime.pending.borrow_mut().clear(); + // Update schema baseline so the next commit in this query only + // emits newly added schema entries. + self.runtime + .pending + .borrow_mut() + .set_schema_baseline(&self.runtime.g); // Reverse once so we can pop from the end in O(1) while preserving order. self.results.reverse(); } diff --git a/graph/src/runtime/pending.rs b/graph/src/runtime/pending.rs index 231a2ea0..86a959ee 100644 --- a/graph/src/runtime/pending.rs +++ b/graph/src/runtime/pending.rs @@ -112,10 +112,14 @@ pub struct Pending { deleted_nodes: RoaringTreemap, /// Relationships to be deleted (edge_id, src, dst) deleted_relationships: HashMap, - /// Property updates for nodes - set_nodes_attrs: HashMap, Value>>, - /// Property updates for relationships - set_relationships_attrs: HashMap, Value>>, + /// Property updates for newly created nodes (fast path: skip fjall) + new_nodes_attrs: HashMap, Value>>, + /// Property updates for existing nodes (full merge path) + existing_nodes_attrs: HashMap, Value>>, + /// Property updates for newly created relationships (fast path: skip fjall) + new_relationships_attrs: HashMap, Value>>, + /// Property updates for existing relationships (full merge path) + existing_relationships_attrs: HashMap, Value>>, /// Labels to add (node_id × label_id matrix) set_node_labels: Matrix, /// Labels to remove @@ -124,6 +128,14 @@ pub struct Pending { index_add_docs: HashMap, /// Documents to remove from indexes (keyed by label id) index_remove_docs: HashMap, + /// Schema baseline: number of labels when the current commit window started. + schema_label_count: usize, + /// Schema baseline: number of relationship types when the current commit window started. + schema_rel_type_count: usize, + /// Schema baseline: number of node attribute names when the current commit window started. + schema_node_attr_count: usize, + /// Schema baseline: number of relationship attribute names when the current commit window started. + schema_rel_attr_count: usize, } impl Default for Pending { @@ -140,23 +152,48 @@ impl Pending { created_relationships: HashMap::new(), deleted_nodes: RoaringTreemap::new(), deleted_relationships: HashMap::new(), - set_nodes_attrs: HashMap::new(), - set_relationships_attrs: HashMap::new(), + new_nodes_attrs: HashMap::new(), + existing_nodes_attrs: HashMap::new(), + new_relationships_attrs: HashMap::new(), + existing_relationships_attrs: HashMap::new(), set_node_labels: Matrix::new(0, 0), remove_node_labels: Matrix::new(0, 0), index_add_docs: HashMap::new(), index_remove_docs: HashMap::new(), + schema_label_count: 0, + schema_rel_type_count: 0, + schema_node_attr_count: 0, + schema_rel_attr_count: 0, } } + /// Record the current schema sizes so `build_effects_buffer` can emit + /// EFFECT_ADD_SCHEMA / EFFECT_ADD_ATTRIBUTE for newly added entries. + pub fn set_schema_baseline( + &mut self, + g: &AtomicRefCell, + ) { + let graph = g.borrow(); + self.schema_label_count = graph.get_labels().len(); + self.schema_rel_type_count = graph.get_types().len(); + self.schema_node_attr_count = graph.get_node_attribute_names().len(); + self.schema_rel_attr_count = graph.get_relationship_attribute_names().len(); + } + pub fn resize( &mut self, node_cap: u64, labels_count: usize, ) { - self.set_node_labels.resize(node_cap, labels_count as u64); - self.remove_node_labels - .resize(node_cap, labels_count as u64); + // Use max dimensions from both set and remove matrices to avoid shrinking either + let new_nrows = node_cap + .max(self.set_node_labels.nrows()) + .max(self.remove_node_labels.nrows()); + let new_ncols = (labels_count as u64) + .max(self.set_node_labels.ncols()) + .max(self.remove_node_labels.ncols()); + self.set_node_labels.resize(new_nrows, new_ncols); + self.remove_node_labels.resize(new_nrows, new_ncols); } pub fn created_nodes( @@ -187,10 +224,15 @@ impl Pending { id: NodeId, attrs: OrderMap, Value>, ) -> Result<(), String> { - for (_, value) in attrs.iter() { + for value in attrs.values() { validate_node_property(value)?; } - self.set_nodes_attrs.insert(id.into(), attrs); + let is_new = self.created_nodes.contains(id.into()); + if is_new { + self.new_nodes_attrs.insert(id.into(), attrs); + } else { + self.existing_nodes_attrs.insert(id.into(), attrs); + } Ok(()) } @@ -201,10 +243,13 @@ impl Pending { value: Value, ) -> Result<(), String> { validate_node_property(&value)?; - self.set_nodes_attrs - .entry(id.into()) - .or_default() - .insert(key, value); + let map = if self.created_nodes.contains(id.into()) { + &mut self.new_nodes_attrs + } else { + &mut self.existing_nodes_attrs + }; + let entry = map.entry(id.into()).or_default(); + entry.insert(key, value); Ok(()) } @@ -212,7 +257,8 @@ impl Pending { &mut self, id: NodeId, ) { - self.set_nodes_attrs.remove(&id.into()); + self.new_nodes_attrs.remove(&id.into()); + self.existing_nodes_attrs.remove(&id.into()); } #[must_use] @@ -221,9 +267,14 @@ impl Pending { id: NodeId, key: &Arc, ) -> Option<&Value> { - self.set_nodes_attrs + self.new_nodes_attrs .get(&id.into()) .and_then(|attrs| attrs.get(key)) + .or_else(|| { + self.existing_nodes_attrs + .get(&id.into()) + .and_then(|attrs| attrs.get(key)) + }) } pub fn update_node_attrs( @@ -231,7 +282,11 @@ impl Pending { id: NodeId, attrs: &mut OrderMap, Value>, ) { - if let Some(added) = self.set_nodes_attrs.get(&id.into()) { + let added = self + .new_nodes_attrs + .get(&id.into()) + .or_else(|| self.existing_nodes_attrs.get(&id.into())); + if let Some(added) = added { for (key, value) in added.iter() { if matches!(value, Value::Null) { attrs.remove(key); @@ -323,7 +378,11 @@ impl Pending { } // Collect pending attrs - let attrs = self.set_nodes_attrs.remove(&id.into()).unwrap_or_default(); + let attrs = self + .new_nodes_attrs + .remove(&id.into()) + .or_else(|| self.existing_nodes_attrs.remove(&id.into())) + .unwrap_or_default(); // Find pending-created relationships connected to this node let rels: Vec<_> = self @@ -342,7 +401,7 @@ impl Pending { /// Remove and return all pending-created relationships incident on the /// given node, along with their staged attributes. Also cleans up - /// `set_relationships_attrs` and `deleted_relationships` entries for + /// `new_relationships_attrs` and `deleted_relationships` entries for /// each removed relationship so that commit() has no stale state. pub fn remove_pending_relationships_for_node( &mut self, @@ -364,7 +423,7 @@ impl Pending { let mut result = Vec::with_capacity(rels.len()); for (rel_id, from, to, type_name) in rels { self.created_relationships.remove(&rel_id); - let attrs = self.set_relationships_attrs.remove(&rel_id.into()); + let attrs = self.new_relationships_attrs.remove(&rel_id.into()); self.deleted_relationships.remove(&rel_id); result.push((rel_id, from, to, type_name, attrs)); } @@ -387,10 +446,14 @@ impl Pending { id: RelationshipId, attrs: OrderMap, Value>, ) -> Result<(), String> { - for (_, value) in attrs.iter() { + for value in attrs.values() { validate_relationship_property(value)?; } - self.set_relationships_attrs.insert(id.into(), attrs); + if self.created_relationships.contains_key(&id) { + self.new_relationships_attrs.insert(id.into(), attrs); + } else { + self.existing_relationships_attrs.insert(id.into(), attrs); + } Ok(()) } @@ -401,10 +464,13 @@ impl Pending { value: Value, ) -> Result<(), String> { validate_relationship_property(&value)?; - self.set_relationships_attrs - .entry(id.into()) - .or_default() - .insert(key, value); + let map = if self.created_relationships.contains_key(&id) { + &mut self.new_relationships_attrs + } else { + &mut self.existing_relationships_attrs + }; + let entry = map.entry(id.into()).or_default(); + entry.insert(key, value); Ok(()) } @@ -414,9 +480,14 @@ impl Pending { id: RelationshipId, key: &Arc, ) -> Option<&Value> { - self.set_relationships_attrs + self.new_relationships_attrs .get(&id.into()) .and_then(|attrs| attrs.get(key)) + .or_else(|| { + self.existing_relationships_attrs + .get(&id.into()) + .and_then(|attrs| attrs.get(key)) + }) } pub fn update_relationship_attrs( @@ -424,7 +495,11 @@ impl Pending { id: RelationshipId, attrs: &mut OrderMap, Value>, ) { - if let Some(added) = self.set_relationships_attrs.get(&id.into()) { + let added = self + .new_relationships_attrs + .get(&id.into()) + .or_else(|| self.existing_relationships_attrs.get(&id.into())); + if let Some(added) = added { for (key, value) in added.iter() { if matches!(value, Value::Null) { attrs.remove(key); @@ -568,63 +643,69 @@ impl Pending { if !self.created_nodes.is_empty() { stats.borrow_mut().nodes_created += self.created_nodes.len(); g.borrow_mut().create_nodes(&self.created_nodes); - self.created_nodes.clear(); } if !self.created_relationships.is_empty() { stats.borrow_mut().relationships_created += self.created_relationships.len(); g.borrow_mut() .create_relationships(&self.created_relationships); - self.created_relationships.clear(); } if self.set_node_labels.nvals() > 0 { g.borrow_mut() .set_nodes_labels(&mut self.set_node_labels, &mut self.index_add_docs); - - self.set_node_labels.clear(); } if self.remove_node_labels.nvals() > 0 { stats.borrow_mut().labels_removed += self.remove_node_labels.nvals() as usize; g.borrow_mut() .remove_nodes_labels(&mut self.remove_node_labels, &mut self.index_remove_docs); + } + if !self.new_nodes_attrs.is_empty() || !self.existing_nodes_attrs.is_empty() { + let count_properties = |map: &HashMap, Value>>| -> usize { + map.values() + .flat_map(super::ordermap::OrderMap::values) + .map(|v| match *v { + Value::Null => 0, + _ => 1, + }) + .sum() + }; + stats.borrow_mut().properties_set += count_properties(&self.new_nodes_attrs) + + count_properties(&self.existing_nodes_attrs); + let mut g = g.borrow_mut(); + if !self.new_nodes_attrs.is_empty() { + g.import_node_attrs(&self.new_nodes_attrs, &mut self.index_add_docs); + } + if !self.existing_nodes_attrs.is_empty() { + stats.borrow_mut().properties_removed += + g.set_nodes_attributes(&self.existing_nodes_attrs, &mut self.index_add_docs)?; + } + } - self.remove_node_labels.clear(); - } - if !self.set_nodes_attrs.is_empty() { - stats.borrow_mut().properties_set += self - .set_nodes_attrs - .values() - .flat_map(super::ordermap::OrderMap::values) - .map(|v| match *v { - Value::Null => 0, - _ => 1, - }) - .sum::(); - stats.borrow_mut().properties_removed += g - .borrow_mut() - .set_nodes_attributes(&self.set_nodes_attrs, &mut self.index_add_docs)?; - self.set_nodes_attrs.clear(); - } - - if !self.set_relationships_attrs.is_empty() { - stats.borrow_mut().properties_set += self - .set_relationships_attrs - .values() - .flat_map(super::ordermap::OrderMap::values) - .map(|v| match *v { - Value::Null => 0, - _ => 1, - }) - .sum::(); - stats.borrow_mut().properties_removed += g - .borrow_mut() - .set_relationships_attributes(&self.set_relationships_attrs)?; - self.set_relationships_attrs.clear(); + if !self.new_relationships_attrs.is_empty() || !self.existing_relationships_attrs.is_empty() + { + let count_properties = |map: &HashMap, Value>>| -> usize { + map.values() + .flat_map(super::ordermap::OrderMap::values) + .map(|v| match *v { + Value::Null => 0, + _ => 1, + }) + .sum() + }; + stats.borrow_mut().properties_set += count_properties(&self.new_relationships_attrs) + + count_properties(&self.existing_relationships_attrs); + let mut g = g.borrow_mut(); + if !self.new_relationships_attrs.is_empty() { + g.import_relationship_attrs(&self.new_relationships_attrs); + } + if !self.existing_relationships_attrs.is_empty() { + stats.borrow_mut().properties_removed += + g.set_relationships_attributes(&self.existing_relationships_attrs)?; + } } if !self.deleted_nodes.is_empty() { stats.borrow_mut().nodes_deleted += self.deleted_nodes.len(); g.borrow_mut() .delete_nodes(&self.deleted_nodes, &mut self.index_remove_docs)?; - self.deleted_nodes.clear(); } if !self.deleted_relationships.is_empty() { stats.borrow_mut().relationships_deleted += self.deleted_relationships.len(); @@ -639,6 +720,519 @@ impl Pending { g.commit_attrs()?; g.commit_index(&mut self.index_add_docs, &mut self.index_remove_docs); } + Ok(()) } + + /// Clear all pending mutation state. + pub fn clear(&mut self) { + self.created_nodes.clear(); + self.created_relationships.clear(); + self.set_node_labels.clear(); + self.remove_node_labels.clear(); + self.new_nodes_attrs.clear(); + self.existing_nodes_attrs.clear(); + self.new_relationships_attrs.clear(); + self.existing_relationships_attrs.clear(); + self.deleted_nodes.clear(); + self.deleted_relationships.clear(); + } + + /// Returns the number of effects (operations) tracked in this Pending. + #[must_use] + pub fn effects_count(&self) -> u64 { + self.created_nodes.len() + + self.created_relationships.len() as u64 + + self.deleted_nodes.len() + + self.deleted_relationships.len() as u64 + + self.new_nodes_attrs.len() as u64 + + self.existing_nodes_attrs.len() as u64 + + self.new_relationships_attrs.len() as u64 + + self.existing_relationships_attrs.len() as u64 + + self.set_node_labels.nvals() + + self.remove_node_labels.nvals() + } + + /// Build a binary effects buffer from the accumulated mutations. + /// Must be called before `clear()` resets the pending data. + /// Appends to an existing buffer if provided, so multiple commits + /// in the same query accumulate into a single effects buffer. + /// Returns the number of effect records written. + pub fn build_effects_buffer( + &self, + g: &AtomicRefCell, + buf: &mut Vec, + ) -> u64 { + let mut n_effects = 0u64; + + // Version header (only write once at the start) + if buf.is_empty() { + buf.push(EFFECTS_VERSION); + } + + // --- Schema additions (new labels, relationship types) --- + { + let graph = g.borrow(); + let labels = graph.get_labels(); + for label in labels.iter().skip(self.schema_label_count) { + buf.push(EFFECT_ADD_SCHEMA); + buf.push(SCHEMA_NODE_LABEL); + write_string(buf, label); + n_effects += 1; + } + let types = graph.get_types(); + for rel_type in types.iter().skip(self.schema_rel_type_count) { + buf.push(EFFECT_ADD_SCHEMA); + buf.push(SCHEMA_REL_TYPE); + write_string(buf, rel_type); + n_effects += 1; + } + + // --- Attribute additions (new node/rel attribute names) --- + let node_attrs = graph.get_node_attribute_names(); + for attr in node_attrs.iter().skip(self.schema_node_attr_count) { + buf.push(EFFECT_ADD_ATTRIBUTE); + buf.push(ATTR_NODE); + write_string(buf, attr); + n_effects += 1; + } + let rel_attrs = graph.get_relationship_attribute_names(); + for attr in rel_attrs.iter().skip(self.schema_rel_attr_count) { + buf.push(EFFECT_ADD_ATTRIBUTE); + buf.push(ATTR_REL); + write_string(buf, attr); + n_effects += 1; + } + } + + // --- Created nodes --- + for node_id in &self.created_nodes { + buf.push(EFFECT_CREATE_NODE); + buf.extend_from_slice(&node_id.to_le_bytes()); + + // Labels: iterate the set_node_labels matrix for this row + let label_entries: Vec = self + .set_node_labels + .iter(node_id, node_id) + .map(|(_, col)| col) + .collect(); + write_u16(buf, label_entries.len() as u16); + let graph = g.borrow(); + for label_id in &label_entries { + let label_name = graph.get_label_by_id(LabelId(*label_id as usize)); + write_string(buf, &label_name); + } + drop(graph); + + // Attributes + if let Some(attrs) = self.new_nodes_attrs.get(&node_id) { + write_u16(buf, attrs.len() as u16); + for (key, value) in attrs.iter() { + write_string(buf, key); + write_value(buf, value); + } + } else { + write_u16(buf, 0); + } + n_effects += 1; + } + + // --- Created relationships --- + for (rel_id, rel) in &self.created_relationships { + buf.push(EFFECT_CREATE_EDGE); + buf.extend_from_slice(&u64::from(*rel_id).to_le_bytes()); + buf.extend_from_slice(&u64::from(rel.from).to_le_bytes()); + buf.extend_from_slice(&u64::from(rel.to).to_le_bytes()); + write_string(buf, &rel.type_name); + + if let Some(attrs) = self.new_relationships_attrs.get(&u64::from(*rel_id)) { + write_u16(buf, attrs.len() as u16); + for (key, value) in attrs.iter() { + write_string(buf, key); + write_value(buf, value); + } + } else { + write_u16(buf, 0); + } + n_effects += 1; + } + + // --- Updated node attributes (existing nodes only) --- + for (node_id, attrs) in &self.existing_nodes_attrs { + buf.push(EFFECT_UPDATE_NODE); + buf.extend_from_slice(&node_id.to_le_bytes()); + write_u16(buf, attrs.len() as u16); + for (key, value) in attrs.iter() { + write_string(buf, key); + write_value(buf, value); + } + n_effects += 1; + } + + // --- Updated relationship attributes (existing rels only) --- + for (rel_id, attrs) in &self.existing_relationships_attrs { + buf.push(EFFECT_UPDATE_EDGE); + buf.extend_from_slice(&rel_id.to_le_bytes()); + write_u16(buf, attrs.len() as u16); + for (key, value) in attrs.iter() { + write_string(buf, key); + write_value(buf, value); + } + n_effects += 1; + } + + // --- Set labels (non-created nodes only) --- + { + let nrows = self.set_node_labels.nrows(); + if nrows > 0 { + let mut label_map: HashMap> = HashMap::new(); + for (node_id, label_id) in self.set_node_labels.iter(0, nrows - 1) { + if !self.created_nodes.contains(node_id) { + label_map.entry(node_id).or_default().push(label_id); + } + } + let graph = g.borrow(); + for (node_id, label_ids) in &label_map { + buf.push(EFFECT_SET_LABELS); + buf.extend_from_slice(&node_id.to_le_bytes()); + write_u16(buf, label_ids.len() as u16); + for label_id in label_ids { + let label_name = graph.get_label_by_id(LabelId(*label_id as usize)); + write_string(buf, &label_name); + } + n_effects += 1; + } + } + } + + // --- Remove labels --- + { + let nrows = self.remove_node_labels.nrows(); + if nrows > 0 { + let mut label_map: HashMap> = HashMap::new(); + for (node_id, label_id) in self.remove_node_labels.iter(0, nrows - 1) { + label_map.entry(node_id).or_default().push(label_id); + } + let graph = g.borrow(); + for (node_id, label_ids) in &label_map { + buf.push(EFFECT_REMOVE_LABELS); + buf.extend_from_slice(&node_id.to_le_bytes()); + write_u16(buf, label_ids.len() as u16); + for label_id in label_ids { + let label_name = graph.get_label_by_id(LabelId(*label_id as usize)); + write_string(buf, &label_name); + } + n_effects += 1; + } + } + } + + // --- Deleted relationships (before nodes, so replica removes edges first) --- + for (rel_id, (from, to)) in &self.deleted_relationships { + buf.push(EFFECT_DELETE_EDGE); + buf.extend_from_slice(&u64::from(*rel_id).to_le_bytes()); + buf.extend_from_slice(&u64::from(*from).to_le_bytes()); + buf.extend_from_slice(&u64::from(*to).to_le_bytes()); + n_effects += 1; + } + + // --- Deleted nodes --- + for node_id in &self.deleted_nodes { + buf.push(EFFECT_DELETE_NODE); + buf.extend_from_slice(&node_id.to_le_bytes()); + n_effects += 1; + } + + n_effects + } +} + +// ── Effects buffer constants and helpers ── + +pub const EFFECTS_VERSION: u8 = 1; + +pub const EFFECT_UPDATE_NODE: u8 = 1; +pub const EFFECT_UPDATE_EDGE: u8 = 2; +pub const EFFECT_CREATE_NODE: u8 = 3; +pub const EFFECT_CREATE_EDGE: u8 = 4; +pub const EFFECT_DELETE_NODE: u8 = 5; +pub const EFFECT_DELETE_EDGE: u8 = 6; +pub const EFFECT_SET_LABELS: u8 = 7; +pub const EFFECT_REMOVE_LABELS: u8 = 8; +pub const EFFECT_ADD_SCHEMA: u8 = 9; +pub const EFFECT_ADD_ATTRIBUTE: u8 = 10; +pub const EFFECT_CREATE_INDEX: u8 = 11; +pub const EFFECT_DROP_INDEX: u8 = 12; + +// Schema type tags (used in EFFECT_ADD_SCHEMA) +pub const SCHEMA_NODE_LABEL: u8 = 0; +pub const SCHEMA_REL_TYPE: u8 = 1; + +// Attribute type tags (used in EFFECT_ADD_ATTRIBUTE) +pub const ATTR_NODE: u8 = 0; +pub const ATTR_REL: u8 = 1; + +// Value type tags for effect serialization +const VALUE_NULL: u8 = 0; +const VALUE_BOOL: u8 = 1; +const VALUE_INT: u8 = 2; +const VALUE_FLOAT: u8 = 3; +const VALUE_STRING: u8 = 4; +const VALUE_LIST: u8 = 5; +const VALUE_POINT: u8 = 6; +const VALUE_VECF32: u8 = 7; +const VALUE_DATETIME: u8 = 8; +const VALUE_DATE: u8 = 9; +const VALUE_TIME: u8 = 10; +const VALUE_DURATION: u8 = 11; + +pub fn write_u16( + buf: &mut Vec, + v: u16, +) { + buf.extend_from_slice(&v.to_le_bytes()); +} + +pub fn write_string( + buf: &mut Vec, + s: &str, +) { + buf.extend_from_slice(&(s.len() as u64).to_le_bytes()); + buf.extend_from_slice(s.as_bytes()); +} + +fn write_value( + buf: &mut Vec, + value: &Value, +) { + match value { + Value::Null => buf.push(VALUE_NULL), + Value::Bool(b) => { + buf.push(VALUE_BOOL); + buf.push(u8::from(*b)); + } + Value::Int(i) => { + buf.push(VALUE_INT); + buf.extend_from_slice(&i.to_le_bytes()); + } + Value::Float(f) => { + buf.push(VALUE_FLOAT); + buf.extend_from_slice(&f.to_le_bytes()); + } + Value::String(s) => { + buf.push(VALUE_STRING); + write_string(buf, s); + } + Value::List(items) => { + buf.push(VALUE_LIST); + buf.extend_from_slice(&(items.len() as u64).to_le_bytes()); + for item in items.iter() { + write_value(buf, item); + } + } + Value::Point(p) => { + buf.push(VALUE_POINT); + buf.extend_from_slice(&(p.latitude as f64).to_le_bytes()); + buf.extend_from_slice(&(p.longitude as f64).to_le_bytes()); + } + Value::VecF32(v) => { + buf.push(VALUE_VECF32); + buf.extend_from_slice(&(v.len() as u64).to_le_bytes()); + for f in v.iter() { + buf.extend_from_slice(&f.to_le_bytes()); + } + } + Value::Datetime(ts) => { + buf.push(VALUE_DATETIME); + buf.extend_from_slice(&ts.to_le_bytes()); + } + Value::Date(ts) => { + buf.push(VALUE_DATE); + buf.extend_from_slice(&ts.to_le_bytes()); + } + Value::Time(ts) => { + buf.push(VALUE_TIME); + buf.extend_from_slice(&ts.to_le_bytes()); + } + Value::Duration(dur) => { + buf.push(VALUE_DURATION); + buf.extend_from_slice(&dur.to_le_bytes()); + } + _ => { + debug_assert!(false, "Unsupported value type in effects buffer: {value:?}"); + buf.push(VALUE_NULL); // Fallback for unsupported types + } + } +} + +pub fn read_string( + buf: &[u8], + offset: &mut usize, +) -> Result, String> { + if *offset + 8 > buf.len() { + return Err("effects buffer truncated".to_string()); + } + let len = u64::from_le_bytes(buf[*offset..*offset + 8].try_into().unwrap()) as usize; + *offset += 8; + if *offset + len > buf.len() { + return Err("effects buffer truncated".to_string()); + } + let s = std::str::from_utf8(&buf[*offset..*offset + len]) + .map_err(|e| format!("invalid utf8 in effects buffer: {e}"))?; + *offset += len; + Ok(Arc::new(s.to_string())) +} + +pub fn read_u16( + buf: &[u8], + offset: &mut usize, +) -> Result { + if *offset + 2 > buf.len() { + return Err("effects buffer truncated".to_string()); + } + let v = u16::from_le_bytes(buf[*offset..*offset + 2].try_into().unwrap()); + *offset += 2; + Ok(v) +} + +pub fn read_u64( + buf: &[u8], + offset: &mut usize, +) -> Result { + if *offset + 8 > buf.len() { + return Err("effects buffer truncated".to_string()); + } + let v = u64::from_le_bytes(buf[*offset..*offset + 8].try_into().unwrap()); + *offset += 8; + Ok(v) +} + +pub fn read_value( + buf: &[u8], + offset: &mut usize, +) -> Result { + if *offset >= buf.len() { + return Err("effects buffer truncated".to_string()); + } + let tag = buf[*offset]; + *offset += 1; + match tag { + VALUE_NULL => Ok(Value::Null), + VALUE_BOOL => { + if *offset >= buf.len() { + return Err("effects buffer truncated".to_string()); + } + let b = buf[*offset] != 0; + *offset += 1; + Ok(Value::Bool(b)) + } + VALUE_INT => { + let v = i64::from_le_bytes( + buf.get(*offset..*offset + 8) + .ok_or("truncated")? + .try_into() + .unwrap(), + ); + *offset += 8; + Ok(Value::Int(v)) + } + VALUE_FLOAT => { + let v = f64::from_le_bytes( + buf.get(*offset..*offset + 8) + .ok_or("truncated")? + .try_into() + .unwrap(), + ); + *offset += 8; + Ok(Value::Float(v)) + } + VALUE_STRING => { + let s = read_string(buf, offset)?; + Ok(Value::String(s)) + } + VALUE_LIST => { + let len = read_u64(buf, offset)? as usize; + let mut items = thin_vec::ThinVec::with_capacity(len); + for _ in 0..len { + items.push(read_value(buf, offset)?); + } + Ok(Value::List(Arc::new(items))) + } + VALUE_POINT => { + let lat = f64::from_le_bytes( + buf.get(*offset..*offset + 8) + .ok_or("truncated")? + .try_into() + .unwrap(), + ); + *offset += 8; + let lon = f64::from_le_bytes( + buf.get(*offset..*offset + 8) + .ok_or("truncated")? + .try_into() + .unwrap(), + ); + *offset += 8; + Ok(Value::Point(crate::runtime::value::Point { + latitude: lat as f32, + longitude: lon as f32, + })) + } + VALUE_VECF32 => { + let len = read_u64(buf, offset)? as usize; + let mut v = Vec::with_capacity(len); + for _ in 0..len { + let f = f32::from_le_bytes( + buf.get(*offset..*offset + 4) + .ok_or("truncated")? + .try_into() + .unwrap(), + ); + *offset += 4; + v.push(f); + } + Ok(Value::VecF32(Arc::new(v.into()))) + } + VALUE_DATETIME => { + let ts = i64::from_le_bytes( + buf.get(*offset..*offset + 8) + .ok_or("truncated")? + .try_into() + .unwrap(), + ); + *offset += 8; + Ok(Value::Datetime(ts)) + } + VALUE_DATE => { + let ts = i64::from_le_bytes( + buf.get(*offset..*offset + 8) + .ok_or("truncated")? + .try_into() + .unwrap(), + ); + *offset += 8; + Ok(Value::Date(ts)) + } + VALUE_TIME => { + let ts = i64::from_le_bytes( + buf.get(*offset..*offset + 8) + .ok_or("truncated")? + .try_into() + .unwrap(), + ); + *offset += 8; + Ok(Value::Time(ts)) + } + VALUE_DURATION => { + let dur = i64::from_le_bytes( + buf.get(*offset..*offset + 8) + .ok_or("truncated")? + .try_into() + .unwrap(), + ); + *offset += 8; + Ok(Value::Duration(dur)) + } + _ => Err(format!("unknown value tag in effects buffer: {tag}")), + } } diff --git a/graph/src/runtime/runtime.rs b/graph/src/runtime/runtime.rs index 4b86eec9..3bc85a86 100644 --- a/graph/src/runtime/runtime.rs +++ b/graph/src/runtime/runtime.rs @@ -67,7 +67,13 @@ use chrono::{DateTime, Utc}; use once_cell::unsync::Lazy; use orx_tree::{Bfs, Dyn, DynNode, DynTree, MemoryPolicy, NodeIdx, NodeRef}; use roaring::RoaringTreemap; -use std::{cell::RefCell, collections::HashMap, fmt::Debug, sync::Arc, time::Instant}; +use std::{ + cell::{Cell, RefCell}, + collections::HashMap, + fmt::Debug, + sync::Arc, + time::Instant, +}; pub use super::eval::ValueIter; @@ -145,6 +151,10 @@ pub struct Runtime<'a> { pub env_pool: &'a Pool, /// Maximum number of result rows to return. Negative means unlimited. pub result_set_size: i64, + /// Effects buffer built before commit, for replication. + pub effects_buffer: RefCell>>, + /// Total number of effect records across all commits in this query. + pub effects_count: Cell, /// Timestamp captured at the start of the transaction/query. /// Used by `date.transaction()`, `localtime.transaction()`, and `localdatetime.transaction()` /// so every call in the same transaction returns the same value. @@ -331,11 +341,15 @@ impl<'a> Runtime<'a> { result_set_size: i64, ) -> Self { let return_names = plan.root().get_return_names(); + let pending = Lazy::new((|| RefCell::new(Pending::new())) as fn() -> RefCell); + if write { + pending.borrow_mut().set_schema_baseline(&g); + } Self { parameters, g, write, - pending: Lazy::new(|| RefCell::new(Pending::new())), + pending, stats: RefCell::new(QueryStatistics::default()), plan, return_names, @@ -348,6 +362,8 @@ impl<'a> Runtime<'a> { merge_pattern_cache: RefCell::new(HashMap::new()), env_pool, result_set_size, + effects_buffer: RefCell::new(None), + effects_count: Cell::new(0), transaction_timestamp: Utc::now(), } } diff --git a/tests/flow/test_effects.py b/tests/flow/test_effects.py index 58d1784e..5fb629e9 100644 --- a/tests/flow/test_effects.py +++ b/tests/flow/test_effects.py @@ -322,7 +322,7 @@ def test06_update_node_effect(self, expect_effect=True): n.xa = n.xa + 1""" res = self.query_master_and_wait(q) - self.env.assertEqual(res.properties_set, 11) + self.env.assertEqual(res.properties_set, 1) if(expect_effect): self.wait_for_effect() @@ -483,7 +483,7 @@ def test07_update_edge_effect(self, expect_effect=True): e.a = e.a + 1""" res = self.query_master_and_wait(q) - self.env.assertEqual(res.properties_set, 11) + self.env.assertEqual(res.properties_set, 1) if(expect_effect): self.wait_for_effect() @@ -698,8 +698,7 @@ def test12_merge_node(self, expect_effect=True): ON CREATE SET n.v = 'blue'""" res = self.query_master_and_wait(q) self.env.assertEqual(res.nodes_created, 1) - self.env.assertEqual(res.properties_set, 2) - self.env.assertEqual(res.properties_removed, 1) + self.env.assertEqual(res.properties_set, 1) if(expect_effect): self.wait_for_effect() @@ -734,7 +733,7 @@ def test13_merge_edge(self, expect_effect=True): ON MATCH SET e.v = 'green' ON CREATE SET e.v = 'blue'""" res = self.query_master_and_wait(q) - self.env.assertEqual(res.properties_set, 3) + self.env.assertEqual(res.properties_set, 2) self.env.assertEqual(res.relationships_created, 1) if(expect_effect):