diff --git a/src/serializers/buffered_io.rs b/src/serializers/buffered_io.rs new file mode 100644 index 00000000..eca35f67 --- /dev/null +++ b/src/serializers/buffered_io.rs @@ -0,0 +1,310 @@ +//! Buffered IO layer for RDB serialization (v19 format). +//! +//! Wraps `*mut RedisModuleIO` with a 256KB buffer and prefixes every +//! value with a 1-byte type tag, matching the C FalkorDB `SerializerIOv2`. +//! +//! Type tags: +//! - 0 (BYTES): `[tag:u8][len:u64][data:len bytes]` +//! - 1 (FLOAT): `[tag:u8][value:4 bytes]` +//! - 2 (DOUBLE): `[tag:u8][value:8 bytes]` +//! - 3 (SIGNED): `[tag:u8][value:8 bytes]` +//! - 4 (UNSIGNED):`[tag:u8][value:8 bytes]` +//! - 5 (LONG_DOUBLE): not used in Rust +//! - 6 (BLOB): sentinel, next Redis chunk is standalone blob data + +use graph::graph::graphblas::serialization::Reader; +use graph::graph::graphblas::serialization::Writer; +use redis_module::RedisModuleIO; +use redis_module::raw; + +const BUFFER_SIZE: usize = 256_000; + +const TYPE_BYTES: u8 = 0; +const TYPE_FLOAT: u8 = 1; +const TYPE_DOUBLE: u8 = 2; +const TYPE_SIGNED: u8 = 3; +const TYPE_UNSIGNED: u8 = 4; +#[allow(dead_code)] +const TYPE_LONG_DOUBLE: u8 = 5; +const TYPE_BLOB: u8 = 6; + +// --------------------------------------------------------------------------- +// Writer +// --------------------------------------------------------------------------- + +/// Buffered writer that accumulates type-tagged values and flushes +/// as 256KB chunks to Redis via `RedisModule_SaveStringBuffer`. +pub struct BufferedWriter { + rdb: *mut RedisModuleIO, + buf: Vec, +} + +impl BufferedWriter { + pub fn new(rdb: *mut RedisModuleIO) -> Self { + Self { + rdb, + buf: Vec::with_capacity(BUFFER_SIZE), + } + } + + /// Flush the current buffer to Redis and reset. + fn flush(&mut self) { + if !self.buf.is_empty() { + raw::save_slice(self.rdb, &self.buf); + self.buf.clear(); + } + } + + /// Ensure there is room for `needed` bytes, flushing if necessary. + fn accommodate( + &mut self, + needed: usize, + ) { + if self.buf.len() + needed > BUFFER_SIZE { + self.flush(); + } + } + + pub fn write_unsigned( + &mut self, + val: u64, + ) { + self.accommodate(1 + 8); + self.buf.push(TYPE_UNSIGNED); + self.buf.extend_from_slice(&val.to_le_bytes()); + } + + pub fn write_signed( + &mut self, + val: i64, + ) { + self.accommodate(1 + 8); + self.buf.push(TYPE_SIGNED); + self.buf.extend_from_slice(&val.to_le_bytes()); + } + + pub fn write_double( + &mut self, + val: f64, + ) { + self.accommodate(1 + 8); + self.buf.push(TYPE_DOUBLE); + self.buf.extend_from_slice(&val.to_le_bytes()); + } + + #[allow(dead_code)] + pub fn write_float( + &mut self, + val: f32, + ) { + self.accommodate(1 + 4); + self.buf.push(TYPE_FLOAT); + self.buf.extend_from_slice(&val.to_le_bytes()); + } + + /// Write a byte buffer. Small buffers are inlined; large ones use + /// the blob sentinel and are written as standalone Redis chunks. + pub fn write_buffer( + &mut self, + data: &[u8], + ) { + let inline_size = 1 + 8 + data.len(); // tag + u64 len + data + if inline_size <= BUFFER_SIZE { + // Inline: fits in a single buffer + self.accommodate(inline_size); + self.buf.push(TYPE_BYTES); + self.buf + .extend_from_slice(&(data.len() as u64).to_le_bytes()); + self.buf.extend_from_slice(data); + } else { + // Blob: write sentinel, flush, then write standalone + self.accommodate(1); + self.buf.push(TYPE_BLOB); + self.flush(); + raw::save_slice(self.rdb, data); + } + } + + /// Flush any remaining data. Must be called when encoding is complete. + pub fn finish(mut self) { + self.flush(); + } +} + +impl Writer for BufferedWriter { + fn write_unsigned( + &mut self, + val: u64, + ) { + self.write_unsigned(val); + } + + fn write_signed( + &mut self, + val: i64, + ) { + self.write_signed(val); + } + + fn write_double( + &mut self, + val: f64, + ) { + self.write_double(val); + } + + fn write_buffer( + &mut self, + data: &[u8], + ) { + self.write_buffer(data); + } +} + +// --------------------------------------------------------------------------- +// Reader +// --------------------------------------------------------------------------- + +/// Buffered reader that loads 256KB chunks from Redis and consumes +/// type-tagged values from them. +pub struct BufferedReader { + rdb: *mut RedisModuleIO, + buf: Vec, + pos: usize, +} + +impl Reader for BufferedReader { + fn read_unsigned(&mut self) -> Result { + self.read_unsigned() + } + + fn read_signed(&mut self) -> Result { + self.read_signed() + } + + fn read_double(&mut self) -> Result { + self.read_double() + } + + fn read_buffer(&mut self) -> Result, String> { + self.read_buffer() + } +} + +impl BufferedReader { + pub const fn new(rdb: *mut RedisModuleIO) -> Self { + Self { + rdb, + buf: Vec::new(), + pos: 0, + } + } + + /// Load the next chunk from Redis. + fn load_chunk(&mut self) -> Result<(), String> { + let chunk = raw::load_string_buffer(self.rdb) + .map_err(|e| format!("BufferedReader: load chunk: {e}"))?; + self.buf = chunk.as_ref().to_vec(); + self.pos = 0; + Ok(()) + } + + /// Ensure at least 1 byte is available, loading a new chunk if needed. + fn ensure_available(&mut self) -> Result<(), String> { + if self.pos >= self.buf.len() { + self.load_chunk()?; + } + Ok(()) + } + + /// Read and validate a type tag byte. + fn read_tag( + &mut self, + expected: u8, + ) -> Result<(), String> { + self.ensure_available()?; + let tag = self.buf[self.pos]; + self.pos += 1; + if tag != expected { + return Err(format!( + "BufferedReader: expected type tag {expected}, got {tag} at pos {}", + self.pos - 1 + )); + } + Ok(()) + } + + /// Read N bytes from the buffer. + fn read_bytes( + &mut self, + n: usize, + ) -> Result<&[u8], String> { + if self.pos + n > self.buf.len() { + return Err(format!( + "BufferedReader: need {n} bytes at pos {}, but buffer len is {}", + self.pos, + self.buf.len() + )); + } + let slice = &self.buf[self.pos..self.pos + n]; + self.pos += n; + Ok(slice) + } + + pub fn read_unsigned(&mut self) -> Result { + self.read_tag(TYPE_UNSIGNED)?; + let bytes = self.read_bytes(8)?; + Ok(u64::from_le_bytes(bytes.try_into().unwrap())) + } + + pub fn read_signed(&mut self) -> Result { + self.read_tag(TYPE_SIGNED)?; + let bytes = self.read_bytes(8)?; + Ok(i64::from_le_bytes(bytes.try_into().unwrap())) + } + + pub fn read_double(&mut self) -> Result { + self.read_tag(TYPE_DOUBLE)?; + let bytes = self.read_bytes(8)?; + Ok(f64::from_le_bytes(bytes.try_into().unwrap())) + } + + #[allow(dead_code)] + pub fn read_float(&mut self) -> Result { + self.read_tag(TYPE_FLOAT)?; + let bytes = self.read_bytes(4)?; + Ok(f32::from_le_bytes(bytes.try_into().unwrap())) + } + + /// Read a byte buffer. Handles both inline (TYPE_BYTES) and blob (TYPE_BLOB). + pub fn read_buffer(&mut self) -> Result, String> { + self.ensure_available()?; + let tag = self.buf[self.pos]; + self.pos += 1; + + match tag { + TYPE_BYTES => { + // Inline: length then data + let len_bytes = self.read_bytes(8)?; + let len = u64::from_le_bytes(len_bytes.try_into().unwrap()) as usize; + let data = self.read_bytes(len)?; + Ok(data.to_vec()) + } + TYPE_BLOB => { + // The current buffer should now be fully consumed + // (the blob sentinel was the last byte before flush). + // Load the standalone blob chunk. + let chunk = raw::load_string_buffer(self.rdb) + .map_err(|e| format!("BufferedReader: load blob: {e}"))?; + let data = chunk.as_ref().to_vec(); + // Reset internal state - next read will trigger load_chunk + self.buf.clear(); + self.pos = 0; + Ok(data) + } + _ => Err(format!( + "BufferedReader: expected BYTES(0) or BLOB(6) tag, got {tag}" + )), + } + } +} diff --git a/src/serializers/decoder/mod.rs b/src/serializers/decoder/mod.rs new file mode 100644 index 00000000..b5896f8f --- /dev/null +++ b/src/serializers/decoder/mod.rs @@ -0,0 +1,330 @@ +use std::sync::Arc; + +use graph::entity_type::EntityType; +use graph::graph::attribute_store::AttributeStore; +use graph::graph::graph::Graph; +use graph::graph::graphblas::matrix::New; +use graph::graph::graphblas::serialization::Decode; +use graph::graph::graphblas::tensor::Tensor; +use graph::graph::graphblas::versioned_matrix::VersionedMatrix; +use graph::index::IndexInfo; +use redis_module::RedisModuleIO; +use roaring::RoaringTreemap; + +use super::EncodeState; +use super::Header; +use super::Schema; +use super::buffered_io::BufferedReader; +use super::{DECODE_STATE, PendingGraph}; + +/// Decode a graph key from the RDB stream (v19 format). +/// +/// Returns `Ok(Some(graph))` for single-key graphs (key_count == 1), +/// or `Ok(None)` when the key data has been accumulated into +/// `DECODE_STATE` for multi-key graphs (key_count > 1). +#[allow(clippy::too_many_lines)] +pub fn rdb_load_graph( + rdb: *mut RedisModuleIO, + cache_size: usize, +) -> Result, String> { + let mut r = BufferedReader::new(rdb); + + // --- Header --- + let hdr = Header::decode(&mut r)?; + + // --- Schema --- + let schema = Schema::decode(&mut r)?; + + // --- Key Schema (payload directory) --- + let payload_count = r.read_unsigned()?; + let mut payloads = Vec::with_capacity(payload_count as usize); + for _ in 0..payload_count { + let state = r.read_unsigned()?; + let count = r.read_unsigned()?; + let state = + EncodeState::from_u64(state).ok_or_else(|| format!("unknown encode state: {state}"))?; + payloads.push((state, count)); + } + + // For multi-key graphs, check if we already have a pending graph in DECODE_STATE. + if hdr.key_count > 1 { + let mut decode_state = DECODE_STATE.lock(); + let is_first_key = !decode_state.pending.contains_key(&hdr.graph_name); + + if is_first_key { + // First key: initialize the pending graph. + let node_attrs = AttributeStore::new(&format!("{}/nodes", hdr.graph_name), 0); + let mut rel_attrs = + AttributeStore::new(&format!("{}/relationships", hdr.graph_name), 0); + + // Set attribute names on the stores now -- they are the same across all keys. + let mut node_attrs_init = node_attrs; + for name in &schema.attribute_names { + node_attrs_init.attrs_name.insert(name.clone()); + rel_attrs.attrs_name.insert(name.clone()); + } + + let pg = PendingGraph { + keys_remaining: hdr.key_count - 1, // this key + remaining + cache_size, + header: Header { + graph_name: hdr.graph_name.clone(), + node_count: hdr.node_count, + edge_count: hdr.edge_count, + deleted_node_count: hdr.deleted_node_count, + deleted_edge_count: hdr.deleted_edge_count, + label_count: hdr.label_count, + relationship_count: hdr.relationship_count, + multi_edge: hdr.multi_edge.clone(), + key_count: hdr.key_count, + }, + schema: Schema { + attribute_names: schema.attribute_names.clone(), + node_labels: schema.node_labels.clone(), + relationship_types: schema.relationship_types.clone(), + indexes: schema.indexes, + }, + node_attrs: node_attrs_init, + rel_attrs, + deleted_nodes: RoaringTreemap::new(), + deleted_rels: RoaringTreemap::new(), + label_matrices: Vec::new(), + relationship_tensors: Vec::new(), + adj_matrix: VersionedMatrix::new(0, 0), + lbls_matrix: VersionedMatrix::new(0, 0), + }; + decode_state.pending.insert(hdr.graph_name.clone(), pg); + } else { + // Subsequent key: just decrement keys_remaining. + if let Some(pg) = decode_state.pending.get_mut(&hdr.graph_name) { + pg.keys_remaining -= 1; + } + } + + // Decode this key's payloads into the pending graph. + { + let pg = decode_state.pending.get_mut(&hdr.graph_name).unwrap(); + decode_payloads_into_pending(&mut r, &payloads, pg, &hdr)?; + } + + // If all keys have been loaded, finalize immediately. + // This avoids depending on aux_load ordering between module types. + let should_finalize = { + let pg = decode_state.pending.get(&hdr.graph_name).unwrap(); + pg.keys_remaining == 0 + }; + if should_finalize { + let graph_name = hdr.graph_name.clone(); + let pg = decode_state.pending.remove(&graph_name).unwrap(); + let graph = finalize_pending_graph(pg)?; + // Store the finalized graph in DECODE_STATE for the caller to retrieve. + decode_state.finalized.insert(graph_name, graph); + } + + return Ok(None); + } + + // Single-key path (key_count == 1): decode everything in one go. + let mut node_attrs = AttributeStore::new(&format!("{}/nodes", hdr.graph_name), 0); + let mut rel_attrs = AttributeStore::new(&format!("{}/relationships", hdr.graph_name), 0); + + for name in &schema.attribute_names { + node_attrs.attrs_name.insert(name.clone()); + rel_attrs.attrs_name.insert(name.clone()); + } + + let mut deleted_nodes = RoaringTreemap::new(); + let mut deleted_rels = RoaringTreemap::new(); + let mut label_matrices: Vec = Vec::new(); + let mut relationship_tensors: Vec = Vec::new(); + let mut adj_matrix = VersionedMatrix::new(0, 0); + let mut lbls_matrix = VersionedMatrix::new(0, 0); + + for (state, count) in &payloads { + match *state { + EncodeState::Nodes => { + node_attrs.decode_with_count(&mut r, *count)?; + } + EncodeState::DeletedNodes => { + deleted_nodes.decode_with_count(&mut r, *count)?; + } + EncodeState::Edges => { + rel_attrs.decode_with_count(&mut r, *count)?; + } + EncodeState::DeletedEdges => { + deleted_rels.decode_with_count(&mut r, *count)?; + } + EncodeState::LabelsMatrices => { + let count = r.read_unsigned()?; + for _ in 0..count { + let _label_id = r.read_unsigned()?; + label_matrices.push(VersionedMatrix::decode(&mut r)?); + } + } + EncodeState::RelationMatrices => { + for _ in 0..hdr.relationship_count { + let _relation_id = r.read_unsigned()?; + relationship_tensors.push(Tensor::decode(&mut r)?); + } + } + EncodeState::AdjMatrix => { + adj_matrix = VersionedMatrix::decode(&mut r)?; + } + EncodeState::LblsMatrix => { + lbls_matrix = VersionedMatrix::decode(&mut r)?; + } + _ => {} + } + } + + node_attrs + .commit() + .map_err(|e| format!("commit node attrs: {e}"))?; + rel_attrs + .commit() + .map_err(|e| format!("commit rel attrs: {e}"))?; + + let mut graph = Graph::restore( + &hdr.graph_name, + cache_size, + hdr.node_count, + hdr.edge_count, + deleted_nodes, + deleted_rels, + adj_matrix, + lbls_matrix, + VersionedMatrix::new(0, 0), + VersionedMatrix::new(0, 0), + label_matrices, + relationship_tensors, + schema.node_labels, + schema.relationship_types, + node_attrs, + rel_attrs, + ); + + graph.rebuild_derived_matrices(); + rebuild_indexes(&mut graph, &schema.indexes); + graph.populate_indexes_sync(); + + Ok(Some(graph)) +} + +/// Decode payload data from the RDB stream into a pending multi-key graph. +fn decode_payloads_into_pending( + r: &mut BufferedReader, + payloads: &[(EncodeState, u64)], + pg: &mut PendingGraph, + hdr: &Header, +) -> Result<(), String> { + for (state, count) in payloads { + match *state { + EncodeState::Nodes => { + pg.node_attrs.decode_with_count(r, *count)?; + } + EncodeState::DeletedNodes => { + pg.deleted_nodes.decode_with_count(r, *count)?; + } + EncodeState::Edges => { + pg.rel_attrs.decode_with_count(r, *count)?; + } + EncodeState::DeletedEdges => { + pg.deleted_rels.decode_with_count(r, *count)?; + } + EncodeState::LabelsMatrices => { + let count = r.read_unsigned()?; + for _ in 0..count { + let _label_id = r.read_unsigned()?; + pg.label_matrices.push(VersionedMatrix::decode(r)?); + } + } + EncodeState::RelationMatrices => { + for _ in 0..hdr.relationship_count { + let _relation_id = r.read_unsigned()?; + pg.relationship_tensors.push(Tensor::decode(r)?); + } + } + EncodeState::AdjMatrix => { + pg.adj_matrix = VersionedMatrix::decode(r)?; + } + EncodeState::LblsMatrix => { + pg.lbls_matrix = VersionedMatrix::decode(r)?; + } + _ => {} + } + } + Ok(()) +} + +/// Finalize a pending multi-key graph: commit attrs, build Graph, rebuild derived matrices. +pub fn finalize_pending_graph(pg: PendingGraph) -> Result { + let mut node_attrs = pg.node_attrs; + let mut rel_attrs = pg.rel_attrs; + + node_attrs + .commit() + .map_err(|e| format!("commit node attrs: {e}"))?; + rel_attrs + .commit() + .map_err(|e| format!("commit rel attrs: {e}"))?; + + let mut graph = Graph::restore( + &pg.header.graph_name, + pg.cache_size, + pg.header.node_count, + pg.header.edge_count, + pg.deleted_nodes, + pg.deleted_rels, + pg.adj_matrix, + pg.lbls_matrix, + VersionedMatrix::new(0, 0), + VersionedMatrix::new(0, 0), + pg.label_matrices, + pg.relationship_tensors, + pg.schema.node_labels, + pg.schema.relationship_types, + node_attrs, + rel_attrs, + ); + + graph.rebuild_derived_matrices(); + rebuild_indexes(&mut graph, &pg.schema.indexes); + graph.populate_indexes_sync(); + + Ok(graph) +} + +/// Rebuild indexes from the decoded schema information. +fn rebuild_indexes( + graph: &mut Graph, + indexes: &[IndexInfo], +) { + for info in indexes { + for (attr_name, fields) in &info.fields { + for field in fields { + // The Field.name includes the type prefix (e.g. "range:val"). + // The attr_name key in the HashMap is the raw attribute name. + let attr = Arc::new(attr_name.to_string()); + + let options = field.vector_options().map_or_else( + || { + field + .options() + .map(|topts| graph::index::indexer::IndexOptions::Text(topts.clone())) + }, + |vopts| Some(graph::index::indexer::IndexOptions::Vector(vopts.clone())), + ); + + if let Err(e) = graph.create_index_sync( + &field.ty, + &EntityType::Node, + &info.label, + &vec![attr], + options, + ) { + eprintln!("FalkorDB: failed to rebuild index on {:?}: {e}", info.label); + } + } + } + } +} diff --git a/src/serializers/encoder/mod.rs b/src/serializers/encoder/mod.rs new file mode 100644 index 00000000..5886b3f7 --- /dev/null +++ b/src/serializers/encoder/mod.rs @@ -0,0 +1,217 @@ +use graph::graph::graph::Graph; +use graph::graph::graphblas::serialization::{Encode, EncodeState, PayloadEntry}; +use redis_module::RedisModuleIO; + +use super::buffered_io::BufferedWriter; +use super::{Header, Schema}; + +/// Encode a full graph into a single RDB key (v19 format, single-key mode). +/// +/// This is the backward-compatible entry point used when `key_count == 1`. +pub fn rdb_save_graph( + rdb: *mut RedisModuleIO, + graph: &Graph, +) { + let payloads = build_payloads(graph); + rdb_save_graph_key(rdb, graph, &payloads, 1); +} + +/// Encode a single key's portion of the graph (used for both primary and virtual keys). +pub fn rdb_save_graph_key( + rdb: *mut RedisModuleIO, + graph: &Graph, + payloads: &[PayloadEntry], + key_count: u64, +) { + let mut w = BufferedWriter::new(rdb); + + // Compute global attrs once — reused by Schema and all payload encodes. + let global_attrs = graph.build_global_attrs(); + + // --- Header --- + Header::from_graph(graph, key_count).encode(&mut w); + + // --- Schema (inline in header) --- + Schema::from_graph(graph, global_attrs.clone()).encode(&mut w); + + // --- Key Schema (payload directory) --- + w.write_unsigned(payloads.len() as u64); + for p in payloads { + w.write_unsigned(p.state as u64); + w.write_unsigned(p.count); + } + + // --- Payload data --- + for p in payloads { + graph.encode_payload(&mut w, p, &global_attrs); + } + + w.finish(); +} + +/// Build per-key payload distributions for multi-key encoding. +/// +/// Returns a Vec of per-key payload lists. Key 0 always gets matrices. +/// Entity payloads (nodes, edges, deleted nodes, deleted edges) are distributed +/// across keys such that each key gets at most `vkey_max` entities. +pub fn build_multi_key_payloads( + graph: &Graph, + vkey_max: u64, +) -> Vec> { + let nc = graph.node_count(); + let ec = graph.relationship_count(); + let dnc = graph.deleted_nodes_count(); + let dec = graph.deleted_relationships_count(); + + let total_entities = nc + ec + dnc + dec; + let key_count = if total_entities == 0 || vkey_max == 0 { + 1u64 + } else { + total_entities.div_ceil(vkey_max) + }; + + // Entity types in encoding order with their total counts. + let entity_types: Vec<(EncodeState, u64)> = [ + (EncodeState::Nodes, nc), + (EncodeState::DeletedNodes, dnc), + (EncodeState::Edges, ec), + (EncodeState::DeletedEdges, dec), + ] + .into_iter() + .filter(|(_, count)| *count > 0) + .collect(); + + // Distribute entities across keys, vkey_max per key. + let mut keys: Vec> = Vec::with_capacity(key_count as usize); + + // Track global offset per entity type. + let mut type_offsets: Vec = vec![0; entity_types.len()]; + let mut type_idx = 0usize; // current entity type index + + for key_idx in 0..key_count { + let mut key_payloads = Vec::new(); + // When vkey_max == 0, store everything in first key (unlimited capacity) + let mut remaining_capacity = if vkey_max == 0 { u64::MAX } else { vkey_max }; + + // Fill this key with entities from the current position + while remaining_capacity > 0 && type_idx < entity_types.len() { + let (state, total) = entity_types[type_idx]; + let available = total - type_offsets[type_idx]; + let take = remaining_capacity.min(available); + + if take > 0 { + key_payloads.push(PayloadEntry { + state, + count: take, + offset: type_offsets[type_idx], + }); + type_offsets[type_idx] += take; + remaining_capacity -= take; + } + + if type_offsets[type_idx] >= total { + type_idx += 1; + } + } + + // Key 0 always gets matrices + if key_idx == 0 { + let lmc = graph.label_matrices().len() as u64; + if lmc > 0 { + key_payloads.push(PayloadEntry { + state: EncodeState::LabelsMatrices, + count: lmc, + offset: 0, + }); + } + let rmc = graph.relationship_tensors().len() as u64; + if rmc > 0 { + key_payloads.push(PayloadEntry { + state: EncodeState::RelationMatrices, + count: rmc, + offset: 0, + }); + } + key_payloads.push(PayloadEntry { + state: EncodeState::AdjMatrix, + count: 1, + offset: 0, + }); + key_payloads.push(PayloadEntry { + state: EncodeState::LblsMatrix, + count: 1, + offset: 0, + }); + } + + keys.push(key_payloads); + } + + keys +} + +/// Build the list of (state, entity_count) payloads for a single-key encode. +fn build_payloads(graph: &Graph) -> Vec { + let mut payloads = Vec::new(); + + let nc = graph.node_count(); + if nc > 0 { + payloads.push(PayloadEntry { + state: EncodeState::Nodes, + count: nc, + offset: 0, + }); + } + let dnc = graph.deleted_nodes_count(); + if dnc > 0 { + payloads.push(PayloadEntry { + state: EncodeState::DeletedNodes, + count: dnc, + offset: 0, + }); + } + let ec = graph.relationship_count(); + if ec > 0 { + payloads.push(PayloadEntry { + state: EncodeState::Edges, + count: ec, + offset: 0, + }); + } + let dec = graph.deleted_relationships_count(); + if dec > 0 { + payloads.push(PayloadEntry { + state: EncodeState::DeletedEdges, + count: dec, + offset: 0, + }); + } + let lmc = graph.label_matrices().len(); + if lmc > 0 { + payloads.push(PayloadEntry { + state: EncodeState::LabelsMatrices, + count: lmc as u64, + offset: 0, + }); + } + let rmc = graph.relationship_tensors().len(); + if rmc > 0 { + payloads.push(PayloadEntry { + state: EncodeState::RelationMatrices, + count: rmc as u64, + offset: 0, + }); + } + payloads.push(PayloadEntry { + state: EncodeState::AdjMatrix, + count: 1, + offset: 0, + }); + payloads.push(PayloadEntry { + state: EncodeState::LblsMatrix, + count: 1, + offset: 0, + }); + + payloads +} diff --git a/src/serializers/mod.rs b/src/serializers/mod.rs new file mode 100644 index 00000000..f5ced828 --- /dev/null +++ b/src/serializers/mod.rs @@ -0,0 +1,554 @@ +pub mod buffered_io; +pub mod decoder; +pub mod encoder; + +use std::collections::HashMap; +use std::ffi::CString; +use std::sync::{Arc, LazyLock}; + +use graph::graph::attribute_store::AttributeStore; +use graph::graph::graph::Graph; +use graph::graph::graphblas::serialization::{Decode, Encode, Reader, Writer, index_field_type}; +use graph::graph::graphblas::tensor::Tensor; +use graph::graph::graphblas::versioned_matrix::VersionedMatrix; +use graph::index::{Field, IndexInfo, IndexType, TextIndexOptions, VectorIndexOptions}; +use parking_lot::{Mutex, RwLock}; +use roaring::RoaringTreemap; + +use crate::graph_core::ThreadedGraph; + +/// RDB encoding version. Matches C FalkorDB v19 format (buffered IO with type tags). +#[allow(dead_code)] +pub const ENCODING_VERSION: u64 = 19; + +/// Global state for virtual key management during RDB save. +pub static VKEY_STATE: std::sync::LazyLock> = + std::sync::LazyLock::new(|| Mutex::new(VirtualKeyState::new())); + +pub struct VirtualKeyState { + /// vkey_name → (graph_name, key_index, payloads for that key) + pub vkey_map: HashMap)>, + /// graph_name → list of virtual key names + pub graph_vkeys: HashMap>, + /// Graph references keyed by graph_name for use by virtual key rdb_save. + graph_refs: HashMap>>, +} + +impl VirtualKeyState { + pub fn new() -> Self { + Self { + vkey_map: HashMap::new(), + graph_vkeys: HashMap::new(), + graph_refs: HashMap::new(), + } + } + + pub fn clear(&mut self) { + self.vkey_map.clear(); + self.graph_vkeys.clear(); + self.graph_refs.clear(); + } + + pub fn get_vkey_payloads( + &self, + vkey_name: &str, + ) -> Option<(&str, &[PayloadEntry])> { + self.vkey_map + .get(vkey_name) + .map(|(graph_name, _key_idx, payloads)| (graph_name.as_str(), payloads.as_slice())) + } + + /// Store a graph reference for use during RDB save. + pub fn store_graph_ref( + &mut self, + graph_name: &str, + graph: Arc>, + ) { + self.graph_refs.insert(graph_name.to_string(), graph); + } + + /// Retrieve the stored graph reference for RDB save. + pub fn get_graph_ref( + &self, + graph_name: &str, + ) -> Option<&Arc>> { + self.graph_refs.get(graph_name) + } +} + +/// Global state for multi-key graph decoding. +pub static DECODE_STATE: LazyLock> = + LazyLock::new(|| Mutex::new(DecodeState::new())); + +/// Tracks pending multi-key graph loads. +pub struct DecodeState { + pub pending: HashMap, + /// Placeholder `Arc>` values returned by `graph_rdb_load` + /// for multi-key graphs. Used to replace the placeholder's inner graph + /// with the finalized graph once all keys are loaded. + pub placeholders: HashMap>>, + /// Finalized graphs ready to be picked up by graph_rdb_load or + /// the finalize_pending_graphs callback. + pub finalized: HashMap, +} + +pub struct PendingGraph { + pub keys_remaining: u64, + pub cache_size: usize, + pub header: Header, + pub schema: Schema, + pub node_attrs: AttributeStore, + pub rel_attrs: AttributeStore, + pub deleted_nodes: RoaringTreemap, + pub deleted_rels: RoaringTreemap, + pub label_matrices: Vec, + pub relationship_tensors: Vec, + pub adj_matrix: VersionedMatrix, + pub lbls_matrix: VersionedMatrix, +} + +impl DecodeState { + pub fn new() -> Self { + Self { + pending: HashMap::new(), + placeholders: HashMap::new(), + finalized: HashMap::new(), + } + } + + #[allow(dead_code)] + pub fn clear(&mut self) { + self.pending.clear(); + self.placeholders.clear(); + self.finalized.clear(); + } +} + +pub use graph::graph::graphblas::serialization::{EncodeState, PayloadEntry}; + +/// Graph header — shared between RDB encode and decode. +#[allow(dead_code)] +pub struct Header { + pub graph_name: String, + pub node_count: u64, + pub edge_count: u64, + pub deleted_node_count: u64, + pub deleted_edge_count: u64, + pub label_count: u64, + pub relationship_count: u64, + pub multi_edge: Vec, + pub key_count: u64, +} + +impl Encode<19> for Header { + fn encode( + &self, + w: &mut dyn Writer, + ) { + fn null_terminated(s: &str) -> Vec { + s.as_bytes() + .iter() + .copied() + .chain(std::iter::once(0)) + .collect() + } + + w.write_buffer(&null_terminated(&self.graph_name)); + w.write_unsigned(self.node_count); + w.write_unsigned(self.edge_count); + w.write_unsigned(self.deleted_node_count); + w.write_unsigned(self.deleted_edge_count); + w.write_unsigned(self.label_count); + w.write_unsigned(self.relationship_count); + + for &me in &self.multi_edge { + w.write_unsigned(u64::from(me)); + } + + w.write_unsigned(self.key_count); + } +} + +impl Decode<19> for Header { + fn decode(r: &mut dyn Reader) -> Result { + let name_bytes = r.read_buffer()?; + let graph_name = if name_bytes.last() == Some(&0) { + String::from_utf8_lossy(&name_bytes[..name_bytes.len() - 1]).to_string() + } else { + String::from_utf8_lossy(&name_bytes).to_string() + }; + + let node_count = r.read_unsigned()?; + let edge_count = r.read_unsigned()?; + let deleted_node_count = r.read_unsigned()?; + let deleted_edge_count = r.read_unsigned()?; + let label_count = r.read_unsigned()?; + let relationship_count = r.read_unsigned()?; + + let mut multi_edge = Vec::with_capacity(relationship_count as usize); + for _ in 0..relationship_count { + let flag = r.read_unsigned()?; + multi_edge.push(flag != 0); + } + + let key_count = r.read_unsigned()?; + + Ok(Self { + graph_name, + node_count, + edge_count, + deleted_node_count, + deleted_edge_count, + label_count, + relationship_count, + multi_edge, + key_count, + }) + } +} + +impl Header { + pub fn from_graph( + graph: &Graph, + key_count: u64, + ) -> Self { + Self { + graph_name: graph.name().to_string(), + node_count: graph.node_count(), + edge_count: graph.relationship_count(), + deleted_node_count: graph.deleted_nodes().len(), + deleted_edge_count: graph.deleted_relationships().len(), + label_count: graph.label_matrices().len() as u64, + relationship_count: graph.relationship_tensors().len() as u64, + multi_edge: graph + .relationship_tensors() + .iter() + .map(Tensor::has_multi_edge) + .collect(), + key_count, + } + } +} + +/// Graph schema — shared between RDB encode and decode. +pub struct Schema { + pub attribute_names: Vec>, + pub node_labels: Vec>, + pub relationship_types: Vec>, + pub indexes: Vec, +} + +fn null_terminated(s: &str) -> Vec { + s.as_bytes() + .iter() + .copied() + .chain(std::iter::once(0)) + .collect() +} + +fn strip_null_terminator(buf: &[u8]) -> String { + if buf.last() == Some(&0) { + String::from_utf8_lossy(&buf[..buf.len() - 1]).to_string() + } else { + String::from_utf8_lossy(buf).to_string() + } +} + +impl Encode<19> for Schema { + fn encode( + &self, + w: &mut dyn Writer, + ) { + // --- Attribute keys --- + w.write_unsigned(self.attribute_names.len() as u64); + for name in &self.attribute_names { + w.write_buffer(&null_terminated(name)); + } + + // --- Node schemas --- + w.write_unsigned(self.node_labels.len() as u64); + for (i, label) in self.node_labels.iter().enumerate() { + w.write_unsigned(i as u64); + w.write_buffer(&null_terminated(label)); + + let label_indices: Vec<_> = self + .indexes + .iter() + .filter(|info| info.label.as_str() == label.as_str()) + .collect(); + + let has_index = !label_indices.is_empty(); + w.write_unsigned(u64::from(has_index)); + + if has_index { + let language = label_indices + .first() + .and_then(|info| info.language.as_ref()) + .map_or("english", |l| l.as_str()); + w.write_buffer(&null_terminated(language)); + + let stopwords: Vec<_> = label_indices + .first() + .and_then(|info| info.stopwords.as_ref()) + .map(|sw| sw.iter().map(|s| s.as_str()).collect()) + .unwrap_or_default(); + w.write_unsigned(stopwords.len() as u64); + for sw in &stopwords { + w.write_buffer(&null_terminated(sw)); + } + + let all_fields: Vec<_> = label_indices + .iter() + .flat_map(|info| info.fields.values().flatten()) + .collect(); + w.write_unsigned(all_fields.len() as u64); + for f in &all_fields { + let name = f.name.to_str().unwrap_or(""); + w.write_buffer(&null_terminated(name)); + + let field_type = match f.ty { + IndexType::Fulltext => index_field_type::INDEX_FLD_FULLTEXT, + IndexType::Range => { + index_field_type::INDEX_FLD_NUMERIC + | index_field_type::INDEX_FLD_STR + | index_field_type::INDEX_FLD_GEO + } + IndexType::Vector => index_field_type::INDEX_FLD_VECTOR, + }; + w.write_unsigned(field_type); + + let opts = f.options(); + w.write_double(opts.and_then(|o| o.weight).unwrap_or(1.0)); + w.write_unsigned(u64::from(opts.and_then(|o| o.nostem).unwrap_or(false))); + let phonetic = opts.and_then(|o| o.phonetic).map_or(String::new(), |p| { + if p { + "dm:en".to_string() + } else { + String::new() + } + }); + w.write_buffer(&null_terminated(&phonetic)); + + if field_type & index_field_type::INDEX_FLD_VECTOR != 0 + && let Some(vopts) = f.vector_options() + { + w.write_unsigned(u64::from(vopts.dimension)); + w.write_unsigned(vopts.m.unwrap_or(16) as u64); + w.write_unsigned(vopts.ef_construction.unwrap_or(200) as u64); + w.write_unsigned(vopts.ef_runtime.unwrap_or(10) as u64); + // similarity function: 0 = cosine (default) + let sim = match vopts.similarity_function.as_deref() { + Some("L2") => 1u64, + Some("IP") => 2u64, + _ => 0u64, + }; + w.write_unsigned(sim); + } + } + } + + // Constraints (not implemented yet) + w.write_unsigned(0); + } + + // --- Relation schemas --- + w.write_unsigned(self.relationship_types.len() as u64); + for (i, type_name) in self.relationship_types.iter().enumerate() { + w.write_unsigned(i as u64); + w.write_buffer(&null_terminated(type_name)); + w.write_unsigned(0); // no indices + w.write_unsigned(0); // no constraints + } + } +} + +impl Decode<19> for Schema { + fn decode(r: &mut dyn Reader) -> Result { + // --- Attribute keys --- + let attr_count = r.read_unsigned()?; + let mut attribute_names = Vec::with_capacity(attr_count as usize); + for _ in 0..attr_count { + let buf = r.read_buffer()?; + attribute_names.push(Arc::new(strip_null_terminator(&buf))); + } + + // --- Node schemas --- + let node_schema_count = r.read_unsigned()?; + let mut node_labels = Vec::with_capacity(node_schema_count as usize); + let mut indexes = Vec::new(); + for _ in 0..node_schema_count { + let (label, info) = decode_schema_entry(r)?; + let label = Arc::new(label); + if let Some(mut info) = info { + info.label = label.clone(); + indexes.push(info); + } + node_labels.push(label); + } + + // --- Relation schemas --- + let rel_schema_count = r.read_unsigned()?; + let mut relationship_types = Vec::with_capacity(rel_schema_count as usize); + for _ in 0..rel_schema_count { + let (schema_name, _) = decode_schema_entry(r)?; + relationship_types.push(Arc::new(schema_name)); + } + + Ok(Self { + attribute_names, + node_labels, + relationship_types, + indexes, + }) + } +} + +fn decode_schema_entry(r: &mut dyn Reader) -> Result<(String, Option), String> { + let _schema_id = r.read_unsigned()?; + let name_buf = r.read_buffer()?; + let schema_name = strip_null_terminator(&name_buf); + + let has_index = r.read_unsigned()?; + + let index = if has_index != 0 { + let lang_buf = r.read_buffer()?; + let language = strip_null_terminator(&lang_buf); + + let sw_count = r.read_unsigned()?; + let mut stopwords = Vec::with_capacity(sw_count as usize); + for _ in 0..sw_count { + let sw_buf = r.read_buffer()?; + stopwords.push(Arc::new(strip_null_terminator(&sw_buf))); + } + + let field_count = r.read_unsigned()?; + let mut fields: HashMap, Vec>> = HashMap::new(); + for _ in 0..field_count { + let (attr_name, field) = decode_index_field(r)?; + fields.entry(attr_name).or_default().push(Arc::new(field)); + } + + Some(IndexInfo { + label: Arc::new(String::new()), + pending: 0, + progress: 0, + total: 0, + fields, + language: Some(Arc::new(language)), + stopwords: if stopwords.is_empty() { + None + } else { + Some(stopwords) + }, + }) + } else { + None + }; + + let constraint_count = r.read_unsigned()?; + for _ in 0..constraint_count { + let _constraint_type = r.read_unsigned()?; + let fields_count = r.read_unsigned()?; + for _ in 0..fields_count { + let _attr_id = r.read_unsigned()?; + } + } + + Ok((schema_name, index)) +} + +fn decode_index_field(r: &mut dyn Reader) -> Result<(Arc, Field), String> { + let name_buf = r.read_buffer()?; + let name = strip_null_terminator(&name_buf); + let field_type = r.read_unsigned()?; + let weight = r.read_double()?; + let nostem = r.read_unsigned()? != 0; + let phonetic_buf = r.read_buffer()?; + let phonetic = strip_null_terminator(&phonetic_buf); + + let is_vector = field_type & index_field_type::INDEX_FLD_VECTOR != 0; + let is_fulltext = field_type & index_field_type::INDEX_FLD_FULLTEXT != 0; + + let ty = if is_fulltext { + IndexType::Fulltext + } else if is_vector { + IndexType::Vector + } else { + IndexType::Range + }; + + // Strip the type prefix from the field name to get the raw attribute name. + let attr_name = match ty { + IndexType::Range => name.strip_prefix("range:").unwrap_or(&name).to_string(), + IndexType::Vector => name.strip_prefix("vector:").unwrap_or(&name).to_string(), + IndexType::Fulltext => name.clone(), + }; + + let vector_options = if is_vector { + let dimension = r.read_unsigned()? as u32; + let m = r.read_unsigned()? as usize; + let ef_construction = r.read_unsigned()? as usize; + let ef_runtime = r.read_unsigned()? as usize; + let sim_func = r.read_unsigned()?; + let similarity_function = match sim_func { + 1 => Some("L2".to_string()), + 2 => Some("IP".to_string()), + _ => None, // 0 = cosine (default) + }; + Some(VectorIndexOptions { + dimension, + similarity_function, + m: Some(m), + ef_construction: Some(ef_construction), + ef_runtime: Some(ef_runtime), + }) + } else { + None + }; + + let text_options = if is_fulltext { + Some(TextIndexOptions { + weight: Some(weight), + nostem: Some(nostem), + phonetic: Some(!phonetic.is_empty()), + language: None, + stopwords: None, + }) + } else { + None + }; + + let field = if let Some(vopts) = vector_options { + Field::new_with_vector_options( + CString::new(name.as_str()).map_err(|e| e.to_string())?, + ty, + vopts, + ) + } else { + Field::new( + CString::new(name.as_str()).map_err(|e| e.to_string())?, + ty, + text_options, + ) + }; + + Ok((Arc::new(attr_name), field)) +} + +impl Schema { + pub fn from_graph( + graph: &Graph, + attribute_names: Vec>, + ) -> Self { + let node_labels = graph.get_labels().to_vec(); + let relationship_types = graph.get_types().to_vec(); + let indexes = graph.index_info(); + + Self { + attribute_names, + node_labels, + relationship_types, + indexes, + } + } +}