From eb1ccc10c6dcd150a46f0c93d1d660d9c8800c79 Mon Sep 17 00:00:00 2001 From: Avi Avni Date: Tue, 14 Apr 2026 16:06:11 +0300 Subject: [PATCH] refactor: extract database singleton to AttributeStore, add name field to Graph - Move fjall DATABASE static from Graph to AttributeStore as get_database() - Remove database parameter from AttributeStore::new() - Add name: String field to Graph struct for persistence/identification - Add Graph::name() accessor Split from #359. --- graph/src/graph/attribute_store.rs | 39 +++++++++++++++++++----------- graph/src/graph/graph.rs | 31 ++++++++++-------------- 2 files changed, 38 insertions(+), 32 deletions(-) diff --git a/graph/src/graph/attribute_store.rs b/graph/src/graph/attribute_store.rs index f4d55388..ba4093e4 100644 --- a/graph/src/graph/attribute_store.rs +++ b/graph/src/graph/attribute_store.rs @@ -83,7 +83,7 @@ //! Each attribute is stored as a separate fjall entry: //! `entity_id (8 bytes big-endian) + attr_idx (2 bytes big-endian)` -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, process, sync::Arc}; use fjall::{ Database, Keyspace, KeyspaceCreateOptions, Readable, Snapshot, config::HashRatioPolicy, @@ -120,7 +120,6 @@ fn extract_attr_idx(key: &[u8]) -> Option { /// durable cold store. The fjall keyspace is created lazily on first access /// to avoid I/O overhead for graphs that fit entirely in cache. pub struct AttributeStore { - database: Database, snapshot: OnceCell, keyspace: OnceCell, keyspace_name: Arc, @@ -139,7 +138,6 @@ pub struct AttributeStore { impl Clone for AttributeStore { fn clone(&self) -> Self { Self { - database: self.database.clone(), snapshot: self.snapshot.clone(), keyspace: self.keyspace.clone(), keyspace_name: self.keyspace_name.clone(), @@ -155,10 +153,25 @@ impl Clone for AttributeStore { /// Default memory budget per attribute cache (2 GiB). const DEFAULT_ATTR_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024; +static DATABASE: OnceCell = OnceCell::new(); + +/// Get or initialize the shared fjall database for attribute stores. +fn get_database() -> Database { + DATABASE + .get_or_init(|| { + Database::builder(format!("./attrs/{}", process::id())) + .temporary(true) + .manual_journal_persist(true) + .cache_size(128 * 1_024 * 1_024) + .open() + .expect("failed to open fjall database") + }) + .clone() +} + impl AttributeStore { #[must_use] pub fn new( - database: Database, keyspace: &str, version: u64, ) -> Self { @@ -166,7 +179,6 @@ impl AttributeStore { snapshot: OnceCell::new(), keyspace: OnceCell::new(), keyspace_name: Arc::new(keyspace.to_owned()), - database, attrs_name: OrderSet::default(), cache: Arc::new(AttributeCache::new(DEFAULT_ATTR_CACHE_BYTES)), version, @@ -188,9 +200,9 @@ impl AttributeStore { /// the process cannot continue safely. fn keyspace(&self) -> &Keyspace { self.keyspace.get_or_init(|| { - let ks_exists = self.database.keyspace_exists(&self.keyspace_name); - let ks = self - .database + let db = get_database(); + let ks_exists = db.keyspace_exists(&self.keyspace_name); + let ks = db .keyspace(&self.keyspace_name, || { KeyspaceCreateOptions::default() .data_block_hash_ratio_policy(HashRatioPolicy::all(0.75)) @@ -216,7 +228,7 @@ impl AttributeStore { // taking a snapshot, so the new version never sees data from a // previously-deleted graph that reused the same keyspace name. let _ = self.keyspace(); - self.database.snapshot() + get_database().snapshot() }) } @@ -226,7 +238,6 @@ impl AttributeStore { version: u64, ) -> Self { Self { - database: self.database.clone(), snapshot: self.snapshot.clone(), keyspace: self.keyspace.clone(), keyspace_name: self.keyspace_name.clone(), @@ -518,7 +529,7 @@ impl AttributeStore { pub fn commit(&mut self) -> Result<(), String> { // Apply pending full entity deletions to fjall. if !self.pending_deletes.is_empty() { - let mut batch = self.database.batch(); + let mut batch = get_database().batch(); for key in &self.pending_deletes { let prefix = key.to_be_bytes(); for entry in self.keyspace().prefix(prefix) { @@ -530,7 +541,7 @@ impl AttributeStore { batch.durability(None).commit().map_err(|e| e.to_string())?; } let new_snapshot = OnceCell::new(); - let _ = new_snapshot.set(self.database.snapshot()); + let _ = new_snapshot.set(get_database().snapshot()); self.snapshot = new_snapshot; self.dirty_entities.clear(); self.pending_deletes.clear(); @@ -561,7 +572,7 @@ impl AttributeStore { return Ok(()); } - let mut batch = self.database.batch(); + let mut batch = get_database().batch(); for (entity_id, attrs) in &dirty_entries { // Delete all existing fjall keys for this entity first, so that // removed attributes don't reappear after cache eviction. @@ -611,7 +622,7 @@ impl AttributeStore { // Write dirty cached attributes to fjall before losing the cache entry. // Safe to flush: these are pre-existing dirty entries from prior // transactions, not from the active one. - let mut batch = self.database.batch(); + let mut batch = get_database().batch(); for &(attr_idx, ref value) in cached.iter() { let composite_key = make_key(entity_id, attr_idx); batch.insert(self.keyspace(), composite_key, value.to_bytes()); diff --git a/graph/src/graph/graph.rs b/graph/src/graph/graph.rs index a8adb1f9..93eadbfe 100644 --- a/graph/src/graph/graph.rs +++ b/graph/src/graph/graph.rs @@ -73,10 +73,10 @@ use std::{ }; use atomic_refcell::AtomicRefCell; -use fjall::Database; + use itertools::Itertools; use lru::LruCache; -use once_cell::sync::OnceCell; + use orx_tree::DynTree; use parking_lot::Mutex; use roaring::RoaringTreemap; @@ -219,6 +219,8 @@ pub struct MemoryUsageReport { /// The Graph is `Send + Sync` but not internally synchronized. Use [`MvccGraph`] /// for concurrent access with proper read/write isolation. pub struct Graph { + /// Graph name (Redis key name) + name: String, /// Maximum node capacity (for matrix sizing) node_cap: u64, /// Maximum relationship capacity (for matrix sizing) @@ -392,8 +394,6 @@ fn drop_index_bg( ); } -static DATABASE: OnceCell = OnceCell::new(); - impl Graph { #[must_use] pub fn new( @@ -403,15 +403,8 @@ impl Graph { version: u64, name: &str, ) -> Self { - let db = DATABASE.get_or_init(|| { - Database::builder(format!("./attrs/{}", std::process::id())) - .temporary(true) - .manual_journal_persist(true) - .cache_size(128 * 1_024 * 1_024) - .open() - .expect("failed to open fjall database") - }); Self { + name: name.to_string(), node_cap: n, relationship_cap: e, reserved_node_count: 0, @@ -427,12 +420,8 @@ impl Graph { all_nodes_matrix: VersionedMatrix::new(n, n), labels_matices: Vec::new(), relationship_matrices: Vec::new(), - node_attrs: AttributeStore::new(db.clone(), &format!("{name}/nodes"), version), - relationship_attrs: AttributeStore::new( - db.clone(), - &format!("{name}/relationships"), - version, - ), + node_attrs: AttributeStore::new(&format!("{name}/nodes"), version), + relationship_attrs: AttributeStore::new(&format!("{name}/relationships"), version), node_indexer: Indexer::default(), node_labels: Vec::new(), relationship_types: Vec::new(), @@ -450,6 +439,7 @@ impl Graph { let node_attrs = self.node_attrs.new_version(self.version + 1); let relationship_attrs = self.relationship_attrs.new_version(self.version + 1); Self { + name: self.name.clone(), node_cap: self.node_cap, relationship_cap: self.relationship_cap, reserved_node_count: 0, @@ -479,6 +469,11 @@ impl Graph { } } + #[must_use] + pub fn name(&self) -> &str { + &self.name + } + #[must_use] pub const fn node_count(&self) -> u64 { self.node_count