diff --git a/Cargo.lock b/Cargo.lock index f6917f31..adf64324 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -394,7 +394,9 @@ dependencies = [ "parking_lot", "redis-module", "redis-module-macros", + "roaring", "ryu", + "thin-vec", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 8a021999..3b88f2d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,9 @@ graph = { path = "graph", version = "0.1.0" } lazy_static = "1.5.0" parking_lot = "0.12.5" redis-module = { git = "https://github.com/AviAvni/redismodule-rs", branch = "master" } +roaring = "0.11.3" ryu = "1.0.23" +thin-vec = "0.2.14" orx-tree = "2.2.0" [build-dependencies] diff --git a/graph/src/graph/graphblas/matrix.rs b/graph/src/graph/graphblas/matrix.rs index 6096e713..a802f230 100644 --- a/graph/src/graph/graphblas/matrix.rs +++ b/graph/src/graph/graphblas/matrix.rs @@ -54,12 +54,25 @@ #![allow(clippy::doc_markdown)] -use std::{mem::MaybeUninit, os::raw::c_void, ptr::null_mut, sync::Arc}; +use std::{ + marker::PhantomData, + mem::{ManuallyDrop, MaybeUninit}, + os::raw::c_void, + ptr::null_mut, + sync::Arc, +}; use parking_lot::Mutex; -use crate::graph::graphblas::lagraph_bindings::{LAGraph_Finalize, LAGraph_Init}; +use crate::graph::graphblas::{ + lagraph_bindings::{LAGraph_Finalize, LAGraph_Init}, + serialization::{Decode, Encode, Reader, Writer}, +}; + +/// Size of the `GxB_Container_struct` in bytes. +const CONTAINER_STRUCT_SIZE: usize = std::mem::size_of::(); +use super::vector::Vector; use super::{ GrB_BOOL, GrB_DESC_C, GrB_DESC_CT0, GrB_DESC_CT0T1, GrB_DESC_CT1, GrB_DESC_R, GrB_DESC_RC, GrB_DESC_RCT0, GrB_DESC_RCT0T1, GrB_DESC_RCT1, GrB_DESC_RS, GrB_DESC_RSC, GrB_DESC_RSCT0, @@ -68,14 +81,16 @@ use super::{ GrB_DESC_SCT1, GrB_DESC_ST0, GrB_DESC_ST0T1, GrB_DESC_ST1, GrB_DESC_T0, GrB_DESC_T0T1, GrB_DESC_T1, GrB_Descriptor, GrB_GLOBAL, GrB_Global_set_INT32, GrB_Info, GrB_Matrix, GrB_Matrix_clear, GrB_Matrix_dup, GrB_Matrix_eWiseAdd_Semiring, GrB_Matrix_eWiseMult_Semiring, - GrB_Matrix_extractElement_BOOL, GrB_Matrix_free, GrB_Matrix_get_INT32, GrB_Matrix_ncols, - GrB_Matrix_new, GrB_Matrix_nrows, GrB_Matrix_nvals, GrB_Matrix_removeElement, - GrB_Matrix_resize, GrB_Matrix_setElement_BOOL, GrB_Matrix_wait, GrB_Mode, GrB_WaitMode, - GrB_finalize, GrB_mxm, GrB_transpose, GxB_ANY_BOOL, GxB_ANY_PAIR_BOOL, GxB_Iterator, - GxB_Iterator_free, GxB_Iterator_new, GxB_Matrix_fprint, GxB_Matrix_memoryUsage, - GxB_Option_Field, GxB_Print_Level, GxB_init, GxB_rowIterator_attach, - GxB_rowIterator_getColIndex, GxB_rowIterator_getRowIndex, GxB_rowIterator_nextCol, - GxB_rowIterator_nextRow, GxB_rowIterator_seekRow, + GrB_Matrix_extractElement_BOOL, GrB_Matrix_extractElement_UINT64, GrB_Matrix_free, + GrB_Matrix_get_INT32, GrB_Matrix_ncols, GrB_Matrix_new, GrB_Matrix_nrows, GrB_Matrix_nvals, + GrB_Matrix_removeElement, GrB_Matrix_resize, GrB_Matrix_setElement_BOOL, + GrB_Matrix_setElement_UINT64, GrB_Matrix_wait, GrB_Mode, GrB_UINT64, GrB_WaitMode, + GrB_finalize, GrB_mxm, GrB_transpose, GxB_ANY_BOOL, GxB_ANY_PAIR_BOOL, GxB_Container_free, + GxB_Container_new, GxB_Iterator, GxB_Iterator_free, GxB_Iterator_new, GxB_Matrix_fprint, + GxB_Matrix_memoryUsage, GxB_Matrix_type, GxB_Option_Field, GxB_Print_Level, GxB_init, + GxB_load_Matrix_from_Container, GxB_rowIterator_attach, GxB_rowIterator_getColIndex, + GxB_rowIterator_getRowIndex, GxB_rowIterator_nextCol, GxB_rowIterator_nextRow, + GxB_rowIterator_seekRow, GxB_unload_Matrix_into_Container, }; /// Initializes the GraphBLAS library in non-blocking mode. @@ -419,6 +434,104 @@ impl Drop for Matrix { } } +impl Decode<19> for Matrix { + fn decode(r: &mut dyn Reader) -> Result { + let container_bytes = r.read_buffer()?; + + // Validate container size before copying + if container_bytes.len() < CONTAINER_STRUCT_SIZE { + return Err(format!( + "container buffer too small: {} bytes < {} bytes required", + container_bytes.len(), + CONTAINER_STRUCT_SIZE + )); + } + + unsafe { + let mut container: MaybeUninit = MaybeUninit::uninit(); + let info = GxB_Container_new(container.as_mut_ptr()); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + let container = container.assume_init(); + + // Copy struct data into the allocated container + std::ptr::copy_nonoverlapping( + container_bytes.as_ptr(), + container.cast::(), + CONTAINER_STRUCT_SIZE, + ); + + // Nullify vector/matrix pointers (will be populated below) + (*container).x = null_mut(); + (*container).h = null_mut(); + (*container).b = null_mut(); + (*container).i = null_mut(); + (*container).p = null_mut(); + (*container).Y = null_mut(); + + // Read and load 5 vectors: x, h, p, i, b + (*container).x = ManuallyDrop::new(Vector::::decode(r)?).ptr(); + (*container).h = ManuallyDrop::new(Vector::::decode(r)?).ptr(); + (*container).p = ManuallyDrop::new(Vector::::decode(r)?).ptr(); + (*container).i = ManuallyDrop::new(Vector::::decode(r)?).ptr(); + (*container).b = ManuallyDrop::new(Vector::::decode(r)?).ptr(); + + // Create matrix and load from container + let mut m: MaybeUninit = MaybeUninit::uninit(); + let info = GrB_Matrix_new(m.as_mut_ptr(), GrB_BOOL, 0, 0); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + let m = m.assume_init(); + + let info = GxB_load_Matrix_from_Container(m, container, null_mut()); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + + let mut c = container; + let info = GxB_Container_free(&raw mut c); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + + Ok(Self { + m: Arc::new(m), + lock: Arc::new(Mutex::new(())), + }) + } + } +} + +impl Encode<19> for Matrix { + fn encode( + &self, + w: &mut dyn Writer, + ) { + unsafe { + let mut container: MaybeUninit = MaybeUninit::uninit(); + let info = GxB_Container_new(container.as_mut_ptr()); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + let container = container.assume_init(); + + let info = GxB_unload_Matrix_into_Container(self.inner(), container, null_mut()); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + + // Write container struct bytes + let container_bytes = + std::slice::from_raw_parts(container.cast::(), CONTAINER_STRUCT_SIZE); + w.write_buffer(container_bytes); + + // Write 5 vectors: x, h, p, i, b + ManuallyDrop::new(Vector::::from((*container).x)).encode(w); + ManuallyDrop::new(Vector::::from((*container).h)).encode(w); + ManuallyDrop::new(Vector::::from((*container).p)).encode(w); + ManuallyDrop::new(Vector::::from((*container).i)).encode(w); + ManuallyDrop::new(Vector::::from((*container).b)).encode(w); + + let info = GxB_load_Matrix_from_Container(self.inner(), container, null_mut()); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + + let mut c = container; + let info = GxB_Container_free(&raw mut c); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + } + } +} + impl Matrix { /// Returns the raw GrB_Matrix handle for FFI calls (e.g. LAGraph). /// The caller must NOT free the returned handle. @@ -427,6 +540,29 @@ impl Matrix { *self.m } + /// Iterate entries as `(row, col, value)` UINT64 triples. + /// + /// Used when loading C-produced relation matrices where single-edge + /// entries store the edge ID as a UINT64 value. + #[must_use] + pub fn uint64_iter(&self) -> Iter { + Iter::new(self, 0, u64::MAX) + } + + /// Returns true if this matrix has UINT64 element type. + /// + /// C-produced relation matrices store edge IDs as UINT64, while + /// Rust-produced ones use BOOL. + #[must_use] + pub fn is_uint64(&self) -> bool { + unsafe { + let mut t: MaybeUninit = MaybeUninit::uninit(); + let info = GxB_Matrix_type(t.as_mut_ptr(), *self.m); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + t.assume_init() == GrB_UINT64 + } + } + #[must_use] pub fn pending(&self) -> bool { unsafe { @@ -476,6 +612,17 @@ impl Matrix { debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); } } + + pub fn select( + &mut self, + mask: &Matrix, + a: &Matrix, + ) { + unsafe { + let info = GrB_transpose(*self.m, *mask.m, null_mut(), *a.m, GrB_DESC_RCT0); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + } + } } impl Size for Matrix { @@ -561,6 +708,36 @@ impl Dup for Matrix { } impl Matrix { + /// Create a new UINT64 matrix (for C-compatible tensor encoding). + #[must_use] + pub fn new_uint64( + nrows: u64, + ncols: u64, + ) -> Self { + unsafe { + let mut m: MaybeUninit = MaybeUninit::uninit(); + let info = GrB_Matrix_new(m.as_mut_ptr(), GrB_UINT64, nrows, ncols); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + Self { + m: Arc::new(m.assume_init()), + lock: Arc::new(Mutex::new(())), + } + } + } + + /// Set a UINT64 value at (i, j). + pub fn set_uint64( + &mut self, + i: u64, + j: u64, + value: u64, + ) { + unsafe { + let info = GrB_Matrix_setElement_UINT64(*self.m, value, i, j); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + } + } + #[must_use] #[allow(clippy::iter_without_into_iter)] pub fn iter( @@ -655,7 +832,57 @@ where } } -pub struct Iter { +/// Strategy for extracting values from a GraphBLAS row iterator position. +/// +/// # Safety +/// Implementations must only call valid GraphBLAS FFI functions on the provided matrix. +pub trait IterExtract { + type Item; + + /// Extract the item from the current iterator position. + /// + /// # Safety + /// `m` must be a valid `GrB_Matrix` and the iterator must be positioned on a valid entry. + unsafe fn extract( + m: GrB_Matrix, + row: u64, + col: u64, + ) -> Self::Item; +} + +/// Extracts `(row, col)` pairs from a boolean matrix. +pub struct BoolExtract; + +impl IterExtract for BoolExtract { + type Item = (u64, u64); + + unsafe fn extract( + _m: GrB_Matrix, + row: u64, + col: u64, + ) -> Self::Item { + (row, col) + } +} + +/// Extracts `(row, col, value)` triples from a UINT64 matrix. +pub struct Uint64Extract; + +impl IterExtract for Uint64Extract { + type Item = (u64, u64, u64); + + unsafe fn extract( + m: GrB_Matrix, + row: u64, + col: u64, + ) -> Self::Item { + let mut val: u64 = 0; + unsafe { GrB_Matrix_extractElement_UINT64(&raw mut val, m, row, col) }; + (row, col, val) + } +} + +pub struct Iter { m: Arc, /// The underlying GraphBLAS iterator. inner: GxB_Iterator, @@ -663,12 +890,13 @@ pub struct Iter { depleted: bool, /// The maximum row index for the iterator. max_row: u64, + _extract: PhantomData, } -unsafe impl Send for Iter {} -unsafe impl Sync for Iter {} +unsafe impl Send for Iter {} +unsafe impl Sync for Iter {} -impl Drop for Iter { +impl Drop for Iter { /// Frees the GraphBLAS iterator when the `Iter` is dropped. fn drop(&mut self) { unsafe { @@ -681,7 +909,7 @@ impl Drop for Iter { } } -impl Iter { +impl Iter { /// Creates a new iterator for traversing all elements in a matrix. /// /// # Parameters @@ -716,18 +944,19 @@ impl Iter { depleted: info != GrB_Info::GrB_SUCCESS || GxB_rowIterator_getRowIndex(iter) > max_row, max_row, + _extract: PhantomData, } } } } -impl Iterator for Iter { - type Item = (u64, u64); +impl Iterator for Iter { + type Item = E::Item; /// Advances the iterator and returns the next element in the matrix. /// /// # Returns - /// - `Some((u64, u64))`: The next element in the matrix. + /// - `Some(E::Item)`: The next element in the matrix. /// - `None`: The iterator is depleted. fn next(&mut self) -> Option { if self.depleted { @@ -736,6 +965,7 @@ impl Iterator for Iter { unsafe { let row = GxB_rowIterator_getRowIndex(self.inner); let col = GxB_rowIterator_getColIndex(self.inner); + let item = E::extract(*self.m, row, col); if GxB_rowIterator_nextCol(self.inner) != GrB_Info::GrB_SUCCESS { let mut info = GxB_rowIterator_nextRow(self.inner); debug_assert!( @@ -751,7 +981,7 @@ impl Iterator for Iter { self.depleted = info != GrB_Info::GrB_SUCCESS || GxB_rowIterator_getRowIndex(self.inner) > self.max_row; } - Some((row, col)) + Some(item) } } } diff --git a/graph/src/graph/graphblas/mod.rs b/graph/src/graph/graphblas/mod.rs index 50e88604..ff70bcca 100644 --- a/graph/src/graph/graphblas/mod.rs +++ b/graph/src/graph/graphblas/mod.rs @@ -54,6 +54,7 @@ pub mod lagraph_bindings; pub mod lagraphx_bindings; pub mod matrix; +pub mod serialization; pub mod tensor; pub mod vector; pub mod versioned_matrix; diff --git a/graph/src/graph/graphblas/serialization.rs b/graph/src/graph/graphblas/serialization.rs new file mode 100644 index 00000000..28470182 --- /dev/null +++ b/graph/src/graph/graphblas/serialization.rs @@ -0,0 +1,219 @@ +//! Serialization traits and type tags for RDB persistence. +//! +//! Provides `Writer`/`Reader` traits, `Encode`/`Decode` traits, and +//! type-tag modules used by the encoder/decoder in the `serializers` +//! module which handles the actual Redis Module IO. + +use roaring::RoaringTreemap; + +/// Abstraction over a serialization sink. +/// +/// The root crate implements this for `BufferedWriter` (v19 buffered IO). +/// The graph crate uses it via `Encode` impls without knowing about Redis. +pub trait Writer { + fn write_unsigned( + &mut self, + val: u64, + ); + fn write_signed( + &mut self, + val: i64, + ); + fn write_double( + &mut self, + val: f64, + ); + fn write_buffer( + &mut self, + data: &[u8], + ); +} + +/// Types that can serialize themselves into a [`Writer`]. +pub trait Encode { + fn encode( + &self, + w: &mut dyn Writer, + ); + + /// Encode a range of entities starting at `offset`, encoding `count` items. + fn encode_with_range( + &self, + w: &mut dyn Writer, + count: u64, + offset: u64, + ) { + let _ = (w, count, offset); + unimplemented!() + } +} + +/// Abstraction over a deserialization source. +/// +/// The root crate implements this for `BufferedReader` (v19 buffered IO). +/// The graph crate uses it via `Decode` impls without knowing about Redis. +pub trait Reader { + fn read_unsigned(&mut self) -> Result; + fn read_signed(&mut self) -> Result; + fn read_double(&mut self) -> Result; + fn read_buffer(&mut self) -> Result, String>; +} + +/// Types that can deserialize themselves from a [`Reader`]. +pub trait Decode: Sized { + fn decode(r: &mut dyn Reader) -> Result; + + /// Decode `count` entities from the reader into `self`. + fn decode_with_count( + &mut self, + r: &mut dyn Reader, + count: u64, + ) -> Result<(), String> { + let _ = (r, count); + unimplemented!() + } +} + +/// Index field type bitmask matching C FalkorDB index_field.h. +pub mod index_field_type { + pub const INDEX_FLD_FULLTEXT: u64 = 0x01; + pub const INDEX_FLD_NUMERIC: u64 = 0x02; + pub const INDEX_FLD_GEO: u64 = 0x04; + pub const INDEX_FLD_STR: u64 = 0x08; + pub const INDEX_FLD_VECTOR: u64 = 0x10; +} + +/// SIValue type tags for binary serialization (matching C FalkorDB format). +pub mod si_type { + pub const T_ARRAY: u64 = 1 << 3; + pub const T_DATETIME: u64 = 1 << 5; + pub const T_DATE: u64 = 1 << 7; + pub const T_TIME: u64 = 1 << 8; + pub const T_DURATION: u64 = 1 << 10; + pub const T_STRING: u64 = 1 << 11; + pub const T_BOOL: u64 = 1 << 12; + pub const T_INT64: u64 = 1 << 13; + pub const T_DOUBLE: u64 = 1 << 14; + pub const T_NULL: u64 = 1 << 15; + pub const T_POINT: u64 = 1 << 17; + pub const T_VECTOR_F32: u64 = 1 << 18; + pub const T_INTERN: u64 = 1 << 19; +} + +/// Identifies which payload section a key entry represents in the RDB format. +/// +/// Each virtual key stores a directory of `(EncodeState, count)` pairs describing +/// which payload sections it contains and how many entities per section. +#[repr(u64)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EncodeState { + Init = 0, + Nodes = 1, + DeletedNodes = 2, + Edges = 3, + DeletedEdges = 4, + GraphSchema = 5, + LabelsMatrices = 6, + RelationMatrices = 7, + AdjMatrix = 8, + LblsMatrix = 9, + Final = 10, +} + +impl EncodeState { + #[must_use] + pub const fn from_u64(v: u64) -> Option { + match v { + 0 => Some(Self::Init), + 1 => Some(Self::Nodes), + 2 => Some(Self::DeletedNodes), + 3 => Some(Self::Edges), + 4 => Some(Self::DeletedEdges), + 5 => Some(Self::GraphSchema), + 6 => Some(Self::LabelsMatrices), + 7 => Some(Self::RelationMatrices), + 8 => Some(Self::AdjMatrix), + 9 => Some(Self::LblsMatrix), + 10 => Some(Self::Final), + _ => None, + } + } +} + +/// A single payload entry with state, count, and offset into the entity stream. +#[derive(Debug, Clone, Copy)] +pub struct PayloadEntry { + pub state: EncodeState, + pub count: u64, + pub offset: u64, +} + +impl Encode<19> for RoaringTreemap { + fn encode( + &self, + w: &mut dyn Writer, + ) { + self.encode_with_range(w, self.len(), 0); + } + + fn encode_with_range( + &self, + w: &mut dyn Writer, + count: u64, + offset: u64, + ) { + let mut buf = Vec::with_capacity(count as usize * 8); + for id in self.iter().skip(offset as usize).take(count as usize) { + buf.extend_from_slice(&id.to_le_bytes()); + } + w.write_buffer(&buf); + } +} + +impl Decode<19> for RoaringTreemap { + fn decode(r: &mut dyn Reader) -> Result { + let bytes = r.read_buffer()?; + if bytes.len() % 8 != 0 { + return Err(format!( + "misaligned deleted entities buffer: {} bytes is not a multiple of 8", + bytes.len() + )); + } + let count = bytes.len() / 8; + let mut bitmap = Self::new(); + for i in 0..count { + let id = u64::from_le_bytes( + bytes[i * 8..(i + 1) * 8] + .try_into() + .map_err(|_| "invalid id bytes")?, + ); + bitmap.insert(id); + } + Ok(bitmap) + } + + fn decode_with_count( + &mut self, + r: &mut dyn Reader, + count: u64, + ) -> Result<(), String> { + let bytes = r.read_buffer()?; + let expected_len = count as usize * 8; + if bytes.len() != expected_len { + return Err(format!( + "deleted entities buffer length mismatch: got {} bytes, expected {} bytes", + bytes.len(), + expected_len + )); + } + for i in 0..count as usize { + let id = u64::from_le_bytes( + bytes[i * 8..(i + 1) * 8] + .try_into() + .map_err(|_| "invalid id bytes")?, + ); + self.insert(id); + } + Ok(()) + } +} diff --git a/graph/src/graph/graphblas/tensor.rs b/graph/src/graph/graphblas/tensor.rs index 8bd0298f..4b450c8a 100644 --- a/graph/src/graph/graphblas/tensor.rs +++ b/graph/src/graph/graphblas/tensor.rs @@ -56,7 +56,9 @@ //! with different amounts and dates. use super::{ - matrix::{Dup, New, Remove, Set, Size}, + matrix::{Dup, Matrix, New, Remove, Set, Size, Transpose}, + serialization::{Decode, Encode, Reader, Writer}, + vector::Vector, versioned_matrix::{self, VersionedMatrix}, }; @@ -115,7 +117,7 @@ impl Tensor { pub fn remove_all( &mut self, - rels: &Vec<(u64, u64, u64)>, + rels: &[(u64, u64, u64)], ) { for (id, src, dest) in rels { self.me.remove(src << 32 | dest, *id); @@ -142,6 +144,11 @@ impl Tensor { self.mt.resize(ncols, nrows); } + /// Rebuild the backward matrix as the transpose of the forward matrix. + pub fn rebuild_backward(&mut self) { + self.mt = self.m.transpose(); + } + #[must_use] pub fn dup(&self) -> Self { Self { @@ -172,6 +179,12 @@ impl Tensor { Iter::new(self, min_row, max_row, transpose) } + /// Whether this tensor has any (src, dst) pair with more than one edge. + #[must_use] + pub fn has_multi_edge(&self) -> bool { + self.m.nvals() != self.me.nvals() + } + pub fn wait(&mut self) { self.m.wait(); self.mt.wait(); @@ -184,6 +197,137 @@ impl Tensor { } } +/// MSB flag used by C FalkorDB to indicate multi-edge entries in the +/// UINT64 forward matrix. +const MSB_MASK: u64 = 1u64 << 63; + +impl Encode<19> for Tensor { + fn encode( + &self, + w: &mut dyn Writer, + ) { + // Build a UINT64 forward matrix for C compatibility. + // Single-edge (src,dst): cell = edge_id + // Multi-edge (src,dst): cell = edge_count | MSB_MASK + let (m, dp) = self.m.extract_m_dp(); + + let mut uint64_m = Matrix::new_uint64(m.nrows(), m.ncols()); + let mut uint64_dp = Matrix::new_uint64(dp.nrows(), dp.ncols()); + // Track multi-edge (src, dst) pairs per sub-matrix for tensor section + let mut multi_edge_m: Vec<(u64, u64)> = Vec::new(); + let mut multi_edge_dp: Vec<(u64, u64)> = Vec::new(); + + for (matrix, uint64_matrix, multi_edges) in [ + (&m, &mut uint64_m, &mut multi_edge_m), + (&dp, &mut uint64_dp, &mut multi_edge_dp), + ] { + for (src, dst) in matrix.iter(0, u64::MAX) { + let compound_key = (src << 32) | dst; + let mut edge_ids: Vec = self + .me + .iter(compound_key, compound_key) + .map(|(_, edge_id)| edge_id) + .collect(); + + if edge_ids.len() == 1 { + // Single edge: store edge ID directly + uint64_matrix.set_uint64(src, dst, edge_ids[0]); + } else { + // Multi-edge: store count with MSB set + uint64_matrix.set_uint64(src, dst, edge_ids.len() as u64 | MSB_MASK); + multi_edges.push((src, dst)); + } + } + } + + // Encode the UINT64 forward matrix (as a VersionedMatrix: m, dp, dm) + let dm = Matrix::new_uint64(m.nrows(), m.ncols()); // empty delta-minus + uint64_m.encode(w); + uint64_dp.encode(w); + dm.encode(w); + + let total = self.edge_count(); + w.write_unsigned(total); + + if total == 0 { + return; + } + + // Tensor section: only multi-edge pairs + let mut v = Vector::::new(GrB_INDEX_MAX); + for (multi_edges, _matrix) in [(&multi_edge_m, &m), (&multi_edge_dp, &dp)] { + w.write_unsigned(multi_edges.len() as u64); + for &(src, dst) in multi_edges { + let compound_key = (src << 32) | dst; + v.clear(); + + for (idx, edge_id) in self + .me + .iter(compound_key, compound_key) + .map(|(_, edge_id)| edge_id) + .enumerate() + { + v.set(idx as u64, edge_id); + } + + w.write_unsigned(src); + w.write_unsigned(dst); + v.encode(w); + } + } + } +} + +impl Decode<19> for Tensor { + fn decode(r: &mut dyn Reader) -> Result { + let forward = VersionedMatrix::decode(r)?; + let mut edges = VersionedMatrix::new(GrB_INDEX_MAX, GrB_INDEX_MAX); + + // C FalkorDB stores edge IDs as UINT64 values in the forward matrix. + // Single-edge entries (MSB not set) hold the edge ID directly. + // Multi-edge entries (MSB set) are stored in the tensor section below. + // Iterate entries, extract single-edge IDs, and rebuild as BOOL. + let forward = if forward.is_uint64() { + let mut bool_forward = VersionedMatrix::new(forward.nrows(), forward.ncols()); + for (src, dst, value) in forward.uint64_iter() { + bool_forward.set(src, dst, true); + if value & MSB_MASK == 0 { + // Single-edge: value is the edge ID + let compound_key = (src << 32) | dst; + edges.set(compound_key, value, true); + } + } + bool_forward + } else { + forward + }; + + let total_tensor_count = r.read_unsigned()?; + if total_tensor_count > 0 { + // TM tensors (base), then TDP tensors (delta-plus) + for _ in 0..2 { + let count = r.read_unsigned()?; + for _ in 0..count { + let src = r.read_unsigned()?; + let dst = r.read_unsigned()?; + let v = Vector::::decode(r)?; + let compound_key = (src << 32) | dst; + for (_, edge_id) in v.iter() { + edges.set(compound_key, edge_id, true); + } + } + } + } + + let backward = VersionedMatrix::new(0, 0); + Ok(Self { + m: forward, + mt: backward, + me: edges, + }) + } +} + pub struct Iter<'a> { t: &'a Tensor, mit: versioned_matrix::Iter, diff --git a/graph/src/graph/graphblas/vector.rs b/graph/src/graph/graphblas/vector.rs index f46ac467..ab26e142 100644 --- a/graph/src/graph/graphblas/vector.rs +++ b/graph/src/graph/graphblas/vector.rs @@ -36,14 +36,21 @@ use std::{ marker::PhantomData, mem::MaybeUninit, + os::raw::c_void, ptr::{addr_of_mut, null_mut}, }; +use crate::graph::graphblas::{GrB_UINT64, GrB_Vector_clear, GrB_Vector_setElement_UINT64}; + +use super::serialization::{Decode, Encode, Reader, Writer}; use super::{ - GrB_BOOL, GrB_Info, GrB_Vector, GrB_Vector_free, GrB_Vector_new, GrB_Vector_removeElement, - GrB_Vector_resize, GrB_Vector_setElement_BOOL, GrB_Vector_size, GrB_Vector_wait, GrB_WaitMode, - GxB_Iterator, GxB_Iterator_free, GxB_Iterator_new, GxB_Vector_Iterator_attach, - GxB_Vector_Iterator_getIndex, GxB_Vector_Iterator_next, GxB_Vector_Iterator_seek, + GrB_BOOL, GrB_Info, GrB_Type, GrB_Type_get_String, GrB_Vector, GrB_Vector_free, GrB_Vector_new, + GrB_Vector_removeElement, GrB_Vector_resize, GrB_Vector_setElement_BOOL, GrB_Vector_size, + GrB_Vector_wait, GrB_WaitMode, GxB_Iterator, GxB_Iterator_free, GxB_Iterator_get_UINT64, + GxB_Iterator_new, GxB_MAX_NAME_LEN, GxB_Option_Field, GxB_Type_from_name, + GxB_Vector_Iterator_attach, GxB_Vector_Iterator_getIndex, GxB_Vector_Iterator_next, + GxB_Vector_Iterator_seek, GxB_Vector_deserialize, GxB_Vector_load, GxB_Vector_serialize, + GxB_Vector_unload, }; /// A sparse vector backed by GraphBLAS. @@ -73,6 +80,24 @@ impl From for Vector { } } +impl From for Vector { + fn from(v: GrB_Vector) -> Self { + Self { + v, + phantom: PhantomData, + } + } +} + +impl Vector { + pub fn clear(&mut self) { + unsafe { + let info = GrB_Vector_clear(self.v); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + } + } +} + impl Vector { pub fn new(nrows: u64) -> Self { unsafe { @@ -116,6 +141,186 @@ impl Vector { } } +impl Encode<19> for Vector { + fn encode( + &self, + w: &mut dyn Writer, + ) { + unsafe { + let mut blob: *mut c_void = null_mut(); + let mut blob_size: u64 = 0; + + let info = GxB_Vector_serialize(&raw mut blob, &raw mut blob_size, self.v, null_mut()); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + + let blob_slice = std::slice::from_raw_parts(blob.cast::(), blob_size as usize); + w.write_buffer(blob_slice); + + let layout = std::alloc::Layout::from_size_align(blob_size as usize, 8).unwrap(); + std::alloc::dealloc(blob.cast::(), layout); + } + } +} + +impl Decode<19> for Vector { + fn decode(r: &mut dyn Reader) -> Result { + let blob = r.read_buffer()?; + unsafe { + let mut v: MaybeUninit = MaybeUninit::uninit(); + let info = GxB_Vector_deserialize( + v.as_mut_ptr(), + null_mut(), + blob.as_ptr().cast(), + blob.len() as u64, + null_mut(), + ); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + Ok(Self::from(v.assume_init())) + } + } +} + +impl Encode<19> for Vector { + fn encode( + &self, + w: &mut dyn Writer, + ) { + unsafe { + let mut arr: *mut c_void = null_mut(); + let mut type_: MaybeUninit = MaybeUninit::uninit(); + let mut n_entries: u64 = 0; + let mut n_bytes: u64 = 0; + let mut handling: i32 = 0; + + let info = GxB_Vector_unload( + self.v, + &raw mut arr, + type_.as_mut_ptr(), + &raw mut n_entries, + &raw mut n_bytes, + &raw mut handling, + null_mut(), + ); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + + let type_ = type_.assume_init(); + + let mut t_name = [0u8; GxB_MAX_NAME_LEN as usize]; + let info = GrB_Type_get_String( + type_, + t_name.as_mut_ptr().cast(), + GxB_Option_Field::GrB_NAME as _, + ); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + + let t_name_len = t_name + .iter() + .position(|&b| b == 0) + .unwrap_or(GxB_MAX_NAME_LEN as usize) + + 1; + + let arr_slice = if n_bytes > 0 { + std::slice::from_raw_parts(arr.cast::(), n_bytes as usize) + } else { + &[] + }; + + w.write_buffer(arr_slice); + w.write_buffer(&t_name[..t_name_len]); + w.write_unsigned(n_entries); + w.write_unsigned(n_bytes); + w.write_signed(handling as i64); + + // Reload the vector so it remains usable + let info = GxB_Vector_load( + self.v, + &raw mut arr, + type_, + n_entries, + n_bytes, + handling, + null_mut(), + ); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + } + } +} + +impl Decode<19> for Vector { + fn decode(r: &mut dyn Reader) -> Result { + let arr_data = r.read_buffer()?; + let type_name = r.read_buffer()?; + let n_entries = r.read_unsigned()?; + let n_bytes = r.read_unsigned()?; + let handling = r.read_signed()? as i32; + + unsafe { + let mut type_: MaybeUninit = MaybeUninit::uninit(); + let info = GxB_Type_from_name(type_.as_mut_ptr(), type_name.as_ptr().cast()); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + let type_ = type_.assume_init(); + + let mut v: MaybeUninit = MaybeUninit::uninit(); + let info = GrB_Vector_new(v.as_mut_ptr(), type_, 0); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + let v = v.assume_init(); + + let mut arr_ptr: *mut c_void = if n_bytes > 0 { + let layout = std::alloc::Layout::from_size_align(n_bytes as usize, 8).unwrap(); + let ptr = std::alloc::alloc(layout); + std::ptr::copy_nonoverlapping(arr_data.as_ptr(), ptr, n_bytes as usize); + ptr.cast() + } else { + null_mut() + }; + + let info = GxB_Vector_load( + v, + &raw mut arr_ptr, + type_, + n_entries, + n_bytes, + handling, + null_mut(), + ); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + + Ok(Self::from(v)) + } + } +} + +impl Vector { + pub fn new(nrows: u64) -> Self { + unsafe { + let mut v: MaybeUninit = MaybeUninit::uninit(); + let info = GrB_Vector_new(v.as_mut_ptr(), GrB_UINT64, nrows); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + Self { + v: v.assume_init(), + phantom: PhantomData, + } + } + } + + pub fn set( + &mut self, + i: u64, + value: u64, + ) { + unsafe { + let info = GrB_Vector_setElement_UINT64(self.v, value, i); + debug_assert_eq!(info, GrB_Info::GrB_SUCCESS); + } + } + + #[must_use] + #[allow(clippy::iter_without_into_iter)] + pub fn iter(&self) -> Iter { + Iter::new(self) + } +} + pub trait Size { fn size(&self) -> u64; fn resize( @@ -236,3 +441,19 @@ impl Iterator for Iter { } } } + +impl Iterator for Iter { + type Item = (u64, u64); + + fn next(&mut self) -> Option { + if self.depleted { + return None; + } + unsafe { + let idx = GxB_Vector_Iterator_getIndex(self.inner); + let val = GxB_Iterator_get_UINT64(self.inner); + self.depleted = GxB_Vector_Iterator_next(self.inner) == GrB_Info::GxB_EXHAUSTED; + Some((idx, val)) + } + } +} diff --git a/graph/src/graph/graphblas/versioned_matrix.rs b/graph/src/graph/graphblas/versioned_matrix.rs index 6875a3cd..0753ce06 100644 --- a/graph/src/graph/graphblas/versioned_matrix.rs +++ b/graph/src/graph/graphblas/versioned_matrix.rs @@ -61,6 +61,7 @@ use super::{ GxB_Print_Level, matrix::{self, Dup, Get, MaskedElementWiseAdd, Matrix, New, Remove, Set, Size, Transpose}, + serialization::{Decode, Encode, Reader, Writer}, }; use crate::graph::cow::Cow; @@ -73,7 +74,7 @@ pub struct VersionedMatrix { m: Cow, /// Delta-plus: edges added in current transaction dp: Cow, - /// Delta-minus: edges removed in current transaction + /// Delta-minus: edges removed in current transaction dm: Cow, } @@ -182,6 +183,35 @@ impl VersionedMatrix { self.dp.print(level); self.dm.print(level); } + + #[must_use] + pub fn extract_m_dp(&self) -> (Matrix, Matrix) { + let mut m = Matrix::new(self.m.nrows(), self.m.ncols()); + let mut dp = Matrix::new(self.dp.nrows(), self.dp.ncols()); + + m.select(&self.dm, &self.m); + dp.select(&self.dm, &self.dp); + + (m, dp) + } + + /// Returns true if the base matrix has UINT64 element type. + /// + /// C-produced relation matrices store edge IDs as UINT64, while + /// Rust-produced ones use BOOL. + #[must_use] + pub fn is_uint64(&self) -> bool { + self.m.is_uint64() + } + + /// Iterate UINT64 entries from the base M and delta-plus DP matrices. + /// + /// Used during RDB decode to read C-produced relation matrices where + /// single-edge entries store the edge ID as a UINT64 value. + /// Returns an empty iterator for Rust-produced BOOL matrices. + pub fn uint64_iter(&self) -> impl Iterator + '_ { + self.m.uint64_iter().chain(self.dp.uint64_iter()) + } } impl Remove for VersionedMatrix { @@ -199,22 +229,6 @@ impl Remove for VersionedMatrix { } } -// impl MxM for VersionedMatrix { -// fn lmxm( -// &mut self, -// b: &Self, -// ) { - -// } - -// fn rmxm( -// &mut self, -// b: &Self, -// ) { - -// } -// } - impl Get for VersionedMatrix { fn get( &self, @@ -270,6 +284,30 @@ where } } +impl Encode<19> for VersionedMatrix { + fn encode( + &self, + w: &mut dyn Writer, + ) { + self.m.encode(w); + self.dp.encode(w); + self.dm.encode(w); + } +} + +impl Decode<19> for VersionedMatrix { + fn decode(r: &mut dyn Reader) -> Result { + let m = Matrix::decode(r)?; + let dp = Matrix::decode(r)?; + let dm = Matrix::decode(r)?; + Ok(Self { + m: Cow::new(m), + dp: Cow::new(dp), + dm: Cow::new(dm), + }) + } +} + pub struct Iter { mit: matrix::Iter, dpit: matrix::Iter,