Skip to content
Open
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 95 additions & 52 deletions crates/liquidity-sources/src/recent_block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
//! - Automatically updating the cache is decoupled from normal on-chain data
//! fetches.
//!
//! A result of this is that it is possible that the same uncached entry is
//! requested multiple times simultaneously and some work is wasted. This is
//! unlikely to happen in practice and the value is going to be cached the next
//! time it is needed.
//! A result of this design is that simultaneous requests for the same
//! uncached entries can still perform some duplicate work before the results
//! are cached. However, cache misses are fetched in batches to avoid excessive
//! per-entry fan-out and RPC overhead.
//!
//! When entries are requested we mark all those entries as recently used which
//! potentially evicts other entries from the lru cache. Cache misses are
Expand All @@ -26,13 +26,12 @@

use {
alloy::eips::BlockId,
anyhow::{Context, Result},
anyhow::Result,
cached::{Cached, SizedCache},
ethrpc::block_stream::CurrentBlockWatcher,
futures::{FutureExt, StreamExt},
futures::StreamExt,
itertools::Itertools,
prometheus::IntCounterVec,
request_sharing::BoxRequestSharing,
std::{
cmp,
collections::{BTreeMap, HashMap, HashSet, hash_map::Entry},
Expand Down Expand Up @@ -109,7 +108,6 @@ where
delay_between_retries: Duration,
metrics: &'static Metrics,
metrics_label: &'static str,
requests: BoxRequestSharing<(K, Block), Option<Vec<V>>>,
}

#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -179,7 +177,6 @@ where
delay_between_retries: config.delay_between_retries,
metrics: Metrics::instance(observe::metrics::get_storage_registry()).unwrap(),
metrics_label,
requests: BoxRequestSharing::labelled("liquidity_fetching".into()),
});

Self::spawn_gc_task(
Expand Down Expand Up @@ -246,41 +243,23 @@ where
}

async fn fetch_inner_many(&self, keys: HashSet<K>, block: Block) -> Result<Vec<V>> {
Comment thread
metalurgical marked this conversation as resolved.
let fetched =
futures::future::join_all(keys.iter().map(|key| self.fetch_inner(key.clone(), block)))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the PR description it's not really clear to me what the desired outcome of this change is. Which metric would you expect to see improved based on this change so that we can verify somehow that it works as desired.
I imagine due to doing 1 bulk request with n items instead of n individual requests there should be a lot less congestion on the mutex around the cache so lower latency which reduces latency. Is that the idea for this PR?

I wonder if this could backfire since we are now doing a TON of synchronous work before yielding the thread back to the runtime.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metric that will improve here is the call count and the related overhead per call, especially on a cold cache.

Your concern at the end applies equally to the old code as well as this change, not a regression here.

.await;
let fetched: Vec<_> = fetched
.into_iter()
.filter_map(|res| res.ok())
.flatten()
.collect();
Ok(fetched)
}

// Sometimes nodes requests error when we try to get state from what we think is
// the current block when the node has been load balanced out to one that
// hasn't seen the block yet. As a workaround we repeat the request up to N
// times while sleeping in between.
async fn fetch_inner(&self, key: K, block: Block) -> Result<Vec<V>> {
let retries = self.maximum_retries;
let delay = self.delay_between_retries;
let fetcher = self.fetcher.clone();
let shared = self.requests.shared_or_else((key, block), |entry| {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why you're removing the request sharing, it's also not covered in the description

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously it was doing individual fetches for each key, each going through request sharing. Now it does a single fetch for the key set. Request sharing no longer appears necessary here. Do you agree or do you see another reason to keep it?

let (key, block) = entry.clone();
async move {
for _ in 0..=retries {
let keys = [key.clone()].into();
match fetcher.fetch_values(keys, block).await {
Ok(values) => return Some(values),
Err(err) => tracing::warn!("retrying fetch because error: {:?}", err),
}
tokio::time::sleep(delay).await;
if keys.is_empty() {
return Ok(Vec::new());
}
let mut last_err = None;
for attempt in 0..=self.maximum_retries {
Comment thread
metalurgical marked this conversation as resolved.
Outdated
match self.fetcher.fetch_values(keys.clone(), block).await {
Ok(values) => return Ok(values),
Err(err) => {
tracing::warn!("retrying fetch because error: {:?}", err);
last_err = Some(err);
}
None
}
.boxed()
});
shared.await.context("could not fetch liquidity")
if attempt < self.maximum_retries {
tokio::time::sleep(self.delay_between_retries).await;
}
}
Err(last_err.unwrap().context("could not fetch liquidity"))
}

async fn fetch(&self, keys: impl IntoIterator<Item = K>, block: Block) -> Result<Vec<V>> {
Expand Down Expand Up @@ -386,7 +365,7 @@ where
}

fn get(&mut self, key: K, block: Option<u64>) -> Option<&[V]> {
let allow_background_udpates = block.is_some();
let allow_background_updates = block.is_some();
let block = block.or_else(|| {
self.cached_most_recently_at_block
.get(&key)
Expand All @@ -396,7 +375,7 @@ where
})
})?;
let result = self.entries.get(&(block, key.clone())).map(Vec::as_slice);
if allow_background_udpates && result.is_some_and(|values| !values.is_empty()) {
if allow_background_updates && result.is_some_and(|values| !values.is_empty()) {
self.recently_used.cache_set(key, ());
}
result
Expand Down Expand Up @@ -433,21 +412,24 @@ where
fn remove_cached_blocks_older_than(&mut self, oldest_to_keep: u64) {
tracing::debug!("dropping blocks older than {} from cache", oldest_to_keep);
self.entries = self.entries.split_off(&(oldest_to_keep, K::first_ord()));

// Iterate from newest block to oldest block and only keep the most recent
// liquidity around to reduce memory consumption.
// Iterate from the newest block to the oldest block and only keep the most
// recent liquidity around to reduce memory consumption. Empty entries
// are valid negative cache entries and must be kept for the most recent
// block.
let mut cached_keys = HashSet::new();
let mut entries_to_remove = Vec::new();
let mut items = 0;
for ((_block, key), values) in self.entries.iter_mut().rev() {
if !cached_keys.insert(key) {
*values = vec![];
for ((block, key), values) in self.entries.iter().rev() {
if !cached_keys.insert((*key).clone()) {
entries_to_remove.push((*block, (*key).clone()));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for this change? I'm not seeing how the other fan-out change makes this necessary. Are those 2 unrelated changes bundled together?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, two different changes in the same file.

} else {
items += values.len();
}
}
// Afterwards drop all entries that are now empty.
self.entries.retain(|_, values| !values.is_empty());

for entry in entries_to_remove {
self.entries.remove(&entry);
}
self.cached_most_recently_at_block
.retain(|_, block| *block >= oldest_to_keep);
tracing::debug!(
Expand Down Expand Up @@ -870,4 +852,65 @@ mod tests {
assert!(cache.mutexed.lock().unwrap().get(key, Some(8)).is_some());
assert!(cache.mutexed.lock().unwrap().get(key, None).is_some());
}

#[tokio::test]
async fn negative_cache_entries_survive_gc() {
// Key 0 has on-chain data; key 1 has none (negative cache entry).
let fetcher = FakeCacheFetcher::new(vec![TestValue::new(0, "a")]);
let block = |number| BlockInfo {
number,
..Default::default()
};
let (block_sender, block_stream) = tokio::sync::watch::channel(block(10));
let cache = RecentBlockCache::new(
CacheConfig {
number_of_blocks_to_cache: NonZeroU64::new(2).unwrap(),
number_of_entries_to_auto_update: NonZeroUsize::new(2).unwrap(),
..Default::default()
},
fetcher,
block_stream,
"",
)
.unwrap()
.inner;

// Populate the cache: key 0 gets a value, key 1 gets a negative entry.
cache
.fetch(test_keys(0..2), Block::Number(10))
.await
.unwrap();
assert_eq!(cache.mutexed.lock().unwrap().entries.len(), 2);

// Key 1 has no on-chain data so get() never adds it to recently_used
// (the guard requires a non-empty result). Add it manually so the
// background updater re-fetches it each cycle and re-inserts the
// negative entry, giving GC the opportunity to destroy it.
cache
.mutexed
.lock()
.unwrap()
.recently_used
.cache_set(TestKey(1), ());

// Advance two blocks, triggering GC each time. The updater re-fetches
// both keys, still finds no data for key 1, and re-inserts its negative
// entry.
block_sender.send(block(11)).unwrap();
cache.update_cache_at_block(11).await.unwrap();
block_sender.send(block(12)).unwrap();
cache.update_cache_at_block(12).await.unwrap();

// Negative entry for key 1 must still be in the cache — get() should
// return Some(&[]) rather than None.
let mut mutexed = cache.mutexed.lock().unwrap();
assert!(
mutexed.get(TestKey(0), None).is_some(),
"key 0 should still be cached"
);
assert!(
matches!(mutexed.get(TestKey(1), None), Some(&[])),
"key 1 negative entry must survive GC; got None (treated as cache miss)"
);
}
}
Loading