Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions graph/src/graph/attribute_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
//! Each attribute is stored as a separate fjall entry:
//! `entity_id (8 bytes big-endian) + attr_idx (2 bytes big-endian)`

use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, process, sync::Arc};

use fjall::{
Database, Keyspace, KeyspaceCreateOptions, Readable, Snapshot, config::HashRatioPolicy,
Expand Down Expand Up @@ -120,7 +120,6 @@ fn extract_attr_idx(key: &[u8]) -> Option<u16> {
/// durable cold store. The fjall keyspace is created lazily on first access
/// to avoid I/O overhead for graphs that fit entirely in cache.
pub struct AttributeStore {
database: Database,
snapshot: OnceCell<Snapshot>,
keyspace: OnceCell<Keyspace>,
keyspace_name: Arc<String>,
Expand All @@ -139,7 +138,6 @@ pub struct AttributeStore {
impl Clone for AttributeStore {
fn clone(&self) -> Self {
Self {
database: self.database.clone(),
snapshot: self.snapshot.clone(),
keyspace: self.keyspace.clone(),
keyspace_name: self.keyspace_name.clone(),
Expand All @@ -155,18 +153,32 @@ impl Clone for AttributeStore {
/// Default memory budget per attribute cache (2 GiB).
const DEFAULT_ATTR_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;

static DATABASE: OnceCell<Database> = OnceCell::new();

/// Get or initialize the shared fjall database for attribute stores.
fn get_database() -> Database {
DATABASE
.get_or_init(|| {
Database::builder(format!("./attrs/{}", process::id()))
.temporary(true)
.manual_journal_persist(true)
.cache_size(128 * 1_024 * 1_024)
.open()
.expect("failed to open fjall database")
})
.clone()
}

impl AttributeStore {
#[must_use]
pub fn new(
database: Database,
keyspace: &str,
version: u64,
) -> Self {
Self {
snapshot: OnceCell::new(),
keyspace: OnceCell::new(),
keyspace_name: Arc::new(keyspace.to_owned()),
database,
attrs_name: OrderSet::default(),
cache: Arc::new(AttributeCache::new(DEFAULT_ATTR_CACHE_BYTES)),
version,
Expand All @@ -188,9 +200,9 @@ impl AttributeStore {
/// the process cannot continue safely.
fn keyspace(&self) -> &Keyspace {
self.keyspace.get_or_init(|| {
let ks_exists = self.database.keyspace_exists(&self.keyspace_name);
let ks = self
.database
let db = get_database();
let ks_exists = db.keyspace_exists(&self.keyspace_name);
let ks = db
.keyspace(&self.keyspace_name, || {
KeyspaceCreateOptions::default()
.data_block_hash_ratio_policy(HashRatioPolicy::all(0.75))
Expand All @@ -216,7 +228,7 @@ impl AttributeStore {
// taking a snapshot, so the new version never sees data from a
// previously-deleted graph that reused the same keyspace name.
let _ = self.keyspace();
self.database.snapshot()
get_database().snapshot()
})
}

Expand All @@ -226,7 +238,6 @@ impl AttributeStore {
version: u64,
) -> Self {
Self {
database: self.database.clone(),
snapshot: self.snapshot.clone(),
keyspace: self.keyspace.clone(),
keyspace_name: self.keyspace_name.clone(),
Expand Down Expand Up @@ -518,7 +529,7 @@ impl AttributeStore {
pub fn commit(&mut self) -> Result<(), String> {
// Apply pending full entity deletions to fjall.
if !self.pending_deletes.is_empty() {
let mut batch = self.database.batch();
let mut batch = get_database().batch();
for key in &self.pending_deletes {
let prefix = key.to_be_bytes();
for entry in self.keyspace().prefix(prefix) {
Expand All @@ -530,7 +541,7 @@ impl AttributeStore {
batch.durability(None).commit().map_err(|e| e.to_string())?;
}
let new_snapshot = OnceCell::new();
let _ = new_snapshot.set(self.database.snapshot());
let _ = new_snapshot.set(get_database().snapshot());
self.snapshot = new_snapshot;
self.dirty_entities.clear();
self.pending_deletes.clear();
Expand Down Expand Up @@ -561,7 +572,7 @@ impl AttributeStore {
return Ok(());
}

let mut batch = self.database.batch();
let mut batch = get_database().batch();
for (entity_id, attrs) in &dirty_entries {
// Delete all existing fjall keys for this entity first, so that
// removed attributes don't reappear after cache eviction.
Expand Down Expand Up @@ -611,7 +622,7 @@ impl AttributeStore {
// Write dirty cached attributes to fjall before losing the cache entry.
// Safe to flush: these are pre-existing dirty entries from prior
// transactions, not from the active one.
let mut batch = self.database.batch();
let mut batch = get_database().batch();
for &(attr_idx, ref value) in cached.iter() {
let composite_key = make_key(entity_id, attr_idx);
batch.insert(self.keyspace(), composite_key, value.to_bytes());
Expand Down
31 changes: 13 additions & 18 deletions graph/src/graph/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ use std::{
};

use atomic_refcell::AtomicRefCell;
use fjall::Database;

use itertools::Itertools;
use lru::LruCache;
use once_cell::sync::OnceCell;

use orx_tree::DynTree;
use parking_lot::Mutex;
use roaring::RoaringTreemap;
Expand Down Expand Up @@ -219,6 +219,8 @@ pub struct MemoryUsageReport {
/// The Graph is `Send + Sync` but not internally synchronized. Use [`MvccGraph`]
/// for concurrent access with proper read/write isolation.
pub struct Graph {
/// Graph name (Redis key name)
name: String,
/// Maximum node capacity (for matrix sizing)
node_cap: u64,
/// Maximum relationship capacity (for matrix sizing)
Expand Down Expand Up @@ -392,8 +394,6 @@ fn drop_index_bg(
);
}

static DATABASE: OnceCell<Database> = OnceCell::new();

impl Graph {
#[must_use]
pub fn new(
Expand All @@ -403,15 +403,8 @@ impl Graph {
version: u64,
name: &str,
) -> Self {
let db = DATABASE.get_or_init(|| {
Database::builder(format!("./attrs/{}", std::process::id()))
.temporary(true)
.manual_journal_persist(true)
.cache_size(128 * 1_024 * 1_024)
.open()
.expect("failed to open fjall database")
});
Self {
name: name.to_string(),
node_cap: n,
relationship_cap: e,
reserved_node_count: 0,
Expand All @@ -427,12 +420,8 @@ impl Graph {
all_nodes_matrix: VersionedMatrix::new(n, n),
labels_matices: Vec::new(),
relationship_matrices: Vec::new(),
node_attrs: AttributeStore::new(db.clone(), &format!("{name}/nodes"), version),
relationship_attrs: AttributeStore::new(
db.clone(),
&format!("{name}/relationships"),
version,
),
node_attrs: AttributeStore::new(&format!("{name}/nodes"), version),
relationship_attrs: AttributeStore::new(&format!("{name}/relationships"), version),
node_indexer: Indexer::default(),
node_labels: Vec::new(),
relationship_types: Vec::new(),
Expand All @@ -450,6 +439,7 @@ impl Graph {
let node_attrs = self.node_attrs.new_version(self.version + 1);
let relationship_attrs = self.relationship_attrs.new_version(self.version + 1);
Self {
name: self.name.clone(),
node_cap: self.node_cap,
relationship_cap: self.relationship_cap,
reserved_node_count: 0,
Expand Down Expand Up @@ -479,6 +469,11 @@ impl Graph {
}
}

#[must_use]
pub fn name(&self) -> &str {
&self.name
}

#[must_use]
pub const fn node_count(&self) -> u64 {
self.node_count
Expand Down
Loading