Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions graph/src/graph/attribute_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
//! The default budget is 2 GiB per attribute store (nodes and relationships
//! each get their own cache).

use std::sync::Arc;

use quick_cache::sync::Cache;
use quick_cache::{DefaultHashBuilder, Lifecycle, Weighter};

Expand All @@ -69,7 +71,9 @@ use crate::runtime::value::Value;
#[derive(Clone)]
struct CachedEntity {
/// Sorted by `attr_idx` for O(log n) binary-search lookups.
attrs: Vec<(u16, Value)>,
/// Wrapped in Arc so `quick_cache::get()` clone is O(1) refcount bump
/// instead of a full Vec heap allocation.
attrs: Arc<Vec<(u16, Value)>>,
/// Graph version when this entry was written/populated.
version: u64,
/// `true` when the entry has not yet been flushed to fjall.
Expand All @@ -89,7 +93,12 @@ impl Weighter<u64, CachedEntity> for EntityWeighter {
let base = val.attrs.len() * (std::mem::size_of::<u16>() + std::mem::size_of::<Value>());
let heap: usize = val.attrs.iter().map(|(_, v)| v.heap_size()).sum();
// Minimum weight of 1 to satisfy quick_cache invariant.
(base + heap + std::mem::size_of::<CachedEntity>()).max(1) as u64
// Include Arc overhead.
(base
+ heap
+ std::mem::size_of::<CachedEntity>()
+ std::mem::size_of::<Arc<Vec<(u16, Value)>>>())
.max(1) as u64
}
}

Expand Down Expand Up @@ -182,12 +191,13 @@ impl AttributeCache {
/// Return all cached attributes for an entity.
///
/// Returns `None` on cache miss or version mismatch.
/// The returned `Arc` avoids cloning the underlying Vec.
#[must_use]
pub fn get_entity(
&self,
entity_id: u64,
version: u64,
) -> Option<Vec<(u16, Value)>> {
) -> Option<Arc<Vec<(u16, Value)>>> {
let entry = self.entries.get(&entity_id)?;
if entry.version > version {
return None;
Expand All @@ -203,7 +213,7 @@ impl AttributeCache {
&self,
entity_id: u64,
version: u64,
) -> Option<(Vec<(u16, Value)>, bool)> {
) -> Option<(Arc<Vec<(u16, Value)>>, bool)> {
let entry = self.entries.get(&entity_id)?;
if entry.version > version {
return None;
Expand Down Expand Up @@ -240,7 +250,7 @@ impl AttributeCache {
// Ensure attrs are sorted by attr_idx to support binary searches.
attrs.sort_by_key(|item| item.0);
let entry = CachedEntity {
attrs,
attrs: Arc::new(attrs),
version,
dirty,
};
Expand Down Expand Up @@ -340,7 +350,9 @@ impl AttributeCache {
if let Some(mut entry) = self.entries.get(&entity_id)
&& let Ok(pos) = entry.attrs.binary_search_by_key(&attr_idx, |(idx, _)| *idx)
{
entry.attrs.remove(pos);
let mut new_attrs = (*entry.attrs).clone();
new_attrs.remove(pos);
entry.attrs = Arc::new(new_attrs);
entry.dirty = true;
// Update the cache with the modified entry
self.entries.insert(entity_id, entry);
Expand All @@ -363,7 +375,7 @@ impl AttributeCache {
pub fn collect_dirty_lru(
&self,
n: usize,
) -> Vec<(u64, Vec<(u16, Value)>)> {
) -> Vec<(u64, Arc<Vec<(u16, Value)>>)> {
let mut result = Vec::with_capacity(n);
// Iterate and collect dirty entries.
for (entity_id, entry) in self.entries.iter() {
Expand Down
62 changes: 35 additions & 27 deletions graph/src/graph/attribute_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,10 @@ impl AttributeStore {
fn populate_cache_from_fjall(
&self,
entity_id: u64,
) -> Vec<(u16, Value)> {
) -> Arc<Vec<(u16, Value)>> {
// If this entity is pending full deletion, return empty regardless of fjall state.
if self.pending_deletes.contains(entity_id) {
return Vec::new();
return Arc::new(Vec::new());
}
let prefix = entity_id.to_be_bytes();
let attrs: Vec<(u16, Value)> = self
Expand All @@ -271,7 +271,7 @@ impl AttributeStore {
let _ = self
.cache
.insert_entity_if_older(entity_id, attrs.clone(), self.version);
attrs
Arc::new(attrs)
}

// ---- read path (cache → fjall) --------------------------------------
Expand Down Expand Up @@ -340,14 +340,18 @@ impl AttributeStore {
// Try cache first.
let cached = self.cache.get_entity(key, self.version);
let attrs = cached.unwrap_or_else(|| self.populate_cache_from_fjall(key));
attrs.into_iter().filter_map(move |(idx, _)| {
let i = idx as usize;
if i < self.attrs_name.len() {
Some(self.attrs_name[i].clone())
} else {
None
}
})
attrs
.iter()
.filter_map(move |(idx, _)| {
let i = *idx as usize;
if i < self.attrs_name.len() {
Some(self.attrs_name[i].clone())
} else {
None
}
})
.collect::<Vec<_>>()
.into_iter()
}

pub fn get_all_attrs(
Expand All @@ -356,23 +360,27 @@ impl AttributeStore {
) -> impl Iterator<Item = (Arc<String>, Value)> + '_ {
let cached = self.cache.get_entity(key, self.version);
let attrs = cached.unwrap_or_else(|| self.populate_cache_from_fjall(key));
attrs.into_iter().filter_map(move |(idx, value)| {
let i = idx as usize;
if i < self.attrs_name.len() {
Some((self.attrs_name[i].clone(), value))
} else {
None
}
})
attrs
.iter()
.filter_map(move |(idx, value)| {
let i = *idx as usize;
if i < self.attrs_name.len() {
Some((self.attrs_name[i].clone(), value.clone()))
} else {
None
}
})
.collect::<Vec<_>>()
.into_iter()
}

pub fn get_all_attrs_by_id(
&self,
key: u64,
) -> impl Iterator<Item = (u16, Value)> + '_ {
let cached = self.cache.get_entity(key, self.version);
let attrs = cached.unwrap_or_else(|| self.populate_cache_from_fjall(key));
attrs.into_iter()
) -> Arc<Vec<(u16, Value)>> {
self.cache
.get_entity(key, self.version)
.unwrap_or_else(|| self.populate_cache_from_fjall(key))
}

// ---- write path (cache only) ----------------------------------------
Expand Down Expand Up @@ -473,7 +481,7 @@ impl AttributeStore {
}

// Merge: start from current, apply overwrites, remove nulls.
let mut merged: Vec<(u16, Value)> = current;
let mut merged: Vec<(u16, Value)> = (*current).clone();
for (idx, value) in new_entries {
match merged.binary_search_by_key(&idx, |(i, _)| *i) {
Ok(pos) => merged[pos].1 = value,
Expand Down Expand Up @@ -564,7 +572,7 @@ impl AttributeStore {
}
}
// Then insert the current attribute set.
for &(attr_idx, ref value) in attrs {
for &(attr_idx, ref value) in attrs.iter() {
let composite_key = make_key(*entity_id, attr_idx);
batch.insert(self.keyspace(), composite_key, value.to_bytes());
}
Expand All @@ -573,7 +581,7 @@ impl AttributeStore {
// Re-insert entries to prevent data loss on commit failure.
for (entity_id, attrs) in dirty_entries {
self.cache
.insert_entity(entity_id, attrs, self.version, true);
.insert_entity(entity_id, (*attrs).clone(), self.version, true);
}
e.to_string()
})?;
Expand Down Expand Up @@ -604,7 +612,7 @@ impl AttributeStore {
// Safe to flush: these are pre-existing dirty entries from prior
// transactions, not from the active one.
let mut batch = self.database.batch();
for &(attr_idx, ref value) in &cached {
for &(attr_idx, ref value) in cached.iter() {
let composite_key = make_key(entity_id, attr_idx);
batch.insert(self.keyspace(), composite_key, value.to_bytes());
}
Expand Down
6 changes: 3 additions & 3 deletions graph/src/graph/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1456,7 +1456,7 @@ impl Graph {
pub fn get_node_all_attrs_by_id(
&self,
id: NodeId,
) -> impl Iterator<Item = (u16, Value)> + '_ {
) -> Arc<Vec<(u16, Value)>> {
self.node_attrs.get_all_attrs_by_id(id.0)
}

Expand All @@ -1478,7 +1478,7 @@ impl Graph {
pub fn get_relationship_all_attrs_by_id(
&self,
id: RelationshipId,
) -> impl Iterator<Item = (u16, Value)> + '_ {
) -> Arc<Vec<(u16, Value)>> {
self.relationship_attrs.get_all_attrs_by_id(id.0)
}

Expand Down Expand Up @@ -1872,7 +1872,7 @@ impl Graph {
entity_id: u64,
) -> usize {
let mut sz: usize = 0;
for (_, val) in store.get_all_attrs_by_id(entity_id) {
for (_, val) in store.get_all_attrs_by_id(entity_id).iter() {
sz += std::mem::size_of::<u16>() + std::mem::size_of::<Value>() + val.heap_size();
}
sz
Expand Down
69 changes: 35 additions & 34 deletions graph/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ unsafe fn rs_array_new<T>(data: &[T]) -> *mut T {

let n = data.len();
let elem_sz = std::mem::size_of::<T>();
let total = std::mem::size_of::<ArrayHdr>() + n * elem_sz;
let total = std::mem::size_of::<ArrayHdr>() + std::mem::size_of_val(data);

unsafe {
// Must use RedisModule_Alloc because RediSearch's array_free uses
Expand All @@ -107,7 +107,7 @@ unsafe fn rs_array_new<T>(data: &[T]) -> *mut T {
std::ptr::copy_nonoverlapping(
data.as_ptr().cast::<u8>(),
arr_ptr.cast::<u8>(),
n * elem_sz,
std::mem::size_of_val(data),
);

arr_ptr
Expand Down Expand Up @@ -503,40 +503,40 @@ impl Document {
_ => {} // Skip non-indexable types
}
}
if !numerics.is_empty() {
if let Some(name) = field.numeric_arr_name() {
let mut c_arr = rs_array_new(&numerics);
if !c_arr.is_null() {
RediSearch_DocumentAddFieldNumericArray(
self.rs_doc,
name.as_ptr(),
&raw mut c_arr,
RSFLDTYPE_NUMERIC,
);
}
if !numerics.is_empty()
&& let Some(name) = field.numeric_arr_name()
{
let mut c_arr = rs_array_new(&numerics);
if !c_arr.is_null() {
RediSearch_DocumentAddFieldNumericArray(
self.rs_doc,
name.as_ptr(),
&raw mut c_arr,
RSFLDTYPE_NUMERIC,
);
}
}
if !string_cstrs.is_empty() {
if let Some(name) = field.string_arr_name() {
let ptrs: Vec<*mut c_char> = string_cstrs
.iter()
.map(|cs| cs.as_ptr() as *mut c_char)
.collect();
let mut c_arr = rs_array_new(&ptrs);
if !c_arr.is_null() {
RediSearch_DocumentAddFieldStringArray(
self.rs_doc,
name.as_ptr(),
&raw mut c_arr,
ptrs.len(),
RSFLDTYPE_TAG,
);
}
// Keep string content CStrings alive — the pointer
// array in RediSearch references them. They'll be
// properly freed when the Document is dropped.
self._string_arr_values.extend(string_cstrs);
if !string_cstrs.is_empty()
&& let Some(name) = field.string_arr_name()
{
let ptrs: Vec<*mut c_char> = string_cstrs
.iter()
.map(|cs| cs.as_ptr().cast_mut())
.collect();
let mut c_arr = rs_array_new(&ptrs);
if !c_arr.is_null() {
RediSearch_DocumentAddFieldStringArray(
self.rs_doc,
name.as_ptr(),
&raw mut c_arr,
ptrs.len(),
RSFLDTYPE_TAG,
);
}
// Keep string content CStrings alive — the pointer
// array in RediSearch references them. They'll be
// properly freed when the Document is dropped.
self._string_arr_values.extend(string_cstrs);
}
}
Value::VecF32(_) => {} // Only for vector fields
Expand Down Expand Up @@ -773,7 +773,8 @@ impl Index {
/// Uses the same bitmask as C FalkorDB's RediSearch INT64 workaround,
/// applied to the value's magnitude so negative integers are handled
/// correctly.
pub fn int_loses_f64_precision(i: i64) -> bool {
#[must_use]
pub const fn int_loses_f64_precision(i: i64) -> bool {
i.unsigned_abs() & 0x7FF0_0000_0000_0000 != 0
}

Expand Down
8 changes: 4 additions & 4 deletions graph/src/planner/optimizer/utilize_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,10 @@ fn subtree_has_property_of(
if let ExprIR::Property(_) = node.data() {
// Check if the Variable child of this Property matches the alias
for child in node.children() {
if let ExprIR::Variable(v) = child.data() {
if v == alias {
return true;
}
if let ExprIR::Variable(v) = child.data()
&& v == alias
{
return true;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions graph/src/runtime/ops/node_by_index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl<'a> NodeByIndexScanOp<'a> {
fn can_utilize_index(q: &IndexQuery<Value>) -> bool {
use crate::index::Index;

fn is_indexable(v: &Value) -> bool {
const fn is_indexable(v: &Value) -> bool {
match v {
Value::Int(i) => !Index::int_loses_f64_precision(*i),
Value::Float(_)
Expand All @@ -235,7 +235,7 @@ impl<'a> NodeByIndexScanOp<'a> {
match q {
IndexQuery::Equal { value, .. } => is_indexable(value),
IndexQuery::Range { min, max, .. } => {
min.as_ref().map_or(true, is_indexable) && max.as_ref().map_or(true, is_indexable)
min.as_ref().is_none_or(is_indexable) && max.as_ref().map_or(true, is_indexable)
}
IndexQuery::And(children) | IndexQuery::Or(children) => {
children.iter().all(Self::can_utilize_index)
Expand Down
Loading
Loading