diff --git a/Cargo.lock b/Cargo.lock index 6a95b0c6ae82..4b763b419325 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9959,6 +9959,7 @@ dependencies = [ "thread_local", "tokio", "tracing", + "triomphe 0.1.12", "turbo-bincode", "turbo-persistence", "turbo-rcstr", @@ -10148,6 +10149,7 @@ dependencies = [ "smallvec", "tokio", "turbo-tasks", + "turbo-tasks-backend", ] [[package]] diff --git a/crates/next-api/src/project.rs b/crates/next-api/src/project.rs index 611251d188c8..65d4bb172491 100644 --- a/crates/next-api/src/project.rs +++ b/crates/next-api/src/project.rs @@ -625,17 +625,21 @@ impl ProjectContainer { .start_watching_with_invalidation_reason(watch.poll_interval) .await?; } else { - project_fs.invalidate_with_reason(|path| invalidation::Initialize { - // this path is just used for display purposes - path: RcStr::from(path.to_string_lossy()), - }); + project_fs + .invalidate_with_reason(|path| invalidation::Initialize { + // this path is just used for display purposes + path: RcStr::from(path.to_string_lossy()), + }) + .await?; } let output_fs = output_fs_operation(project) .read_strongly_consistent() .await?; - output_fs.invalidate_with_reason(|path| invalidation::Initialize { - path: RcStr::from(path.to_string_lossy()), - }); + output_fs + .invalidate_with_reason(|path| invalidation::Initialize { + path: RcStr::from(path.to_string_lossy()), + }) + .await?; Ok(()) } .instrument(span_clone) @@ -770,16 +774,20 @@ impl ProjectContainer { .start_watching_with_invalidation_reason(watch.poll_interval) .await?; } else { - project_fs.invalidate_with_reason(|path| invalidation::Initialize { - // this path is just used for display purposes - path: RcStr::from(path.to_string_lossy()), - }); + project_fs + .invalidate_with_reason(|path| invalidation::Initialize { + // this path is just used for display purposes + path: RcStr::from(path.to_string_lossy()), + }) + .await?; } } if !ReadRef::ptr_eq(&prev_output_fs, &output_fs) { - prev_output_fs.invalidate_with_reason(|path| invalidation::Initialize { - path: RcStr::from(path.to_string_lossy()), - }); + prev_output_fs + .invalidate_with_reason(|path| invalidation::Initialize { + path: RcStr::from(path.to_string_lossy()), + }) + .await?; } Ok(()) diff --git a/crates/next-napi-bindings/src/next_api/project.rs b/crates/next-napi-bindings/src/next_api/project.rs index c3d6075638d8..a53b1b9fd3e3 100644 --- a/crates/next-napi-bindings/src/next_api/project.rs +++ b/crates/next-napi-bindings/src/next_api/project.rs @@ -1247,15 +1247,19 @@ async fn invalidate_deferred_entry_source_dirs_after_callback( if paths_to_invalidate.is_empty() { // Fallback to full invalidation when app dir paths are unavailable. - project_fs.invalidate_with_reason(|path| invalidation::Initialize { - path: RcStr::from(path.to_string_lossy()), - }); - } else { - project_fs.invalidate_path_and_children_with_reason(paths_to_invalidate, |path| { - invalidation::Initialize { + project_fs + .invalidate_with_reason(|path| invalidation::Initialize { path: RcStr::from(path.to_string_lossy()), - } - }); + }) + .await?; + } else { + project_fs + .invalidate_path_and_children_with_reason(paths_to_invalidate, |path| { + invalidation::Initialize { + path: RcStr::from(path.to_string_lossy()), + } + }) + .await?; } Ok(()) diff --git a/crates/next-napi-bindings/src/next_api/turbopack_ctx.rs b/crates/next-napi-bindings/src/next_api/turbopack_ctx.rs index 12381c65721b..293769ca71d0 100644 --- a/crates/next-napi-bindings/src/next_api/turbopack_ctx.rs +++ b/crates/next-napi-bindings/src/next_api/turbopack_ctx.rs @@ -249,6 +249,8 @@ pub fn create_turbo_tasks( }), dependency_tracking, num_workers: Some(tokio::runtime::Handle::current().metrics().num_workers()), + evict_after_snapshot: std::env::var("TURBO_ENGINE_EVICT_AFTER_SNAPSHOT") + .is_ok_and(|v| v == "1" || v == "true"), ..Default::default() }, Either::Left(backing_storage), diff --git a/test/e2e/filesystem-cache/evict-after-snapshot.test.ts b/test/e2e/filesystem-cache/evict-after-snapshot.test.ts new file mode 100644 index 000000000000..2a0c6289646c --- /dev/null +++ b/test/e2e/filesystem-cache/evict-after-snapshot.test.ts @@ -0,0 +1,91 @@ +import { nextTestSetup, isNextDev } from 'e2e-utils' +import { retry, waitFor } from 'next-test-utils' + +// Eviction requires the dev server (HMR) and persistent caching (Turbopack). +// Skip entirely in prod/start mode. +;(isNextDev ? describe : describe.skip)('evict-after-snapshot', () => { + const envVars = [ + 'ENABLE_CACHING=1', + 'TURBO_ENGINE_IGNORE_DIRTY=1', + 'TURBO_ENGINE_SNAPSHOT_IDLE_TIMEOUT_MILLIS=1000', + 'TURBO_ENGINE_EVICT_AFTER_SNAPSHOT=1', + ].join(' ') + + const { skipped, next } = nextTestSetup({ + files: __dirname, + skipDeployment: true, + packageJson: { + scripts: { + dev: `${envVars} next dev`, + }, + }, + installCommand: 'npm i', + startCommand: 'npm run dev', + }) + + if (skipped) { + return + } + + async function waitForSnapshotAndEviction() { + // The idle timeout is 1s, give extra time for snapshot + eviction to complete + await waitFor(5000) + } + + // Turbopack-only: eviction requires persistent caching + ;(process.env.IS_TURBOPACK_TEST ? it : it.skip)( + 'should serve correct content after eviction and HMR', + async () => { + const browser = await next.browser('/') + await retry(async () => { + expect(await browser.elementByCss('p').text()).toBe('hello world') + }) + + let currentContent = 'hello world' + for (let cycle = 1; cycle <= 3; cycle++) { + await waitForSnapshotAndEviction() + + const prevContent = currentContent + const nextContent = `cycle ${cycle}` + await next.patchFile('app/page.tsx', (content) => + content.replace(prevContent, nextContent) + ) + currentContent = nextContent + + const expected = currentContent + await retry(async () => { + expect(await browser.elementByCss('p').text()).toBe(expected) + }, 10000) + } + + await browser.close() + }, + 90000 + ) + ;(process.env.IS_TURBOPACK_TEST ? it : it.skip)( + 'should handle client component HMR after eviction', + async () => { + const browser = await next.browser('/client') + await retry(async () => { + expect(await browser.elementByCss('p').text()).toBe('hello world') + }) + + await waitForSnapshotAndEviction() + + await next.patchFile( + 'app/client/page.tsx', + (content) => content.replace('hello world', 'hello eviction'), + async () => { + await retry(async () => { + expect(await browser.elementByCss('p').text()).toBe( + 'hello eviction' + ) + }, 10000) + } + ) + + await browser.close() + }, + 90000 + ) +}) diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index bd3c2854413d..0c4fdc1ecf35 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -619,8 +619,7 @@ impl TurboPersistence /// Clears all caches of the database. pub fn clear_cache(&self) { - self.key_block_cache.clear(); - self.value_block_cache.clear(); + self.clear_block_caches(); for meta in self.inner.write().meta_files.iter_mut() { meta.clear_cache(); } diff --git a/turbopack/crates/turbo-tasks-auto-hash-map/src/set.rs b/turbopack/crates/turbo-tasks-auto-hash-map/src/set.rs index 2231b50a395a..8552d792f4bc 100644 --- a/turbopack/crates/turbo-tasks-auto-hash-map/src/set.rs +++ b/turbopack/crates/turbo-tasks-auto-hash-map/src/set.rs @@ -97,6 +97,14 @@ impl AutoSet { pub fn contains(&self, key: &K) -> bool { self.map.contains_key(key) } + + /// see [HashSet::retain](https://doc.rust-lang.org/std/collections/hash_set/struct.HashSet.html#method.retain) + pub fn retain(&mut self, mut f: F) + where + F: FnMut(&K) -> bool, + { + self.map.retain(|k, _| f(k)); + } } impl AutoSet { diff --git a/turbopack/crates/turbo-tasks-backend/Cargo.toml b/turbopack/crates/turbo-tasks-backend/Cargo.toml index 42ed56f33044..8f51a01957e2 100644 --- a/turbopack/crates/turbo-tasks-backend/Cargo.toml +++ b/turbopack/crates/turbo-tasks-backend/Cargo.toml @@ -60,6 +60,7 @@ turbo-persistence = { workspace = true } turbo-rcstr = { workspace = true } turbo-tasks = { workspace = true } turbo-tasks-hash = { workspace = true } +turbo-tasks-malloc = { workspace = true, default-features = false } thread_local = { workspace = true } [dev-dependencies] @@ -68,6 +69,7 @@ futures = { workspace = true } indoc = { workspace = true } regex = { workspace = true } tempfile = { workspace = true } +triomphe = { workspace = true } turbo-tasks-malloc = { workspace = true } rstest = { workspace = true } turbo-tasks-testing = { workspace = true } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/cell_data.rs b/turbopack/crates/turbo-tasks-backend/src/backend/cell_data.rs index 40811eb94696..451935810523 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/cell_data.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/cell_data.rs @@ -45,6 +45,58 @@ impl CellData { pub fn new() -> Self { Self::default() } + + /// Drop cells that can be cheaply reconstructed on next access, retain + /// those that cannot. Called by the macro-generated `TaskStorage::drop_partial` + /// on the data-eviction path. + /// + /// Dropped: + /// - `Persistable` — restored from disk. + /// - `SkipPersist { expensive: false, .. }` — cheap to re-derive by re-running the task. + /// + /// Retained: + /// - `SkipPersist { expensive: true, .. }` — expensive to re-derive. + /// - `SessionStateful` — would lose accumulated state if dropped. + /// + /// Returns `true` if entries remain, so the caller can drop the whole + /// `LazyField::CellData` variant when empty. + pub fn drop_partial(&mut self) -> bool { + self.0.retain( + |cell_id, _| match registry::get_value_type(cell_id.type_id).persistence { + ValueTypePersistence::Persistable(_, _) + | ValueTypePersistence::SkipPersist { + expensive: false, + hash_only: _, + } => { + // these are either persisted or determined to not be worth persisting because + // they are cheap to re-derive + false + } + ValueTypePersistence::SkipPersist { + expensive: true, + hash_only: _, + } + | ValueTypePersistence::SessionStateful => { + // These are either impossible to derive or expensive so we retain. + true + } + }, + ); + if self.0.is_empty() { + return false; + } + self.shrink_to_fit(); + true + } +} + +impl IntoIterator for CellData { + type Item = (CellId, SharedReference); + type IntoIter = ::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } } impl Deref for CellData { @@ -129,3 +181,100 @@ impl TurboBincodeDecode for CellData { impl_encode_for_turbo_bincode_encode!(CellData); impl_decode_for_turbo_bincode_decode!(CellData); + +#[cfg(test)] +mod tests { + //! `drop_partial` must partition cells by their `ValueTypePersistence` — + //! keep the non-recoverable ones, drop the rest. Tests below declare one + //! value type per persistence variant and exercise every partition. + use turbo_tasks::{self as turbo_tasks, VcValueType}; + + use super::*; + + #[turbo_tasks::value] + struct PersistableV(#[allow(dead_code)] u32); + + #[turbo_tasks::value(serialization = "skip")] + struct SkipCheapV( + #[turbo_tasks(trace_ignore)] + #[allow(dead_code)] + u32, + ); + + #[turbo_tasks::value(serialization = "skip", evict = "last")] + struct SkipExpensiveV( + #[turbo_tasks(trace_ignore)] + #[allow(dead_code)] + u32, + ); + + #[turbo_tasks::value(serialization = "skip", evict = "never", cell = "new", eq = "manual")] + struct SessionStatefulV; + + #[turbo_tasks::value(serialization = "hash")] + struct HashOnlyV(#[allow(dead_code)] u32); + + fn cell_of(index: u32) -> CellId { + CellId { + type_id: V::get_value_type_id(), + index, + } + } + + fn dummy_ref() -> SharedReference { + // The drop_partial logic only inspects the key's type_id, not the + // value, so any Any + Send + Sync works. + SharedReference::new(triomphe::Arc::new(0u32)) + } + + #[test] + fn drop_partial_partitions_by_persistence() { + let mut data = CellData::new(); + data.insert(cell_of::(0), dummy_ref()); + data.insert(cell_of::(0), dummy_ref()); + data.insert(cell_of::(0), dummy_ref()); + data.insert(cell_of::(0), dummy_ref()); + data.insert(cell_of::(0), dummy_ref()); + + let still_has_entries = data.drop_partial(); + + assert!(still_has_entries, "two non-recoverable entries remain"); + assert_eq!(data.len(), 2); + assert!(data.contains_key(&cell_of::(0))); + assert!(data.contains_key(&cell_of::(0))); + assert!(!data.contains_key(&cell_of::(0))); + assert!(!data.contains_key(&cell_of::(0))); + assert!(!data.contains_key(&cell_of::(0))); + } + + #[test] + fn drop_partial_fully_empties_when_all_recoverable() { + let mut data = CellData::new(); + data.insert(cell_of::(0), dummy_ref()); + data.insert(cell_of::(0), dummy_ref()); + data.insert(cell_of::(0), dummy_ref()); + + let still_has_entries = data.drop_partial(); + + assert!(!still_has_entries); + assert!(data.is_empty()); + } + + #[test] + fn drop_partial_keeps_everything_when_all_non_recoverable() { + let mut data = CellData::new(); + data.insert(cell_of::(0), dummy_ref()); + data.insert(cell_of::(0), dummy_ref()); + + let still_has_entries = data.drop_partial(); + + assert!(still_has_entries); + assert_eq!(data.len(), 2); + } + + #[test] + fn drop_partial_on_empty_returns_false() { + let mut data = CellData::new(); + assert!(!data.drop_partial()); + } +} diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/counter_map.rs b/turbopack/crates/turbo-tasks-backend/src/backend/counter_map.rs index 108825abbf77..7a5b8c09216f 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/counter_map.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/counter_map.rs @@ -117,6 +117,34 @@ impl CounterMap { { self.0.remove(key) } + + /// Retain only entries for which `f(key, value)` returns `true`. + pub fn retain(&mut self, mut f: F) + where + F: FnMut(&K, &V) -> bool, + K: Eq + Hash, + { + self.0.retain(|k, v| f(k, v)); + } + + /// Extend this map with the entries from an iterator. Used by restore paths + /// to merge persistent entries loaded from disk into an existing map that + /// may hold transient residue. + pub fn extend(&mut self, iter: impl IntoIterator) + where + K: Eq + Hash, + { + self.0.extend(iter); + } +} + +impl IntoIterator for CounterMap { + type Item = (K, V); + type IntoIter = as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } } impl CounterMap { diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs index b0f09a0cf00e..c49f8bbeecc0 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/mod.rs @@ -13,7 +13,7 @@ use std::{ pin::Pin, sync::{ Arc, LazyLock, - atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, }, time::SystemTime, }; @@ -28,10 +28,10 @@ use tokio::time::{Duration, Instant}; use tracing::{Span, trace_span}; use turbo_bincode::{TurboBincodeBuffer, new_turbo_bincode_decoder, new_turbo_bincode_encoder}; use turbo_tasks::{ - CellId, FxDashMap, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency, - ReadOutputOptions, ReadTracking, SharedReference, StackDynTaskInputs, TRANSIENT_TASK_BIT, - TaskExecutionReason, TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasksBackendApi, - TurboTasksPanic, ValueTypeId, + CellId, RawVc, ReadCellOptions, ReadCellTracking, ReadConsistency, ReadOutputOptions, + ReadTracking, SharedReference, StackDynTaskInputs, TRANSIENT_TASK_BIT, TaskExecutionReason, + TaskId, TaskPersistence, TaskPriority, TraitTypeId, TurboTasksBackendApi, TurboTasksPanic, + ValueTypeId, backend::{ Backend, CachedTaskType, CellContent, CellHash, TaskExecutionSpec, TransientTaskType, TurboTaskContextError, TurboTaskLocalContextError, TurboTasksError, @@ -50,7 +50,7 @@ use turbo_tasks::{ pub use self::{ operation::AnyOperation, - storage::{SpecificTaskDataCategory, TaskDataCategory}, + storage::{EvictionCounts, SpecificTaskDataCategory, TaskDataCategory}, }; #[cfg(feature = "trace_task_dirty")] use crate::backend::operation::TaskDirtyCause; @@ -73,7 +73,6 @@ use crate::{ }, error::TaskError, utils::{ - dash_map_drop_contents::drop_contents, dash_map_raw_entry::{RawEntry, get_shard, raw_entry_in_shard, raw_get_in_shard}, ptr_eq_arc::PtrEqArc, shard_amount::compute_shard_amount, @@ -145,6 +144,11 @@ pub struct BackendOptions { /// Avoid big preallocations for faster startup. Should only be used for testing purposes. pub small_preallocation: bool, + + /// When enabled, evict all evictable tasks from in-memory storage after every snapshot. + /// This reclaims memory by clearing persisted data that can be re-loaded from disk on demand. + /// This is an EXPERIMENTAL FEATURE under development + pub evict_after_snapshot: bool, } impl Default for BackendOptions { @@ -155,13 +159,13 @@ impl Default for BackendOptions { storage_mode: Some(StorageMode::ReadWrite), num_workers: None, small_preallocation: false, + evict_after_snapshot: false, } } } pub enum TurboTasksBackendJob { - InitialSnapshot, - FollowUpSnapshot, + Snapshot, } pub struct TurboTasksBackend(Arc>); @@ -174,8 +178,6 @@ struct TurboTasksBackendInner { persisted_task_id_factory: IdFactoryWithReuse, transient_task_id_factory: IdFactoryWithReuse, - task_cache: FxDashMap, TaskId>, - storage: Storage, /// Number of executing operations + Highest bit is set when snapshot is @@ -193,9 +195,6 @@ struct TurboTasksBackendInner { /// Condition Variable that is triggered when a snapshot is completed and /// operations can continue. snapshot_completed: Condvar, - /// The timestamp of the last started snapshot since [`Self::start_time`]. - last_snapshot: AtomicU64, - stopping: AtomicBool, stopping_event: Event, idle_start_event: Event, @@ -222,6 +221,19 @@ impl TurboTasksBackend { pub fn backing_storage(&self) -> &B { &self.0.backing_storage } + + /// Perform a snapshot and then evict all evictable tasks from memory. + /// + /// This is exposed for integration tests that need to verify the + /// snapshot → evict → restore cycle works correctly. + /// + /// Returns `(snapshot_had_new_data, eviction_counts)`. + pub fn snapshot_and_evict_for_testing( + &self, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) -> (bool, EvictionCounts) { + self.0.snapshot_and_evict_for_testing(turbo_tasks) + } } impl TurboTasksBackendInner { @@ -245,13 +257,11 @@ impl TurboTasksBackendInner { TaskId::try_from(TRANSIENT_TASK_BIT).unwrap(), TaskId::MAX, ), - task_cache: FxDashMap::default(), storage: Storage::new(shard_amount, small_preallocation), in_progress_operations: AtomicUsize::new(0), snapshot_request: Mutex::new(SnapshotRequest::new()), operations_suspended: Condvar::new(), snapshot_completed: Condvar::new(), - last_snapshot: AtomicU64::new(0), stopping: AtomicBool::new(false), stopping_event: Event::new(|| || "TurboTasksBackend::stopping_event".to_string()), idle_start_event: Event::new(|| || "TurboTasksBackend::idle_start_event".to_string()), @@ -341,6 +351,39 @@ impl TurboTasksBackendInner { ) } + fn should_evict(&self) -> bool { + self.options.evict_after_snapshot && self.should_persist() + } + + /// Perform a snapshot and then evict all evictable tasks from memory. + /// + /// This is exposed for integration tests that need to verify the + /// snapshot → evict → restore cycle works correctly. + /// + /// Returns `(snapshot_had_new_data, eviction_counts)`. + #[doc(hidden)] + pub fn snapshot_and_evict_for_testing( + &self, + turbo_tasks: &dyn TurboTasksBackendApi>, + ) -> (bool, EvictionCounts) { + assert!( + self.should_persist(), + "snapshot_and_evict requires persistence" + ); + let snapshot_result = self.snapshot_and_persist(None, "test", turbo_tasks); + let had_new_data = match snapshot_result { + Ok((_, new_data)) => new_data, + Err(_) => { + // Snapshot/persist failed — skip eviction since the data may not + // be on disk yet. Evicting now could lose in-memory state that + // can't be restored. + return (false, EvictionCounts::default()); + } + }; + let counts = self.storage.evict_after_snapshot(None); + (had_new_data, counts) + } + fn should_restore(&self) -> bool { self.options.storage_mode.is_some() } @@ -1444,7 +1487,7 @@ impl TurboTasksBackendInner { // Schedule the snapshot job let _span = trace_span!("persisting background job").entered(); let _span = tracing::info_span!("thread").entered(); - turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::InitialSnapshot); + turbo_tasks.schedule_backend_background_job(TurboTasksBackendJob::Snapshot); } } @@ -1465,7 +1508,6 @@ impl TurboTasksBackendInner { { eprintln!("Persisting failed during shutdown: {err:?}"); } - drop_contents(&self.task_cache); self.storage.drop_contents(); if let Err(err) = self.backing_storage.shutdown() { println!("Shutting down failed: {err}"); @@ -1538,7 +1580,7 @@ impl TurboTasksBackendInner { // Compute hash and shard index once from borrowed components (no heap allocation). let arg_ref = arg.as_ref(); let hash = CachedTaskType::hash_from_components( - self.task_cache.hasher(), + self.storage.task_cache.hasher(), native_fn, this, arg_ref, @@ -1546,7 +1588,7 @@ impl TurboTasksBackendInner { // Locate the shard once so that the read-only lookup and any // write-lock retry below share the same reference (saves a modulo + // memory lookup on the miss path). - let shard = get_shard(&self.task_cache, hash); + let shard = get_shard(&self.storage.task_cache, hash); // Step 1: Fast read-only cache lookup (read lock, no allocation). // Use a read lock rather than a write lock to avoid contention. connect_child @@ -1572,7 +1614,7 @@ impl TurboTasksBackendInner { self.track_cache_hit_by_fn(native_fn); // Step 3a: Insert into in-memory cache using the pre-located shard. // Use the existing Arc from storage to avoid a duplicate allocation. - match raw_entry_in_shard(shard, self.task_cache.hasher(), hash, |k| { + match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| { k.eq_components(native_fn, this, arg_ref) }) { RawEntry::Occupied(_) => {} @@ -1582,7 +1624,7 @@ impl TurboTasksBackendInner { }; task_id } else { - match raw_entry_in_shard(shard, self.task_cache.hasher(), hash, |k| { + match raw_entry_in_shard(shard, self.storage.task_cache.hasher(), hash, |k| { k.eq_components(native_fn, this, arg_ref) }) { RawEntry::Occupied(e) => { @@ -2757,24 +2799,26 @@ impl TurboTasksBackendInner { ) -> Pin + Send + 'a>> { Box::pin(async move { match job { - TurboTasksBackendJob::InitialSnapshot | TurboTasksBackendJob::FollowUpSnapshot => { + TurboTasksBackendJob::Snapshot => { debug_assert!(self.should_persist()); - let last_snapshot = self.last_snapshot.load(Ordering::Relaxed); - let mut last_snapshot = self.start_time + Duration::from_millis(last_snapshot); + let mut last_snapshot = self.start_time; let mut idle_start_listener = self.idle_start_event.listen(); let mut idle_end_listener = self.idle_end_event.listen(); + // Whether to immediately set an idle timeout if possible. + // Set to false if we don't persist anything in a cycle. let mut fresh_idle = true; - loop { + let mut evicted = false; + let mut is_first = true; + 'outer: loop { const FIRST_SNAPSHOT_WAIT: Duration = Duration::from_secs(300); const SNAPSHOT_INTERVAL: Duration = Duration::from_secs(120); let idle_timeout = *IDLE_TIMEOUT; - let (time, mut reason) = - if matches!(job, TurboTasksBackendJob::InitialSnapshot) { - (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout") - } else { - (SNAPSHOT_INTERVAL, "regular snapshot interval") - }; + let (time, mut reason) = if is_first { + (FIRST_SNAPSHOT_WAIT, "initial snapshot timeout") + } else { + (SNAPSHOT_INTERVAL, "regular snapshot interval") + }; let until = last_snapshot + time; if until > Instant::now() { @@ -2797,7 +2841,7 @@ impl TurboTasksBackendInner { idle_start_listener = self.idle_start_event.listen() }, _ = &mut idle_end_listener => { - idle_time = until + idle_timeout; + idle_time = far_future(); idle_end_listener = self.idle_end_event.listen() }, _ = tokio::time::sleep_until(until) => { @@ -2828,26 +2872,67 @@ impl TurboTasksBackendInner { return; } Ok((snapshot_start, new_data)) => { + if !new_data { + fresh_idle = false; + } + is_first = false; last_snapshot = snapshot_start; + // Polls the idle-end event without blocking. Returns + // `true` and refreshes the listener if idle has ended, + // `false` if we are still idle. + macro_rules! check_idle_ended { + () => {{ + tokio::select! { + biased; + _ = &mut idle_end_listener => { + idle_end_listener = self.idle_end_event.listen(); + true + }, + _ = std::future::ready(()) => false, + } + }}; + } + // Evict persisted tasks from memory to reclaim space. + // Like compaction, this runs after snapshot_and_persist + // as a separate concern. + // + // TODO: improve eviction policy — current approach is a full sweep + // after every snapshot. Better strategies to consider: + // - Memory pressure signals: only evict when RSS exceeds a + // threshold rather than unconditionally. + // - Recency data: track last-access time per task and evict + // least-recently-used entries first rather than all at once. + // - Eviction intensity: partial sweeps (evict a fraction of + // eligible tasks per cycle) to reduce latency spikes. + // Evict when there is new data to persist (the common + // case) or on the very first snapshot after startup + // (data was already on disk from a prior run, so + // new_data may be false but in-memory state can still + // be evicted). + let mut ran_eviction = false; + if this.should_evict() && (new_data || !evicted) { + if check_idle_ended!() { + // need to start all the way over so we catch the next + // signal + continue 'outer; + } + evicted = true; + ran_eviction = true; + this.storage.evict_after_snapshot(background_span.id()); + } + // Compact while idle (up to limit), regardless of // whether the snapshot had new data. // `background_span` is not entered here because // `EnteredSpan` is `!Send` and would prevent the // future from being sent across threads when it // suspends at the `select!` await below. + let mut ran_compaction = false; const MAX_IDLE_COMPACTION_PASSES: usize = 10; for _ in 0..MAX_IDLE_COMPACTION_PASSES { - let idle_ended = tokio::select! { - biased; - _ = &mut idle_end_listener => { - idle_end_listener = self.idle_end_event.listen(); - true - }, - _ = std::future::ready(()) => false, - }; - if idle_ended { - break; + if check_idle_ended!() { + continue 'outer; } // Enter the span only around the synchronous // compact() call so we never hold an @@ -2858,7 +2943,9 @@ impl TurboTasksBackendInner { ) .entered(); match self.backing_storage.compact() { - Ok(true) => {} + Ok(true) => { + ran_compaction = true; + } Ok(false) => break, Err(err) => { eprintln!("Compaction failed: {err:?}"); @@ -2871,21 +2958,16 @@ impl TurboTasksBackendInner { } } } - - if !new_data { - fresh_idle = false; - continue; + if check_idle_ended!() { + continue 'outer; + } + // After running snapshotting/eviction/compaction we have churned a + // _lot_ of memory if we are still + // idle tell `mimalloc` that now would be a good time to release + // memory back to the OS + if new_data || ran_compaction || ran_eviction { + turbo_tasks_malloc::TurboMalloc::collect(true); } - let last_snapshot = last_snapshot.duration_since(self.start_time); - self.last_snapshot.store( - last_snapshot.as_millis().try_into().unwrap(), - Ordering::Relaxed, - ); - - turbo_tasks.schedule_backend_background_job( - TurboTasksBackendJob::FollowUpSnapshot, - ); - return; } } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs index 4882ac615dab..64f4a98c3005 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs @@ -23,7 +23,7 @@ use turbo_tasks::{ macro_helpers::NativeFunction, }; -use self::aggregation_update::ComputeDirtyAndCleanUpdate; +pub use self::aggregation_update::ComputeDirtyAndCleanUpdate; use crate::{ backend::{ EventDescription, OperationGuard, TaskDataCategory, TurboTasksBackend, @@ -562,6 +562,7 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> { if let Some(task_type) = entry.task_type.clone() { // Insert into the task cache to avoid future lookups self.backend + .storage .task_cache .entry(task_type) .or_insert(entry.task_id); @@ -641,7 +642,7 @@ fn apply_restore_result( task.flags.set_restoring(task_category, false); return Ok(()); } - task.restore_from(storage, task_category); + task.restore_from(storage, category); task.flags.set_restored(task_category); task.flags.set_restoring(task_category, false); Ok(()) @@ -1053,6 +1054,7 @@ impl Display for TaskTypeRef<'_> { } } +#[derive(Debug)] pub enum TaskType { Cached(Arc), Transient(Arc), diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs index c15fbb1e1c99..d550d253d0b4 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/operation/update_cell.rs @@ -65,7 +65,13 @@ impl UpdateCellOperation { // cleared.) debug_assert!( content_hash.is_none() - || matches!(value_type.persistence, ValueTypePersistence::HashOnly), + || matches!( + value_type.persistence, + ValueTypePersistence::SkipPersist { + expensive: _, + hash_only: true + } + ), "content_hash must only be supplied for HashOnly cells" ); @@ -112,18 +118,23 @@ impl UpdateCellOperation { // For HashOnly cells without available content, use hash-based comparison to // detect whether the value actually changed—avoiding unnecessary invalidation. - let skip_invalidation = - matches!(value_type.persistence, ValueTypePersistence::HashOnly) && { - let has_old_content = task.cell_data_contains(&cell); - if !has_old_content { - match (content_hash, task.get_cell_data_hash(&cell)) { - (Some(new_hash), Some(old_hash)) => new_hash == *old_hash, - _ => false, - } - } else { - false + let skip_invalidation = matches!( + value_type.persistence, + ValueTypePersistence::SkipPersist { + expensive: _, + hash_only: true + } + ) && { + let has_old_content = task.cell_data_contains(&cell); + if !has_old_content { + match (content_hash, task.get_cell_data_hash(&cell)) { + (Some(new_hash), Some(old_hash)) => new_hash == *old_hash, + _ => false, } - }; + } else { + false + } + }; #[cfg(feature = "trace_task_dirty")] let has_updated_key_hashes = updated_key_hashes.is_some(); @@ -170,7 +181,13 @@ impl UpdateCellOperation { let old_content = task.remove_cell_data(&cell); // Update cell_data_hash before dropping the task lock - if matches!(value_type.persistence, ValueTypePersistence::HashOnly) { + if matches!( + value_type.persistence, + ValueTypePersistence::SkipPersist { + expensive: _, + hash_only: true + } + ) { update_cell_data_hash(&mut task, &cell, content_hash); } @@ -210,7 +227,13 @@ impl UpdateCellOperation { }; // Update cell_data_hash for non-hashonly cells. - if matches!(value_type.persistence, ValueTypePersistence::HashOnly) { + if matches!( + value_type.persistence, + ValueTypePersistence::SkipPersist { + expensive: _, + hash_only: true + } + ) { update_cell_data_hash(&mut task, &cell, content_hash); } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs index 76270608f059..9651e5be3716 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage.rs @@ -1,6 +1,7 @@ use std::{ cell::Cell, - hash::Hash, + fmt::{Display, Formatter}, + hash::{BuildHasher, Hash}, ops::{Deref, DerefMut}, sync::{ Arc, @@ -9,16 +10,18 @@ use std::{ }; use thread_local::ThreadLocal; +use tracing::span::Id; use turbo_bincode::TurboBincodeBuffer; use turbo_tasks::{FxDashMap, TaskId, backend::CachedTaskType, event::Event, parallel}; use crate::{ - backend::storage_schema::TaskStorage, + backend::storage_schema::{KeyEvictability, TaskStorage, UnevictableReason, ValueEvictability}, backing_storage::SnapshotItem, database::key_value_database::KeySpace, utils::{ dash_map_drop_contents::drop_contents, dash_map_multi::{RefMut, get_multiple_mut}, + dash_map_raw_entry::{TryLockAndRemove, try_lock_and_remove}, }, }; @@ -29,6 +32,60 @@ pub enum TaskDataCategory { All, } +/// Counts of tasks evicted at each level. +#[derive(Debug, Default)] +pub struct EvictionCounts { + pub key_evictions: usize, + pub full: usize, + pub data_and_meta: usize, + pub data_only: usize, + pub meta_only: usize, + /// Per-reason counts of tasks we considered but could not evict, indexed by + /// `UnevictableReason::index()`. + pub unevictable_reasons: [usize; UnevictableReason::COUNT], +} + +impl std::ops::AddAssign for EvictionCounts { + fn add_assign(&mut self, rhs: Self) { + self.key_evictions += rhs.key_evictions; + self.full += rhs.full; + self.data_and_meta += rhs.data_and_meta; + self.data_only += rhs.data_only; + self.meta_only += rhs.meta_only; + for i in 0..UnevictableReason::COUNT { + self.unevictable_reasons[i] += rhs.unevictable_reasons[i]; + } + } +} + +impl Display for EvictionCounts { + /// Compact `field=value,...` form used as a single tracing span field so that + /// adding a new counter or `UnevictableReason` variant doesn't require updating + /// the span field list. + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let skipped: usize = self.unevictable_reasons.iter().sum(); + write!( + f, + "task_cache_evictions={},full={},data_and_meta={},data_only={},meta_only={},skipped={}", + self.key_evictions, + self.full, + self.data_and_meta, + self.data_only, + self.meta_only, + skipped, + )?; + for reason in UnevictableReason::ALL { + write!( + f, + ",{}={}", + reason.span_name(), + self.unevictable_reasons[reason.index()], + )?; + } + Ok(()) + } +} + impl TaskDataCategory { pub fn into_specific(self) -> SpecificTaskDataCategory { match self { @@ -91,12 +148,23 @@ pub struct Storage { /// - `None`: Task was first modified during snapshot mode (not part of current snapshot). Will /// be marked as modified at the beginning of the next snapshot cycle. snapshots: FxDashMap>>, + /// The main storage map + /// + /// Lock Ordering: Task creation acquires a `task_cache` lock and then inserts into this map. + /// Because both datastructures are sharded on different keys, the locks are not 'strictly' + /// ordered but we should treat them as such + /// Acquiring locks in the opposite order should be defensive map: FxDashMap>, /// A shared event notified whenever any task finishes restoring (successfully or not). /// /// Threads waiting for another thread's in-progress restore subscribe to this event, /// then re-check the specific task's `restoring`/`restored` bits after waking. pub(crate) restored: Event, + /// Maps `CachedTaskType` → `TaskId` for deduplication of persistent task creation. + /// This is backed by the TaskCache table in the database. + /// + /// LockOrdering: See the comments on [map]., + pub task_cache: FxDashMap, TaskId>, } impl Storage { @@ -112,8 +180,7 @@ impl Storage { Default::default(), shard_amount, ); - let num_shards = map.shards().len(); - let shard_modified_counts = (0..num_shards) + let shard_modified_counts = (0..shard_amount) .map(|_| AtomicU64::new(0)) .collect::>() .into_boxed_slice(); @@ -130,6 +197,7 @@ impl Storage { ), map, restored: Event::new(|| || "Storage::restored".to_string()), + task_cache: FxDashMap::default(), } } @@ -213,7 +281,6 @@ impl Storage { if modified_count == 0 { return None; } - let mut direct_snapshots: Vec<(TaskId, Box)> = Vec::new(); let mut modified = Vec::with_capacity(modified_count as usize); { let shard_guard = shard.read(); @@ -229,44 +296,26 @@ impl Storage { // accompanied by modified flags (set_persistent_task_type calls // track_modification), so any_modified() is sufficient. if flags.any_modified() { - debug_assert!( - !key.is_transient(), - "found a modified transient task: {:?}", - shared_value.get().get_persistent_task_type() - ); - - if flags.any_modified_during_snapshot() { - // Task was modified during snapshot mode, so a snapshot - // copy must exist in the snapshots map (created by the - // (true, true) case in track_modification_internal). - // Remove the entry entirely so end_snapshot doesn't - // double-process this task. When iterating in `next` we will - // re-synchronize the task flags. - let (_, snapshot) = self.snapshots.remove(key).expect( - "task with modified_during_snapshot must have a snapshots entry", + if key.is_transient() { + debug_assert!( + false, + "found a modified transient task: {:?}", + shared_value.get().get_persistent_task_type() ); - let snapshot = snapshot.expect( - "snapshot entry for modified_during_snapshot task must contain a \ - value", - ); - direct_snapshots.push((*key, snapshot)); - } else { - modified.push(*key); + continue; } + + modified.push(*key); } } // Safety: shard_guard must outlive the iterator. drop(shard_guard); } - // Early return for shards with no entries at all - if direct_snapshots.is_empty() && modified.is_empty() { - return None; - } + debug_assert!(!modified.is_empty()); Some(SnapshotShard { shard_idx, - direct_snapshots, modified, storage: self, process, @@ -380,6 +429,130 @@ impl Storage { pub fn drop_contents(&self) { drop_contents(&self.map); drop_contents(&self.snapshots); + drop_contents(&self.task_cache); + } + + /// Evict tasks from in-memory storage after a successful snapshot. + /// + /// Iterates all tasks and applies the eviction level returned by + /// `TaskStorage::evictability()`: + /// - `Full`: remove from map entirely + /// - `DataAndMeta`: drop both data and meta fields, keep task in map + /// - `DataOnly`: drop data fields only + /// - `MetaOnly`: drop meta fields only + /// - `No`: skip + /// + /// Must be called when NOT in snapshot mode (i.e., after `end_snapshot()`). + pub fn evict_after_snapshot(&self, parent_span: Option) -> EvictionCounts { + let span = tracing::trace_span!( + parent: parent_span, + "evict_after_snapshot", + total_task_cache_keys = self.task_cache.len(), + total_map_keys = self.map.len(), + counts = tracing::field::Empty, + ) + .entered(); + debug_assert!( + !self.snapshot_mode(), + "evict_after_snapshot must not be called during snapshot mode" + ); + + let counts: Vec = parallel::map_collect(self.map.shards(), |shard| { + let mut shard = shard.write(); + let mut evicted = EvictionCounts::default(); + // task_cache removals that we couldn't perform inline because the target shard + // was contended. We defer them until after the map shard lock is released to + // avoid a lock cycle with get_or_create_persistent_task, which takes task_cache + // before map. Allocated lazily on first conflict. + let mut deferred_task_cache_removals: Vec> = Vec::new(); + // SAFETY: We hold the write lock for the duration of iteration. + for bucket in unsafe { shard.iter() } { + // SAFETY: The write lock guard outlives the bucket reference. + let (task_id, task) = unsafe { bucket.as_mut() }; + if task_id.is_transient() { + evicted.unevictable_reasons[UnevictableReason::Transient.index()] += 1; + continue; + } + let (key_evictability, value_evictability) = task.get().evictability(); + match key_evictability { + KeyEvictability::Evictable => { + // The task type is persisted to backing storage (new_task = false), + // so task_cache is a pure perf cache. Remove it now; it will be + // re-populated by task_by_type() on the next cache miss. + let task_type = task.get().get_persistent_task_type().unwrap(); + // Only try to acquire the lock, if we cannot just remove at the end + // Because `get_or_create_task` acquires 'task_cache' then `storage.map` and + // we do the opposite we need to be defensive here. Attempting here is just + // an optimization to avoid pushing into `deferred_task_cache_removals` + match try_lock_and_remove(&self.task_cache, task_type.as_ref()) { + TryLockAndRemove::Removed => { + evicted.key_evictions += 1; + } + TryLockAndRemove::NotFound => { + // Generally this should be rare, it more or less implies something + // else is concurrently holding the Arc + } + TryLockAndRemove::WouldBlock => { + // Contention, to avoid a deadlock just defer + deferred_task_cache_removals.push(task_type.clone()); + } + } + } + KeyEvictability::AlreadyEvicted | KeyEvictability::Unevictable => {} + } + match value_evictability { + ValueEvictability::Evictable { meta, data } => { + task.get_mut().drop_partial(data, meta); + if task.get().is_empty() { + unsafe { + shard.erase(bucket); + } + evicted.full += 1; + } else if data && meta { + evicted.data_and_meta += 1; + } else if data { + evicted.data_only += 1; + } else { + debug_assert!(meta); + evicted.meta_only += 1; + } + } + ValueEvictability::Unevictable(reason) => { + evicted.unevictable_reasons[reason.index()] += 1; + } + } + } + // Shrink the shard if it's less than half full, to reclaim slack capacity + // after bulk evictions. We already hold the write lock, so this is free + // from a locking perspective. TaskId hashing is cheap (it's just an integer). + let len = shard.len(); + if shard.capacity() > len * 2 { + shard.shrink_to(len, |(k, _v)| self.map.hasher().hash_one(k)); + } + // Release the map shard lock before draining deferred removals so that a thread + // holding a task_cache shard lock and waiting on this map shard can make progress. + drop(shard); + for task_type in deferred_task_cache_removals { + if self.task_cache.remove(task_type.as_ref()).is_some() { + evicted.key_evictions += 1; + } + } + evicted + }); + + let mut totals = EvictionCounts::default(); + for evicted in counts { + totals += evicted; + } + // Shrink task_cache only when we evicted more entries than remain — i.e. the map + // is less than half full. shrink_to_fit() acquires each shard write lock in turn + // and rehashes surviving CachedTaskType entries, so we gate it on meaningful slack. + if totals.key_evictions > self.task_cache.len() { + self.task_cache.shrink_to_fit(); + } + span.record("counts", tracing::field::display(&totals)); + + totals } } @@ -568,7 +741,6 @@ impl Drop for SnapshotGuard<'_> { pub struct SnapshotShard<'l, P> { shard_idx: usize, - direct_snapshots: Vec<(TaskId, Box)>, modified: Vec, storage: &'l Storage, process: &'l P, @@ -606,16 +778,27 @@ where type Item = SnapshotItem; fn next(&mut self) -> Option { - // direct_snapshots: these tasks had a snapshot copy created by - // track_modification. We encode from the owned snapshot copy, - // clear the stale modified flags, and promote any _during_snapshot - // flags so the task stays dirty for the next cycle. - if let Some((task_id, snapshot)) = self.shard.direct_snapshots.pop() { - let item = (self.shard.process)(task_id, &snapshot, &mut self.buffer); - // Clear pre-snapshot flags. Since we removed this task's entry from the - // snapshots map in take_snapshot, end_snapshot won't see it, so we must - // promote here. + if let Some(task_id) = self.shard.modified.pop() { let mut inner = self.shard.storage.map.get_mut(&task_id).unwrap(); + // If the task was re-modified during snapshot, the snapshots map may + // hold a pre-modification copy we must serialize instead of the live + // data. Remove the entry so end_snapshot doesn't double-promote it; + // we promote manually below. + let item = if inner.flags.any_modified_during_snapshot() { + match self.shard.storage.snapshots.remove(&task_id) { + Some((_, Some(snapshot))) => { + (self.shard.process)(task_id, &snapshot, &mut self.buffer) + } + Some((_, None)) | None => { + (self.shard.process)(task_id, &inner, &mut self.buffer) + } + } + } else { + (self.shard.process)(task_id, &inner, &mut self.buffer) + }; + // Clear the modified flags that were captured into the snapshot copy, + // then promote modified_during_snapshot → modified so the task stays + // dirty for the next snapshot cycle. inner.flags.set_data_modified(false); inner.flags.set_meta_modified(false); inner.flags.set_new_task(false); @@ -624,45 +807,6 @@ where .promote_during_snapshot_flags(&mut inner, self.shard.shard_idx); return Some(item); } - // modified tasks: acquire a write lock to encode and clear flags in one pass. - if let Some(task_id) = self.shard.modified.pop() { - let mut inner = self.shard.storage.map.get_mut(&task_id).unwrap(); - if !inner.flags.any_modified_during_snapshot() { - let item = (self.shard.process)(task_id, &inner, &mut self.buffer); - inner.flags.set_data_modified(false); - inner.flags.set_meta_modified(false); - inner.flags.set_new_task(false); - return Some(item); - } else { - // Task was modified again during snapshot mode. A snapshot copy was - // created in track_modification_internal. Remove it and encode it. - // end_snapshot must not also process it, so we take it out of the map. - // snapshots is a separate DashMap from map, so holding `inner` across - // the remove and encode is safe — no lock ordering issue. - let snapshot = self - .shard - .storage - .snapshots - .remove(&task_id) - .expect("The snapshot bit was set, so it must be in Snapshot state") - .1 - .expect( - "snapshot entry for modified_during_snapshot task must contain a value", - ); - - let item = (self.shard.process)(task_id, &snapshot, &mut self.buffer); - // Clear the modified flags that were captured into the snapshot copy, - // then promote modified_during_snapshot → modified so the task stays - // dirty for the next snapshot cycle. - inner.flags.set_data_modified(false); - inner.flags.set_meta_modified(false); - inner.flags.set_new_task(false); - self.shard - .storage - .promote_during_snapshot_flags(&mut inner, self.shard.shard_idx); - return Some(item); - } - } None } } @@ -704,20 +848,22 @@ mod tests { } /// Regression test: a task modified before a snapshot and then modified *again* during - /// snapshot iteration must not trigger `debug_assert!(!inner.flags.any_modified())` in - /// `SnapshotShardIter::next`. + /// snapshot iteration must serialize the pre-snapshot state and carry the during-snapshot + /// modification forward to the next cycle. /// /// Sequence of events: /// 1. Task is modified (data_modified = true) → added to shard_modified_counts. /// 2. `start_snapshot` puts us in snapshot mode. - /// 3. `take_snapshot` scans the shard: task has `any_modified()=true` and - /// `any_modified_during_snapshot()=false` → task goes into the `modified` list. - /// 4. **Between scan and iteration**: `track_modification` is called on the task again. This is - /// the `(true, true)` branch: already modified AND in snapshot mode. A snapshot copy of the - /// pre-snapshot state is created (carrying the modified bits) and stored in `snapshots`. - /// 5. `SnapshotShardIter::next` processes the task from the `modified` list, finds - /// `any_modified_during_snapshot()=true`, clears the live modified flags (which were - /// captured into the snapshot), then asserts `!any_modified()` before promoting. + /// 3. `take_snapshot` scans the shard: task has `any_modified()=true` → goes into the + /// `modified` list. + /// 4. **Between scan and iteration**: `track_modification` is called on the same category. This + /// is the `(true, true)` branch: already modified AND in snapshot mode. A snapshot copy of + /// the pre-second-modification state is stored in `snapshots` as `Some(copy)`, and + /// `data_modified_during_snapshot` is set. + /// 5. `SnapshotShardIter::next` processes the task from the `modified` list, detects + /// `any_modified_during_snapshot()=true`, finds the `Some(copy)` in `snapshots`, encodes the + /// pre-snapshot copy, clears the live modified flags, removes the snapshots entry, and + /// promotes `data_modified_during_snapshot → data_modified` for the next cycle. // `end_snapshot` uses `parallel::for_each` which calls `block_in_place` internally, // requiring a multi-threaded Tokio runtime. #[tokio::test(flavor = "multi_thread")] @@ -751,8 +897,8 @@ mod tests { assert!(guard.flags.data_modified_during_snapshot()) } - // Step 5: consume the iterator. The iterator clears the live modified flags - // before the assert, encodes the snapshot copy, and promotes + // Step 5: consume the iterator. The iterator encodes from the pre-snapshot copy, + // clears the live modified flags, removes the snapshots entry, and promotes // `data_modified_during_snapshot → data_modified` for the next cycle. let items: Vec<_> = shards .into_iter() @@ -765,7 +911,7 @@ mod tests { { let guard = storage.access_mut(task_id); - // Ending the snapshot should have promoted modified_during_snapshot → modified. + // The iterator should have promoted modified_during_snapshot → modified. assert!(guard.flags.data_modified()); } @@ -777,4 +923,73 @@ mod tests { "shard_modified_counts must be non-zero after promoting modified_during_snapshot" ); } + + /// Regression test for the `(true, false)` during-snapshot case: a task modified in one + /// category before a snapshot, then modified in a *different* category during snapshot + /// iteration, must not panic and must carry both modifications forward correctly. + /// + /// Sequence of events: + /// 1. Task meta is modified (meta_modified = true). + /// 2. `start_snapshot` puts us in snapshot mode. + /// 3. `take_snapshot` scans the shard: task goes into the `modified` list. + /// 4. Task data is modified during snapshot → `(true, false)` branch: data was not previously + /// modified, so `snapshots` gets a `None` entry and `data_modified_during_snapshot` is set. + /// 5. `SnapshotShardIter::next` processes the task: finds `any_modified_during_snapshot()`, + /// sees `None` in snapshots, encodes from live data (correct — live data for the + /// unmodified-before-snapshot category is still the pre-snapshot state), clears pre-snapshot + /// flags, and promotes `data_modified_during_snapshot → data_modified`. + #[tokio::test(flavor = "multi_thread")] + async fn modify_different_category_during_snapshot() { + let storage = Storage::new(2, true); + let task_id = non_transient_task(1); + + // Step 1: modify meta only, outside snapshot mode. + { + let mut guard = storage.access_mut(task_id); + guard.track_modification(SpecificTaskDataCategory::Meta, "test"); + assert!(guard.flags.meta_modified()); + assert!(!guard.flags.data_modified()); + } + + // Step 2: enter snapshot mode. + let (snapshot_guard, has_modifications) = storage.start_snapshot(); + assert!(has_modifications); + + // Step 3: take_snapshot — task goes into modified list (meta_modified = true). + let shards = storage.take_snapshot(snapshot_guard, &dummy_process); + + // Step 4: modify data during snapshot. The `(true, false)` branch fires: + // data was not previously modified, so snapshots gets a None entry. + { + let mut guard = storage.access_mut(task_id); + guard.track_modification(SpecificTaskDataCategory::Data, "test"); + assert!(guard.flags.data_modified_during_snapshot()); + assert!(!guard.flags.meta_modified_during_snapshot()); + } + + // Step 5: consume the iterator — must not panic. + let items: Vec<_> = shards + .into_iter() + .flat_map(|shard| shard.into_iter()) + .collect(); + + assert_eq!(items.len(), 1); + assert_eq!(items[0].task_id, task_id); + + { + let guard = storage.access_mut(task_id); + // meta_modified was cleared by the iterator (it was the pre-snapshot flag). + assert!(!guard.flags.meta_modified()); + // data_modified_during_snapshot was promoted to data_modified. + assert!(guard.flags.data_modified()); + assert!(!guard.flags.data_modified_during_snapshot()); + } + + // Next snapshot cycle must pick up the promoted data_modified. + let (_guard2, has_modifications) = storage.start_snapshot(); + assert!( + has_modifications, + "shard_modified_counts must be non-zero after promoting data_modified_during_snapshot" + ); + } } diff --git a/turbopack/crates/turbo-tasks-backend/src/backend/storage_schema.rs b/turbopack/crates/turbo-tasks-backend/src/backend/storage_schema.rs index 8dc1a8fe47ba..f5daa26c8dc6 100644 --- a/turbopack/crates/turbo-tasks-backend/src/backend/storage_schema.rs +++ b/turbopack/crates/turbo-tasks-backend/src/backend/storage_schema.rs @@ -17,7 +17,7 @@ //! - `data` - Frequently changed bulk data (dependencies, cell data) //! - `meta` - Rarely changed metadata (output, aggregation, flags) //! - `transient` - Not serialized, only exists in memory -use std::sync::Arc; +use std::{hash::Hash, sync::Arc}; use parking_lot::Mutex; use turbo_tasks::{ @@ -298,6 +298,7 @@ struct TaskStorageSchema { storage = "auto_map", category = "data", shrink_on_completion, + custom_drop_partial, as_type = "AutoMap" )] cell_data: CellData, @@ -442,6 +443,170 @@ impl TaskFlags { } } +// ============================================================================= +// Eviction +// ============================================================================= + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[repr(u8)] +pub enum UnevictableReason { + // Either in progress or soon to be inprogress + InProgress, + /// Modified flags are set, or data/meta has not been restored yet. + Modified, + /// The task is transient + Transient, + // Keep `NothingToEvict` last: `COUNT` is derived from its discriminant. + NothingToEvict, +} + +impl UnevictableReason { + /// All variants in discriminant order. Keep this in sync when adding variants — + /// iteration and indexing rely on it covering every case. + pub const ALL: [UnevictableReason; Self::COUNT] = [ + UnevictableReason::InProgress, + UnevictableReason::Modified, + UnevictableReason::Transient, + UnevictableReason::NothingToEvict, + ]; + + /// Number of variants. Derived from the last variant's discriminant, so adding a + /// new variant before `NothingToEvict` stays correct automatically. + pub const COUNT: usize = (UnevictableReason::NothingToEvict as usize) + 1; + + #[inline] + pub const fn index(self) -> usize { + self as usize + } + + /// Stable name used as a tracing span field. Matches the snake_case convention + /// of the other span fields in `evict_after_snapshot`. + pub const fn span_name(self) -> &'static str { + match self { + UnevictableReason::InProgress => "skipped_in_progress", + UnevictableReason::Modified => "skipped_modified", + UnevictableReason::Transient => "skipped_transient", + UnevictableReason::NothingToEvict => "skipped_nothing_to_evict", + } + } +} + +/// Eviction level for a task after a snapshot. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ValueEvictability { + /// Task cannot be evicted. + Unevictable(UnevictableReason), + Evictable { + meta: bool, + data: bool, + }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum KeyEvictability { + Evictable, + /// The task was already removed from `task_cache` in a prior eviction cycle. + /// No hash lookup is needed; the caller can skip the remove entirely. + AlreadyEvicted, + /// This means the task is new, so we cannot evict it + Unevictable, +} + +impl TaskStorage { + /// Determine the evictability level of this task based on its flags. + /// + /// This checks only the flags on the TaskStorage itself. The caller + /// must additionally check that the task is not transient (via TaskId). + /// + /// Data and meta evictability are computed independently: + /// - `Full` if both are evictable and there is no meaningful transient state. + /// - `DataAndMeta` if both are evictable but transient state must be preserved. + /// - `DataOnly` / `MetaOnly` if only one category is evictable. + /// - `No` if neither can be evicted. + pub fn evictability(&self) -> (KeyEvictability, ValueEvictability) { + let flags = &self.flags; + + let key_evictability = if flags.new_task() { + KeyEvictability::Unevictable + } else { + match &self.persistent_task_type { + None => KeyEvictability::Unevictable, + // strong_count == 1: only TaskStorage holds this Arc (And we are holding a lock on + // that), so no task_cache entry references it — already evicted in + // a prior cycle. This covers tasks that are key-evictable but not + // data-evictable (data stays in the shard, persistent_task_type is + // never dropped). + Some(arc) if std::sync::Arc::strong_count(arc) == 1 => { + KeyEvictability::AlreadyEvicted + } + Some(_) => KeyEvictability::Evictable, + } + }; + // === Absolute blockers === + if flags.new_task() { + return ( + key_evictability, + ValueEvictability::Unevictable(UnevictableReason::Modified), + ); + } + // All these flags imply that the task is currently being used in some way + // either literally executing, or about to + if self.get_in_progress().is_some() + || self.get_activeness().is_some() + // Without these checks we could corrupt racing reads. + // Basically if a task restores ALL but data is already restored, then it will set meta_restoring, so it would break semantics to clear data_restored while that is happening. We could fix it by adding a loop to the restoring threads but it is just much simpler to back off in this case. + || flags.meta_restoring() + || flags.data_restoring() + { + return ( + key_evictability, + ValueEvictability::Unevictable(UnevictableReason::InProgress), + ); + } + debug_assert!( + self.get_transient_task_type().is_none(), + "only transient tasks can have transient_task_types so it cannot be set here" + ); + + // This is common after a round of eviction we end up with tasks with only transient state + // There is no need to search for it, we can just assume any task in this state is preserved + // for some reason. NOTE: new tasks have the restored flags set as part of construction so + // the only way for a task to end up in this situation is through eviction + if !flags.data_restored() && !flags.meta_restored() { + return ( + key_evictability, + ValueEvictability::Unevictable(UnevictableReason::NothingToEvict), + ); + } + + // === Data evictability (independent) === + // Data can be dropped if it's been restored from disk and hasn't been + // modified. + let data_evictable = flags.data_restored() + && !flags.data_modified() + && !flags.data_modified_during_snapshot(); + + // === Meta evictability (independent) === + // Same semantics as data: flag checks only. + let meta_evictable = flags.meta_restored() + && !flags.meta_modified() + && !flags.meta_modified_during_snapshot(); + + // === Combined decision === + ( + key_evictability, + if !data_evictable && !meta_evictable { + ValueEvictability::Unevictable(UnevictableReason::Modified) + } else { + ValueEvictability::Evictable { + meta: meta_evictable, + data: data_evictable, + } + }, + ) + } +} + // ============================================================================= // TaskStorage helper methods // ============================================================================= @@ -669,6 +834,27 @@ trait IsTransient { fn is_transient(&self) -> bool; } +impl IsTransient for TaskId { + fn is_transient(&self) -> bool { + TaskId::is_transient(self) + } +} + +impl IsTransient for CollectibleRef { + fn is_transient(&self) -> bool { + CollectibleRef::is_transient(self) + } +} +impl IsTransient for CollectiblesRef { + fn is_transient(&self) -> bool { + CollectiblesRef::is_transient(self) + } +} +impl IsTransient for OutputValue { + fn is_transient(&self) -> bool { + OutputValue::is_transient(self) + } +} impl IsTransient for (TraitTypeId, TaskId) { fn is_transient(&self) -> bool { self.1.is_transient() @@ -685,6 +871,56 @@ impl IsTransient for (CellRef, Option) { } } +/// Helper trait for drop_partial implementation +trait DropPartial { + /// Returns true if the value is now empty + fn drop_partial(&mut self) -> bool; +} + +impl DropPartial for Option { + fn drop_partial(&mut self) -> bool { + self.take_if(|v| !v.is_transient()); + self.is_none() + } +} + +impl DropPartial for AutoSet { + fn drop_partial(&mut self) -> bool { + self.retain(|t| t.is_transient()); + let end_len = self.len(); + if end_len == 0 { + true + } else { + self.shrink_to_fit(); + false + } + } +} + +impl DropPartial for CounterMap { + fn drop_partial(&mut self) -> bool { + self.retain(|k, _v| k.is_transient()); + let end_len = self.len(); + if end_len == 0 { + true + } else { + self.shrink_to_fit(); + false + } + } +} +impl DropPartial for AutoMap { + fn drop_partial(&mut self) -> bool { + self.retain(|k, v| k.is_transient() || v.is_transient()); + let end_len = self.len(); + if end_len == 0 { + true + } else { + self.shrink_to_fit(); + false + } + } +} #[cfg(test)] mod tests { use std::mem::size_of; @@ -1097,6 +1333,304 @@ mod tests { assert_eq!(decoded.output_dependencies(), None); } + // ========================================================================== + // drop_partial + restore_*_from round-trip with transient residue + // ========================================================================== + + fn persistent_task(id: u32) -> TaskId { + assert!(id & turbo_tasks::TRANSIENT_TASK_BIT == 0); + TaskId::new(id).unwrap() + } + + fn transient_task(id: u32) -> TaskId { + TaskId::new(id | turbo_tasks::TRANSIENT_TASK_BIT).unwrap() + } + + /// After `drop_partial(data=true)`, persistent entries in `filter_transient` + /// data fields are cleared but transient residue must remain so transient + /// dependents aren't silently lost. `restore_data_from` must then merge the + /// persistent portion back in without clobbering the residue. + #[test] + fn drop_partial_retains_transient_residue_data() { + let mut storage = TaskStorage::new(); + + // Mix persistent and transient references in a filter_transient data field. + storage.output_dependent_mut().insert(persistent_task(1)); + storage.output_dependent_mut().insert(persistent_task(2)); + storage.output_dependent_mut().insert(transient_task(3)); + + // Lazy filter_transient data field. + storage.cell_dependencies_mut().insert(( + CellRef { + task: persistent_task(10), + cell: CellId { + type_id: unsafe { turbo_tasks::ValueTypeId::new_unchecked(1) }, + index: 0, + }, + }, + None, + )); + + // Mark as restored so the task is eligible for dropping. + storage.flags.set_data_restored(true); + storage.flags.set_meta_restored(true); + + storage.drop_partial(true, false); + + // Persistent entries gone; transient residue preserved. + assert!(!storage.output_dependent().contains(&persistent_task(1))); + assert!(!storage.output_dependent().contains(&persistent_task(2))); + assert!(storage.output_dependent().contains(&transient_task(3))); + assert_eq!(storage.output_dependent().len(), 1); + // Lazy non-filter-transient residue: cell_dependencies had only persistent + // entries and should be dropped entirely. + assert!(storage.cell_dependencies().is_none()); + // data_restored cleared; meta_restored untouched. + assert!(!storage.flags.data_restored()); + assert!(storage.flags.meta_restored()); + + // Simulate a restore from disk: source has the persistent entries only + // (transient ones would have been filtered during encode). + let mut source = TaskStorage::new(); + source.output_dependent_mut().insert(persistent_task(1)); + source.output_dependent_mut().insert(persistent_task(2)); + + storage.restore_data_from(source); + + // After restore: persistent + transient should both be present. + assert!(storage.output_dependent().contains(&persistent_task(1))); + assert!(storage.output_dependent().contains(&persistent_task(2))); + assert!(storage.output_dependent().contains(&transient_task(3))); + assert_eq!(storage.output_dependent().len(), 3); + } + + /// Same idea for meta: transient `upper` keys (a `CounterMap` residue) must + /// survive the drop and merge cleanly with the persistent upper set on + /// restore. + #[test] + fn drop_partial_retains_transient_residue_meta() { + let mut storage = TaskStorage::new(); + + storage.upper_mut().insert(persistent_task(1), 1); + storage.upper_mut().insert(transient_task(2), 1); + + // Also populate a lazy filter_transient meta field. + storage.children_mut().insert(persistent_task(100)); + storage.children_mut().insert(transient_task(200)); + + storage.flags.set_data_restored(true); + storage.flags.set_meta_restored(true); + + storage.drop_partial(false, true); + + // Inline upper: transient residue remains. + assert_eq!(storage.upper().len(), 1); + assert_eq!(storage.upper().get(&transient_task(2)), Some(&1)); + // Lazy children: transient residue remains. + assert_eq!(storage.children().unwrap().len(), 1); + assert!(storage.children().unwrap().contains(&transient_task(200))); + assert!(!storage.flags.meta_restored()); + assert!(storage.flags.data_restored()); + + // Restore persistent meta fields. + let mut source = TaskStorage::new(); + source.upper_mut().insert(persistent_task(1), 1); + source.children_mut().insert(persistent_task(100)); + + storage.restore_meta_from(source); + + // After restore: residue + persistent are both present. + assert_eq!(storage.upper().len(), 2); + assert_eq!(storage.upper().get(&persistent_task(1)), Some(&1)); + assert_eq!(storage.upper().get(&transient_task(2)), Some(&1)); + assert_eq!(storage.children().unwrap().len(), 2); + assert!(storage.children().unwrap().contains(&persistent_task(100))); + assert!(storage.children().unwrap().contains(&transient_task(200))); + } + + /// `drop_partial` on a field with no transient entries must fully reset the + /// field to default — this is the hot path we optimized for. + #[test] + fn drop_partial_resets_fields_without_transients() { + let mut storage = TaskStorage::new(); + + storage.output_dependent_mut().insert(persistent_task(1)); + storage.output_dependent_mut().insert(persistent_task(2)); + storage.flags.set_data_restored(true); + storage.flags.set_meta_restored(true); + + storage.drop_partial(true, false); + + assert!(storage.output_dependent().is_empty()); + } + + /// Regression: `drop_partial(true, true)` must clear persisted flag bits + /// so a fully-evicted task reports `is_empty()`. Before this, tasks with + /// persistent data flags (e.g. `invalidator`, `immutable`) would get stuck + /// as `NothingToEvict` because `self.flags.0 != 0` even though all data + /// had been dropped. + #[test] + fn drop_partial_clears_persisted_flags_so_is_empty() { + let mut storage = TaskStorage::new(); + storage.flags.set_data_restored(true); + storage.flags.set_meta_restored(true); + storage.flags.set_invalidator(true); + storage.flags.set_immutable(true); + + storage.drop_partial(true, true); + + assert!(!storage.flags.invalidator()); + assert!(!storage.flags.immutable()); + assert!(!storage.flags.data_restored()); + assert!(!storage.flags.meta_restored()); + assert!( + storage.is_empty(), + "fully evicted storage should be is_empty() so it can be removed from the shard" + ); + } + + /// Filter-transient `output`: when `output` is `Some(transient)` it must + /// survive `drop_partial(meta=true)` so restore can merge the disk value + /// back in (normally disk value would be `None` if current output was + /// transient at encode time). + #[test] + fn drop_partial_retains_transient_output() { + let mut storage = TaskStorage::new(); + storage.set_output(OutputValue::Output(transient_task(1))); + storage.flags.set_data_restored(true); + storage.flags.set_meta_restored(true); + + storage.drop_partial(false, true); + + // Transient output retained. + assert_eq!( + storage.get_output(), + Some(&OutputValue::Output(transient_task(1))) + ); + } + + // ========================================================================== + // cell_data custom_drop_partial dispatch + // ========================================================================== + + mod cell_data_drop_partial { + //! End-to-end: verify `TaskStorage::drop_partial` dispatches to + //! `CellData::drop_partial`, and that `restore_data_from` merges the + //! retained residue with incoming persistent entries instead of + //! clobbering it. The per-variant partitioning is covered in + //! `cell_data.rs` — here we only need one non-recoverable entry as + //! residue and one recoverable entry to be dropped. + use turbo_tasks::{self as turbo_tasks, VcValueType}; + + use super::*; + + #[turbo_tasks::value] + struct Keepable(#[allow(dead_code)] u32); + + #[turbo_tasks::value(serialization = "skip", evict = "last")] + struct KeepMe( + #[turbo_tasks(trace_ignore)] + #[allow(dead_code)] + u32, + ); + + fn dummy_ref() -> SharedReference { + SharedReference::new(triomphe::Arc::new(0u32)) + } + + fn keepable_cell(index: u32) -> CellId { + CellId { + type_id: Keepable::get_value_type_id(), + index, + } + } + + fn keep_me_cell(index: u32) -> CellId { + CellId { + type_id: KeepMe::get_value_type_id(), + index, + } + } + + #[test] + fn drop_partial_retains_non_recoverable_entries() { + let mut storage = TaskStorage::new(); + storage + .cell_data_mut() + .insert(keepable_cell(0), dummy_ref()); + storage.cell_data_mut().insert(keep_me_cell(1), dummy_ref()); + storage.flags.set_data_restored(true); + storage.flags.set_meta_restored(true); + + storage.drop_partial(true, false); + + let cells = storage.cell_data().expect("residue keeps the variant"); + assert_eq!(cells.len(), 1); + assert!(cells.contains_key(&keep_me_cell(1))); + assert!(!cells.contains_key(&keepable_cell(0))); + } + + #[test] + fn drop_partial_removes_variant_when_all_recoverable() { + let mut storage = TaskStorage::new(); + storage + .cell_data_mut() + .insert(keepable_cell(0), dummy_ref()); + storage.flags.set_data_restored(true); + storage.flags.set_meta_restored(true); + + storage.drop_partial(true, false); + + assert!( + storage.cell_data().is_none(), + "variant is dropped when drop_partial empties it" + ); + } + + #[test] + fn restore_merges_residue_with_incoming() { + let mut storage = TaskStorage::new(); + storage + .cell_data_mut() + .insert(keepable_cell(0), dummy_ref()); + storage.cell_data_mut().insert(keep_me_cell(1), dummy_ref()); + storage.flags.set_data_restored(true); + storage.flags.set_meta_restored(true); + + storage.drop_partial(true, false); + // Only KeepMe entry survives. + assert_eq!(storage.cell_data().unwrap().len(), 1); + + // Simulate a restore: disk had only the persistable entry. + let mut source = TaskStorage::new(); + source.cell_data_mut().insert(keepable_cell(0), dummy_ref()); + + storage.restore_data_from(source); + + let cells = storage + .cell_data() + .expect("residue + incoming both present"); + assert_eq!(cells.len(), 2); + assert!(cells.contains_key(&keepable_cell(0))); + assert!(cells.contains_key(&keep_me_cell(1))); + } + + #[test] + fn drop_partial_meta_does_not_touch_cell_data() { + let mut storage = TaskStorage::new(); + storage + .cell_data_mut() + .insert(keepable_cell(0), dummy_ref()); + storage.flags.set_data_restored(true); + storage.flags.set_meta_restored(true); + + storage.drop_partial(false, true); + + // cell_data is category=data; meta-only drop leaves it alone. + assert_eq!(storage.cell_data().unwrap().len(), 1); + } + } + // ========================================================================== // Schema Size Tests // ========================================================================== diff --git a/turbopack/crates/turbo-tasks-backend/src/utils/dash_map_raw_entry.rs b/turbopack/crates/turbo-tasks-backend/src/utils/dash_map_raw_entry.rs index 0aaf2c49742a..57714379824a 100644 --- a/turbopack/crates/turbo-tasks-backend/src/utils/dash_map_raw_entry.rs +++ b/turbopack/crates/turbo-tasks-backend/src/utils/dash_map_raw_entry.rs @@ -64,6 +64,47 @@ pub fn raw_entry_in_shard<'l, K: Eq + Hash, V, S: BuildHasher + Clone>( } } +/// Outcome of [`try_lock_and_remove`]. +pub enum TryLockAndRemove { + /// The shard lock was acquired and a matching entry was removed. + Removed, + /// The shard lock was acquired but no matching entry was present. + NotFound, + /// The shard lock was contended; the caller should retry later after releasing + /// any other locks they are holding. + WouldBlock, +} + +/// Remove `key` from `map` without blocking on shard contention. +/// +/// Intended for call sites that already hold another lock and want to avoid a +/// cyclic wait. On contention (`WouldBlock`), the caller is expected to defer the +/// removal and retry after dropping the other lock. +pub fn try_lock_and_remove< + K: Eq + Hash + AsRef, + V, + Q: Eq + Hash + ?Sized, + S: BuildHasher + Clone, +>( + map: &DashMap, + key: &Q, +) -> TryLockAndRemove { + let hasher = map.hasher(); + let hash = hasher.hash_one(key); + let shard_idx = map.determine_shard(hash as usize); + let Some(mut shard) = map.shards()[shard_idx].try_write() else { + return TryLockAndRemove::WouldBlock; + }; + // SAFETY: we hold the write lock for the duration of the find/erase. + match shard.find(hash, |(k, _v)| k.as_ref() == key) { + Some(bucket) => { + unsafe { shard.erase(bucket) }; + TryLockAndRemove::Removed + } + None => TryLockAndRemove::NotFound, + } +} + pub enum RawEntry<'l, K, V> { Occupied(OccupiedEntry<'l, K, V>), Vacant(VacantEntry<'l, K, V>), diff --git a/turbopack/crates/turbo-tasks-backend/tests/eviction.rs b/turbopack/crates/turbo-tasks-backend/tests/eviction.rs new file mode 100644 index 000000000000..21b4abd44784 --- /dev/null +++ b/turbopack/crates/turbo-tasks-backend/tests/eviction.rs @@ -0,0 +1,597 @@ +#![feature(arbitrary_self_types)] +#![feature(arbitrary_self_types_pointers)] +#![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this + +use std::sync::{ + Arc, + atomic::{AtomicBool, AtomicU64, Ordering}, +}; + +use anyhow::Result; +use turbo_tasks::{ + ResolvedVc, State, TurboTasks, Vc, unmark_top_level_task_may_leak_eventually_consistent_state, +}; +use turbo_tasks_backend::{BackendOptions, GitVersionInfo, TurboBackingStorage, TurboTasksBackend}; + +fn create_tt_with_workers( + name: &str, + num_workers: usize, +) -> Arc>> { + use std::hash::BuildHasher; + let path = std::path::PathBuf::from(format!( + "{}/.cache/{}", + env!("CARGO_TARGET_TMPDIR"), + rustc_hash::FxBuildHasher.hash_one(name) + )); + let _ = std::fs::remove_dir_all(&path); + std::fs::create_dir_all(&path).unwrap(); + TurboTasks::new(TurboTasksBackend::new( + BackendOptions { + num_workers: Some(num_workers), + small_preallocation: true, + storage_mode: Some(turbo_tasks_backend::StorageMode::ReadWrite), + evict_after_snapshot: true, + ..Default::default() + }, + turbo_tasks_backend::turbo_backing_storage( + path.as_path(), + &GitVersionInfo { + describe: "test-unversioned", + dirty: false, + }, + false, + true, + true, + ) + .unwrap() + .0, + )) +} + +fn create_tt(name: &str) -> Arc>> { + use std::hash::BuildHasher; + let path = std::path::PathBuf::from(format!( + "{}/.cache/{}", + env!("CARGO_TARGET_TMPDIR"), + rustc_hash::FxBuildHasher.hash_one(name) + )); + let _ = std::fs::remove_dir_all(&path); + std::fs::create_dir_all(&path).unwrap(); + TurboTasks::new(TurboTasksBackend::new( + BackendOptions { + num_workers: Some(2), + small_preallocation: true, + storage_mode: Some(turbo_tasks_backend::StorageMode::ReadWrite), + evict_after_snapshot: true, + ..Default::default() + }, + turbo_tasks_backend::turbo_backing_storage( + path.as_path(), + &GitVersionInfo { + describe: "test-unversioned", + dirty: false, + }, + false, + true, + true, + ) + .unwrap() + .0, + )) +} + +/// Verify that after eviction, task re-execution produces correct results. +/// This tests the snapshot → evict → invalidate → restore → re-execute cycle. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn eviction_recompute() { + let tt = create_tt("eviction_recompute"); + let tt2 = tt.clone(); + + let result = turbo_tasks::run_once(tt.clone(), async move { + unmark_top_level_task_may_leak_eventually_consistent_state(); + + // Create state via operation (persistent task) + let state_op = create_state(1); + let state_vc = state_op.resolve().strongly_consistent().await?; + let state = state_op.read_strongly_consistent().await?; + + // Create compute task (persistent, depends on state) + let output = compute(state_vc); + let read = output.read_strongly_consistent().await?; + assert_eq!(read.value, 1); + let initial_random = read.random; + + // Trigger snapshot + eviction + let (had_data, counts) = tt2.backend().snapshot_and_evict_for_testing(&*tt2); + println!("snapshot had_data={had_data}, evicted: {counts:?}"); + assert!(had_data, "snapshot should have persisted data"); + + // Invalidate via state change — this requires restoring evicted tasks + state.set(2); + + // Read again — tasks must be restored from disk before re-executing + let read = output.read_strongly_consistent().await?; + assert_eq!(read.value, 2); + assert_ne!(read.random, initial_random); + + anyhow::Ok(()) + }) + .await; + tt.stop_and_wait().await; + result.unwrap(); +} + +/// Verify that eviction works with a deep (4-level) dependency chain. +/// Multiple intermediate tasks should be evicted and restored correctly. +/// Chain: create_state → add_one → times_three → plus_ten → deep_chain +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn eviction_deep_chain() { + let tt = create_tt("eviction_deep_chain"); + let tt2 = tt.clone(); + + let result = turbo_tasks::run_once(tt.clone(), async move { + unmark_top_level_task_may_leak_eventually_consistent_state(); + + let state_op = create_state(10); + let state_vc = state_op.resolve().strongly_consistent().await?; + let state = state_op.read_strongly_consistent().await?; + + let output = deep_chain(state_vc); + let read = output.read_strongly_consistent().await?; + // (10+1)*3+10 = 43 + assert_eq!(read.value, 43); + let initial_random = read.random; + + // Snapshot + evict — expect multiple intermediate tasks evicted + let (had_data, counts) = tt2.backend().snapshot_and_evict_for_testing(&*tt2); + println!("deep_chain: snapshot had_data={had_data}, evicted: {counts:?}"); + assert!(had_data, "snapshot should have persisted data"); + assert!( + counts.full + counts.data_and_meta + counts.data_only + counts.meta_only > 0, + "expected some tasks to be evicted" + ); + + // Change the deepest input — must propagate through all restored tasks + state.set(20); + + let read = output.read_strongly_consistent().await?; + // (20+1)*3+10 = 73 + assert_eq!(read.value, 73); + assert_ne!(read.random, initial_random); + let random_after_first = read.random; + + // Evict again and change again + let (had_data2, counts2) = tt2.backend().snapshot_and_evict_for_testing(&*tt2); + println!("deep_chain (2nd): snapshot had_data={had_data2}, evicted: {counts2:?}"); + + state.set(0); + + let read = output.read_strongly_consistent().await?; + // (0+1)*3+10 = 13 + assert_eq!(read.value, 13); + assert_ne!(read.random, random_after_first); + + anyhow::Ok(()) + }) + .await; + tt.stop_and_wait().await; + result.unwrap(); +} + +/// Verify that eviction + restore preserves dependency edges correctly. +/// After eviction, changing a deep dependency should still propagate +/// through the entire chain. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn eviction_dependency_chain() { + let tt = create_tt("eviction_dependency_chain"); + let tt2 = tt.clone(); + + let result = turbo_tasks::run_once(tt.clone(), async move { + unmark_top_level_task_may_leak_eventually_consistent_state(); + + let state_op = create_state(10); + let state_vc = state_op.resolve().strongly_consistent().await?; + let state = state_op.read_strongly_consistent().await?; + + let output = compute_chain(state_vc); + let read = output.read_strongly_consistent().await?; + assert_eq!(read.value, 20); // 10 * 2 + let initial_random = read.random; + + // Snapshot + evict + let (had_data, counts) = tt2.backend().snapshot_and_evict_for_testing(&*tt2); + println!("snapshot had_data={had_data}, evicted: {counts:?}"); + assert!(had_data, "snapshot should have persisted data"); + assert!( + counts.full + counts.data_and_meta + counts.data_only + counts.meta_only > 0, + "expected some tasks to be evicted" + ); + + // Change the deepest input + state.set(5); + + let read = output.read_strongly_consistent().await?; + assert_eq!(read.value, 10); // 5 * 2 + assert_ne!(read.random, initial_random); + let random_after_first = read.random; + + // Evict again + let (had_data2, counts2) = tt2.backend().snapshot_and_evict_for_testing(&*tt2); + println!("snapshot (2nd) had_data={had_data2}, evicted: {counts2:?}"); + + // Change again + state.set(100); + + let read = output.read_strongly_consistent().await?; + assert_eq!(read.value, 200); // 100 * 2 + assert_ne!(read.random, random_after_first); + + anyhow::Ok(()) + }) + .await; + tt.stop_and_wait().await; + result.unwrap(); +} + +#[turbo_tasks::value(transparent)] +struct Step(State); + +#[turbo_tasks::function(operation)] +fn create_state(initial: u32) -> Vc { + Step(State::new(initial)).cell() +} + +#[turbo_tasks::value] +struct Output { + value: u32, + random: u32, +} + +#[turbo_tasks::function(operation)] +async fn compute(input: ResolvedVc) -> Result> { + let value = *input.await?.get(); + Ok(Output { + value, + random: rand::random(), + } + .cell()) +} + +/// Inner function in the dependency chain +#[turbo_tasks::function(operation)] +async fn double(input: ResolvedVc) -> Result> { + let value = *input.await?.get(); + Ok(Vc::cell(value * 2)) +} + +/// Outer function that depends on `double` +#[turbo_tasks::function(operation)] +async fn compute_chain(input: ResolvedVc) -> Result> { + let doubled = double(input); + let value = *doubled.connect().await?; + Ok(Output { + value, + random: rand::random(), + } + .cell()) +} + +// ========================================================================= +// Deep chain helpers — each layer reads the previous layer's output +// ========================================================================= + +#[turbo_tasks::function(operation)] +async fn add_one(input: ResolvedVc) -> Result> { + let value = *input.await?.get(); + Ok(Vc::cell(value + 1)) +} + +#[turbo_tasks::function(operation)] +async fn times_three(input: ResolvedVc) -> Result> { + let value = *input.await?; + Ok(Vc::cell(value * 3)) +} + +#[turbo_tasks::function(operation)] +async fn plus_ten(input: ResolvedVc) -> Result> { + let value = *input.await?; + Ok(Vc::cell(value + 10)) +} + +#[turbo_tasks::function(operation)] +async fn deep_chain(input: ResolvedVc) -> Result> { + // input → add_one → times_three → plus_ten → Output + // For input=10: (10+1)*3+10 = 43 + let a = add_one(input).resolve().strongly_consistent().await?; + let b = times_three(a).resolve().strongly_consistent().await?; + let c = plus_ten(b).resolve().strongly_consistent().await?; + let value = *c.await?; + Ok(Output { + value, + random: rand::random(), + } + .cell()) +} + +// ========================================================================= +// Session-stateful value — accumulates interior state that should not be +// evicted mid-session. +// ========================================================================= + +/// A value marked `serialization = "none"` — tasks that write cells of this type cannot be +/// persisted, so their data lives in `transient_cell_data` and is protected from eviction by +/// the existing transient-cell check in the storage schema. +#[turbo_tasks::value(serialization = "skip", evict = "never", cell = "new", eq = "manual")] +struct SessionCounter { + count: u32, +} + +/// Intermediate operation task that writes a session-stateful cell. +/// Because this task is only resolved (not directly read) by the top-level +/// transient task, it has no transient dependents and is eligible for eviction +/// consideration — but should be blocked by the session-stateful flag. +#[turbo_tasks::function(operation)] +fn create_session_counter(initial: u32) -> Vc { + SessionCounter { count: initial }.cell() +} + +/// Resolves the session counter internally so the transient run_once task +/// doesn't need to resolve it directly (which would add a transient dependent +/// edge to create_session_counter, preventing us from testing the +/// session-stateful eviction gate). +#[turbo_tasks::function(operation)] +async fn read_session_counter(initial: u32) -> Result> { + let counter = create_session_counter(initial) + .resolve() + .strongly_consistent() + .await?; + let c = counter.await?; + Ok(Output { + value: c.count, + random: rand::random(), + } + .cell()) +} + +/// Verify that tasks with session-stateful cells are NOT evicted, while +/// normal persistent tasks without transient dependents ARE evicted. +/// +/// Uses a two-layer chain so that create_session_counter (the task that writes +/// the session-stateful cell) has no transient dependents — only +/// read_session_counter reads it, and it is itself a persistent task. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn eviction_session_stateful_survives() { + let tt = create_tt("eviction_session_stateful_survives"); + let tt2 = tt.clone(); + + let result = turbo_tasks::run_once(tt.clone(), async move { + unmark_top_level_task_may_leak_eventually_consistent_state(); + + // read_session_counter internally creates+resolves create_session_counter(42). + // The transient run_once only reads read_session_counter, so + // create_session_counter has no transient dependents and is eligible for + // eviction consideration — but should be blocked by SessionStateful. + let reader = read_session_counter(42); + let read = reader.read_strongly_consistent().await?; + assert_eq!(read.value, 42); + + // Also build a normal (evictable) chain for comparison + let state_op = create_state(10); + let state_vc = state_op.resolve().strongly_consistent().await?; + let normal = deep_chain(state_vc); + let normal_read = normal.read_strongly_consistent().await?; + // (10+1)*3+10 = 43 + assert_eq!(normal_read.value, 43); + + // Snapshot + evict + let (had_data, counts) = tt2.backend().snapshot_and_evict_for_testing(&*tt2); + println!("session_stateful: snapshot had_data={had_data}, evicted: {counts:?}"); + assert!(had_data, "snapshot should have persisted data"); + // The normal intermediate tasks (add_one, times_three, plus_ten) should be + // evicted. The session-stateful create_session_counter should NOT be fully + // evicted (its data is blocked by has_session_stateful_cells). + assert!( + counts.full + counts.data_and_meta + counts.data_only + counts.meta_only > 0, + "normal intermediate tasks should be evicted" + ); + + // After eviction, reading through the session-stateful chain should still work + let read2 = reader.read_strongly_consistent().await?; + assert_eq!(read2.value, 42); + + anyhow::Ok(()) + }) + .await; + tt.stop_and_wait().await; + result.unwrap(); +} + +/// Verify that transient tasks reading persistent tasks still get invalidated +/// after the persistent tasks are evicted and restored. +/// +/// The `run_once` closure is itself a transient task. We create persistent +/// operation tasks, evict them, then mutate state and confirm the transient +/// reader sees the updated value. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn eviction_transient_reader_invalidated() { + let tt = create_tt("eviction_transient_reader_invalidated"); + let tt2 = tt.clone(); + + let result = turbo_tasks::run_once(tt.clone(), async move { + unmark_top_level_task_may_leak_eventually_consistent_state(); + + // Create persistent state + compute tasks + let state_op = create_state(50); + let state_vc = state_op.resolve().strongly_consistent().await?; + let state = state_op.read_strongly_consistent().await?; + + let output = compute(state_vc); + let read = output.read_strongly_consistent().await?; + assert_eq!(read.value, 50); + let initial_random = read.random; + + // Snapshot + evict. The persistent `compute` task has a transient dependent + // (this run_once closure), so it may be blocked from full eviction. But we + // still exercise the evict path — some tasks (like create_state) may be + // data-only evicted. + let (had_data, counts) = tt2.backend().snapshot_and_evict_for_testing(&*tt2); + println!("transient_reader: snapshot had_data={had_data}, evicted: {counts:?}"); + assert!(had_data, "snapshot should have persisted data"); + + // Mutate state — this invalidates the persistent task, which must propagate + // to the transient reader (this closure) even after eviction. + state.set(99); + + let read = output.read_strongly_consistent().await?; + assert_eq!(read.value, 99); + assert_ne!( + read.random, initial_random, + "task should have been re-executed after invalidation" + ); + + // Second eviction cycle + let (_, counts2) = tt2.backend().snapshot_and_evict_for_testing(&*tt2); + println!("transient_reader (2nd): evicted: {counts2:?}"); + + state.set(0); + + let read = output.read_strongly_consistent().await?; + assert_eq!(read.value, 0); + + anyhow::Ok(()) + }) + .await; + tt.stop_and_wait().await; + result.unwrap(); +} + +// ========================================================================= +// Stress test — concurrent eviction + restore +// ========================================================================= + +/// Adds an offset to a value — the offset parameter makes each call a unique +/// memoized task, creating truly independent intermediate tasks for fan-out. +#[turbo_tasks::function(operation)] +async fn add_offset(input: ResolvedVc, offset: u32) -> Result> { + let value = *input.await?.get(); + Ok(Vc::cell(value.wrapping_add(offset))) +} + +/// Multiplies by a factor — unique per factor argument. +#[turbo_tasks::function(operation)] +async fn multiply(input: ResolvedVc, factor: u32) -> Result> { + let value = *input.await?; + Ok(Vc::cell(value.wrapping_mul(factor))) +} + +/// Wide fan-out helper: creates `width` independent compute chains from a +/// single state. Each chain uses unique arguments (offset/factor) so they +/// produce distinct memoized tasks — `width * 2` intermediate persistent tasks +/// that are candidates for eviction. +#[turbo_tasks::function(operation)] +async fn fan_out(input: ResolvedVc, width: u32) -> Result> { + let mut total = 0u32; + for i in 0..width { + let a = add_offset(input, i).resolve().strongly_consistent().await?; + let b = multiply(a, i.wrapping_add(2)) + .resolve() + .strongly_consistent() + .await?; + total = total.wrapping_add(*b.await?); + } + Ok(Vc::cell(total)) +} + +/// Stress test: a background thread continuously evicts while the main task +/// invalidates and reads through a wide fan-out of tasks. This creates true +/// concurrency between eviction (which clears data under shard write locks) +/// and restore (which releases the lock during disk I/O then re-acquires). +/// +/// Before the restoring-bit fix, this would panic with "Cell no longer exists" +/// because eviction could clear data on a task mid-restore. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn eviction_stress_concurrent() { + let tt = create_tt_with_workers("eviction_stress_concurrent", 4); + let tt_evict = tt.clone(); + + let stop = Arc::new(AtomicBool::new(false)); + let stop_clone = stop.clone(); + let eviction_cycles = Arc::new(AtomicU64::new(0)); + let eviction_cycles_clone = eviction_cycles.clone(); + + // Background thread: snapshot+evict with a short sleep to avoid starving + // worker threads, but fast enough to race with restores. + let eviction_handle = tokio::task::spawn_blocking(move || { + while !stop_clone.load(Ordering::Relaxed) { + tt_evict + .backend() + .snapshot_and_evict_for_testing(&*tt_evict); + eviction_cycles_clone.fetch_add(1, Ordering::Relaxed); + std::thread::sleep(std::time::Duration::from_millis(1)); + } + }); + + let result = turbo_tasks::run_once(tt.clone(), async move { + unmark_top_level_task_may_leak_eventually_consistent_state(); + + let state_op = create_state(1); + let state_vc = state_op.resolve().strongly_consistent().await?; + let state = state_op.read_strongly_consistent().await?; + + // fan_out creates width * 2 intermediate tasks per call + let width = 20u32; + let output = fan_out(state_vc, width); + + // Helper: compute the expected fan_out result for a given state value. + // fan_out sums (state + i) * (i + 2) for i in 0..width. + let expected_for = |state_val: u32| -> u32 { + (0..width) + .map(|i| state_val.wrapping_add(i).wrapping_mul(i.wrapping_add(2))) + .fold(0u32, |acc, x| acc.wrapping_add(x)) + }; + + // Initial read to populate all tasks in memory, then wait for the + // background eviction thread to snapshot + evict at least once so data + // is on disk and eligible for eviction on subsequent cycles. + let read = *output.read_strongly_consistent().await?; + assert_eq!(read, expected_for(1)); + // Give the background eviction thread time to run a snapshot+evict cycle. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Run invalidation cycles while the background eviction thread is active. + // The sleep between eviction cycles gives worker threads time to start + // restoring, then eviction runs and races with in-flight restores. + for i in 1u32..=50 { + state.set(i); + let read = tokio::time::timeout( + std::time::Duration::from_secs(5), + output.read_strongly_consistent(), + ) + .await + .unwrap_or_else(|_| { + panic!( + "cycle {i}: timed out waiting for read — likely a restore/eviction race \ + corrupted the task graph" + ) + })?; + let read = *read; + assert_eq!( + read, + expected_for(i), + "cycle {i}: expected {}, got {read}", + expected_for(i) + ); + } + + anyhow::Ok(()) + }) + .await; + + stop.store(true, Ordering::Relaxed); + eviction_handle.await.unwrap(); + let cycles = eviction_cycles.load(Ordering::Relaxed); + println!("stress test completed with {cycles} eviction cycles"); + + tt.stop_and_wait().await; + result.unwrap(); +} diff --git a/turbopack/crates/turbo-tasks-backend/tests/test_config.trs b/turbopack/crates/turbo-tasks-backend/tests/test_config.trs index 31b5d2f92018..66973293108c 100644 --- a/turbopack/crates/turbo-tasks-backend/tests/test_config.trs +++ b/turbopack/crates/turbo-tasks-backend/tests/test_config.trs @@ -11,6 +11,7 @@ turbo_tasks_backend::BackendOptions { num_workers: Some(2), small_preallocation: true, + evict_after_snapshot: true, ..Default::default() }, turbo_tasks_backend::turbo_backing_storage( diff --git a/turbopack/crates/turbo-tasks-fs/src/lib.rs b/turbopack/crates/turbo-tasks-fs/src/lib.rs index f422275d6f4d..b693be610cc6 100644 --- a/turbopack/crates/turbo-tasks-fs/src/lib.rs +++ b/turbopack/crates/turbo-tasks-fs/src/lib.rs @@ -63,9 +63,8 @@ use tracing::Instrument; use turbo_rcstr::{RcStr, rcstr}; use turbo_tasks::{ Completion, Effect, EffectStateStorage, InvalidationReason, NonLocalValue, ReadRef, ResolvedVc, - TaskInput, TurboTasksApi, ValueToString, ValueToStringRef, Vc, debug::ValueDebugFormat, - emit_effect, mark_session_dependent, parallel, trace::TraceRawVcs, turbo_tasks_weak, turbobail, - turbofmt, + TaskInput, TurboTasksApi, ValueToString, ValueToStringRef, Vc, emit_effect, + mark_session_dependent, parallel, trace::TraceRawVcs, turbo_tasks_weak, turbobail, turbofmt, }; use turbo_tasks_hash::{ DeterministicHash, DeterministicHasher, HashAlgorithm, deterministic_hash, hash_xxh3_hash64, @@ -240,77 +239,57 @@ pub trait FileSystem: ValueToString { fn metadata(self: Vc, fs_path: FileSystemPath) -> Vc; } -#[derive(TraceRawVcs, ValueDebugFormat, NonLocalValue, Encode, Decode)] -struct DiskFileSystemInner { +pub(crate) struct DiskFileSystemSessionInner { pub name: RcStr, pub root: RcStr, - #[turbo_tasks(debug_ignore, trace_ignore)] - #[bincode(skip)] - mutex_map: MutexMap, - #[turbo_tasks(debug_ignore, trace_ignore)] - #[bincode(skip)] - invalidator_map: InvalidatorMap, - #[turbo_tasks(debug_ignore, trace_ignore)] - #[bincode(skip)] - dir_invalidator_map: InvalidatorMap, + pub(crate) mutex_map: MutexMap, + pub(crate) invalidator_map: InvalidatorMap, + pub(crate) dir_invalidator_map: InvalidatorMap, /// Lock that makes invalidation atomic. It will keep a write lock during /// watcher invalidation and a read lock during other operations. - #[turbo_tasks(debug_ignore, trace_ignore)] - #[bincode(skip)] - invalidation_lock: RwLock<()>, + pub(crate) invalidation_lock: RwLock<()>, /// Semaphore to limit the maximum number of concurrent file operations. - #[turbo_tasks(debug_ignore, trace_ignore)] - #[bincode(skip, default = "create_read_semaphore")] - read_semaphore: tokio::sync::Semaphore, + pub(crate) read_semaphore: tokio::sync::Semaphore, /// Semaphore to limit the maximum number of concurrent file operations. - #[turbo_tasks(debug_ignore, trace_ignore)] - #[bincode(skip, default = "create_write_semaphore")] - write_semaphore: tokio::sync::Semaphore, + pub(crate) write_semaphore: tokio::sync::Semaphore, - #[turbo_tasks(debug_ignore, trace_ignore)] - watcher: DiskWatcher, - /// Root paths that we do not allow access to from this filesystem. - /// Useful for things like output directories to prevent accidental ouroboros situations. - denied_paths: Vec, + pub(crate) watcher: DiskWatcher, /// Used by invalidators when called from a non-turbo-tasks thread, specifically in the fs /// watcher. - #[turbo_tasks(debug_ignore, trace_ignore)] - #[bincode(skip, default = "turbo_tasks_weak")] - turbo_tasks: Weak, + pub(crate) turbo_tasks: Weak, /// Used by invalidators when called from a non-tokio thread, specifically in the fs watcher. + pub(crate) tokio_handle: Handle, + pub(crate) effect_state_storage: EffectStateStorage, +} + +impl TraceRawVcs for DiskFileSystemSessionInner { + fn trace_raw_vcs(&self, _trace_context: &mut turbo_tasks::trace::TraceRawVcsContext) { + // All fields are transient session-local state that cannot reference Vcs. + } +} + +// SAFETY: `DiskFileSystemSessionInner` contains only process-local state (file watchers, +// semaphores, invalidator maps, effect storage, and scalar strings). No Vc is stored, so this +// type is safe to use across task boundaries without participating in turbo-tasks tracking. +unsafe impl NonLocalValue for DiskFileSystemSessionInner {} + +/// Transient, session-scoped state backing a [`DiskFileSystem`]. Holds the watcher, invalidator +/// maps, semaphores, and other runtime state that cannot be serialized to disk. Marked +/// `serialization = "none"` so any task that reads it becomes transient and therefore +/// non-evictable (see eviction logic in the turbo-tasks backend). +#[turbo_tasks::value(serialization = "skip", evict = "never", cell = "new", eq = "manual")] +pub struct DiskFileSystemSession { #[turbo_tasks(debug_ignore, trace_ignore)] - #[bincode(skip, default = "Handle::current")] - tokio_handle: Handle, - #[turbo_tasks(debug_ignore, trace_ignore)] - #[bincode(skip)] - effect_state_storage: EffectStateStorage, + inner: Arc, } -impl DiskFileSystemInner { +impl DiskFileSystemSessionInner { /// Returns the root as Path fn root_path(&self) -> &Path { // just in case there's a windows unc path prefix we remove it with `dunce` simplified(Path::new(&*self.root)) } - /// Checks if a path is within the denied path - /// Returns true if the path should be treated as non-existent - /// - /// Since denied_paths are guaranteed to be: - /// - normalized (no ../ traversals) - /// - using unix separators (/) - /// - relative to the fs root - /// - /// We can efficiently check using string operations - fn is_path_denied(&self, path: &FileSystemPath) -> bool { - let path = &path.path; - self.denied_paths.iter().any(|denied_path| { - path.starts_with(denied_path.as_str()) - && (path.len() == denied_path.len() - || path.as_bytes().get(denied_path.len()) == Some(&b'/')) - }) - } - /// registers the path as an invalidator for the current task, /// has to be called within a turbo-tasks function async fn register_read_invalidator(&self, path: &Path) -> Result<()> { @@ -502,43 +481,78 @@ impl DiskFileSystemInner { } #[derive(Clone, ValueToString)] -#[value_to_string(self.inner.name)] +#[value_to_string(self.name)] #[turbo_tasks::value(cell = "new", eq = "manual")] pub struct DiskFileSystem { - inner: Arc, + pub name: RcStr, + pub root: RcStr, + /// Root paths that we do not allow access to from this filesystem. + /// Useful for things like output directories to prevent accidental ouroboros situations. + denied_paths: Vec, + /// Transient session state (watcher, invalidator maps, semaphores, ...). + /// `DiskFileSystemSession` is `serialization = "none"`, so any task that resolves this Vc + /// becomes transient and therefore non-evictable. + session: ResolvedVc, } impl DiskFileSystem { pub fn name(&self) -> &RcStr { - &self.inner.name + &self.name } pub fn root(&self) -> &RcStr { - &self.inner.root + &self.root } - pub fn invalidate(&self) { - self.inner.invalidate(); + /// Returns the root as Path + fn root_path(&self) -> &Path { + // just in case there's a windows unc path prefix we remove it with `dunce` + simplified(Path::new(&*self.root)) + } + + /// Checks if a path is within the denied path + fn is_path_denied(&self, path: &FileSystemPath) -> bool { + let path = &path.path; + self.denied_paths.iter().any(|denied_path| { + path.starts_with(denied_path.as_str()) + && (path.len() == denied_path.len() + || path.as_bytes().get(denied_path.len()) == Some(&b'/')) + }) } - pub fn invalidate_with_reason( + async fn session(&self) -> Result> { + (*self.session).await + } + + pub async fn invalidate(&self) -> Result<()> { + self.session().await?.inner.invalidate(); + Ok(()) + } + + pub async fn invalidate_with_reason( &self, reason: impl Fn(&Path) -> R + Sync, - ) { - self.inner.invalidate_with_reason(reason); + ) -> Result<()> { + self.session().await?.inner.invalidate_with_reason(reason); + Ok(()) } - pub fn invalidate_path_and_children_with_reason( + pub async fn invalidate_path_and_children_with_reason( &self, paths: impl IntoIterator, reason: impl Fn(&Path) -> R + Sync, - ) { - self.inner + ) -> Result<()> { + self.session() + .await? + .inner .invalidate_path_and_children_with_reason(paths, reason); + Ok(()) } pub async fn start_watching(&self, poll_interval: Option) -> Result<()> { - self.inner + self.session() + .await? + .inner .start_watching_internal(false, poll_interval) .await } @@ -547,13 +561,16 @@ impl DiskFileSystem { &self, poll_interval: Option, ) -> Result<()> { - self.inner + self.session() + .await? + .inner .start_watching_internal(true, poll_interval) .await } - pub async fn stop_watching(&self) { - self.inner.watcher.stop_watching().await; + pub async fn stop_watching(&self) -> Result<()> { + self.session().await?.inner.watcher.stop_watching().await; + Ok(()) } /// Try to convert [`Path`] to [`FileSystemPath`]. Return `None` if the file path leaves the @@ -582,7 +599,7 @@ impl DiskFileSystem { // DiskFileSystem root let normalized_sys_path = sys_path.normalize_lexically().ok()?; normalized_sys_path - .strip_prefix(self.inner.root_path()) + .strip_prefix(self.root_path()) .ok()? .to_owned() } else if let Some(relative_to) = relative_to { @@ -604,7 +621,7 @@ impl DiskFileSystem { } pub fn to_sys_path(&self, fs_path: &FileSystemPath) -> PathBuf { - let path = self.inner.root_path(); + let path = self.root_path(); if fs_path.path.is_empty() { path.to_path_buf() } else { @@ -677,10 +694,10 @@ impl DiskFileSystem { denied_paths: Vec, ) -> Result> { let root = root.owned().await?; - let instance = DiskFileSystem { - inner: Arc::new(DiskFileSystemInner { - name, - root, + let session = DiskFileSystemSession { + inner: Arc::new(DiskFileSystemSessionInner { + name: name.clone(), + root: root.clone(), mutex_map: Default::default(), invalidation_lock: Default::default(), invalidator_map: InvalidatorMap::new(), @@ -688,20 +705,27 @@ impl DiskFileSystem { read_semaphore: create_read_semaphore(), write_semaphore: create_write_semaphore(), watcher: DiskWatcher::new(), - denied_paths, turbo_tasks: turbo_tasks_weak(), tokio_handle: Handle::current(), effect_state_storage: EffectStateStorage::default(), }), - }; + } + .cell() + .to_resolved() + .await?; - Ok(Self::cell(instance)) + Ok(Self::cell(DiskFileSystem { + name, + root, + denied_paths, + session, + })) } } impl Debug for DiskFileSystem { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - write!(f, "name: {}, root: {}", self.inner.name, self.inner.root) + write!(f, "name: {}, root: {}", self.name, self.root) } } @@ -712,17 +736,19 @@ impl FileSystem for DiskFileSystem { mark_session_dependent(); // Check if path is denied - if so, treat as NotFound - if self.inner.is_path_denied(&fs_path) { + if self.is_path_denied(&fs_path) { return Ok(FileContent::NotFound.cell()); } let full_path = self.to_sys_path(&fs_path); - self.inner.register_read_invalidator(&full_path).await?; + let session = self.session().await?; + let inner = &session.inner; + inner.register_read_invalidator(&full_path).await?; - let _lock = self.inner.lock_path(&full_path).await; + let _lock = inner.lock_path(&full_path).await; let content = match retry_blocking(|| File::from_path(&full_path)) .instrument(tracing::info_span!("read file", name = ?full_path)) - .concurrency_limited(&self.inner.read_semaphore) + .concurrency_limited(&inner.read_semaphore) .await { Ok(file) => FileContent::new(file), @@ -740,17 +766,19 @@ impl FileSystem for DiskFileSystem { mark_session_dependent(); // Check if directory itself is denied - if so, treat as NotFound - if self.inner.is_path_denied(&fs_path) { + if self.is_path_denied(&fs_path) { return Ok(RawDirectoryContent::not_found()); } let full_path = self.to_sys_path(&fs_path); - self.inner.register_dir_invalidator(&full_path).await?; + let session = self.session().await?; + let inner = &session.inner; + inner.register_dir_invalidator(&full_path).await?; // we use the sync std function here as it's a lot faster (600%) in node-file-trace let read_dir = match retry_blocking(|| std::fs::read_dir(&full_path)) .instrument(tracing::info_span!("read directory", name = ?full_path)) - .concurrency_limited(&self.inner.read_semaphore) + .concurrency_limited(&inner.read_semaphore) .await { Ok(dir) => dir, @@ -768,7 +796,6 @@ impl FileSystem for DiskFileSystem { }; let dir_path = fs_path.path.as_str(); let denied_entries: FxHashSet<&str> = self - .inner .denied_paths .iter() .filter_map(|denied_path| { @@ -830,17 +857,19 @@ impl FileSystem for DiskFileSystem { mark_session_dependent(); // Check if path is denied - if so, treat as NotFound - if self.inner.is_path_denied(&fs_path) { + if self.is_path_denied(&fs_path) { return Ok(LinkContent::NotFound.cell()); } let full_path = self.to_sys_path(&fs_path); - self.inner.register_read_invalidator(&full_path).await?; + let session = self.session().await?; + let inner = &session.inner; + inner.register_read_invalidator(&full_path).await?; - let _lock = self.inner.lock_path(&full_path).await; + let _lock = inner.lock_path(&full_path).await; let link_path = match retry_blocking(|| std::fs::read_link(&full_path)) .instrument(tracing::info_span!("read symlink", name = ?full_path)) - .concurrency_limited(&self.inner.read_semaphore) + .concurrency_limited(&inner.read_semaphore) .await { Ok(res) => res, @@ -874,7 +903,7 @@ impl FileSystem for DiskFileSystem { // // we use `dunce::simplify` to strip a potential UNC prefix on windows, on any // other OS this gets compiled away - let result = simplified(&file).strip_prefix(simplified(Path::new(&self.inner.root))); + let result = simplified(&file).strip_prefix(simplified(Path::new(&self.root))); let relative_to_root_path = match result { Ok(file) => PathBuf::from(sys_to_unix(&file.to_string_lossy()).as_ref()), @@ -921,7 +950,7 @@ impl FileSystem for DiskFileSystem { // session. All side effects are reexecuted in general. // Check if path is denied - if so, return an error - if self.inner.is_path_denied(&fs_path) { + if self.is_path_denied(&fs_path) { turbobail!("Cannot write to denied path: {fs_path}"); } let full_path = self.to_sys_path(&fs_path); @@ -933,12 +962,12 @@ impl FileSystem for DiskFileSystem { // recomputation. let content = content.persist().await?; - let inner = self.inner.clone(); + let inner = self.session().await?.inner.clone(); #[derive(TraceRawVcs, NonLocalValue)] struct WriteEffect { full_path: PathBuf, - inner: Arc, + inner: Arc, content: ReadRef, content_hash: u128, } @@ -1102,19 +1131,19 @@ impl FileSystem for DiskFileSystem { // re-executed in general. // Check if path is denied - if so, return an error - if self.inner.is_path_denied(&fs_path) { + if self.is_path_denied(&fs_path) { turbobail!("Cannot write link to denied path: {fs_path}"); } let content = target.await?; let full_path = self.to_sys_path(&fs_path); - let inner = self.inner.clone(); + let inner = self.session().await?.inner.clone(); #[derive(TraceRawVcs, NonLocalValue)] struct WriteLinkEffect { full_path: PathBuf, - inner: Arc, + inner: Arc, content: ReadRef, content_hash: u128, } @@ -1382,16 +1411,18 @@ impl FileSystem for DiskFileSystem { let full_path = self.to_sys_path(&fs_path); // Check if path is denied - if so, return an error (metadata shouldn't be readable) - if self.inner.is_path_denied(&fs_path) { + if self.is_path_denied(&fs_path) { turbobail!("Cannot read metadata from denied path: {fs_path}"); } - self.inner.register_read_invalidator(&full_path).await?; + let session = self.session().await?; + let inner = &session.inner; + inner.register_read_invalidator(&full_path).await?; - let _lock = self.inner.lock_path(&full_path).await; + let _lock = inner.lock_path(&full_path).await; let meta = retry_blocking(|| std::fs::metadata(&full_path)) .instrument(tracing::info_span!("read metadata", name = ?full_path)) - .concurrency_limited(&self.inner.read_semaphore) + .concurrency_limited(&inner.read_semaphore) .await .with_context(|| format!("reading metadata for {:?}", full_path))?; diff --git a/turbopack/crates/turbo-tasks-fs/src/watcher.rs b/turbopack/crates/turbo-tasks-fs/src/watcher.rs index 173fea7d5a39..d9ba5c8a2b0c 100644 --- a/turbopack/crates/turbo-tasks-fs/src/watcher.rs +++ b/turbopack/crates/turbo-tasks-fs/src/watcher.rs @@ -12,7 +12,6 @@ use std::{ }; use anyhow::{Context, Result}; -use bincode::{Decode, Encode}; use notify::{ Config, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher, event::{MetadataKind, ModifyKind, RenameMode}, @@ -27,7 +26,7 @@ use turbo_tasks::{ }; use crate::{ - DiskFileSystemInner, format_absolute_fs_path, + DiskFileSystemSessionInner, format_absolute_fs_path, invalidation::{WatchChange, WatchStart}, invalidator_map::LockedInvalidatorMap, path_map::OrderedPathMapExt, @@ -61,9 +60,7 @@ static WATCH_RECURSIVE_MODE: LazyLock = LazyLock::new(|| { } }); -#[derive(Encode, Decode)] pub(crate) struct DiskWatcher { - #[bincode(skip)] state: State, } @@ -360,7 +357,7 @@ impl DiskWatcher { /// - Doesn't emit Modify events after a Create event pub async fn start_watching( &self, - fs_inner: Arc, + fs_inner: Arc, report_invalidation_reason: bool, poll_interval: Option, ) -> Result<()> { @@ -478,7 +475,7 @@ impl DiskWatcher { fn watch_thread( &self, rx: Receiver>, - fs_inner: Arc, + fs_inner: Arc, report_invalidation_reason: bool, ) { let mut batched_invalidate_path = FxHashSet::default(); @@ -771,7 +768,7 @@ impl DiskWatcher { fields(name = %path.display()) )] fn invalidate( - inner: &DiskFileSystemInner, + inner: &DiskFileSystemSessionInner, turbo_tasks: &dyn TurboTasksApi, report_invalidation_reason: bool, path: &Path, @@ -787,7 +784,7 @@ fn invalidate( } fn invalidate_path( - inner: &DiskFileSystemInner, + inner: &DiskFileSystemSessionInner, turbo_tasks: &dyn TurboTasksApi, report_invalidation_reason: bool, invalidator_map: &mut LockedInvalidatorMap, @@ -803,7 +800,7 @@ fn invalidate_path( } fn invalidate_path_and_children_execute( - inner: &DiskFileSystemInner, + inner: &DiskFileSystemSessionInner, turbo_tasks: &dyn TurboTasksApi, report_invalidation_reason: bool, invalidator_map: &mut LockedInvalidatorMap, diff --git a/turbopack/crates/turbo-tasks-macros/src/derive/task_storage_macro.rs b/turbopack/crates/turbo-tasks-macros/src/derive/task_storage_macro.rs index efbbae9d01d6..37039d7cd70b 100644 --- a/turbopack/crates/turbo-tasks-macros/src/derive/task_storage_macro.rs +++ b/turbopack/crates/turbo-tasks-macros/src/derive/task_storage_macro.rs @@ -69,6 +69,17 @@ struct FieldInfo { /// If true, drop this field entirely after execution completes if the task is immutable. /// Immutable tasks don't re-execute, so dependency tracking fields are not needed. drop_on_completion_if_immutable: bool, + /// If true, the macro dispatches to `FieldType::drop_partial(&mut v) -> bool` + /// in the generated `TaskStorage::drop_partial` lazy retain_mut arm instead + /// of the default wholesale reset. The method's `bool` return signals whether + /// residue remains (`true` keeps the variant). On restore, the incoming + /// persistent entries are merged into the residue via `extend`, so the field + /// type must support `extend(IntoIterator)` through the usual + /// newtype `DerefMut`. See `CellData::drop_partial` for the canonical example. + /// + /// Cannot be combined with `filter_transient` (both produce residue) or + /// `inline` (the current consumer is a lazy field; keep the surface small). + custom_drop_partial: bool, /// Optional override for the underlying map type, used when the field is a /// newtype wrapping `AutoMap` (or similar) so callers can inject /// custom bincode / accessor behavior while the macro still generates map @@ -373,6 +384,7 @@ fn parse_field_storage_attributes(field: &syn::Field) -> FieldInfo { let mut use_default = false; let mut shrink_on_completion = false; let mut drop_on_completion_if_immutable = false; + let mut custom_drop_partial = false; let mut as_type: Option = None; // Find and parse the field attribute @@ -494,13 +506,15 @@ fn parse_field_storage_attributes(field: &syn::Field) -> FieldInfo { shrink_on_completion = true; } else if ident == "drop_on_completion_if_immutable" { drop_on_completion_if_immutable = true; + } else if ident == "custom_drop_partial" { + custom_drop_partial = true; } else { meta.span() .unwrap() .error(format!( "unknown modifier `{ident}`, expected `inline`, \ - `filter_transient`, `default`, `shrink_on_completion`, or \ - `drop_on_completion_if_immutable`" + `filter_transient`, `default`, `shrink_on_completion`, \ + `drop_on_completion_if_immutable`, or `custom_drop_partial`" )) .emit(); } @@ -597,6 +611,39 @@ fn parse_field_storage_attributes(field: &syn::Field) -> FieldInfo { } } + if custom_drop_partial { + if inline { + field_name + .span() + .unwrap() + .error(format!( + "`custom_drop_partial` on inline field `{field_name}` is not supported; move \ + the field to lazy storage or extend the macro to handle inline custom drops" + )) + .emit(); + } + if filter_transient { + field_name + .span() + .unwrap() + .error(format!( + "`custom_drop_partial` cannot be combined with `filter_transient` on \ + `{field_name}`: both paths produce residue and the semantics would conflict" + )) + .emit(); + } + if !is_collection { + field_name + .span() + .unwrap() + .error(format!( + "`custom_drop_partial` on field `{field_name}` requires a collection storage \ + type (auto_set, auto_map, counter_map)" + )) + .emit(); + } + } + FieldInfo { is_pub, field_name, @@ -609,6 +656,7 @@ fn parse_field_storage_attributes(field: &syn::Field) -> FieldInfo { use_default, shrink_on_completion, drop_on_completion_if_immutable, + custom_drop_partial, as_type, } } @@ -674,8 +722,35 @@ impl GroupedFields { } /// Returns an iterator over all lazy fields (both data and meta categories). + /// + /// The order is **sorted by category** — transient variants first, then + /// meta, then data — with schema declaration order preserved within each + /// category. This grouping is load-bearing for codegen: contiguous + /// categories in the generated `LazyField` enum let LLVM lower + /// `is_persistent()` / `is_meta()` to a single integer range check on the + /// discriminant tag instead of a per-variant jump table. + /// + /// Every downstream generator (enum declaration, `index_and_persistence`, + /// restore merge arms, `build_lazy_index`) iterates `all_lazy()` and uses + /// `enumerate()` positions as the variant index, so they all pick up the + /// same sorted order consistently. `persistent_lazy(category)` does not + /// need to match this order because its consumers (bincode encode/decode, + /// clone arms) use per-category enumeration and Rust match arms are + /// order-independent. fn all_lazy(&self) -> impl Iterator { - self.fields.iter().filter(|f| !f.is_flag() && f.lazy) + let mut lazy: Vec<&FieldInfo> = self + .fields + .iter() + .filter(|f| !f.is_flag() && f.lazy) + .collect(); + // Stable sort by category rank; within a category, preserve schema + // declaration order. + lazy.sort_by_key(|f| match f.category { + Category::Transient => 0u8, + Category::Meta => 1, + Category::Data => 2, + }); + lazy.into_iter() } /// Returns true if there are any lazy fields. @@ -723,16 +798,47 @@ fn gen_clone_inline_fields<'a>(fields: impl Iterator) -> V .collect() } -/// Generate inline field restore assignments: `self.field = source.field;` -fn gen_restore_inline_fields<'a>(fields: impl Iterator) -> Vec { - fields - .map(|field| { - let field_name = &field.field_name; +fn gen_restore_inline_field(field: &FieldInfo) -> TokenStream { + let field_name = &field.field_name; + if !field.filter_transient { + return quote! { + self.#field_name = source.#field_name; + }; + } + match field.storage_type { + StorageType::Direct => { + // Inline `Option` with `T: is_transient()`. Residue in `self` + // means a transient value is live and newer than the disk value — + // prefer the residue; otherwise take the source. quote! { - self.#field_name = source.#field_name; + if self.#field_name.is_none() { + self.#field_name = source.#field_name; + } } - }) - .collect() + } + StorageType::AutoSet => { + quote! { + if self.#field_name.is_empty() { + self.#field_name = source.#field_name; + } else { + self.#field_name.extend(source.#field_name); + } + } + } + StorageType::CounterMap | StorageType::AutoMap => { + // CounterMap / AutoMap: transient residue (if any) is keyed by + // transient task ids; source entries are keyed by persistent ids. + // These key spaces are disjoint, so `extend` merges cleanly. + quote! { + if self.#field_name.is_empty() { + self.#field_name = source.#field_name; + } else { + self.#field_name.extend(source.#field_name); + } + } + } + StorageType::Flag => unreachable!(), + } } /// Generate lazy field match arms with a custom body that also receives the index. @@ -780,6 +886,9 @@ fn generate_task_storage_impl(_ident: &Ident, grouped_fields: &GroupedFields) -> // Generate snapshot clone and restore methods let snapshot_restore_methods = generate_snapshot_restore_methods(grouped_fields); + // Generate eviction methods + let eviction_methods = generate_drop_method(grouped_fields); + quote! { // Import ShrinkToFit trait for the derive macro generated code use turbo_tasks::ShrinkToFit as _; @@ -802,6 +911,9 @@ fn generate_task_storage_impl(_ident: &Ident, grouped_fields: &GroupedFields) -> // Generated snapshot clone and restore methods #snapshot_restore_methods + // Generated eviction methods + #eviction_methods + // Generated TaskStorageAccessors trait #accessors_trait } @@ -949,6 +1061,22 @@ fn generate_task_flags_bitfield(grouped_fields: &GroupedFields) -> TokenStream { self.0 = (self.0 & !Self::PERSISTED_MASK) | (bits & Self::PERSISTED_MASK); } + #[doc = "Clear all persisted meta flag bits, preserving transient flags."] + #[doc = ""] + #[doc = "Called by `drop_partial` when evicting the meta category so the"] + #[doc = "bitfield reflects \"no persisted meta state present\" — required"] + #[doc = "for `is_empty()` to accept fully-evicted tasks for removal."] + pub fn clear_persisted_meta_bits(&mut self) { + self.0 &= !Self::META_MASK; + } + + #[doc = "Clear all persisted data flag bits, preserving transient flags."] + #[doc = ""] + #[doc = "Counterpart of `clear_persisted_meta_bits` for the data category."] + pub fn clear_persisted_data_bits(&mut self) { + self.0 &= !Self::DATA_MASK; + } + #[doc = "Create from raw bits (for deserialization)"] pub fn from_bits(bits: u16) -> Self { Self(bits) @@ -1002,29 +1130,74 @@ fn generate_lazy_field_enum(grouped_fields: &GroupedFields) -> TokenStream { }) .collect(); - // Generate is_persistent (transient check) method arms - let is_persistent_arms: Vec<_> = all_lazy_fields + // Or-pattern lists for `is_persistent` / `is_meta`. Because `all_lazy()` + // returns variants grouped by category (transient, then meta, then data), + // these lists cover contiguous runs of the enum, giving LLVM the clearest + // shape to lower each predicate to a single integer range check on the + // discriminant tag. + let persistent_patterns: Vec<_> = all_lazy_fields .iter() - .map(|field| { - let variant_name = &field.variant_name; - let is_persistent = !field.is_transient(); - quote! { - LazyField::#variant_name(_) => #is_persistent - } + .filter(|f| !f.is_transient()) + .map(|f| { + let variant_name = &f.variant_name; + quote! { LazyField::#variant_name(_) } + }) + .collect(); + let meta_patterns: Vec<_> = all_lazy_fields + .iter() + .filter(|f| f.category == Category::Meta) + .map(|f| { + let variant_name = &f.variant_name; + quote! { LazyField::#variant_name(_) } + }) + .collect(); + let data_patterns: Vec<_> = all_lazy_fields + .iter() + .filter(|f| f.category == Category::Data) + .map(|f| { + let variant_name = &f.variant_name; + quote! { LazyField::#variant_name(_) } }) .collect(); - // Generate is_meta/is_data method arms - let is_meta_arms: Vec<_> = all_lazy_fields + // `matches!(self, ... | ... )` requires at least one pattern. Fall back to + // `false` if the schema has no variants in a given category. + let is_persistent_body = if persistent_patterns.is_empty() { + quote! { false } + } else { + quote! { matches!(self, #(#persistent_patterns)|*) } + }; + let is_meta_body = if meta_patterns.is_empty() { + quote! { false } + } else { + quote! { matches!(self, #(#meta_patterns)|*) } + }; + let is_data_body = if data_patterns.is_empty() { + quote! { false } + } else { + quote! { matches!(self, #(#data_patterns)|*) } + }; + + // (discriminant, is_meta, is_data) arms for the restore prescan — each + // variant maps to its position in the enum definition (used as a + // fixed-size array offset) paired with its category bits. Transient + // variants have both category bits false; persistent variants set + // exactly one. `is_meta || is_data` therefore doubles as `is_persistent`. + let index_and_category_arms: Vec<_> = all_lazy_fields .iter() - .map(|field| { + .enumerate() + .map(|(idx, field)| { let variant_name = &field.variant_name; + let idx = idx as u8; let is_meta = field.category == Category::Meta; + let is_data = field.category == Category::Data; quote! { - LazyField::#variant_name(_) => #is_meta + LazyField::#variant_name(_) => (#idx, #is_meta, #is_data) } }) .collect(); + let num_variants = all_lazy_fields.len(); + let num_variants_tok = quote::quote! { #num_variants }; quote! { #[doc = "All lazily-allocated fields stored in a single Vec."] @@ -1038,6 +1211,9 @@ fn generate_lazy_field_enum(grouped_fields: &GroupedFields) -> TokenStream { #[automatically_derived] impl LazyField { + #[doc = "Total number of LazyField variants."] + pub const NUM_VARIANTS: usize = #num_variants_tok; + #[doc = "Returns true if this field is empty (can be removed from the Vec)"] pub fn is_empty(&self) -> bool { match self { @@ -1045,23 +1221,48 @@ fn generate_lazy_field_enum(grouped_fields: &GroupedFields) -> TokenStream { } } - #[doc = "Returns true if this field should be persisted (not transient)"] + #[doc = "Returns true if this field should be persisted (not transient)."] + #[doc = ""] + #[doc = "Variants are sorted so persistent variants form a contiguous"] + #[doc = "range; LLVM can lower this `matches!` to a single integer"] + #[doc = "compare on the discriminant tag."] + #[inline] pub fn is_persistent(&self) -> bool { - match self { - #(#is_persistent_arms),* - } + #is_persistent_body } - #[doc = "Returns true if this field belongs to the meta category"] + #[doc = "Returns true if this field belongs to the meta category."] + #[doc = ""] + #[doc = "Meta variants form a contiguous range between the transient"] + #[doc = "prefix and the data suffix; expect a range-check lowering."] + #[inline] pub fn is_meta(&self) -> bool { - match self { - #(#is_meta_arms),* - } + #is_meta_body } - #[doc = "Returns true if this field belongs to the data category"] + #[doc = "Returns true if this field belongs to the data category."] + #[doc = ""] + #[doc = "Data variants form the trailing contiguous range of the"] + #[doc = "enum; expect a range-check lowering."] + #[inline] pub fn is_data(&self) -> bool { - !self.is_meta() + #is_data_body + } + + #[doc = "Variant index paired with its category bits."] + #[doc = ""] + #[doc = "Index is the variant's position in the LazyField enum"] + #[doc = "definition, usable as an array offset of size `NUM_VARIANTS`."] + #[doc = "The two bools report the variant's category: transient"] + #[doc = "variants have both false, persistent variants set exactly"] + #[doc = "one (so `is_meta || is_data` is equivalent to `is_persistent`)."] + #[doc = "Used by the restore prescan to answer both \"where does this"] + #[doc = "variant live?\" and \"which category's residue does it count"] + #[doc = "toward?\" in a single match."] + const fn index_and_category(&self) -> (u8, bool, bool) { + match self { + #(#index_and_category_arms),* + } } } } @@ -2631,6 +2832,175 @@ fn generate_cleanup_after_execution(grouped_fields: &GroupedFields) -> TokenStre } } +/// Generate the `drop_data()`, `drop_meta()`, and `drop_data_and_meta()` methods +/// for TaskStorage. +/// +/// These methods clear persistent category fields for eviction. They must be +/// generated by the macro because they need to know which specific inline fields +/// belong to each category. +/// +/// For `filter_transient` fields the generator emits a check-then-drop-or-retain +/// pattern: the overwhelmingly common case is that no transient entries are present, +/// and we want that to be a single linear `any()` scan followed by a cheap +/// `Default::default()` reset. If transient entries do exist, we `retain` them and +/// leave the field as residue — eviction proceeds (the persistent portion is +/// recoverable from disk) and `restore_*_from` will merge the persistent portion +/// back in later. +fn generate_drop_method(grouped_fields: &GroupedFields) -> TokenStream { + let drop_data_inline: Vec<_> = grouped_fields + .persistent_inline(Category::Data) + .map(gen_drop_inline_field) + .collect(); + let drop_meta_inline: Vec<_> = grouped_fields + .persistent_inline(Category::Meta) + .map(gen_drop_inline_field) + .collect(); + + let drop_lazy_arms: Vec<_> = grouped_fields + .all_lazy() + .filter(|f| !f.is_transient() && (f.filter_transient || f.custom_drop_partial)) + .map(gen_drop_lazy_match_arm) + .collect(); + + let mut empty_checks_inline = Vec::new(); + empty_checks_inline.push(quote! { + self.flags.0 == 0 + }); + empty_checks_inline.push(quote! { + self.lazy.iter().all(|f| f.is_empty()) + }); + for field in grouped_fields.all_inline() { + let field_name = &field.field_name; + empty_checks_inline.push(match field.storage_type { + StorageType::AutoMap | StorageType::AutoSet | StorageType::CounterMap => { + quote! { + self.#field_name.is_empty() + } + } + StorageType::Direct => { + quote! { + self.#field_name == Default::default() + } + } + StorageType::Flag => { + unreachable!(); + } + }); + } + + quote! { + #[automatically_derived] + impl TaskStorage { + + pub fn is_empty(&self) -> bool { + #( #empty_checks_inline )&&* + } + + /// Drop persistent fields so the task can be evicted. + /// + /// For each `filter_transient` field, transient entries are retained as + /// residue (they cannot be reconstructed from disk); for all other + /// persistent fields the field is reset to its default. Transient fields + /// (non-persistent) are never touched. After this returns, the caller can + /// check `is_empty()` to decide whether the whole task entry can be + /// removed or whether transient residue forced a partial eviction. + /// + /// `data_restored` / `meta_restored` flags are cleared for the dropped + /// categories so the next access triggers a restore. `prefetched` is + /// cleared unconditionally. + pub fn drop_partial(&mut self, data: bool, meta: bool) { + debug_assert!(data || meta, "at least one of data and meta must be true"); + if data { + #(#drop_data_inline)* + // Clear persisted data flag bits so they don't keep an + // otherwise-evicted task looking non-empty. They come back + // via `set_persisted_data_bits` on restore. + self.flags.clear_persisted_data_bits(); + self.flags.set_data_restored(false); + } + if meta { + #(#drop_meta_inline)* + self.flags.clear_persisted_meta_bits(); + self.flags.set_meta_restored(false); + } + self.flags.set_prefetched(false); + // Walk lazy variants: non-persistent are preserved; persistent ones + // are either fully removed (non-filter_transient) or scanned for + // transient residue (filter_transient), dropping the variant only if + // it becomes empty. + self.lazy.retain_mut(|f| { + if !f.is_persistent() { + // Transient variants normally stay put, but drop + // empty ones. They accumulate as zombies when cells + // get consumed without the task re-running (so + // `shrink_on_completion` never fires), and the empty + // `LazyField` variant blocks `is_empty()` from + // accepting the task for full eviction. + return !f.is_empty(); + } + let drop_this_category = if f.is_data() { data } else { meta }; + if !drop_this_category { + return true; + } + match f { + #(#drop_lazy_arms)* + _ => false, + } + }); + self.lazy.shrink_to_fit(); + } + + } + } +} + +/// Generate the drop statement for a single persistent inline field. +/// +/// For `filter_transient` fields: check for transient entries first and `retain` +/// them only if any exist, otherwise reset to default (hot path — single linear +/// scan, no per-element work on the happy path). +/// For non-filtered fields: unconditional `Default::default()` reset. +fn gen_drop_inline_field(field: &FieldInfo) -> TokenStream { + let field_name = &field.field_name; + if !field.filter_transient { + return quote! { + self.#field_name = Default::default(); + }; + } + let target = quote! { self.#field_name }; + if let StorageType::Direct = field.storage_type { + // the drop partial implementation resets to None so we don't need an assignment + quote! { + (#target).drop_partial(); + } + } else { + quote! { + if !(#target).drop_partial() { + #target = Default::default(); + } + + } + } +} + +/// Generate the match arm for a persistent lazy variant in `drop_partial`'s +/// `retain_mut` closure. The closure returns `true` to keep the variant and +/// `false` to remove it. +/// +/// For non-`filter_transient` fields: always remove (`false`). +/// For `filter_transient` fields: check for transient entries; if none, remove; +/// if any, retain them in place and keep the variant. +fn gen_drop_lazy_match_arm(field: &FieldInfo) -> TokenStream { + let variant_name = &field.variant_name; + assert!(field.filter_transient || field.custom_drop_partial); + + quote! { + LazyField::#variant_name(v) => { + v.drop_partial() + } + } +} + /// Extract the inner type from Option, or return the type as-is if not Option fn extract_option_inner_type(ty: &Type) -> TokenStream { // Try to parse as Option and extract T @@ -3142,7 +3512,10 @@ fn gen_restore_inline_for_category( grouped_fields: &GroupedFields, category: Category, ) -> Vec { - gen_restore_inline_fields(grouped_fields.persistent_inline(category)) + grouped_fields + .persistent_inline(category) + .map(gen_restore_inline_field) + .collect() } /// Generate snapshot clone and restore methods for TaskStorage. @@ -3168,6 +3541,21 @@ fn generate_snapshot_restore_methods(grouped_fields: &GroupedFields) -> TokenStr let restore_meta_inline = gen_restore_inline_for_category(grouped_fields, Category::Meta); let restore_data_inline = gen_restore_inline_for_category(grouped_fields, Category::Data); + // Merge arms for `restore_lazy_field`. + // + // filter_transient variants get their own arm with a variant-specific merge + // body (retain/extend on the inner collection). Non-filter_transient + // variants all share the same "push it" behavior — we collapse them into a + // single or-pattern arm below. The `enumerate()` index matches + // `LazyField::discriminant_index()` (both walk `all_lazy()` in declaration + // order), so we emit it as a literal and skip the method call. + let merge_lazy_arms: Vec<_> = grouped_fields + .all_lazy() + .enumerate() + .filter(|(_, f)| !f.is_transient() && (f.filter_transient || f.custom_drop_partial)) + .map(|(idx, f)| gen_restore_lazy_merge_arm(f, idx as u8)) + .collect(); + let clone_all_flags = if has_any_flags { quote! { // Clone all persisted flags @@ -3196,15 +3584,6 @@ fn generate_snapshot_restore_methods(grouped_fields: &GroupedFields) -> TokenStr quote! {} }; - let restore_all_flags = if has_any_flags { - quote! { - // Restore all persisted flags (preserve transient flags) - self.flags.set_persisted_bits(source.flags.persisted_bits()); - } - } else { - quote! {} - }; - quote! { #[automatically_derived] impl TaskStorage { @@ -3252,86 +3631,144 @@ fn generate_snapshot_restore_methods(grouped_fields: &GroupedFields) -> TokenStr /// The `category` parameter specifies which category of data to restore: /// - `Meta`: Restore meta fields (aggregation_number, output, upper, dirty, etc.) /// - `Data`: Restore data fields (output_dependent, dependencies, cell_data, etc.) - /// - `All`: Restore both meta and data fields pub fn restore_from( &mut self, source: TaskStorage, - category: crate::backend::TaskDataCategory, + category: crate::backend::SpecificTaskDataCategory, ) { match category { - crate::backend::TaskDataCategory::Meta => self.restore_meta_from(source), - crate::backend::TaskDataCategory::Data => self.restore_data_from(source), - crate::backend::TaskDataCategory::All => self.restore_all_from(source), + crate::backend::SpecificTaskDataCategory::Meta => self.restore_meta_from(source), + crate::backend::SpecificTaskDataCategory::Data => self.restore_data_from(source), } } /// Restore meta category fields from source. /// - /// Debug assertions verify that the target doesn't already have the lazy fields - /// being restored. + /// `self` may contain transient residue left behind by `drop_partial`; + /// `filter_transient` fields are merged rather than overwritten. fn restore_meta_from(&mut self, source: TaskStorage) { - // Debug assertion: verify target doesn't already have persistent meta lazy fields - debug_assert!( - !self.lazy.iter().any(|f| f.is_persistent() && f.is_meta()), - "restore_meta_from called on storage that already has persistent meta lazy fields" - ); - - // Inline meta fields - direct assignment + // Inline meta fields #(#restore_meta_inline)* #restore_meta_flags - // Extend lazy vec with persistent meta fields from source - self.lazy.extend( - source.lazy.into_iter().filter(|f| f.is_persistent() && f.is_meta()) - ); + // `source.lazy` contains only persistent meta variants. If + // `self.lazy` has no persistent meta residue we can bulk-extend + // regardless of transient or data residue — those can't collide + // with the incoming meta variants. Otherwise build the index + // and merge each source variant in O(1). + let (any_meta, _any_data, index) = Self::build_lazy_index(&self.lazy); + if !any_meta { + self.lazy.extend(source.lazy); + } else { + for field in source.lazy { + debug_assert!(field.is_persistent() && field.is_meta()); + self.restore_lazy_field(field, &index); + } + } } /// Restore data category fields from source. /// - /// Debug assertions verify that the target doesn't already have the lazy fields - /// being restored. + /// `self` may contain transient residue left behind by `drop_partial`; + /// `filter_transient` fields are merged rather than overwritten. fn restore_data_from(&mut self, source: TaskStorage) { - // Debug assertion: verify target doesn't already have persistent data lazy fields - debug_assert!( - !self.lazy.iter().any(|f| f.is_persistent() && f.is_data()), - "restore_data_from called on storage that already has persistent data lazy fields" - ); - - // Inline data fields - direct assignment + // Inline data fields #(#restore_data_inline)* #restore_data_flags - // Extend lazy vec with persistent data fields from source - self.lazy.extend( - source.lazy.into_iter().filter(|f| f.is_persistent() && f.is_data()) - ); + // Mirror image of `restore_meta_from`: `source.lazy` contains + // only persistent data variants, so meta or transient residue + // in `self.lazy` is never a collision risk. + let (_any_meta, any_data, index) = Self::build_lazy_index(&self.lazy); + if !any_data { + self.lazy.extend(source.lazy); + } else { + for field in source.lazy { + debug_assert!(field.is_persistent() && field.is_data()); + self.restore_lazy_field(field, &index); + } + } } - /// Restore all fields from source (both meta and data). - /// - /// Debug assertions verify that the target doesn't already have the lazy fields - /// being restored. - fn restore_all_from(&mut self, source: TaskStorage) { - // Debug assertion: verify target doesn't already have any persistent lazy fields - debug_assert!( - !self.lazy.iter().any(|f| f.is_persistent()), - "restore_all_from called on storage that already has persistent lazy fields" - ); - - // Inline meta fields - direct assignment - #(#restore_meta_inline)* - // Inline data fields - direct assignment - #(#restore_data_inline)* + /// Build a discriminant → position lookup table over `lazy`, plus + /// per-category "any persistent residue?" bits. + /// + /// The bits let each restore entry point skip per-field dispatch + /// when its category has no residue to collide with — e.g. + /// `restore_meta_from` only cares about meta residue, since the + /// incoming source is all meta. A cold restore after a + /// `restore_meta_from` + `drop_partial(data)` can still have data + /// residue present but not collide with the incoming meta source, + /// so the data bit staying false lets meta restore stay on the + /// bulk-extend fast path. + /// + /// `u8::MAX` marks "variant not present" in the index. Relies on + /// `lazy.len() < 255`, which is trivially true (at most + /// `LazyField::NUM_VARIANTS` entries, well under 255). + fn build_lazy_index( + lazy: &[LazyField], + ) -> (bool, bool, [u8; LazyField::NUM_VARIANTS]) { + debug_assert!(lazy.len() < u8::MAX as usize); + let mut index = [u8::MAX; LazyField::NUM_VARIANTS]; + let mut any_meta = false; + let mut any_data = false; + for (i, f) in lazy.iter().enumerate() { + let (d, is_meta, is_data) = f.index_and_category(); + index[d as usize] = i as u8; + any_meta |= is_meta; + any_data |= is_data; + } + (any_meta, any_data, index) + } - #restore_all_flags + /// Merge a single persistent `LazyField` from a decoded snapshot into + /// `self.lazy`. Uses the precomputed `index` for O(1) residue lookup + /// on `filter_transient` variants; non-filter_transient variants are + /// pushed unconditionally. `source.lazy` never contains duplicate + /// variants (encode emits each exactly once), so `index` is + /// read-only here. + fn restore_lazy_field( + &mut self, + incoming: LazyField, + index: &[u8; LazyField::NUM_VARIANTS], + ) { + match incoming { + #(#merge_lazy_arms)* + _ => { + self.lazy.push(incoming); + } + } + } + } + } +} - // Extend lazy vec with all persistent fields from source - self.lazy.extend( - source.lazy.into_iter().filter(|f| f.is_persistent()) - ); +/// Generate a match arm for `restore_lazy_field` that merges an incoming +/// persistent `filter_transient` variant into `self.lazy` using the precomputed +/// discriminant → position `index`. `discriminant` must equal the variant's +/// position in `all_lazy()` (and in the `LazyField` enum definition). +/// +/// On residue hit: merge the incoming collection into the existing one. +/// On miss: push the variant. `source.lazy` never contains duplicate variants, +/// so the pushed variant is never looked up again within this call — no need +/// to update `index`. +fn gen_restore_lazy_merge_arm(field: &FieldInfo, discriminant: u8) -> TokenStream { + debug_assert!(field.filter_transient || field.custom_drop_partial); + let variant_name = &field.variant_name; + quote! { + LazyField::#variant_name(incoming) => { + let slot = index[#discriminant as usize]; + if slot != u8::MAX { + let residue = match &mut self.lazy[slot as usize] { + LazyField::#variant_name(v) => v, + _ => unreachable!(), + }; + residue.extend(incoming); + } else { + self.lazy.push(LazyField::#variant_name(incoming)); } } } diff --git a/turbopack/crates/turbo-tasks-macros/src/value_macro.rs b/turbopack/crates/turbo-tasks-macros/src/value_macro.rs index ef0892efad09..c826469540e3 100644 --- a/turbopack/crates/turbo-tasks-macros/src/value_macro.rs +++ b/turbopack/crates/turbo-tasks-macros/src/value_macro.rs @@ -587,6 +587,7 @@ pub fn value_type_and_register( fn has_serialization() -> bool { #has_serialization } + } } } diff --git a/turbopack/crates/turbo-tasks-malloc/src/lib.rs b/turbopack/crates/turbo-tasks-malloc/src/lib.rs index edf251ab604d..45fe9762db2a 100644 --- a/turbopack/crates/turbo-tasks-malloc/src/lib.rs +++ b/turbopack/crates/turbo-tasks-malloc/src/lib.rs @@ -94,9 +94,22 @@ impl TurboMalloc { } pub fn thread_park() { + Self::collect(false); + } + + /// When using mimalloc triggers some cleanup + /// force=false: process threadlocal free lists and other threadlocal deferred work + /// only operates on thread local data and should be fast + /// force=true: do all the work of `process=false` and then process global shared structures and + /// return memory to the OS if possible, this is much slower and should only be done rarely. + pub fn collect(force: bool) { #[cfg(all(feature = "custom_allocator", not(target_family = "wasm")))] unsafe { - libmimalloc_sys::mi_collect(false); + libmimalloc_sys::mi_collect(force); + } + #[cfg(not(all(feature = "custom_allocator", not(target_family = "wasm"))))] + { + let _ = force; } } diff --git a/turbopack/crates/turbo-tasks-testing/Cargo.toml b/turbopack/crates/turbo-tasks-testing/Cargo.toml index ca841f422bb8..7571896f4576 100644 --- a/turbopack/crates/turbo-tasks-testing/Cargo.toml +++ b/turbopack/crates/turbo-tasks-testing/Cargo.toml @@ -20,3 +20,4 @@ rustc-hash = { workspace = true } smallvec = { workspace = true } tokio = { workspace = true } turbo-tasks = { workspace = true, features = ["non_operation_vc_strongly_consistent"] } +turbo-tasks-backend = { workspace = true } diff --git a/turbopack/crates/turbo-tasks-testing/src/lib.rs b/turbopack/crates/turbo-tasks-testing/src/lib.rs index 5409ccf5a072..a91b359e3b65 100644 --- a/turbopack/crates/turbo-tasks-testing/src/lib.rs +++ b/turbopack/crates/turbo-tasks-testing/src/lib.rs @@ -28,7 +28,8 @@ use turbo_tasks::{ }; pub use crate::run::{ - Registration, run, run_once, run_once_without_cache_check, run_with_tt, run_without_cache_check, + Registration, TestInstance, run, run_once, run_once_without_cache_check, run_with_tt, + run_without_cache_check, test_instance, }; enum Task { diff --git a/turbopack/crates/turbo-tasks-testing/src/run.rs b/turbopack/crates/turbo-tasks-testing/src/run.rs index dfed8cd4d10d..b51facbcafb5 100644 --- a/turbopack/crates/turbo-tasks-testing/src/run.rs +++ b/turbopack/crates/turbo-tasks-testing/src/run.rs @@ -1,35 +1,68 @@ use std::{env, fmt::Debug, future::Future, sync::Arc}; use anyhow::Result; -use turbo_tasks::{TurboTasksApi, trace::TraceRawVcs}; +use turbo_tasks::{TurboTasks, TurboTasksApi, trace::TraceRawVcs}; +use turbo_tasks_backend::{BackingStorage, TurboTasksBackend}; +/// A freshly created test instance: the `TurboTasks` handle (type-erased to +/// `Arc`) and a closure that, when called, takes a +/// snapshot and evicts all evictable tasks on that instance. +/// +/// The eviction closure captures the concrete backend type internally so +/// harness code holding an erased `TurboTasksApi` can still reach the +/// `snapshot_and_evict` API. +pub struct TestInstance { + pub tt: Arc, + pub snapshot_and_evict: Box, +} + +/// Type-erased factory returned by the `register!` macro. Stays non-generic so +/// call sites can write `static REGISTRATION: Registration = register!();` +/// without naming the backing storage type. pub struct Registration { - create_turbo_tasks: fn(&str, bool) -> Arc, + create_turbo_tasks: fn(&str, bool) -> TestInstance, } impl Registration { #[doc(hidden)] - pub const fn new(create_turbo_tasks: fn(&str, bool) -> Arc) -> Self { + pub const fn new(create_turbo_tasks: fn(&str, bool) -> TestInstance) -> Self { Registration { create_turbo_tasks } } - pub fn create_turbo_tasks(&self, name: &str, initial: bool) -> Arc { + pub fn create_turbo_tasks(&self, name: &str, initial: bool) -> TestInstance { (self.create_turbo_tasks)(name, initial) } } +/// Wrap a concrete `Arc>>` into a +/// [`TestInstance`]. Called from the `register!` macro — the `.trs` closure +/// returns a concrete backend-parameterized `TurboTasks`, and this function +/// erases the type while retaining eviction access via a capturing closure. +pub fn test_instance(tt: Arc>>) -> TestInstance +where + B: BackingStorage + 'static, +{ + let tt_for_evict = tt.clone(); + let snapshot_and_evict = Box::new(move || { + let _ = tt_for_evict + .backend() + .snapshot_and_evict_for_testing(&*tt_for_evict); + }); + TestInstance { + tt: tt as Arc, + snapshot_and_evict, + } +} + #[macro_export] macro_rules! register { () => {{ - use std::sync::Arc; - - use turbo_tasks::TurboTasksApi; - fn create_turbo_tasks(name: &str, initial: bool) -> Arc { + fn create_turbo_tasks(name: &str, initial: bool) -> turbo_tasks_testing::TestInstance { let inner = include!(concat!( env!("CARGO_MANIFEST_DIR"), "/tests/test_config.trs" )); - (inner)(name, initial) + turbo_tasks_testing::test_instance((inner)(name, initial)) } turbo_tasks_testing::Registration::new(create_turbo_tasks) }}; @@ -43,8 +76,8 @@ where T: TraceRawVcs + Send + 'static, { let name = closure_to_name(&fut); - let tt = registration.create_turbo_tasks(&name, true); - turbo_tasks::run_once(tt, async move { Ok(fut.await) }) + let instance = registration.create_turbo_tasks(&name, true); + turbo_tasks::run_once(instance.tt, async move { Ok(fut.await) }) .await .unwrap() } @@ -57,8 +90,8 @@ where T: TraceRawVcs + Send + 'static, { let name = closure_to_name(&fut); - let tt = registration.create_turbo_tasks(&name, true); - turbo_tasks::run(tt, async move { Ok(fut.await) }) + let instance = registration.create_turbo_tasks(&name, true); + turbo_tasks::run(instance.tt, async move { Ok(fut.await) }) .await .unwrap() } @@ -104,36 +137,41 @@ where let name = closure_to_name(&fut); let mut i = 1; loop { - let tt = registration.create_turbo_tasks(&name, true); + let instance = registration.create_turbo_tasks(&name, true); println!("Run #{i} (without cache)"); let start = std::time::Instant::now(); - let first = fut(tt.clone()).await?; + let first = fut(instance.tt.clone()).await?; println!("Run #{i} took {:?}", start.elapsed()); i += 1; if !single_run { let max_run = if infinite_memory_runs { usize::MAX } else { 10 }; for _ in 0..max_run { - println!("Run #{i} (with memory cache, same TurboTasks instance)"); + // Snapshot + evict between runs. Forces every subsequent read to + // go through the restore path instead of the warm in-memory cache, + // so tests exercise persistence on every iteration — not just the + // initial cold run and the post-`stop_and_wait` fs-cache runs. + (instance.snapshot_and_evict)(); + println!("Run #{i} (with memory cache, same TurboTasks instance, post-evict)"); let start = std::time::Instant::now(); - let second = fut(tt.clone()).await?; + let second = fut(instance.tt.clone()).await?; println!("Run #{i} took {:?}", start.elapsed()); i += 1; assert_eq!(first, second); } } let start = std::time::Instant::now(); - tt.stop_and_wait().await; + instance.tt.stop_and_wait().await; println!("Stopping TurboTasks took {:?}", start.elapsed()); if !single_run { for _ in 10..20 { - let tt = registration.create_turbo_tasks(&name, false); + let instance = registration.create_turbo_tasks(&name, false); println!("Run #{i} (with filesystem cache if available, new TurboTasks instance)"); let start = std::time::Instant::now(); - let third = fut(tt.clone()).await?; + let third = fut(instance.tt.clone()).await?; println!("Run #{i} took {:?}", start.elapsed()); i += 1; let start = std::time::Instant::now(); - tt.stop_and_wait().await; + instance.tt.stop_and_wait().await; println!("Stopping TurboTasks took {:?}", start.elapsed()); assert_eq!(first, third); } diff --git a/turbopack/crates/turbo-tasks/src/value_type.rs b/turbopack/crates/turbo-tasks/src/value_type.rs index 911ebb54ca76..17d2e010cede 100644 --- a/turbopack/crates/turbo-tasks/src/value_type.rs +++ b/turbopack/crates/turbo-tasks/src/value_type.rs @@ -48,12 +48,13 @@ pub enum ValueTypePersistence { /// evicting cheap cells first. True iff declared with /// `serialization = "skip", evict = "last"`. expensive: bool, + + /// The value type is not persisted, but the macro emitted a + /// `DeterministicHash` derive and the write path stashes a `content_hash` + /// into `cell_data_hash` so post-eviction reads can detect unchanged + /// content and skip invalidation. Maps to `serialization = "hash"`. + hash_only: bool, }, - /// The value type is not persisted, but the macro emitted a - /// `DeterministicHash` derive and the write path stashes a `content_hash` - /// into `cell_data_hash` so post-eviction reads can detect unchanged - /// content and skip invalidation. Maps to `serialization = "hash"`. - HashOnly, /// Not persistable, not reconstructible — holds interior-mutable state /// that accumulates across the session (`State<>` cells, `Arc>` /// dedup histories). Re-running the producing task would lose the @@ -129,7 +130,10 @@ impl ValueType { pub const fn skip_persist(global_name: &'static str) -> Self { Self::new_inner::( global_name, - ValueTypePersistence::SkipPersist { expensive: false }, + ValueTypePersistence::SkipPersist { + expensive: false, + hash_only: false, + }, ) } @@ -142,7 +146,10 @@ impl ValueType { pub const fn skip_persist_expensive(global_name: &'static str) -> Self { Self::new_inner::( global_name, - ValueTypePersistence::SkipPersist { expensive: true }, + ValueTypePersistence::SkipPersist { + expensive: true, + hash_only: false, + }, ) } @@ -153,7 +160,13 @@ impl ValueType { /// This is internally used by [`#[turbo_tasks::value]`][crate::value] for /// `serialization = "hash"`. pub const fn hash_only(global_name: &'static str) -> Self { - Self::new_inner::(global_name, ValueTypePersistence::HashOnly) + Self::new_inner::( + global_name, + ValueTypePersistence::SkipPersist { + expensive: false, + hash_only: true, + }, + ) } /// Construct a `ValueType` whose cells cannot be reconstructed by @@ -467,7 +480,10 @@ mod tests { assert!( matches!( vt.persistence, - ValueTypePersistence::SkipPersist { expensive: false }, + ValueTypePersistence::SkipPersist { + expensive: false, + hash_only: false + }, ), "`serialization = \"skip\"` must map to SkipPersist {{ expensive: false }}" ); @@ -478,7 +494,13 @@ mod tests { fn hash_maps_to_hash_only() { let vt = registry::get_value_type(HashValue::get_value_type_id()); assert!( - matches!(vt.persistence, ValueTypePersistence::HashOnly), + matches!( + vt.persistence, + ValueTypePersistence::SkipPersist { + expensive: false, + hash_only: true + }, + ), "`serialization = \"hash\"` must map to HashOnly" ); assert!(!HashValue::has_serialization()); @@ -490,7 +512,10 @@ mod tests { assert!( matches!( vt.persistence, - ValueTypePersistence::SkipPersist { expensive: true }, + ValueTypePersistence::SkipPersist { + expensive: true, + hash_only: false + }, ), "`serialization = \"skip\", evict = \"last\"` must map to SkipPersist {{ expensive: \ true }}"