Skip to content

Progress notification infrastructure#1190

Open
DavisVaughan wants to merge 9 commits into
mainfrom
oak/package-cache-progress
Open

Progress notification infrastructure#1190
DavisVaughan wants to merge 9 commits into
mainfrom
oak/package-cache-progress

Conversation

@DavisVaughan
Copy link
Copy Markdown
Contributor

@DavisVaughan DavisVaughan commented May 1, 2026

Not completely hooked up - no one is sending events to the package sources event loop yet, we are waiting on Salsa support before doing that.

But I feel like the two main core changes are nice:

  • New package sources event loop for managing requests to populate sources without blocking the main loop
  • New progress reporting support added to the auxiliary event loop, as long as the client has indicated that it supports progress updates

You can treat these two features as orthogonal, where the package sources event loop is a user of the progress reporter

@DavisVaughan DavisVaughan force-pushed the oak/package-cache-in-library branch from 5b9bacd to 1934906 Compare May 5, 2026 18:11
Base automatically changed from oak/package-cache-in-library to main May 5, 2026 18:50
@DavisVaughan DavisVaughan force-pushed the oak/package-cache-progress branch from e2dea39 to c10ffff Compare May 6, 2026 16:27
Comment on lines -220 to -227
let library_paths: Vec<PathBuf> = library_paths.into_iter().map(PathBuf::from).collect();

let r = harp::command::r_executable(&r_home);
let package_sources = r
.and_then(|r| PackageCache::new(r, library_paths.clone()).log_err())
.map(|cache| Arc::new(cache) as Arc<dyn oak_sources::PackageSources>);

let library = Library::new(library_paths, package_sources);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have lifted all of this up out of the constructor because:

  • It is needed in the new package sources event loop
  • We eventually want to get library_paths construction out of this constructor anyways (see the FIXME in here)

Comment on lines -247 to -256
pub(crate) fn start(self) -> tokio::task::JoinSet<()> {
let mut set = tokio::task::JoinSet::<()>::new();

// Spawn latency-sensitive auxiliary loop. Must be first to initialise
// global transmission channel.
let aux = AuxiliaryState::new(self.client.clone());
set.spawn(async move { aux.start().await });

// Spawn main loop
set.spawn(async move { self.main_loop().await });
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It always felt weird to me that the GlobalState controlled the spawning of everything

Really we have 3 event loops:

  • GlobalState::start()
  • AuxiliaryState::start()
  • PackageSourceState::start()

start_lsp() now manages all 3 more directly

(It was also kind of required to do it this way due to the fact that PackageSourceState::new() needs a few things that aren't available in GlobalState, so you can't start it from GlobalState)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All AuxiliaryState support for arbitrary progress reporting lives here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Package sources event loop lives here


None
// We fully expect to be able to find an executable, it is required for many tasks
panic!("Can't find R executable relative to {r_home:?}")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was getting annoying for this to be optional, and it seems like critical info

Comment on lines +39 to +48
/// # Reader / Writer split
///
/// The cache is split into [PackageCacheReader] and [PackageCacheWriter].
///
/// - [PackageCacheReader::get] reads from the cache and returns instantly. If the
/// package isn't in the cache, it simply returns [None].
///
/// - [PackageCacheWriter::insert] writes into the cache if the package sources aren't
/// already in the cache. Insertion can be very expensive, and takes place on a
/// dedicated tokio task to avoid blocking the main loop.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most important change

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems much cleaner and easier to reason about / work with.

/// Both carry the shared cache root lock, because both look at the cache and hand out
/// paths into it.
#[derive(Debug)]
struct PackageCacheShared {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be useful in the future as well

If the Reader needs to check if there are packages in the queue (i.e. if you just wait a moment they will be ready) then maybe we can store some shared state about them in an RwLock here, which is data shared between a Reader/Writer pair. That was my eventual plan, at least.

@DavisVaughan DavisVaughan marked this pull request as ready for review May 7, 2026 15:02
@DavisVaughan DavisVaughan changed the title POC of progress notifications Progress notification infrastructure May 7, 2026
@DavisVaughan DavisVaughan requested a review from lionel- May 7, 2026 15:05
Copy link
Copy Markdown
Contributor

@lionel- lionel- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

I've gathered some thoughts about how to integrate it with the Salsa database at #1234

client: Client,

/// Event channels for the main loop. The tower-lsp methods forward
/// Event channel for recieving on the main loop. The tower-lsp methods forward
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Event channel for recieving on the main loop. The tower-lsp methods forward
/// Event channel for receiving on the main loop. The tower-lsp methods forward

.map(|reader| Arc::new(reader) as Arc<dyn oak_sources::PackageSources>);

// Package cache reader goes to `Library`
let library = Library::new(library_paths, package_sources);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With #1221 Library becomes a Salsa input that you can set with db.set_library_paths().

Comment on lines +39 to +48
/// # Reader / Writer split
///
/// The cache is split into [PackageCacheReader] and [PackageCacheWriter].
///
/// - [PackageCacheReader::get] reads from the cache and returns instantly. If the
/// package isn't in the cache, it simply returns [None].
///
/// - [PackageCacheWriter::insert] writes into the cache if the package sources aren't
/// already in the cache. Insertion can be very expensive, and takes place on a
/// dedicated tokio task to avoid blocking the main loop.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems much cleaner and easier to reason about / work with.

Comment on lines +134 to +137
/// Set of packages which are installed, but we failed to populate their sources (from
/// CRAN or srcrefs). If we request sources for one of these packages a second time,
/// we don't attempt expensive source generation again.
source_unavailable: HashSet<String>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It now seems to me this sort of defensive measure should be handled one level higher. I.e. what if the caller/owner decides it wants to retry?

Now that we no longer try to do too much in oak_sources (e.g. populat sources from a get() call), the crate feels lower level than before.

// We don't populate packages concurrently, so we don't need per package ids
let id = String::from("package-sources");

// TODO: This currently reports progress when its already in the cache (i.e. dplyr
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll be able to set up the driver to avoid requesting sources when they already exist. It should maintain a set of packages that either already had sources or sent a populate request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants