diff --git a/src/commands/effect.rs b/src/commands/effect.rs new file mode 100644 index 00000000..45af3dcf --- /dev/null +++ b/src/commands/effect.rs @@ -0,0 +1,354 @@ +//! `GRAPH.EFFECT` command handler. +//! +//! Applies serialized effects (mutations) received from the primary to +//! maintain replica consistency. The binary effects buffer is produced +//! by `Pending::build_effects_buffer()` on the primary and contains the +//! exact mutations that occurred during query execution. +//! +//! ## Command syntax +//! ```text +//! GRAPH.EFFECT +//! ``` + +use crate::{config::CONFIGURATION_CACHE_SIZE, graph_core::ThreadedGraph, redis_type::GRAPH_TYPE}; +use graph::{ + entity_type::EntityType, + graph::graph::{Graph, NodeId, RelationshipId}, + graph::graphblas::matrix::{Matrix, New, Set}, + graph::graphblas::tensor::GrB_INDEX_MAX, + index::IndexType, + runtime::{ + ordermap::OrderMap, + pending::{ + ATTR_NODE, ATTR_REL, EFFECT_ADD_ATTRIBUTE, EFFECT_ADD_SCHEMA, EFFECT_CREATE_EDGE, + EFFECT_CREATE_INDEX, EFFECT_CREATE_NODE, EFFECT_DELETE_EDGE, EFFECT_DELETE_NODE, + EFFECT_DROP_INDEX, EFFECT_REMOVE_LABELS, EFFECT_SET_LABELS, EFFECT_UPDATE_EDGE, + EFFECT_UPDATE_NODE, EFFECTS_VERSION, PendingRelationship, SCHEMA_NODE_LABEL, + SCHEMA_REL_TYPE, read_string, read_u16, read_u64, read_value, + }, + value::Value, + }, +}; +use parking_lot::RwLock; +use redis_module::{Context, NextArg, RedisResult, RedisString, RedisValue}; +use roaring::RoaringTreemap; +use std::{collections::HashMap, sync::Arc}; + +pub fn graph_effect( + ctx: &Context, + args: Vec, +) -> RedisResult { + let mut args = args.into_iter().skip(1); + let key_str = args.next_arg()?; + let effects_buf = args.next_arg()?; + + let buf = effects_buf.as_slice(); + if buf.is_empty() { + return Ok(RedisValue::SimpleStringStatic("OK")); + } + + // Open existing graph or create a new one + let key = ctx.open_key_writable(&key_str); + let graph = if let Some(g) = key.get_value::>>(&GRAPH_TYPE)? { + g.clone() + } else { + let g = Arc::new(RwLock::new(ThreadedGraph::new( + *CONFIGURATION_CACHE_SIZE.lock(ctx) as usize, + &key_str.to_string(), + ))); + key.set_value(&GRAPH_TYPE, g.clone())?; + g + }; + + let mut tg = graph.write(); + let Some(g_arc) = tg.graph.write() else { + return Err(redis_module::RedisError::String( + "ERR write lock unavailable".to_string(), + )); + }; + + let result = { + let mut g = g_arc.borrow_mut(); + apply_effects(&mut g, buf) + }; + + match result { + Ok(()) => { + tg.graph.commit(g_arc); + ctx.replicate_verbatim(); + let value = tg.graph.read().borrow().maybe_flush_caches(); + if let Err(e) = value { + eprintln!("FalkorDB: cache flush failed: {e}"); + } + Ok(RedisValue::SimpleStringStatic("OK")) + } + Err(e) => { + tg.graph.rollback(); + Err(redis_module::RedisError::String(format!( + "ERR effect apply failed: {e}" + ))) + } + } +} + +#[allow(clippy::too_many_lines)] +fn apply_effects( + g: &mut Graph, + buf: &[u8], +) -> Result<(), String> { + let mut offset = 0; + + if offset >= buf.len() { + return Err("empty effects buffer".to_string()); + } + let version = buf[offset]; + offset += 1; + if version != EFFECTS_VERSION { + return Err(format!("unsupported effects version: {version}")); + } + + let mut index_add_docs: HashMap = HashMap::new(); + let mut index_remove_docs: HashMap = HashMap::new(); + let mut has_index_ops = false; + + while offset < buf.len() { + let effect_type = buf[offset]; + offset += 1; + + match effect_type { + EFFECT_CREATE_NODE => { + let node_id_raw = read_u64(buf, &mut offset)?; + g.inc_reserved_node_count(); + + // Labels + let label_count = read_u16(buf, &mut offset)?; + let mut set_labels = Matrix::new(GrB_INDEX_MAX, GrB_INDEX_MAX); + for _ in 0..label_count { + let label_name = read_string(buf, &mut offset)?; + let label_id = g.get_label_id_mut(&label_name); + set_labels.set(node_id_raw, label_id.0 as u64, true); + } + + // Create the node + let mut nodes = RoaringTreemap::new(); + nodes.insert(node_id_raw); + g.create_nodes(&nodes); + + // Apply labels + if label_count > 0 { + g.set_nodes_labels(&mut set_labels, &mut index_add_docs); + } + + // Attributes + let attr_count = read_u16(buf, &mut offset)?; + if attr_count > 0 { + let attrs = read_attrs(buf, &mut offset, attr_count)?; + let mut attr_map = HashMap::new(); + attr_map.insert(node_id_raw, attrs); + g.set_nodes_attributes(&attr_map, &mut index_add_docs)?; + } + } + + EFFECT_CREATE_EDGE => { + let rel_id_raw = read_u64(buf, &mut offset)?; + let src_id = read_u64(buf, &mut offset)?; + let dst_id = read_u64(buf, &mut offset)?; + let type_name = read_string(buf, &mut offset)?; + + g.inc_reserved_relationship_count(); + + let pending_rel = + PendingRelationship::new(NodeId::from(src_id), NodeId::from(dst_id), type_name); + let mut rels = HashMap::new(); + rels.insert(RelationshipId::from(rel_id_raw), pending_rel); + g.create_relationships(&rels); + + // Attributes + let attr_count = read_u16(buf, &mut offset)?; + if attr_count > 0 { + let attrs = read_attrs(buf, &mut offset, attr_count)?; + let mut attr_map = HashMap::new(); + attr_map.insert(rel_id_raw, attrs); + g.set_relationships_attributes(&attr_map)?; + } + } + + EFFECT_UPDATE_NODE => { + let node_id = read_u64(buf, &mut offset)?; + let attr_count = read_u16(buf, &mut offset)?; + let attrs = read_attrs(buf, &mut offset, attr_count)?; + let mut attr_map = HashMap::new(); + attr_map.insert(node_id, attrs); + g.set_nodes_attributes(&attr_map, &mut index_add_docs)?; + } + + EFFECT_UPDATE_EDGE => { + let rel_id = read_u64(buf, &mut offset)?; + let attr_count = read_u16(buf, &mut offset)?; + let attrs = read_attrs(buf, &mut offset, attr_count)?; + let mut attr_map = HashMap::new(); + attr_map.insert(rel_id, attrs); + g.set_relationships_attributes(&attr_map)?; + } + + EFFECT_SET_LABELS => { + let node_id = read_u64(buf, &mut offset)?; + let label_count = read_u16(buf, &mut offset)?; + let mut set_labels = Matrix::new(GrB_INDEX_MAX, GrB_INDEX_MAX); + for _ in 0..label_count { + let label_name = read_string(buf, &mut offset)?; + let label_id = g.get_label_id_mut(&label_name); + set_labels.set(node_id, label_id.0 as u64, true); + } + g.set_nodes_labels(&mut set_labels, &mut index_add_docs); + } + + EFFECT_REMOVE_LABELS => { + let node_id = read_u64(buf, &mut offset)?; + let label_count = read_u16(buf, &mut offset)?; + let mut remove_labels = Matrix::new(GrB_INDEX_MAX, GrB_INDEX_MAX); + for _ in 0..label_count { + let label_name = read_string(buf, &mut offset)?; + if let Some(label_id) = g.get_label_id(&label_name) { + remove_labels.set(node_id, label_id.0 as u64, true); + } + } + g.remove_nodes_labels(&mut remove_labels, &mut index_remove_docs); + } + + EFFECT_DELETE_NODE => { + let node_id = read_u64(buf, &mut offset)?; + let mut nodes = RoaringTreemap::new(); + nodes.insert(node_id); + g.delete_nodes(&nodes, &mut index_remove_docs)?; + } + + EFFECT_DELETE_EDGE => { + let rel_id = read_u64(buf, &mut offset)?; + let src_id = read_u64(buf, &mut offset)?; + let dst_id = read_u64(buf, &mut offset)?; + let rel = RelationshipId::from(rel_id); + let mut rels = HashMap::new(); + rels.insert(rel, (NodeId::from(src_id), NodeId::from(dst_id))); + g.delete_relationships(rels)?; + } + + EFFECT_ADD_SCHEMA => { + if offset >= buf.len() { + return Err("truncated EFFECT_ADD_SCHEMA".to_string()); + } + let schema_type = buf[offset]; + offset += 1; + let name = read_string(buf, &mut offset)?; + match schema_type { + SCHEMA_NODE_LABEL => { + g.get_label_id_mut(&name); + } + SCHEMA_REL_TYPE => { + g.get_type_id_mut(&name); + } + _ => return Err(format!("unknown schema type: {schema_type}")), + } + } + + EFFECT_ADD_ATTRIBUTE => { + if offset >= buf.len() { + return Err("truncated EFFECT_ADD_ATTRIBUTE".to_string()); + } + let attr_type = buf[offset]; + offset += 1; + let name = read_string(buf, &mut offset)?; + match attr_type { + ATTR_NODE => g.add_node_attribute_name(&name), + ATTR_REL => g.add_rel_attribute_name(&name), + _ => return Err(format!("unknown attribute type: {attr_type}")), + } + } + + EFFECT_CREATE_INDEX => { + let index_type = read_index_type(buf, &mut offset)?; + let entity_type = read_entity_type(buf, &mut offset)?; + let label = read_string(buf, &mut offset)?; + let attr_count = read_u16(buf, &mut offset)?; + let mut attrs = Vec::with_capacity(attr_count as usize); + for _ in 0..attr_count { + attrs.push(read_string(buf, &mut offset)?); + } + // Use sync variant to avoid spawning async population threads on the replica + g.create_index_sync(&index_type, &entity_type, &label, &attrs, None)?; + has_index_ops = true; + } + + EFFECT_DROP_INDEX => { + let index_type = read_index_type(buf, &mut offset)?; + let entity_type = read_entity_type(buf, &mut offset)?; + let label = read_string(buf, &mut offset)?; + let attr_count = read_u16(buf, &mut offset)?; + let mut attrs = Vec::with_capacity(attr_count as usize); + for _ in 0..attr_count { + attrs.push(read_string(buf, &mut offset)?); + } + g.drop_index(&index_type, &entity_type, &label, &attrs)?; + } + + _ => return Err(format!("unknown effect type: {effect_type}")), + } + } + + g.commit_attrs()?; + g.commit_index(&mut index_add_docs, &mut index_remove_docs); + + if has_index_ops { + g.populate_indexes_sync(); + } + + Ok(()) +} + +fn read_index_type( + buf: &[u8], + offset: &mut usize, +) -> Result { + if *offset >= buf.len() { + return Err("effects buffer truncated".to_string()); + } + let tag = buf[*offset]; + *offset += 1; + match tag { + 0 => Ok(IndexType::Range), + 1 => Ok(IndexType::Fulltext), + 2 => Ok(IndexType::Vector), + _ => Err(format!("unknown index type tag: {tag}")), + } +} + +fn read_entity_type( + buf: &[u8], + offset: &mut usize, +) -> Result { + if *offset >= buf.len() { + return Err("effects buffer truncated".to_string()); + } + let tag = buf[*offset]; + *offset += 1; + match tag { + 0 => Ok(EntityType::Node), + 1 => Ok(EntityType::Relationship), + _ => Err(format!("unknown entity type tag: {tag}")), + } +} + +fn read_attrs( + buf: &[u8], + offset: &mut usize, + count: u16, +) -> Result, Value>, String> { + let pairs: Vec<_> = (0..count) + .map(|_| { + let key = read_string(buf, offset)?; + let value = read_value(buf, offset)?; + Ok((key, value)) + }) + .collect::, String>>()?; + Ok(OrderMap::from_vec(pairs)) +} diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 41bc603a..d54a4731 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -21,6 +21,7 @@ use redis_module::{RedisError, RedisResult}; pub mod config_cmd; pub mod delete; +pub mod effect; pub mod explain; pub mod list; pub mod memory; @@ -31,6 +32,7 @@ pub mod udf; pub use config_cmd::graph_config; pub use delete::graph_delete; +pub use effect::graph_effect; pub use explain::graph_explain; pub use list::graph_list; pub use memory::graph_memory; diff --git a/src/graph_core.rs b/src/graph_core.rs index 54996fae..e64ea2ca 100644 --- a/src/graph_core.rs +++ b/src/graph_core.rs @@ -28,7 +28,10 @@ //! queue guarded by `write_loop`. use crate::{ - config::{CONFIGURATION_IMPORT_FOLDER, MAX_QUEUED_QUERIES, RESULTSET_SIZE, TIMEOUT_DEFAULT}, + config::{ + CONFIGURATION_IMPORT_FOLDER, EFFECTS_THRESHOLD, MAX_QUEUED_QUERIES, RESULTSET_SIZE, + TIMEOUT_DEFAULT, + }, reply::{reply_compact, reply_verbose}, }; use atomic_refcell::AtomicRefCell; @@ -41,8 +44,15 @@ use graph::{ graph::{Graph, Plan}, mvcc_graph::MvccGraph, }, - planner::IR, - runtime::{eval::evaluate_param, pool::Pool, runtime::Runtime}, + planner::{IR, plan_is_non_deterministic}, + runtime::{ + eval::evaluate_param, + pending::{ + EFFECT_CREATE_INDEX, EFFECT_DROP_INDEX, EFFECTS_VERSION, write_string, write_u16, + }, + pool::Pool, + runtime::Runtime, + }, threadpool::{pending_count, spawn}, }; use orx_tree::Collection; @@ -62,6 +72,7 @@ use std::{ use crate::allocator::{current_thread_usage, disable_tracking, enable_tracking, reset_counter}; type WriteMessage = (BlockedClient, Arc, bool, bool, Arc); +type WriteQueryResult = Result<(Arc>, Option>, bool), String>; pub struct ThreadedGraph { pub graph: MvccGraph, @@ -87,6 +98,24 @@ impl ThreadedGraph { } } + /// Create a `ThreadedGraph` from an existing `MvccGraph`. + /// Used by the RDB load path. + pub fn from_mvcc(graph: MvccGraph) -> Self { + let (sender, receiver) = bounded_blocking(1024); + Self { + graph, + sender, + receiver, + write_loop: AtomicBool::new(false), + } + } + + /// Returns the graph name. + pub fn name(&self) -> String { + let g = self.graph.read(); + g.borrow().name().to_string() + } + pub fn execute_query( &self, ctx: &Context, @@ -149,7 +178,7 @@ impl ThreadedGraph { query: &str, compact: bool, first_cached: bool, - ) -> Result>, String> { + ) -> WriteQueryResult { let Plan { plan, parameters, .. } = self.graph.read().borrow().get_plan(query)?; @@ -162,6 +191,9 @@ impl ThreadedGraph { n, IR::Commit | IR::CreateIndex { .. } | IR::DropIndex { .. } ))); + + let is_non_deterministic = plan_is_non_deterministic(&plan); + let g = self.graph.write().unwrap(); let env_pool = Pool::new(); let runtime = Runtime::new( @@ -182,13 +214,31 @@ impl ThreadedGraph { return Err(err); } }; + + // Capture effects buffer before replying (pending data is still available) + let mut effects_buffer = + should_use_effects(is_non_deterministic, &runtime, result.stats.execution_time); + + // Build index effects for CreateIndex / DropIndex IR nodes (not tracked by Pending) + effects_buffer = build_index_effects(&runtime, effects_buffer); + result.stats.cached = cached; if compact { reply_compact(ctx, &runtime, &result); } else { reply_verbose(ctx, &runtime, &result); } - Ok(g) + let modified = result.stats.nodes_created > 0 + || result.stats.nodes_deleted > 0 + || result.stats.relationships_created > 0 + || result.stats.relationships_deleted > 0 + || result.stats.properties_set > 0 + || result.stats.properties_removed > 0 + || result.stats.labels_added > 0 + || result.stats.labels_removed > 0 + || result.stats.indexes_created > 0 + || result.stats.indexes_dropped > 0; + Ok((g, effects_buffer, modified)) } } @@ -217,7 +267,7 @@ pub fn query_mut( ) -> RedisResult { // Inside MULTI/EXEC: execute synchronously (blocking commands not allowed). if ctx.get_flags().contains(ContextFlags::MULTI) { - return query_sync(ctx, graph, query, compact, write); + return query_sync(ctx, graph, query, compact, write, &key_name); } // Check pending queries limit before dispatching. @@ -307,6 +357,7 @@ fn query_sync( query: &str, compact: bool, write: bool, + key_name: &Arc, ) -> RedisResult { // First pass: parse + detect if write, execute reads inline. // Sync query timeout to UDF JS runtime @@ -324,8 +375,11 @@ fn query_sync( let mut g = graph.write(); let res = g.execute_query_write(ctx, query, compact, cached); match res { - Ok(new_graph) => { + Ok((new_graph, effects_buffer, modified)) => { g.graph.commit(new_graph); + if modified { + replicate_effects(ctx, key_name, effects_buffer, query); + } // Flush dirty cache entries to fjall if over budget. let value = g.graph.read().borrow().maybe_flush_caches(); if let Err(e) = value { @@ -360,7 +414,7 @@ pub fn process_write_queued_query(graph: &Arc>) { let ctx = Context::new(ctx); let res = graph.execute_query_write(&ctx, &query, compact, cached); match res { - Ok(g) => { + Ok((g, effects_buffer, modified)) => { // Signal the key as modified so WATCH gets triggered. unsafe { raw::RedisModule_ThreadSafeContextLock.unwrap()(ctx.ctx); @@ -371,6 +425,12 @@ pub fn process_write_queued_query(graph: &Arc>) { ); raw::RedisModule_SignalModifiedKey.unwrap()(ctx.ctx, rstr); raw::RedisModule_FreeString.unwrap()(ctx.ctx, rstr); + }; + // Send replication while GIL is held + if modified { + replicate_effects(&ctx, &key_name, effects_buffer, &query); + } + unsafe { raw::RedisModule_ThreadSafeContextUnlock.unwrap()(ctx.ctx); raw::RedisModule_FreeThreadSafeContext.unwrap()(ctx.ctx); }; @@ -395,6 +455,118 @@ pub fn process_write_queued_query(graph: &Arc>) { } } +/// Decide whether to use effects replication and get the pre-built buffer. +/// The buffer was built in CommitOp before pending was cleared. +/// Returns Some(buffer) if effects should be sent, None for verbatim replication. +fn should_use_effects( + is_non_deterministic: bool, + runtime: &Runtime, + exec_time_ms: f64, +) -> Option> { + let threshold = EFFECTS_THRESHOLD.load(Ordering::Relaxed); + + let buf = runtime.effects_buffer.borrow_mut().take(); + let buf = match buf { + Some(b) if b.len() > 1 => b, // > 1 because version byte alone means empty + _ => return None, + }; + + let n_effects = runtime.effects_count.get(); + + let use_effects = if is_non_deterministic || threshold == 0 { + true + } else if n_effects == 0 { + false + } else { + let avg_mod_time_us = (exec_time_ms / n_effects as f64) * 1000.0; + avg_mod_time_us > threshold as f64 + }; + + if use_effects { Some(buf) } else { None } +} + +/// Send replication: GRAPH.EFFECT with binary buffer, or verbatim query replay. +fn replicate_effects( + ctx: &Context, + key_name: &str, + effects_buffer: Option>, + query: &str, +) { + if let Some(buf) = effects_buffer { + let args: &[&[u8]] = &[key_name.as_bytes(), &buf]; + ctx.replicate("GRAPH.EFFECT", args); + } else { + let args: &[&[u8]] = &[key_name.as_bytes(), query.as_bytes()]; + ctx.replicate("GRAPH.QUERY", args); + } +} + +/// Encode IndexType as u8 tag for effects buffer. +const fn index_type_tag(it: &graph::index::IndexType) -> u8 { + use graph::index::IndexType; + match it { + IndexType::Range => 0, + IndexType::Fulltext => 1, + IndexType::Vector => 2, + } +} + +/// Encode EntityType as u8 tag for effects buffer. +const fn entity_type_tag(et: &graph::entity_type::EntityType) -> u8 { + use graph::entity_type::EntityType; + match et { + EntityType::Node => 0, + EntityType::Relationship => 1, + } +} + +/// Scan the plan for CreateIndex / DropIndex IR nodes and append their +/// effects to the buffer. Returns the (possibly new) effects buffer. +fn build_index_effects( + runtime: &Runtime, + mut effects_buffer: Option>, +) -> Option> { + for node in runtime.plan.iter() { + match node { + IR::CreateIndex { + label, + attrs, + index_type, + entity_type, + .. + } => { + let buf = effects_buffer.get_or_insert_with(|| vec![EFFECTS_VERSION]); + buf.push(EFFECT_CREATE_INDEX); + buf.push(index_type_tag(index_type)); + buf.push(entity_type_tag(entity_type)); + write_string(buf, label); + write_u16(buf, attrs.len() as u16); + for attr in attrs { + write_string(buf, attr); + } + } + IR::DropIndex { + label, + attrs, + index_type, + entity_type, + } => { + let buf = effects_buffer.get_or_insert_with(|| vec![EFFECTS_VERSION]); + buf.push(EFFECT_DROP_INDEX); + buf.push(index_type_tag(index_type)); + buf.push(entity_type_tag(entity_type)); + write_string(buf, label); + write_u16(buf, attrs.len() as u16); + for attr in attrs { + write_string(buf, attr); + } + } + _ => {} + } + } + effects_buffer +} + #[unsafe(no_mangle)] pub unsafe extern "C" fn graph_free(value: *mut c_void) { unsafe {