diff --git a/graph/src/runtime/value.rs b/graph/src/runtime/value.rs index 0226f397..3301faa1 100644 --- a/graph/src/runtime/value.rs +++ b/graph/src/runtime/value.rs @@ -39,7 +39,10 @@ use std::{ use thin_vec::{ThinVec, thin_vec}; use crate::{ - graph::graph::{LabelId, NodeId, RelationshipId}, + graph::{ + graph::{LabelId, NodeId, RelationshipId}, + graphblas::serialization::{Decode, Encode, Reader, Writer, si_type}, + }, runtime::{functions::Type, ordermap::OrderMap, runtime::Runtime}, }; @@ -1758,3 +1761,143 @@ impl Value { } } } + +impl Encode<19> for Value { + fn encode( + &self, + w: &mut dyn Writer, + ) { + match self { + Self::Bool(b) => { + w.write_unsigned(si_type::T_BOOL); + w.write_signed(i64::from(*b)); + } + Self::Int(i) => { + w.write_unsigned(si_type::T_INT64); + w.write_signed(*i); + } + Self::Float(f) => { + w.write_unsigned(si_type::T_DOUBLE); + w.write_double(*f); + } + Self::String(s) => { + w.write_unsigned(si_type::T_STRING); + let bytes: Vec = s + .as_bytes() + .iter() + .copied() + .chain(std::iter::once(0)) + .collect(); + w.write_buffer(&bytes); + } + Self::List(list) => { + w.write_unsigned(si_type::T_ARRAY); + w.write_unsigned(list.len() as u64); + for item in list.iter() { + crate::graph::graphblas::serialization::Encode::encode(item, w); + } + } + Self::Point(p) => { + w.write_unsigned(si_type::T_POINT); + w.write_double(f64::from(p.latitude)); + w.write_double(f64::from(p.longitude)); + } + Self::VecF32(v) => { + w.write_unsigned(si_type::T_VECTOR_F32); + let dim = v.len() as u32; + let mut buf = Vec::with_capacity(4 + v.len() * 4); + buf.extend_from_slice(&dim.to_le_bytes()); + for f in v.iter() { + buf.extend_from_slice(&f.to_le_bytes()); + } + w.write_buffer(&buf); + } + Self::Datetime(ts) => { + w.write_unsigned(si_type::T_DATETIME); + w.write_signed(*ts); + } + Self::Date(ts) => { + w.write_unsigned(si_type::T_DATE); + w.write_signed(*ts); + } + Self::Time(ts) => { + w.write_unsigned(si_type::T_TIME); + w.write_signed(*ts); + } + Self::Duration(ts) => { + w.write_unsigned(si_type::T_DURATION); + w.write_signed(*ts); + } + // Map, Node, Relationship, Path are not stored as properties + Self::Null => { + w.write_unsigned(si_type::T_NULL); + } + Self::Map(_) | Self::Node(_) | Self::Relationship(_) | Self::Path(_) => { + debug_assert!( + false, + "unsupported value type in property storage: graphs/nodes/relationships/paths cannot be persisted as attribute values" + ); + w.write_unsigned(si_type::T_NULL); + } + } + } +} + +impl Decode<19> for Value { + fn decode(r: &mut dyn Reader) -> Result { + let tag = r.read_unsigned()?; + match tag { + si_type::T_NULL => Ok(Self::Null), + si_type::T_BOOL => Ok(Self::Bool(r.read_signed()? != 0)), + si_type::T_INT64 => Ok(Self::Int(r.read_signed()?)), + si_type::T_DOUBLE => Ok(Self::Float(r.read_double()?)), + // T_STRING or T_INTERN_STRING (T_INTERN | T_STRING = (1<<19) | (1<<11) = 526336) + t if t == si_type::T_STRING || t == (si_type::T_INTERN | si_type::T_STRING) => { + let buf = r.read_buffer()?; + let s = if buf.last() == Some(&0) { + String::from_utf8_lossy(&buf[..buf.len() - 1]).to_string() + } else { + String::from_utf8_lossy(&buf).to_string() + }; + Ok(Self::String(Arc::new(s))) + } + si_type::T_ARRAY => { + let len = r.read_unsigned()?; + let mut items = ThinVec::with_capacity(len as usize); + for _ in 0..len { + items.push(Self::decode(r)?); + } + Ok(Self::List(Arc::new(items))) + } + si_type::T_POINT => { + let lat = r.read_double()?; + let lon = r.read_double()?; + Ok(Self::Point(Point { + latitude: lat as f32, + longitude: lon as f32, + })) + } + si_type::T_VECTOR_F32 => { + let bytes = r.read_buffer()?; + if bytes.len() < 4 { + return Err("vector buffer too short".into()); + } + let dim = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize; + let mut v = ThinVec::with_capacity(dim); + for i in 0..dim { + let off = 4 + i * 4; + if off + 4 > bytes.len() { + return Err("vector data truncated".into()); + } + v.push(f32::from_le_bytes(bytes[off..off + 4].try_into().unwrap())); + } + Ok(Self::VecF32(Arc::new(v))) + } + si_type::T_DATETIME => Ok(Self::Datetime(r.read_signed()?)), + si_type::T_DATE => Ok(Self::Date(r.read_signed()?)), + si_type::T_TIME => Ok(Self::Time(r.read_signed()?)), + si_type::T_DURATION => Ok(Self::Duration(r.read_signed()?)), + _ => Err(format!("unknown SIType tag: {tag}")), + } + } +}