diff --git a/docs/ja/src/concepts/search/lexical_search.md b/docs/ja/src/concepts/search/lexical_search.md index f633c4cb..b7236703 100644 --- a/docs/ja/src/concepts/search/lexical_search.md +++ b/docs/ja/src/concepts/search/lexical_search.md @@ -372,6 +372,30 @@ let config = LexicalIndexConfig::builder() .build(); ``` +## Posting キャッシュ(Posting Cache) + +語の評価ではセグメントの `.post` ファイルから posting list を読み、デコードします +(varint doc-id、削除フィルタ、skip table)。キャッシュがないと同じ語のクエリごとに read + +デコードを繰り返し、クラウド/リモートストレージでは read が支配的になります。各セグメント +リーダーはデコード済み・削除フィルタ後の posting list を小さくキャッシュし、スナップショット +内の同一 `(field, term)` 参照を再利用します。 + +セグメントはスナップショット内で immutable なので、キャッシュ済みリストは常にその削除と整合 +します。commit すると空キャッシュの新しいセグメントリーダーが構築されます。キャッシュは +**byte-budget で上限制御**され(posting list はサイズ分散が大きい)、予算超過で +least-recently-used リストを退避し、予算全体より大きい単一リストはキャッシュしません。 +デフォルトで有効で `max_cache_memory` の予算を共有します。インデックス設定で制御できます。 + +```rust +use laurus::lexical::store::config::LexicalIndexConfig; +use laurus::lexical::index::config::InvertedIndexConfig; + +let mut inverted = InvertedIndexConfig::default(); +inverted.enable_posting_cache = false; // 完全に無効化 +inverted.max_cache_memory = 256 * 1024 * 1024; // またはキャッシュ予算(バイト)を変更 +let config = LexicalIndexConfig::Inverted(inverted); +``` + ## 次のステップ - 意味的類似性検索: [Vector 検索](vector_search.md) diff --git a/docs/src/concepts/search/lexical_search.md b/docs/src/concepts/search/lexical_search.md index 3e6cd0e3..65d02fc9 100644 --- a/docs/src/concepts/search/lexical_search.md +++ b/docs/src/concepts/search/lexical_search.md @@ -376,6 +376,30 @@ let config = LexicalIndexConfig::builder() .build(); ``` +## Posting Cache + +Evaluating a term reads its posting list from the segment's `.post` file and decodes it +(varint doc-ids, deletion filtering, skip table). Without caching, every query for the same +term repeats that read + decode — and on cloud/remote storage the read dominates. Each segment +reader keeps a small cache of decoded, deletion-filtered posting lists, so a repeated +`(field, term)` lookup within a snapshot reuses the decoded list. + +Because a segment is immutable for a reader snapshot, the cached list is always consistent with +its deletions; a commit builds new segment readers with empty caches. The cache is **byte-budget +bounded** (posting lists vary widely in size) — least-recently-used lists are evicted once the +budget is exceeded, and a single list larger than the whole budget is not cached. It is enabled +by default and shares the `max_cache_memory` budget; control it via the index config: + +```rust +use laurus::lexical::store::config::LexicalIndexConfig; +use laurus::lexical::index::config::InvertedIndexConfig; + +let mut inverted = InvertedIndexConfig::default(); +inverted.enable_posting_cache = false; // disable entirely +inverted.max_cache_memory = 256 * 1024 * 1024; // or resize the cache budget (bytes) +let config = LexicalIndexConfig::Inverted(inverted); +``` + ## Next Steps - Semantic similarity search: [Vector Search](vector_search.md) diff --git a/laurus/src/lexical/index/inverted.rs b/laurus/src/lexical/index/inverted.rs index 9c771e6b..cd3ba067 100644 --- a/laurus/src/lexical/index/inverted.rs +++ b/laurus/src/lexical/index/inverted.rs @@ -37,6 +37,7 @@ pub mod core; pub mod maintenance; pub mod parsed_query_cache; pub(crate) mod per_segment_view; +pub mod posting_cache; pub mod query_cache; pub mod reader; pub mod searcher; diff --git a/laurus/src/lexical/index/inverted/posting_cache.rs b/laurus/src/lexical/index/inverted/posting_cache.rs new file mode 100644 index 00000000..6aa76bb2 --- /dev/null +++ b/laurus/src/lexical/index/inverted/posting_cache.rs @@ -0,0 +1,262 @@ +//! Decoded posting-list cache (Issue +//! [#612](https://github.com/mosuka/laurus/issues/612)). +//! +//! Without it, every `SegmentReader::postings` re-opens the segment's `.post` +//! file, re-decodes the varint posting list, re-applies deletions, and rebuilds +//! the skip table — on every query. For cloud / remote storage the read alone +//! dominates. [`PostingCache`] memoises the decoded, deletion-filtered +//! [`DecodedPostingList`] per `(field, term)` so a repeated lookup reuses it. +//! +//! # Lifetime and key +//! +//! The cache lives on a [`SegmentReader`](crate::lexical::index::inverted::reader::SegmentReader), +//! which is immutable for a reader snapshot (a new commit builds new segment +//! readers), so caching the **post-deletion** list is sound — deletions are +//! fixed for the reader's life. The key is `"field\u{1}term"` (the segment is +//! implicit — the cache is per-segment). +//! +//! # Eviction +//! +//! Posting lists range from a few bytes to many megabytes, so the cache is +//! bounded by an **estimated byte budget**, not an entry count: on insert it +//! evicts the least-recently-used entries until back under budget. An entry +//! larger than the whole budget is not cached (it would evict everything else +//! for a single list). A budget of `0` disables the cache. + +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use lru::LruCache; +use parking_lot::Mutex; + +use crate::lexical::index::inverted::core::posting::DecodedPostingList; + +/// Estimate the heap footprint of a decoded posting list, used for the byte +/// budget. Approximate — counts the parallel `u32` arrays, the skip table, the +/// optional positions sidecar, and the term string. +fn estimated_bytes(list: &DecodedPostingList) -> usize { + let mut bytes = std::mem::size_of::(); + bytes += list.term.len(); + bytes += list.doc_ids.len() * std::mem::size_of::(); + bytes += list.frequencies.len() * std::mem::size_of::(); + bytes += list.weights.len() * std::mem::size_of::(); + for level in &list.skip_levels { + bytes += level.len() * std::mem::size_of::(); + } + if let Some(positions) = &list.positions { + for p in positions.iter().flatten() { + bytes += p.len() * std::mem::size_of::(); + } + } + bytes +} + +/// Mutable cache state guarded by the [`PostingCache`] mutex. +#[derive(Debug)] +struct PostingCacheInner { + /// Unbounded LRU; eviction is driven by `cur_bytes` against `max_bytes`. + lru: LruCache>, + /// Sum of `estimated_bytes` over the cached entries. + cur_bytes: usize, + /// Soft byte budget. Eviction runs until `cur_bytes <= max_bytes`. + max_bytes: usize, +} + +/// A byte-budget LRU cache of decoded, deletion-filtered posting lists, scoped +/// to one [`SegmentReader`](crate::lexical::index::inverted::reader::SegmentReader). +/// +/// A [`Mutex`] guards the state because [`LruCache::get`] takes `&mut self` to +/// update recency and the byte accounting mutates on every insert. +#[derive(Debug)] +pub struct PostingCache { + /// `None` when disabled (budget 0). + inner: Option>, + hits: AtomicU64, + misses: AtomicU64, +} + +impl PostingCache { + /// Create a cache with the given byte budget. `0` disables it (every + /// [`get`](Self::get) misses and [`put`](Self::put) is a no-op). + /// + /// # Arguments + /// + /// * `max_bytes` - Soft heap budget for cached posting lists. + pub fn new(max_bytes: usize) -> Self { + let inner = (max_bytes > 0).then(|| { + Mutex::new(PostingCacheInner { + lru: LruCache::unbounded(), + cur_bytes: 0, + max_bytes, + }) + }); + PostingCache { + inner, + hits: AtomicU64::new(0), + misses: AtomicU64::new(0), + } + } + + /// Returns `true` if caching is enabled (budget was non-zero). + pub fn is_enabled(&self) -> bool { + self.inner.is_some() + } + + /// Look up the decoded posting list cached for `key`, bumping its recency + /// on a hit. Records a hit or miss in the statistics. + /// + /// # Arguments + /// + /// * `key` - The `"field\u{1}term"` cache key. + pub fn get(&self, key: &str) -> Option> { + let hit = self + .inner + .as_ref() + .and_then(|inner| inner.lock().lru.get(key).cloned()); + if hit.is_some() { + self.hits.fetch_add(1, Ordering::Relaxed); + } else { + self.misses.fetch_add(1, Ordering::Relaxed); + } + hit + } + + /// Insert a decoded posting list for `key`, evicting least-recently-used + /// entries until back under the byte budget. A no-op when the cache is + /// disabled or when this single entry exceeds the whole budget. + /// + /// # Arguments + /// + /// * `key` - The `"field\u{1}term"` cache key. + /// * `list` - The decoded, deletion-filtered posting list to share. + pub fn put(&self, key: String, list: Arc) { + let Some(inner) = self.inner.as_ref() else { + return; + }; + let size = estimated_bytes(&list); + let mut guard = inner.lock(); + // A single list bigger than the whole budget is not cached — caching it + // would evict everything else and still not fit. + if size > guard.max_bytes { + return; + } + if let Some(old) = guard.lru.put(key, list) { + guard.cur_bytes = guard.cur_bytes.saturating_sub(estimated_bytes(&old)); + } + guard.cur_bytes += size; + // Evict LRU entries until under budget. The just-inserted entry is the + // MRU, and `size <= max_bytes`, so it is never the one evicted. + while guard.cur_bytes > guard.max_bytes { + match guard.lru.pop_lru() { + Some((_, evicted)) => { + guard.cur_bytes = guard.cur_bytes.saturating_sub(estimated_bytes(&evicted)); + } + None => break, + } + } + } + + /// Number of cached entries (0 when disabled). For tests / observability. + pub fn len(&self) -> usize { + self.inner + .as_ref() + .map_or(0, |inner| inner.lock().lru.len()) + } + + /// Returns `true` if the cache holds no entries (or is disabled). + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Snapshot of the cache hit / miss counters. + pub fn stats(&self) -> PostingCacheStats { + PostingCacheStats { + hits: self.hits.load(Ordering::Relaxed), + misses: self.misses.load(Ordering::Relaxed), + } + } +} + +/// Hit / miss counters for a [`PostingCache`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct PostingCacheStats { + /// Number of lookups served from the cache. + pub hits: u64, + /// Number of lookups that had to decode from storage. + pub misses: u64, +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Build a decoded list of `n` doc ids (each entry ≈ `n * 8` bytes from the + /// doc_ids + frequencies arrays, plus fixed overhead). + fn list(term: &str, n: u32) -> Arc { + Arc::new(DecodedPostingList { + term: term.to_string(), + doc_ids: (0..n).collect(), + frequencies: vec![1; n as usize], + weights: Vec::new(), + positions: None, + skip_levels: Vec::new(), + total_frequency: n as u64, + doc_frequency: n as u64, + }) + } + + #[test] + fn put_then_get_returns_cached_list() { + let cache = PostingCache::new(1 << 20); + cache.put("body\u{1}rust".to_string(), list("rust", 4)); + let got = cache.get("body\u{1}rust").expect("present"); + assert_eq!(got.doc_ids, vec![0, 1, 2, 3]); + assert_eq!(cache.stats().hits, 1); + assert_eq!(cache.stats().misses, 0); + } + + #[test] + fn miss_increments_miss_counter() { + let cache = PostingCache::new(1 << 20); + assert!(cache.get("absent").is_none()); + assert_eq!(cache.stats().misses, 1); + } + + #[test] + fn capacity_zero_disables_cache() { + let cache = PostingCache::new(0); + assert!(!cache.is_enabled()); + cache.put("k".to_string(), list("k", 4)); + assert!(cache.get("k").is_none()); + assert!(cache.is_empty()); + } + + #[test] + fn byte_budget_evicts_least_recently_used() { + // Budget fits ~2 lists of 64 ids; a 3rd insert evicts the LRU victim. + let one = estimated_bytes(&list("x", 64)); + let cache = PostingCache::new(one * 2 + one / 2); + + cache.put("a".to_string(), list("a", 64)); + cache.put("b".to_string(), list("b", 64)); + // Touch "a" so "b" is the LRU victim. + assert!(cache.get("a").is_some()); + cache.put("c".to_string(), list("c", 64)); + + assert!(cache.get("a").is_some(), "recently used 'a' survives"); + assert!(cache.get("c").is_some(), "just-inserted 'c' survives"); + assert!(cache.get("b").is_none(), "LRU 'b' must be evicted"); + } + + #[test] + fn oversized_entry_is_not_cached() { + let big = list("big", 1024); + let cache = PostingCache::new(estimated_bytes(&big) / 2); + cache.put("big".to_string(), big); + assert!( + cache.get("big").is_none(), + "entry larger than budget is skipped" + ); + assert!(cache.is_empty()); + } +} diff --git a/laurus/src/lexical/index/inverted/reader.rs b/laurus/src/lexical/index/inverted/reader.rs index de100f14..a878fce6 100644 --- a/laurus/src/lexical/index/inverted/reader.rs +++ b/laurus/src/lexical/index/inverted/reader.rs @@ -23,6 +23,7 @@ use crate::lexical::index::inverted::core::posting::{DecodedPostingList, Posting use crate::lexical::index::inverted::core::terms::{ InvertedIndexTerms, MergedInvertedIndexTerms, TermDictionaryAccess, Terms, }; +use crate::lexical::index::inverted::posting_cache::PostingCache; use crate::lexical::index::inverted::query_cache::QueryFilterCache; use crate::lexical::index::inverted::segment::SegmentInfo; use crate::lexical::index::structures::bkd_tree::{BKDReader, BKDTree}; @@ -453,6 +454,12 @@ pub struct SegmentReader { /// Cached BKD trees: field -> tree bkd_trees: RwLock>>, + /// Decoded posting-list cache (Issue #612). Disabled by default + /// (`open` builds it with a zero budget); query readers enable it via + /// [`Self::with_posting_cache_bytes`]. Per-segment because a segment is + /// immutable for a reader snapshot. + posting_cache: PostingCache, + /// Whether the segment is loaded. loaded: AtomicBool, } @@ -478,12 +485,33 @@ impl SegmentReader { doc_values: RwLock::new(None), deletion_bitmap: RwLock::new(None), bkd_trees: RwLock::new(AHashMap::new()), + // Disabled by default; query readers enable it (Issue #612). + posting_cache: PostingCache::new(0), loaded: AtomicBool::new(false), }; Ok(reader) } + /// Enable (or resize) this segment's decoded posting-list cache with a byte + /// budget (Issue #612). `0` keeps it disabled. Returns `self` for chaining + /// after [`Self::open`]. + /// + /// # Arguments + /// + /// * `max_bytes` - Soft heap budget for cached posting lists in this segment. + pub fn with_posting_cache_bytes(mut self, max_bytes: usize) -> Self { + self.posting_cache = PostingCache::new(max_bytes); + self + } + + /// Snapshot of this segment's posting-cache hit / miss counters (Issue #612). + pub fn posting_cache_stats( + &self, + ) -> crate::lexical::index::inverted::posting_cache::PostingCacheStats { + self.posting_cache.stats() + } + /// Get all document IDs in this segment. pub fn doc_ids(&self) -> Result> { if !self.loaded.load(Ordering::Acquire) { @@ -1060,6 +1088,24 @@ impl SegmentReader { return self.scan_documents_for_term(field, term); } + // Posting cache (#612): a repeated `(field, term)` lookup within this + // reader snapshot reuses the decoded, deletion-filtered list instead of + // re-opening + re-decoding the `.post` file (the read dominates on + // remote storage). The key allocation, lookup, and the clone are + // skipped entirely when the cache is disabled (budget 0), so the + // uncached path — merge / test readers — is byte-for-byte unchanged. + let cache_key = self + .posting_cache + .is_enabled() + .then(|| format!("{field}\u{1}{term}")); + if let Some(key) = &cache_key + && let Some(cached) = self.posting_cache.get(key) + { + return Ok(Some(Box::new( + InvertedIndexPostingIterator::from_decoded_soa_with_blocks((*cached).clone(), 64), + ))); + } + if let Some(term_info) = self.term_info(field, term)? { let input = self.storage.open_input(&postings_file)?; let mut reader = StructReader::new(input)?; @@ -1090,8 +1136,21 @@ impl SegmentReader { let filtered = self.filter_deleted_soa(decoded)?; if filtered.is_empty() { + // Empty lists are not cached — `None` is cheap to recompute. Ok(None) + } else if let Some(key) = cache_key { + // Cache the shared decoded list and build the iterator from a + // clone (a `Vec` memcpy — cheaper than the decode it replaces). + let shared = Arc::new(filtered); + self.posting_cache.put(key, Arc::clone(&shared)); + Ok(Some(Box::new( + InvertedIndexPostingIterator::from_decoded_soa_with_blocks( + (*shared).clone(), + 64, + ), + ))) } else { + // Cache disabled — build directly from the owned list, no clone. Ok(Some(Box::new( InvertedIndexPostingIterator::from_decoded_soa_with_blocks(filtered, 64), ))) @@ -1414,11 +1473,6 @@ pub struct CacheManager { /// [`LruCache::get`] takes `&mut self` to update recency. term_cache: Mutex>>, - /// Posting list cache. - #[allow(dead_code)] - posting_cache: - RwLock>>>, - /// Maximum memory limit in bytes (informational; also derives the term /// cache's entry capacity). memory_limit: usize, @@ -1441,7 +1495,6 @@ impl CacheManager { .unwrap_or(NonZeroUsize::MIN); CacheManager { term_cache: Mutex::new(LruCache::new(capacity)), - posting_cache: RwLock::new(AHashMap::new()), memory_limit, cache_hits: AtomicUsize::new(0), cache_misses: AtomicUsize::new(0), @@ -1553,9 +1606,17 @@ impl InvertedIndexReader { let mut segment_readers = Vec::new(); let mut total_doc_count = 0; + // Enable the per-segment posting cache (Issue #612) for query readers, + // gated by `enable_posting_cache` and budgeted by `max_cache_memory`. + let posting_cache_bytes = if config.enable_posting_cache { + config.max_cache_memory + } else { + 0 + }; for segment_info in &segments { total_doc_count += segment_info.doc_count; - let mut reader = SegmentReader::open(segment_info.clone(), storage.clone())?; + let mut reader = SegmentReader::open(segment_info.clone(), storage.clone())? + .with_posting_cache_bytes(posting_cache_bytes); if config.preload_segments { reader.load()?; @@ -2541,6 +2602,72 @@ mod tests { ); } + /// A repeated `postings(field, term)` within a snapshot is served from the + /// per-segment posting cache (Issue #612), and a commit (new snapshot) does + /// not serve a stale, pre-deletion list. + #[test] + fn posting_cache_hit_and_snapshot_invalidation() { + use crate::Document; + use crate::lexical::store::LexicalStore; + use crate::lexical::store::config::LexicalIndexConfig; + use crate::storage::memory::{MemoryStorage, MemoryStorageConfig}; + + let storage = Arc::new(MemoryStorage::new(MemoryStorageConfig::default())); + let store = LexicalStore::new(storage, LexicalIndexConfig::default()).unwrap(); + for id in 0..5u64 { + store + .upsert_document( + id, + Document::builder().add_text("body", "shared term").build(), + ) + .unwrap(); + } + store.commit().unwrap(); + + let drain = |it: Option>| -> Vec { + let mut ids = Vec::new(); + if let Some(mut it) = it { + while it.next().unwrap() { + ids.push(it.doc_id()); + } + } + ids.sort_unstable(); + ids + }; + + // First snapshot: the second `postings` call is a cache hit. + { + let reader = store.reader_for_tests().unwrap(); + let inverted = reader + .as_any() + .downcast_ref::() + .unwrap(); + let seg = inverted.segment_readers()[0].read().unwrap(); + + let first = drain(seg.postings("body", "shared").unwrap()); + let second = drain(seg.postings("body", "shared").unwrap()); + assert_eq!(first, vec![0, 1, 2, 3, 4]); + assert_eq!(first, second, "cached postings must match the decoded list"); + + let stats = seg.posting_cache_stats(); + assert_eq!(stats.misses, 1, "the first decode is a cache miss"); + assert!(stats.hits >= 1, "the repeat lookup hits the cache"); + } + + // Delete a doc + commit: the fresh snapshot must exclude it (a new + // segment reader with an empty cache re-decodes against the new + // deletions — no stale cached list). + store.delete_document_by_internal_id(2).unwrap(); + store.commit().unwrap(); + let reader2 = store.reader_for_tests().unwrap(); + let after = drain(reader2.postings("body", "shared").unwrap()); + assert_eq!( + after, + vec![0, 1, 3, 4], + "deleted doc 2 must be excluded in the new snapshot" + ); + } + #[test] fn test_segment_info() { let info = SegmentInfo {