diff --git a/crates/ark/src/lsp.rs b/crates/ark/src/lsp.rs index 352b60bee8..184012b79d 100644 --- a/crates/ark/src/lsp.rs +++ b/crates/ark/src/lsp.rs @@ -31,6 +31,8 @@ pub mod input_boundaries; pub mod inputs; pub mod main_loop; pub mod markdown; +pub mod package_sources; +pub mod progress; pub mod references; pub mod selection_range; diff --git a/crates/ark/src/lsp/backend.rs b/crates/ark/src/lsp/backend.rs index 9d4feb6417..81094814bc 100644 --- a/crates/ark/src/lsp/backend.rs +++ b/crates/ark/src/lsp/backend.rs @@ -15,6 +15,7 @@ use amalthea::comm::server_comm::ServerStartMessage; use amalthea::comm::server_comm::ServerStartedMessage; use anyhow::Context; use crossbeam::channel::Sender; +use oak_index::library::Library; use serde_json::Value; use stdext::result::ResultExt; use tokio::net::TcpListener; @@ -45,9 +46,11 @@ use crate::lsp::help_topic::HelpTopicResponse; use crate::lsp::input_boundaries; use crate::lsp::input_boundaries::InputBoundariesParams; use crate::lsp::input_boundaries::InputBoundariesResponse; +use crate::lsp::main_loop::AuxiliaryState; use crate::lsp::main_loop::Event; use crate::lsp::main_loop::GlobalState; use crate::lsp::main_loop::TokioUnboundedSender; +use crate::lsp::package_sources::PackageSourcesState; use crate::lsp::statement_range; use crate::lsp::statement_range::StatementRangeParams; use crate::lsp::statement_range::StatementRangeResponse; @@ -228,9 +231,9 @@ struct Backend { /// Channel for communication with the main loop. events_tx: TokioUnboundedSender, - /// Handle to main loop. Drop it to cancel the loop, all associated tasks, - /// and drop all owned state. - _main_loop: tokio::task::JoinSet<()>, + /// Handle to all tokio tasks (main loop, auxiliary loop, package sources loop). Drop + /// it to cancel the loops, all associated tasks, and drop all owned state. + _tokio_tasks_handle: tokio::task::JoinSet<()>, } impl Backend { @@ -557,11 +560,55 @@ pub(crate) fn start_lsp( let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::channel::<()>(1); let init = |client: Client| { - let state = GlobalState::new(client, r_home, console_notification_tx); - let events_tx = state.events_tx(); + let r = harp::command::r_executable(&r_home); - // Start main loop and hold onto the handle that keeps it alive - let main_loop = state.start(); + // FIXME: We shouldn't call R code in the kernel to figure this out + let library_paths = r_task(|| -> anyhow::Result> { + Ok(harp::RFunction::new("base", ".libPaths") + .call()? + .try_into()?) + }); + let library_paths = match library_paths { + Ok(library_paths) => library_paths, + Err(err) => { + log::error!("Can't evaluate `libPaths()`: {err:?}"); + Vec::new() + }, + }; + let library_paths: Vec = + library_paths.into_iter().map(PathBuf::from).collect(); + + // If reader/writer generation fails, all features related to source references are disabled, + // but we don't bring the whole system down. We don't expect this to ever occur in practice. + let (package_cache_reader, package_cache_writer) = + match oak_sources::new_cache_pair(r, library_paths.clone()).log_err() { + Some((reader, writer)) => (Some(reader), Some(writer)), + None => (None, None), + }; + + let package_sources = package_cache_reader + .map(|reader| Arc::new(reader) as Arc); + + // Package cache reader goes to `Library` + let library = Library::new(library_paths, package_sources); + + // Package cache writer goes to `PackageSourcesState` event loop + let (package_sources_state, package_sources_event_tx) = match package_cache_writer { + Some(writer) => { + let (state, event_tx) = PackageSourcesState::new(writer); + (Some(state), Some(event_tx)) + }, + None => (None, None), + }; + + let auxiliary_state = AuxiliaryState::new(client.clone()); + + let (state, events_tx) = GlobalState::new( + client, + library, + console_notification_tx, + package_sources_event_tx, + ); // Forward event channel along to `Console`. // This also updates an outdated channel after a reconnect. @@ -576,10 +623,26 @@ pub(crate) fn start_lsp( } }); + // The handle that keeps all tokio tasks alive! + // Drop it to cancel everything and shut down the service. + let mut tokio_tasks_handle = tokio::task::JoinSet::<()>::new(); + + // Spawn latency-sensitive auxiliary loop. Must be first to initialise + // global transmission channel. + tokio_tasks_handle.spawn(async move { auxiliary_state.start().await }); + + // Spawn package sources event loop + if let Some(package_sources_state) = package_sources_state { + tokio_tasks_handle.spawn(async move { package_sources_state.start().await }); + } + + // Spawn main loop + tokio_tasks_handle.spawn(async move { state.start().await }); + Backend { shutdown_tx, events_tx, - _main_loop: main_loop, + _tokio_tasks_handle: tokio_tasks_handle, } }; diff --git a/crates/ark/src/lsp/capabilities.rs b/crates/ark/src/lsp/capabilities.rs index e501db14ec..f86f610315 100644 --- a/crates/ark/src/lsp/capabilities.rs +++ b/crates/ark/src/lsp/capabilities.rs @@ -17,6 +17,7 @@ pub(crate) struct Capabilities { dynamic_registration_for_did_change_configuration: bool, code_action_literal_support: bool, workspace_edit_document_changes: bool, + work_done_progress: bool, } impl Capabilities { @@ -46,10 +47,17 @@ impl Capabilities { .and_then(|workspace_edit| workspace_edit.document_changes) .is_some_and(|document_changes| document_changes); + let work_done_progress = client_capabilities + .window + .as_ref() + .and_then(|window| window.work_done_progress) + .unwrap_or(false); + Self { dynamic_registration_for_did_change_configuration, code_action_literal_support, workspace_edit_document_changes, + work_done_progress, } } @@ -85,6 +93,10 @@ impl Capabilities { self } + pub(crate) fn work_done_progress(&self) -> bool { + self.work_done_progress + } + pub(crate) fn code_action_provider_capability(&self) -> Option { if !self.code_action_literal_support() { return None; diff --git a/crates/ark/src/lsp/main_loop.rs b/crates/ark/src/lsp/main_loop.rs index c68f508b79..293d778c8a 100644 --- a/crates/ark/src/lsp/main_loop.rs +++ b/crates/ark/src/lsp/main_loop.rs @@ -8,11 +8,9 @@ use std::collections::HashMap; use std::future; use std::path::Path; -use std::path::PathBuf; use std::pin::Pin; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; -use std::sync::Arc; use std::sync::LazyLock; use std::sync::RwLock; @@ -20,8 +18,6 @@ use anyhow::anyhow; use futures::stream::FuturesUnordered; use futures::StreamExt; use oak_index::library::Library; -use oak_sources::PackageCache; -use stdext::result::ResultExt; use tokio::sync::mpsc; use tokio::sync::mpsc::unbounded_channel as tokio_unbounded_channel; use tokio::task; @@ -45,6 +41,8 @@ use crate::lsp::diagnostics::generate_diagnostics; use crate::lsp::document::Document; use crate::lsp::handlers; use crate::lsp::indexer; +use crate::lsp::package_sources::PackageSourcesEvent; +use crate::lsp::progress::ProgressSupport; use crate::lsp::state::WorldState; use crate::lsp::state_handlers; use crate::lsp::state_handlers::ConsoleInputs; @@ -120,6 +118,8 @@ pub(crate) enum AuxiliaryEvent { Log(lsp_types::MessageType, String), PublishDiagnostics(Url, Vec, Option), SpawnedTask(JoinHandle>>), + EnableProgress, + Progress(lsp::progress::Progress), Shutdown, } @@ -144,12 +144,15 @@ pub(crate) struct GlobalState { /// LSP client shared with tower-lsp and the log loop client: Client, - /// Event channels for the main loop. The tower-lsp methods forward + /// Event channel for recieving on the main loop. The tower-lsp methods forward /// notifications and requests here via `Event::Lsp`. We also receive - /// messages from the kernel via `Event::Kernel`, and from ourselves via - /// `Event::Task`. - events_tx: TokioUnboundedSender, + /// messages from the kernel via `Event::Kernel`. events_rx: TokioUnboundedReceiver, + + /// Event channel for sending populate requests to the package sources event loop + /// + /// FIXME: Use this to send populate requests. Blocked on Salsa integration. + _package_sources_event_tx: Option>, } /// Unlike `WorldState`, `ParserState` cannot be cloned and is only accessed by @@ -174,24 +177,21 @@ pub(crate) struct LspState { /// The auxiliary loop currently handles: /// - Log messages. /// - Joining of spawned blocking tasks to relay any errors or panics to the LSP log. -struct AuxiliaryState { +pub(crate) struct AuxiliaryState { client: Client, auxiliary_event_rx: TokioUnboundedReceiver, tasks: TaskList>, + progress_support: lsp::progress::ProgressSupport, } impl GlobalState { - /// Create a new global state - /// - /// # Arguments - /// - /// * `client`: The tower-lsp client shared with the tower-lsp backend - /// and auxiliary loop. + /// Create a new global state and its `events_tx` sender pub(crate) fn new( client: Client, - r_home: PathBuf, + library: Library, console_notification_tx: TokioUnboundedSender, - ) -> Self { + package_sources_event_tx: Option>, + ) -> (Self, TokioUnboundedSender) { // Transmission channel for the main loop events. Shared with the // tower-lsp backend and the Jupyter kernel. let (events_tx, events_rx) = tokio_unbounded_channel::(); @@ -202,67 +202,23 @@ impl GlobalState { console_notification_tx, }; - // FIXME: We shouldn't call R code in the kernel to figure this out - let library_paths = crate::r_task(|| -> anyhow::Result> { - Ok(harp::RFunction::new("base", ".libPaths") - .call()? - .try_into()?) - }); - - let library_paths = match library_paths { - Ok(library_paths) => library_paths, - Err(err) => { - log::error!("Can't evaluate `libPaths()`: {err:?}"); - Vec::new() + ( + Self { + world: WorldState::new(library), + lsp_state, + client, + events_rx, + _package_sources_event_tx: package_sources_event_tx, }, - }; - - let library_paths: Vec = library_paths.into_iter().map(PathBuf::from).collect(); - - let r = harp::command::r_executable(&r_home); - let package_sources = r - .and_then(|r| PackageCache::new(r, library_paths.clone()).log_err()) - .map(|cache| Arc::new(cache) as Arc); - - let library = Library::new(library_paths, package_sources); - - Self { - world: WorldState::new(library), - lsp_state, - client, events_tx, - events_rx, - } - } - - /// Get `Event` transmission channel - pub(crate) fn events_tx(&self) -> TokioUnboundedSender { - self.events_tx.clone() - } - - /// Start the main and auxiliary loops - /// - /// Returns a `JoinSet` that holds onto all tasks and state owned by the - /// event loop. Drop it to cancel everything and shut down the service. - pub(crate) fn start(self) -> tokio::task::JoinSet<()> { - let mut set = tokio::task::JoinSet::<()>::new(); - - // Spawn latency-sensitive auxiliary loop. Must be first to initialise - // global transmission channel. - let aux = AuxiliaryState::new(self.client.clone()); - set.spawn(async move { aux.start().await }); - - // Spawn main loop - set.spawn(async move { self.main_loop().await }); - - set + ) } /// Run main loop /// /// This takes ownership of all global state and handles one by one LSP /// requests, notifications, and other internal events. - async fn main_loop(mut self) { + pub(crate) async fn start(mut self) { loop { let event = self.next_event().await; if let Err(err) = self.handle_event(event).await { @@ -518,7 +474,7 @@ fn respond( unsafe impl Sync for AuxiliaryState {} impl AuxiliaryState { - fn new(client: Client) -> Self { + pub(crate) fn new(client: Client) -> Self { // Channels for communication with the auxiliary loop let (auxiliary_event_tx, auxiliary_event_rx) = tokio_unbounded_channel::(); @@ -546,10 +502,14 @@ impl AuxiliaryState { Box::pin(pending) as Pin> + Send>>; tasks.push(pending); + // Until told otherwise by the client capabilities + let progress_support = ProgressSupport::Disabled; + Self { client, auxiliary_event_rx, tasks, + progress_support, } } @@ -557,7 +517,7 @@ impl AuxiliaryState { /// /// Takes ownership of auxiliary state and start the low-latency auxiliary /// loop. - async fn start(mut self) { + pub(crate) async fn start(mut self) { loop { match self.next_event().await { AuxiliaryEvent::Log(level, message) => self.log(level, message).await, @@ -567,6 +527,8 @@ impl AuxiliaryState { .publish_diagnostics(uri, diagnostics, version) .await }, + AuxiliaryEvent::EnableProgress => self.handle_enable_progress(), + AuxiliaryEvent::Progress(progress) => self.handle_progress(progress).await, AuxiliaryEvent::Shutdown => break, } } @@ -606,6 +568,17 @@ impl AuxiliaryState { async fn log_error(&self, message: String) { self.client.log_message(MessageType::ERROR, message).await } + + pub(crate) fn client(&self) -> &Client { + &self.client + } + + pub(crate) fn progress_support(&self) -> ProgressSupport { + self.progress_support + } + pub(crate) fn set_progress_support(&mut self, progress_support: ProgressSupport) { + self.progress_support = progress_support; + } } fn with_auxiliary_tx(f: F) -> T @@ -689,6 +662,14 @@ pub(crate) fn publish_diagnostics(uri: Url, diagnostics: Vec, versio )); } +pub(crate) fn enable_progress() { + send_auxiliary(AuxiliaryEvent::EnableProgress); +} + +pub(crate) fn report_progress(progress: lsp::progress::Progress) { + send_auxiliary(AuxiliaryEvent::Progress(progress)); +} + impl KernelNotification { pub(crate) fn trace(&self) -> TraceKernelNotification<'_> { TraceKernelNotification { inner: self } diff --git a/crates/ark/src/lsp/package_sources.rs b/crates/ark/src/lsp/package_sources.rs new file mode 100644 index 0000000000..781f30ab63 --- /dev/null +++ b/crates/ark/src/lsp/package_sources.rs @@ -0,0 +1,79 @@ +use oak_sources::PackageCacheWriter; +use tokio::sync::mpsc::unbounded_channel; +use tokio::sync::mpsc::UnboundedSender; + +use crate::lsp::main_loop::report_progress; +use crate::lsp::main_loop::TokioUnboundedReceiver; +use crate::lsp::progress::Progress; +use crate::lsp::progress::ProgressEvent; +use crate::lsp::progress::ProgressEventBegin; + +#[derive(Debug)] +pub(crate) enum PackageSourcesEvent { + /// FIXME: Use this to send populate requests. Blocked on Salsa integration. + #[expect(dead_code)] + Populate(Populate), +} + +#[derive(Debug)] +pub(crate) struct Populate { + package: String, +} + +#[derive(Debug)] +pub(crate) struct PackageSourcesState { + writer: PackageCacheWriter, + + event_rx: TokioUnboundedReceiver, +} + +impl PackageSourcesState { + /// Construct a [PackageSourcesState] and its `event_tx` sender + pub(crate) fn new(writer: PackageCacheWriter) -> (Self, UnboundedSender) { + // Channels for communication with the package sources event loop + let (event_tx, event_rx) = unbounded_channel::(); + (Self { writer, event_rx }, event_tx) + } + + /// Start the event loop + pub(crate) async fn start(mut self) { + while let Some(event) = self.next_event().await { + self.handle_event(event); + } + } + + async fn next_event(&mut self) -> Option { + self.event_rx.recv().await + } + + fn handle_event(&mut self, event: PackageSourcesEvent) { + match event { + PackageSourcesEvent::Populate(populate) => self.handle_populate(populate), + } + } + + fn handle_populate(&mut self, populate: Populate) { + // We don't populate packages concurrently, so we don't need per package ids + let id = String::from("package-sources"); + + // TODO: This currently reports progress when its already in the cache (i.e. dplyr + // was discovered, we got a request to ensure it is populated, but it's already + // populated), we should probably also consider passing the `reader` through here + // as well, so we can check `self.reader.get(package)` and if that returns + // `Some()` then we just return early without reporting any progress at all! We + // can hook that up correctly when we actually turn this feature on after getting + // Salsa integrated. + + report_progress(Progress::new( + id.clone(), + ProgressEvent::Begin(ProgressEventBegin::new(format!( + "Populating {package}", + package = &populate.package + ))), + )); + + self.writer.insert(&populate.package); + + report_progress(Progress::new(id, ProgressEvent::End)); + } +} diff --git a/crates/ark/src/lsp/progress.rs b/crates/ark/src/lsp/progress.rs new file mode 100644 index 0000000000..0c9f4258c6 --- /dev/null +++ b/crates/ark/src/lsp/progress.rs @@ -0,0 +1,96 @@ +use tower_lsp::lsp_types; + +use crate::lsp::main_loop::AuxiliaryState; + +#[derive(Debug)] +pub(crate) struct Progress { + /// Identifier for the kind of progress being reported + /// + /// If we understand correctly, this allows different identifiers to show up as + /// different spinners that all report progress concurrently + id: String, + + event: ProgressEvent, +} + +impl Progress { + pub(crate) fn new(id: String, event: ProgressEvent) -> Self { + Self { id, event } + } +} + +#[derive(Debug)] +pub(crate) enum ProgressEvent { + Begin(ProgressEventBegin), + End, +} + +#[derive(Debug)] +pub(crate) struct ProgressEventBegin { + title: String, +} + +impl ProgressEventBegin { + pub(crate) fn new(title: String) -> Self { + Self { title } + } +} + +#[derive(Debug, Clone, Copy)] +pub(crate) enum ProgressSupport { + Enabled, + Disabled, +} + +impl AuxiliaryState { + pub(crate) fn handle_enable_progress(&mut self) { + log::info!("Enabling work done progress support"); + self.set_progress_support(ProgressSupport::Enabled); + } + + pub(crate) async fn handle_progress(&self, progress: Progress) { + if matches!(self.progress_support(), ProgressSupport::Disabled) { + return; + } + + let token = lsp_types::ProgressToken::String(format!("ark/progress/{}", progress.id)); + + let work_done_progress = match progress.event { + ProgressEvent::Begin(begin) => { + tracing::trace!("handle_progress(begin): token {token:?}"); + + let result = self + .client() + .send_request::( + lsp_types::WorkDoneProgressCreateParams { + token: token.clone(), + }, + ) + .await; + + if let Err(error) = result { + log::warn!("Client rejected progress token: {error:?}"); + return; + }; + + lsp_types::WorkDoneProgress::Begin(lsp_types::WorkDoneProgressBegin { + title: begin.title, + cancellable: None, + message: None, + percentage: None, + }) + }, + ProgressEvent::End => { + tracing::trace!("handle_progress(end): token {token:?}"); + lsp_types::WorkDoneProgress::End(lsp_types::WorkDoneProgressEnd { message: None }) + }, + }; + + self.client() + .send_notification::(lsp_types::ProgressParams { + token, + value: lsp_types::ProgressParamsValue::WorkDone(work_done_progress), + }) + .await; + } +} diff --git a/crates/ark/src/lsp/state_handlers.rs b/crates/ark/src/lsp/state_handlers.rs index 4d85a322e6..ecdb409d26 100644 --- a/crates/ark/src/lsp/state_handlers.rs +++ b/crates/ark/src/lsp/state_handlers.rs @@ -53,6 +53,7 @@ use crate::lsp::config::DOCUMENT_SETTINGS; use crate::lsp::config::GLOBAL_SETTINGS; use crate::lsp::document::Document; use crate::lsp::inputs::source_root::SourceRoot; +use crate::lsp::main_loop::enable_progress; use crate::lsp::main_loop::DidCloseVirtualDocumentParams; use crate::lsp::main_loop::DidOpenVirtualDocumentParams; use crate::lsp::main_loop::LspState; @@ -87,6 +88,11 @@ pub(crate) fn initialize( ) -> LspResult { lsp_state.capabilities = Capabilities::new(params.capabilities); + // Let the auxiliary loop know it can emit progress reports + if lsp_state.capabilities.work_done_progress() { + enable_progress(); + } + // Initialize the workspace folders let mut folders: Vec = Vec::new(); if let Some(workspace_folders) = params.workspace_folders { diff --git a/crates/harp/src/command.rs b/crates/harp/src/command.rs index b420cd12a1..288cbb23bf 100644 --- a/crates/harp/src/command.rs +++ b/crates/harp/src/command.rs @@ -94,18 +94,19 @@ pub fn r_home_setup() -> anyhow::Result { /// /// - For unix, this look for `{R_HOME}/bin/R` /// - For Windows, this looks for `{R_HOME}/bin/R.exe` and `{R_HOME}/bin/R.bat` (for rig compatibility) -pub fn r_executable(r_home: &Path) -> Option { +pub fn r_executable(r_home: &Path) -> PathBuf { let r_bin = r_home.join("bin"); for command in COMMAND_R_NAMES { let candidate = r_bin.join(command); if candidate.exists() { - return Some(candidate); + return candidate; } } - None + // We fully expect to be able to find an executable, it is required for many tasks + panic!("Can't find R executable relative to {r_home:?}") } /// Execute a `Command` for R found on the `PATH` diff --git a/crates/oak_sources/src/cache.rs b/crates/oak_sources/src/cache.rs index b7cc4dc218..12f88dc3f8 100644 --- a/crates/oak_sources/src/cache.rs +++ b/crates/oak_sources/src/cache.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use std::path::Path; use std::path::PathBuf; -use std::sync::RwLock; +use std::sync::Arc; use chrono::DateTime; use chrono::TimeDelta; @@ -28,7 +28,7 @@ const METADATA_FILENAME: &str = ".metadata"; /// the DESCRIPTION `Build:` timestamp being different). const ONE_WEEK: TimeDelta = TimeDelta::weeks(1); -impl crate::PackageSources for PackageCache { +impl crate::PackageSources for PackageCacheReader { fn get(&self, package: &str) -> Option { self.get(package) } @@ -36,6 +36,17 @@ impl crate::PackageSources for PackageCache { /// A cache of extracted R package sources /// +/// # Reader / Writer split +/// +/// The cache is split into [PackageCacheReader] and [PackageCacheWriter]. +/// +/// - [PackageCacheReader::get] reads from the cache and returns instantly. If the +/// package isn't in the cache, it simply returns [None]. +/// +/// - [PackageCacheWriter::insert] writes into the cache if the package sources aren't +/// already in the cache. Insertion can be very expensive, and takes place on a +/// dedicated tokio task to avoid blocking the main loop. +/// /// # On disk layout /// /// The on disk cache at `/oak/sources/v1/` is the source of truth @@ -71,7 +82,7 @@ impl crate::PackageSources for PackageCache { /// /// The cache root `.lock` can be locked as shared or exclusive: /// -/// - **Shared root lock** is held for the lifetime of this `PackageCache`. The invariant +/// - **Shared root lock** is held for the lifetime of this cache. The invariant /// is that if you hold a shared root lock, you can read from or append new entries to /// the cache, but never delete from it. Multiple sessions can hold this simultaneously. /// The main purpose is to prevent cleanup from running so that any PathBuf handed out @@ -106,35 +117,44 @@ impl crate::PackageSources for PackageCache { /// - Deleting if the package it originated from no longer exists /// - Deleting if the DESCRIPTION it originated from has changed /// -/// [`get`]: PackageCache::get -/// [`clean`]: PackageCache::clean +/// [`get`]: PackageCacheReader::get +/// [`clean`]: clean +#[derive(Debug)] +pub struct PackageCacheReader { + shared: Arc, +} + #[derive(Debug)] -pub struct PackageCache { +pub struct PackageCacheWriter { + shared: Arc, + /// Path to an R executable r: PathBuf, + /// Set of packages which are installed, but we failed to populate their sources (from + /// CRAN or srcrefs). If we request sources for one of these packages a second time, + /// we don't attempt expensive source generation again. + source_unavailable: HashSet, +} + +/// Immutable data shared between reader and writer +/// +/// Both carry the shared cache root lock, because both look at the cache and hand out +/// paths into it. +#[derive(Debug)] +struct PackageCacheShared { /// Library paths to consider library_paths: Vec, /// On disk cache directory root - cache_root: file_lock::Filesystem, + fs: file_lock::Filesystem, /// Shared lock on the root `.lock`, held for the life of this cache. /// /// Blocks any other process from acquiring the root exclusive lock (the only thing /// that can delete entries). That way, any `PathBuf` we hand out remains valid for - /// the life of this cache (as long as `PackageCache` itself is not dropped!). - cache_root_lock: FileLock, - - /// Set of packages which are installed, but we failed to populate their sources (from - /// CRAN or srcrefs). If we request sources for one of these packages a second time, - /// we don't attempt expensive source generation again. - /// - /// Inside an [RwLock] so that [PackageCache::get()] avoids being `mut`, allowing a - /// caller to wrap a [PackageCache] in an [std::sync::Arc] and call - /// [PackageCache::get()] in the background on a thread, acting as a form of - /// "prefetching". - source_unavailable: RwLock>, + /// the life of this cache (as long as the cache itself is not dropped!). + lock: FileLock, } /// Completion sentinel for a cache entry, written last. Also used to determine if we can @@ -147,35 +167,47 @@ struct Metadata { generated_at: DateTime, } -impl PackageCache { - pub fn new(r: PathBuf, library_paths: Vec) -> anyhow::Result { - let cache_root = file_lock::Filesystem::new(crate::fs::sources_dir()?); - cache_root.create_dir()?; - - // Try to clean the cache. Only works if no other processes hold a shared root lock. - if let Some(cache_root_lock) = cache_root.try_open_rw_exclusive_create(LOCK_FILENAME)? { - if let Err(err) = Self::clean(&cache_root_lock) { - log::warn!("Failed to clean sources cache: {err:?}"); - } - drop(cache_root_lock); +pub fn new_cache_pair( + r: PathBuf, + library_paths: Vec, +) -> anyhow::Result<(PackageCacheReader, PackageCacheWriter)> { + let fs = file_lock::Filesystem::new(crate::fs::sources_dir()?); + fs.create_dir()?; + + // Try to clean the cache. Only works if no other processes hold a shared root lock. + if let Some(lock) = fs.try_open_rw_exclusive_create(LOCK_FILENAME)? { + if let Err(err) = clean(&lock) { + log::warn!("Failed to clean sources cache: {err:?}"); } - - // Take shared lock for the lifetime of the cache so any paths we hand out stay valid - let cache_root_lock = cache_root.open_ro_shared_create(LOCK_FILENAME)?; - - Ok(Self { - r, - library_paths, - cache_root, - cache_root_lock, - source_unavailable: RwLock::new(HashSet::new()), - }) + drop(lock); } + // Take shared lock for the lifetime of the cache so any paths we hand out stay valid + let lock = fs.open_ro_shared_create(LOCK_FILENAME)?; + + let shared = PackageCacheShared { + library_paths, + fs, + lock, + }; + let shared = Arc::new(shared); + + let reader = PackageCacheReader { + shared: Arc::clone(&shared), + }; + let writer = PackageCacheWriter { + shared, + r, + source_unavailable: HashSet::new(), + }; + + Ok((reader, writer)) +} + +impl PackageCacheReader { /// Get a package's cached source folder /// - /// May spawn an R subprocess or download from CRAN (in a blocking manner) to - /// generate the sources, so keep that in mind when calling this. + /// Returns [None] if the cached source folder doesn't exist pub fn get(&self, package: &str) -> Option { match self.get_result(package) { Ok(Some(sources)) => Some(sources), @@ -188,33 +220,61 @@ impl PackageCache { } fn get_result(&self, package: &str) -> anyhow::Result> { - let Some(package) = InstalledPackage::find(package, &self.library_paths)? else { + let Some(package) = InstalledPackage::find(package, &self.shared.library_paths)? else { + // Not even installed + return Ok(None); + }; + + // If completion sentinel file is present, the cache folder exists + let destination = self.shared.lock.parent().join(package.key()); + + if !destination.join(METADATA_FILENAME).exists() { + // Not cached + return Ok(None); + } + + Ok(Some(destination)) + } +} + +impl PackageCacheWriter { + pub fn insert(&mut self, package: &str) -> Option { + match self.insert_result(package) { + Ok(Some(sources)) => Some(sources), + Ok(None) => None, + Err(err) => { + log::error!("Failed to get sources for {package}: {err:?}"); + None + }, + } + } + + fn insert_result(&mut self, package: &str) -> anyhow::Result> { + let Some(package) = InstalledPackage::find(package, &self.shared.library_paths)? else { // Not even installed return Ok(None); }; // Read path: completion sentinel present, already exists on disk - let destination = self.cache_root_lock.parent().join(package.key()); + let destination = self.shared.lock.parent().join(package.key()); + if destination.join(METADATA_FILENAME).exists() { + // Already cached return Ok(Some(destination)); } // Check if we've already tried to generate sources for this installed package but // failed. If so, refuse to attempt expensive source generation again. - if self - .source_unavailable - .read() - .is_ok_and(|set| set.contains(package.key())) - { + if self.source_unavailable.contains(package.key()) { return Ok(None); } // Write path let result = if matches!(package.description().priority, Some(Priority::Base)) { // R version to download is the same as the base package version - self.try_populate_base(&package.description().version, &self.library_paths) + self.try_populate_base(&package.description().version, &self.shared.library_paths) } else { - self.try_populate(&package, &self.r, &self.library_paths) + self.try_populate(&package, &self.r, &self.shared.library_paths) }; match result { @@ -222,10 +282,7 @@ impl PackageCache { Ok(false) => { // Unavailable for some reason, maybe package isn't on CRAN. // Never try and generate sources again this session. - self.source_unavailable - .write() - .ok() - .map(|mut set| set.insert(package.key().to_string())); + self.source_unavailable.insert(package.key().to_string()); Ok(None) }, Err(err) => { @@ -236,10 +293,7 @@ impl PackageCache { name = package.name(), version = package.version() ); - self.source_unavailable - .write() - .ok() - .map(|mut set| set.insert(package.key().to_string())); + self.source_unavailable.insert(package.key().to_string()); Ok(None) }, } @@ -276,7 +330,7 @@ impl PackageCache { bytes: &[u8], ) -> anyhow::Result<()> { // Take per-key exclusive lock - let destination = self.cache_root.join(package.key()); + let destination = self.shared.fs.join(package.key()); destination.create_dir()?; let destination_lock = destination.open_rw_exclusive_create(LOCK_FILENAME)?; @@ -328,7 +382,7 @@ impl PackageCache { library_paths: &[Q], ) -> anyhow::Result { // Take per-key exclusive lock - let destination = self.cache_root.join(package.key()); + let destination = self.shared.fs.join(package.key()); destination.create_dir()?; let destination_lock = destination.open_rw_exclusive_create(LOCK_FILENAME)?; @@ -456,83 +510,82 @@ impl PackageCache { std::fs::write(destination_lock.parent().join(METADATA_FILENAME), contents)?; Ok(()) } +} - /// Walks all cache entries and evicts ones that are provably stale. - /// - /// Caller must hold the root exclusive lock. - fn clean(cache_root_lock: &file_lock::FileLock) -> anyhow::Result<()> { - let cache_root = cache_root_lock.parent(); - let now = Utc::now(); +/// Walks all cache entries and evicts ones that are provably stale. +/// +/// Caller must hold the root exclusive lock. +fn clean(cache_root_lock: &file_lock::FileLock) -> anyhow::Result<()> { + let cache_root = cache_root_lock.parent(); + let now = Utc::now(); + + for entry in std::fs::read_dir(cache_root)? { + let entry = entry?; + let path = entry.path(); + + if !entry.file_type()?.is_dir() { + // i.e. `.lock` itself + continue; + } - for entry in std::fs::read_dir(cache_root)? { - let entry = entry?; - let path = entry.path(); + let metadata_path = path.join(METADATA_FILENAME); - if !entry.file_type()?.is_dir() { - // i.e. `.lock` itself - continue; - } - - let metadata_path = path.join(METADATA_FILENAME); + let Ok(metadata_contents) = std::fs::read_to_string(&metadata_path) else { + log::warn!( + "Cleaning {} due to missing or unreadable metadata", + path.display() + ); + crate::fs::remove_dir_all_or_warn(&path); + continue; + }; - let Ok(metadata_contents) = std::fs::read_to_string(&metadata_path) else { + let metadata: Metadata = match serde_json::from_str(&metadata_contents) { + Ok(m) => m, + Err(err) => { log::warn!( - "Cleaning {} due to missing or unreadable metadata", + "Cleaning {} due to unreadable metadata: {err:?}", path.display() ); crate::fs::remove_dir_all_or_warn(&path); continue; - }; - - let metadata: Metadata = match serde_json::from_str(&metadata_contents) { - Ok(m) => m, - Err(err) => { - log::warn!( - "Cleaning {} due to unreadable metadata: {err:?}", - path.display() - ); - crate::fs::remove_dir_all_or_warn(&path); - continue; - }, - }; + }, + }; - // Refuse to do anything if younger than 1 week. The user may be switching - // between CRAN and dev, and we want to keep the cache for the CRAN version - // around. - let age = now.signed_duration_since(metadata.generated_at); - if age < ONE_WEEK { - continue; - } + // Refuse to do anything if younger than 1 week. The user may be switching + // between CRAN and dev, and we want to keep the cache for the CRAN version + // around. + let age = now.signed_duration_since(metadata.generated_at); + if age < ONE_WEEK { + continue; + } - if !metadata.libpath.exists() { - log::trace!("Cleaning {} due to nonexistent libpath", path.display()); - crate::fs::remove_dir_all_or_warn(&path); - continue; - } + if !metadata.libpath.exists() { + log::trace!("Cleaning {} due to nonexistent libpath", path.display()); + crate::fs::remove_dir_all_or_warn(&path); + continue; + } - let package_path = metadata.libpath.join(&metadata.package); + let package_path = metadata.libpath.join(&metadata.package); - if !package_path.exists() { - log::trace!("Cleaning {} due to nonexistent package", path.display()); - crate::fs::remove_dir_all_or_warn(&path); - continue; - } + if !package_path.exists() { + log::trace!("Cleaning {} due to nonexistent package", path.display()); + crate::fs::remove_dir_all_or_warn(&path); + continue; + } - let Ok(description_contents) = - std::fs::read_to_string(package_path.join("DESCRIPTION")) - else { - log::trace!("Cleaning {} due to missing DESCRIPTION", path.display()); - crate::fs::remove_dir_all_or_warn(&path); - continue; - }; + let Ok(description_contents) = std::fs::read_to_string(package_path.join("DESCRIPTION")) + else { + log::trace!("Cleaning {} due to missing DESCRIPTION", path.display()); + crate::fs::remove_dir_all_or_warn(&path); + continue; + }; - if crate::hash::hash(&description_contents) != metadata.description_hash { - log::trace!("Cleaning {} due to changed DESCRIPTION", path.display()); - crate::fs::remove_dir_all_or_warn(&path); - continue; - } + if crate::hash::hash(&description_contents) != metadata.description_hash { + log::trace!("Cleaning {} due to changed DESCRIPTION", path.display()); + crate::fs::remove_dir_all_or_warn(&path); + continue; } - - Ok(()) } + + Ok(()) } diff --git a/crates/oak_sources/src/lib.rs b/crates/oak_sources/src/lib.rs index 4d13b4be2a..ed8a6aab59 100644 --- a/crates/oak_sources/src/lib.rs +++ b/crates/oak_sources/src/lib.rs @@ -11,11 +11,13 @@ pub mod test; use std::path::PathBuf; -pub use cache::PackageCache; +pub use cache::new_cache_pair; +pub use cache::PackageCacheReader; +pub use cache::PackageCacheWriter; /// Trait for an object that can retrieve package sources /// -/// Implemented by the main [crate::cache::PackageCache] itself, but also by +/// Implemented by the [crate::cache::PackageCacheReader] itself, but also by /// [crate::test::TestPackageCache] so that you can generate a test cache that doesn't /// need internet access or access to a live R session. pub trait PackageSources: std::fmt::Debug + Sync + Send {