diff --git a/src/rust/fs/src/lib.rs b/src/rust/fs/src/lib.rs index 1b7f78fdda5..3dbd9f2ac85 100644 --- a/src/rust/fs/src/lib.rs +++ b/src/rust/fs/src/lib.rs @@ -426,11 +426,9 @@ impl PosixFS { pub async fn scandir(&self, dir_relative_to_root: Dir) -> Result { let vfs = self.clone(); self.executor - .spawn_blocking( - move || vfs.scandir_sync(&dir_relative_to_root), - |e| Err(io::Error::other(format!("Synchronous scandir failed: {e}"))), - ) - .await + .spawn_blocking(move || vfs.scandir_sync(&dir_relative_to_root)) + .await? + .map_err(|e| io::Error::other(format!("Synchronous scandir failed: {e}"))) } fn scandir_sync(&self, dir_relative_to_root: &Dir) -> Result { diff --git a/src/rust/fs/store/src/local.rs b/src/rust/fs/store/src/local.rs index 7d1bfe39870..cf608ba9455 100644 --- a/src/rust/fs/store/src/local.rs +++ b/src/rust/fs/store/src/local.rs @@ -203,21 +203,20 @@ impl ShardedFSDB { .map_err(|e| format!("Failed to create directory: {e}"))?; let (src_file, dst_dir) = fsdb .executor - .spawn_blocking( - move || { - let src_file = Builder::new() - .suffix(".hardlink_canary") - .tempfile_in(&fsdb.root) - .map_err(|e| format!("Failed to create hardlink canary file: {e}"))?; - let dst_dir = Builder::new() - .suffix(".hardlink_canary") - .tempdir_in(dst_parent_dir) - .map_err(|e| format!("Failed to create hardlink canary dir: {e}"))?; - Ok((src_file, dst_dir)) - }, - |e| Err(format!("hardlink canary temp files task failed: {e}")), - ) - .await?; + .spawn_blocking(move || { + let src_file = Builder::new() + .suffix(".hardlink_canary") + .tempfile_in(&fsdb.root) + .map_err(|e| format!("Failed to create hardlink canary file: {e}"))?; + let dst_dir = Builder::new() + .suffix(".hardlink_canary") + .tempdir_in(dst_parent_dir) + .map_err(|e| format!("Failed to create hardlink canary dir: {e}"))?; + Ok((src_file, dst_dir)) + }) + .await + .map_err(|e| format!("Failed to join Tokio task: {e}"))? + .map_err(|e: String| format!("hardlink canary temp files task failed: {e}"))?; let dst_file = dst_dir.path().join("hard_link"); let is_hardlinkable = hard_link(src_file, dst_file).await.is_ok(); log::debug!("{src_display} -> {dst_display} hardlinkable: {is_hardlinkable}"); @@ -289,16 +288,15 @@ impl ShardedFSDB { // have to worry about parent dirs. let named_temp_file = self .executor - .spawn_blocking( - move || { - Builder::new() - .suffix(".tmp") - .tempfile_in(dest_path2.parent().unwrap()) - .map_err(|e| format!("Failed to create temp file: {e}")) - }, - |e| Err(format!("temp file creation task failed: {e}")), - ) - .await?; + .spawn_blocking(move || { + Builder::new() + .suffix(".tmp") + .tempfile_in(dest_path2.parent().unwrap()) + .map_err(|e| format!("Failed to create temp file: {e}")) + }) + .await + .map_err(|e| format!("Failed to join Tokio task: {e}"))? + .map_err(|e| format!("temp file creation task failed: {e}"))?; let (std_file, tmp_path) = named_temp_file .keep() .map_err(|e| format!("Failed to keep temp file: {e}"))?; @@ -366,14 +364,13 @@ impl UnderlyingByteStore for ShardedFSDB { async fn lease(&self, fingerprint: Fingerprint) -> Result<(), String> { let path = self.get_path(fingerprint); self.executor - .spawn_blocking( - move || { - fs_set_times::set_mtime(&path, fs_set_times::SystemTimeSpec::SymbolicNow) - .map_err(|e| format!("Failed to extend mtime of {path:?}: {e}")) - }, - |e| Err(format!("`lease` task failed: {e}")), - ) + .spawn_blocking(move || { + fs_set_times::set_mtime(&path, fs_set_times::SystemTimeSpec::SymbolicNow) + .map_err(|e| format!("Failed to extend mtime of {path:?}: {e}")) + }) .await + .map_err(|e| format!("Failed to join Tokio task: {e}"))? + .map_err(|e| format!("`lease` task failed: {e}")) } async fn remove(&self, fingerprint: Fingerprint) -> Result { @@ -470,61 +467,57 @@ impl UnderlyingByteStore for ShardedFSDB { let root = self.root.clone(); let expiration_time = SystemTime::now() - self.lease_time; self.executor - .spawn_blocking( - move || { - let maybe_shards = std::fs::read_dir(&root); - let mut fingerprints = vec![]; - if let Ok(shards) = maybe_shards { - for entry in shards { - let shard = - entry.map_err(|e| format!("Error iterating dir {root:?}: {e}."))?; - let large_files = std::fs::read_dir(shard.path()) - .map_err(|e| format!("Failed to read shard directory: {e}."))?; - for entry in large_files { - let large_file = entry.map_err(|e| { - format!( - "Error iterating dir {:?}: {e}", - shard.path().file_name() - ) - })?; - let path = large_file.path(); - if path.extension().is_some() { - continue; // NB: This is a tempfile - } - - let hash = path.file_name().unwrap().to_str().unwrap(); - let (length, mtime) = large_file - .metadata() - .and_then(|metadata| { - let length = metadata.len(); - let mtime = metadata.modified()?; - Ok((length, mtime)) - }) - .map_err(|e| { - format!("Could not access metadata for {path:?}: {e}") - })?; - - let expired_seconds_ago = expiration_time - .duration_since(mtime) - .map(|t| t.as_secs()) - // 0 indicates unexpired. - .unwrap_or(0); - - fingerprints.push(AgedFingerprint { - expired_seconds_ago, - fingerprint: Fingerprint::from_hex_string(hash).map_err( - |e| format!("Invalid file store entry at {path:?}: {e}"), - )?, - size_bytes: length as usize, - }); + .spawn_blocking(move || { + let maybe_shards = std::fs::read_dir(&root); + let mut fingerprints = vec![]; + if let Ok(shards) = maybe_shards { + for entry in shards { + let shard = + entry.map_err(|e| format!("Error iterating dir {root:?}: {e}."))?; + let large_files = std::fs::read_dir(shard.path()) + .map_err(|e| format!("Failed to read shard directory: {e}."))?; + for entry in large_files { + let large_file = entry.map_err(|e| { + format!("Error iterating dir {:?}: {e}", shard.path().file_name()) + })?; + let path = large_file.path(); + if path.extension().is_some() { + continue; // NB: This is a tempfile } + + let hash = path.file_name().unwrap().to_str().unwrap(); + let (length, mtime) = large_file + .metadata() + .and_then(|metadata| { + let length = metadata.len(); + let mtime = metadata.modified()?; + Ok((length, mtime)) + }) + .map_err(|e| { + format!("Could not access metadata for {path:?}: {e}") + })?; + + let expired_seconds_ago = expiration_time + .duration_since(mtime) + .map(|t| t.as_secs()) + // 0 indicates unexpired. + .unwrap_or(0); + + fingerprints.push(AgedFingerprint { + expired_seconds_ago, + fingerprint: Fingerprint::from_hex_string(hash).map_err(|e| { + format!("Invalid file store entry at {path:?}: {e}") + })?, + size_bytes: length as usize, + }); } } - Ok(fingerprints) - }, - |e| Err(format!("`aged_fingerprints` task failed: {e}")), - ) + } + Ok(fingerprints) + }) .await + .map_err(|e| format!("Failed to join Tokio task: {e}"))? + .map_err(|e: String| format!("`aged_fingerprints` task failed: {e}")) } } diff --git a/src/rust/nailgun/src/server.rs b/src/rust/nailgun/src/server.rs index e598bc4b9b1..48c94b59424 100644 --- a/src/rust/nailgun/src/server.rs +++ b/src/rust/nailgun/src/server.rs @@ -240,26 +240,28 @@ impl Nail for RawFdNail { // Spawn the underlying function as a blocking task, and capture its exit code to append to the // output stream. + let executor = self.executor.clone(); let nail = self.clone(); - let exit_code = self - .executor - .spawn_blocking( - move || { - // NB: This closure captures the stdio handles, and will drop/close them when it completes. - (nail.runner)(RawFdExecution { - cmd, - cancelled, - stdin_fd: stdin_handle.as_raw_fd(), - stdout_fd: stdout_handle.as_raw_fd(), - stderr_fd: stderr_handle.as_raw_fd(), - }) - }, - |e| { + let exit_code_join = executor.spawn_blocking(move || { + // NB: This closure captures the stdio handles, and will drop/close them when it completes. + (nail.runner)(RawFdExecution { + cmd, + cancelled, + stdin_fd: stdin_handle.as_raw_fd(), + stdout_fd: stdout_handle.as_raw_fd(), + stderr_fd: stderr_handle.as_raw_fd(), + }) + }); + let exit_code = async move { + match exit_code_join.await { + Ok(code) => code, + Err(e) => { log::warn!("Server exited uncleanly: {e}"); ExitCode(1) - }, - ) - .boxed(); + } + } + } + .boxed(); // Select a single stdout/stderr stream. let stdout_stream = stdout_stream.map_ok(ChildOutput::Stdout); diff --git a/src/rust/process_execution/docker/src/docker.rs b/src/rust/process_execution/docker/src/docker.rs index bb01f006896..52f9ad2ba8a 100644 --- a/src/rust/process_execution/docker/src/docker.rs +++ b/src/rust/process_execution/docker/src/docker.rs @@ -279,50 +279,45 @@ async fn credentials_for_image( let server = server.to_owned(); executor - .spawn_blocking( - move || { - // Resolve the server as a DNS name to confirm that it is actually a registry. - let Ok(_) = (server.as_ref(), 80) - .to_socket_addrs() - .or_else(|_| server.to_socket_addrs()) - else { + .spawn_blocking(move || { + // Resolve the server as a DNS name to confirm that it is actually a registry. + let Ok(_) = (server.as_ref(), 80) + .to_socket_addrs() + .or_else(|_| server.to_socket_addrs()) + else { + return Ok(None); + }; + + // TODO: https://github.com/keirlawson/docker_credential/issues/7 means that this will only + // work for credential helpers and credentials encoded directly in the docker config, + // rather than for general credStore implementations. + let credential = match docker_credential::get_credential(&server) { + Ok(credential) => credential, + Err(e) => { + log::warn!("Failed to retrieve Docker credentials for server `{server}`: {e}"); return Ok(None); - }; + } + }; - // TODO: https://github.com/keirlawson/docker_credential/issues/7 means that this will only - // work for credential helpers and credentials encoded directly in the docker config, - // rather than for general credStore implementations. - let credential = match docker_credential::get_credential(&server) { - Ok(credential) => credential, - Err(e) => { - log::warn!( - "Failed to retrieve Docker credentials for server `{server}`: {e}" - ); - return Ok(None); + let bollard_credentials = match credential { + docker_credential::DockerCredential::IdentityToken(token) => DockerCredentials { + identitytoken: Some(token), + ..DockerCredentials::default() + }, + docker_credential::DockerCredential::UsernamePassword(username, password) => { + DockerCredentials { + username: Some(username), + password: Some(password), + ..DockerCredentials::default() } - }; - - let bollard_credentials = match credential { - docker_credential::DockerCredential::IdentityToken(token) => { - DockerCredentials { - identitytoken: Some(token), - ..DockerCredentials::default() - } - } - docker_credential::DockerCredential::UsernamePassword(username, password) => { - DockerCredentials { - username: Some(username), - password: Some(password), - ..DockerCredentials::default() - } - } - }; + } + }; - Ok(Some(bollard_credentials)) - }, - |e| Err(format!("Credentials task failed: {e}")), - ) + Ok(Some(bollard_credentials)) + }) .await + .map_err(|e| format!("Failed to join Tokio task: {e}"))? + .map_err(|e: String| format!("Credentials task failed: {e}")) } /// Pull an image given its name and the image pull policy. This method is debounced by diff --git a/src/rust/process_execution/pe_nailgun/src/nailgun_pool.rs b/src/rust/process_execution/pe_nailgun/src/nailgun_pool.rs index 755a782a67d..a52eeee0e09 100644 --- a/src/rust/process_execution/pe_nailgun/src/nailgun_pool.rs +++ b/src/rust/process_execution/pe_nailgun/src/nailgun_pool.rs @@ -389,14 +389,13 @@ impl NailgunProcess { // Spawn the process and read its port from stdout. let (child, port) = executor - .spawn_blocking( - { - let workdir = workdir.path().to_owned(); - move || spawn_and_read_port(startup_options, workdir) - }, - |e| Err(format!("Nailgun spawn task failed: {e}")), - ) - .await?; + .spawn_blocking({ + let workdir = workdir.path().to_owned(); + move || spawn_and_read_port(startup_options, workdir) + }) + .await + .map_err(|e| format!("Failed to join Tokio task: {e}"))? + .map_err(|e| format!("Nailgun spawn task failed: {e}"))?; debug!( "Created nailgun server process with pid {} and port {}", child.id(), @@ -549,7 +548,7 @@ async fn clear_workdir( future::try_join_all(moves).await?; // And drop it in the background. - let fut = executor.native_spawn_blocking(move || std::mem::drop(garbage_dir)); + let fut = executor.spawn_blocking(move || std::mem::drop(garbage_dir)); drop(fut); Ok(()) diff --git a/src/rust/process_execution/src/local.rs b/src/rust/process_execution/src/local.rs index 31f66be4a3d..83793307030 100644 --- a/src/rust/process_execution/src/local.rs +++ b/src/rust/process_execution/src/local.rs @@ -817,7 +817,7 @@ impl AsyncDropSandbox { impl Drop for AsyncDropSandbox { fn drop(&mut self) { if let Some(sandbox) = self.2.take() { - let _background_cleanup = self.0.native_spawn_blocking(|| std::mem::drop(sandbox)); + let _background_cleanup = self.0.spawn_blocking(|| std::mem::drop(sandbox)); } } } diff --git a/src/rust/sharded_lmdb/src/lib.rs b/src/rust/sharded_lmdb/src/lib.rs index fb8f1901ce6..c1c1ea3f38b 100644 --- a/src/rust/sharded_lmdb/src/lib.rs +++ b/src/rust/sharded_lmdb/src/lib.rs @@ -271,34 +271,33 @@ impl ShardedLmdb { pub async fn remove(&self, fingerprint: Fingerprint) -> Result { let store = self.clone(); self.executor - .spawn_blocking( - move || { - let effective_key = - VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION); - let (env, db, lease_database) = store.get(&fingerprint); - let del_res = env.begin_rw_txn().and_then(|mut txn| { - txn.del(db, &effective_key, None)?; - txn.del(lease_database, &effective_key, None) - .or_else(|err| match err { - lmdb::Error::NotFound => Ok(()), - err => Err(err), - })?; - txn.commit() - }); - - match del_res { - Ok(()) => Ok(true), - Err(lmdb::Error::NotFound) => Ok(false), - Err(err) => Err(format!( - "Error removing versioned key {:?}: {}", - effective_key.to_hex(), - err - )), - } - }, - |e| Err(format!("`remove` task failed: {e}")), - ) + .spawn_blocking(move || { + let effective_key = + VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION); + let (env, db, lease_database) = store.get(&fingerprint); + let del_res = env.begin_rw_txn().and_then(|mut txn| { + txn.del(db, &effective_key, None)?; + txn.del(lease_database, &effective_key, None) + .or_else(|err| match err { + lmdb::Error::NotFound => Ok(()), + err => Err(err), + })?; + txn.commit() + }); + + match del_res { + Ok(()) => Ok(true), + Err(lmdb::Error::NotFound) => Ok(false), + Err(err) => Err(format!( + "Error removing versioned key {:?}: {}", + effective_key.to_hex(), + err + )), + } + }) .await + .map_err(|e| format!("Failed to join Tokio task: {e}"))? + .map_err(|e| format!("`remove` task failed: {e}")) } /// @@ -320,56 +319,55 @@ impl ShardedLmdb { ) -> Result, String> { let store = self.clone(); self.executor - .spawn_blocking( - move || { - // Group the items by the Environment that they will be applied to. - let mut items_by_env = HashMap::new(); - let mut exists = HashSet::new(); - - for fingerprint in &fingerprints { - let effective_key = - VersionedFingerprint::new(*fingerprint, ShardedLmdb::SCHEMA_VERSION); - let (env_id, _, env, db, _) = store.get_raw(&fingerprint.0); - - let (_, _, batch) = items_by_env - .entry(*env_id) - .or_insert_with(|| (env.clone(), *db, vec![])); - batch.push(effective_key); - } + .spawn_blocking(move || { + // Group the items by the Environment that they will be applied to. + let mut items_by_env = HashMap::new(); + let mut exists = HashSet::new(); - // Open and commit a Transaction per Environment. Since we never have more than one - // Transaction open at a time, we don't have to worry about ordering. - for (_, (env, db, batch)) in items_by_env { - env.begin_ro_txn() - .and_then(|txn| { - for effective_key in &batch { - let get_res = txn.get(db, &effective_key); - match get_res { - Ok(_) => { - exists.insert(effective_key.get_fingerprint()); - } - Err(lmdb::Error::NotFound) => (), - Err(err) => return Err(err), - }; - } - txn.commit() - }) - .map_err(|e| { - format!( - "Error checking existence of fingerprints {:?}: {}", - batch - .iter() - .map(|key| key.get_fingerprint()) - .collect::>(), - e - ) - })?; - } - Ok(exists) - }, - |e| Err(format!("`exists_batch` task failed: {e}")), - ) + for fingerprint in &fingerprints { + let effective_key = + VersionedFingerprint::new(*fingerprint, ShardedLmdb::SCHEMA_VERSION); + let (env_id, _, env, db, _) = store.get_raw(&fingerprint.0); + + let (_, _, batch) = items_by_env + .entry(*env_id) + .or_insert_with(|| (env.clone(), *db, vec![])); + batch.push(effective_key); + } + + // Open and commit a Transaction per Environment. Since we never have more than one + // Transaction open at a time, we don't have to worry about ordering. + for (_, (env, db, batch)) in items_by_env { + env.begin_ro_txn() + .and_then(|txn| { + for effective_key in &batch { + let get_res = txn.get(db, &effective_key); + match get_res { + Ok(_) => { + exists.insert(effective_key.get_fingerprint()); + } + Err(lmdb::Error::NotFound) => (), + Err(err) => return Err(err), + }; + } + txn.commit() + }) + .map_err(|e| { + format!( + "Error checking existence of fingerprints {:?}: {}", + batch + .iter() + .map(|key| key.get_fingerprint()) + .collect::>(), + e + ) + })?; + } + Ok(exists) + }) .await + .map_err(|e| format!("Failed to join Tokio task: {e}"))? + .map_err(|e: String| format!("`exists_batch` task failed: {e}")) } /// @@ -378,62 +376,58 @@ impl ShardedLmdb { pub async fn all_fingerprints(&self) -> Result, String> { let store = self.clone(); self.executor - .spawn_blocking( - move || { - let mut fingerprints = Vec::new(); - for (env, database, lease_database) in &store.all_lmdbs() { - let txn = env.begin_ro_txn().map_err(|err| { - format!("Error beginning transaction to garbage collect: {err}") - })?; - let mut cursor = txn - .open_ro_cursor(*database) - .map_err(|err| format!("Failed to open lmdb read cursor: {err}"))?; - for key_res in cursor.iter() { - let (key, bytes) = key_res.map_err(|err| { - format!("Failed to advance lmdb read cursor: {err}") - })?; - - // Random access into the lease_database is slower than iterating, but hopefully garbage - // collection is rare enough that we can get away with this, rather than do two passes - // here (either to populate leases into pre-populated AgedFingerprints, or to read sizes - // when we delete from lmdb to track how much we've freed). - let lease_until_unix_timestamp = txn - .get(*lease_database, &key) - .map(|b| { - let mut array = [0_u8; 8]; - array.copy_from_slice(b); - u64::from_le_bytes(array) - }) - .unwrap_or_else(|e| match e { - lmdb::Error::NotFound => 0, - e => panic!( - "Error reading lease, probable lmdb corruption: {e:?}" - ), - }); - - let leased_until = - time::UNIX_EPOCH + Duration::from_secs(lease_until_unix_timestamp); - - let expired_seconds_ago = time::SystemTime::now() - .duration_since(leased_until) - .map(|t| t.as_secs()) - // 0 indicates unexpired. - .unwrap_or(0); - - let v = VersionedFingerprint::from_bytes_unsafe(key); - let fingerprint = v.get_fingerprint(); - fingerprints.push(AgedFingerprint { - expired_seconds_ago, - fingerprint, - size_bytes: bytes.len(), + .spawn_blocking(move || { + let mut fingerprints = Vec::new(); + for (env, database, lease_database) in &store.all_lmdbs() { + let txn = env.begin_ro_txn().map_err(|err| { + format!("Error beginning transaction to garbage collect: {err}") + })?; + let mut cursor = txn + .open_ro_cursor(*database) + .map_err(|err| format!("Failed to open lmdb read cursor: {err}"))?; + for key_res in cursor.iter() { + let (key, bytes) = key_res + .map_err(|err| format!("Failed to advance lmdb read cursor: {err}"))?; + + // Random access into the lease_database is slower than iterating, but hopefully garbage + // collection is rare enough that we can get away with this, rather than do two passes + // here (either to populate leases into pre-populated AgedFingerprints, or to read sizes + // when we delete from lmdb to track how much we've freed). + let lease_until_unix_timestamp = txn + .get(*lease_database, &key) + .map(|b| { + let mut array = [0_u8; 8]; + array.copy_from_slice(b); + u64::from_le_bytes(array) + }) + .unwrap_or_else(|e| match e { + lmdb::Error::NotFound => 0, + e => panic!("Error reading lease, probable lmdb corruption: {e:?}"), }); - } + + let leased_until = + time::UNIX_EPOCH + Duration::from_secs(lease_until_unix_timestamp); + + let expired_seconds_ago = time::SystemTime::now() + .duration_since(leased_until) + .map(|t| t.as_secs()) + // 0 indicates unexpired. + .unwrap_or(0); + + let v = VersionedFingerprint::from_bytes_unsafe(key); + let fingerprint = v.get_fingerprint(); + fingerprints.push(AgedFingerprint { + expired_seconds_ago, + fingerprint, + size_bytes: bytes.len(), + }); } - Ok(fingerprints) - }, - |e| Err(format!("`all_fingerprints` task failed: {e}")), - ) + } + Ok(fingerprints) + }) .await + .map_err(|e| format!("Failed to join Tokio task: {e}"))? + .map_err(|e: String| format!("`all_fingerprints` task failed: {e}")) } /// @@ -464,68 +458,63 @@ impl ShardedLmdb { ) -> Result<(), String> { let store = self.clone(); self.executor - .spawn_blocking( - move || { - // Group the items by the Environment that they will be applied to. - let mut items_by_env = HashMap::new(); - let mut fingerprints = Vec::new(); - for (fingerprint, bytes) in items { - let effective_key = - VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION); - let (env_id, _, env, db, lease_database) = store.get_raw(&fingerprint.0); - - let (_, _, _, batch) = items_by_env - .entry(*env_id) - .or_insert_with(|| (env.clone(), *db, *lease_database, vec![])); - batch.push((effective_key, bytes)); - fingerprints.push(fingerprint); - } - - // Open and commit a Transaction per Environment. Since we never have more than one - // Transaction open at a time, we don't have to worry about ordering. - for (_, (env, db, lease_database, batch)) in items_by_env { - env.begin_rw_txn() - .and_then(|mut txn| { - for (effective_key, bytes) in &batch { - let put_res = txn.put( - db, - &effective_key, - &bytes, - WriteFlags::NO_OVERWRITE, - ); - match put_res { - Ok(()) => (), - Err(lmdb::Error::KeyExist) => continue, - Err(err) => return Err(err), - } - if initial_lease { - store.lease_inner( - lease_database, - effective_key, - store.lease_until_secs_since_epoch(), - &mut txn, - )?; - } + .spawn_blocking(move || { + // Group the items by the Environment that they will be applied to. + let mut items_by_env = HashMap::new(); + let mut fingerprints = Vec::new(); + for (fingerprint, bytes) in items { + let effective_key = + VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION); + let (env_id, _, env, db, lease_database) = store.get_raw(&fingerprint.0); + + let (_, _, _, batch) = items_by_env + .entry(*env_id) + .or_insert_with(|| (env.clone(), *db, *lease_database, vec![])); + batch.push((effective_key, bytes)); + fingerprints.push(fingerprint); + } + + // Open and commit a Transaction per Environment. Since we never have more than one + // Transaction open at a time, we don't have to worry about ordering. + for (_, (env, db, lease_database, batch)) in items_by_env { + env.begin_rw_txn() + .and_then(|mut txn| { + for (effective_key, bytes) in &batch { + let put_res = + txn.put(db, &effective_key, &bytes, WriteFlags::NO_OVERWRITE); + match put_res { + Ok(()) => (), + Err(lmdb::Error::KeyExist) => continue, + Err(err) => return Err(err), } - txn.commit() - }) - .map_err(|e| { - format!( - "Error storing fingerprints {:?}: {}", - batch - .iter() - .map(|(key, _)| key.to_hex()) - .collect::>(), - e - ) - })?; - } + if initial_lease { + store.lease_inner( + lease_database, + effective_key, + store.lease_until_secs_since_epoch(), + &mut txn, + )?; + } + } + txn.commit() + }) + .map_err(|e| { + format!( + "Error storing fingerprints {:?}: {}", + batch + .iter() + .map(|(key, _)| key.to_hex()) + .collect::>(), + e + ) + })?; + } - Ok(()) - }, - |e| Err(format!("`store_bytes_batch` task failed: {e}")), - ) + Ok(()) + }) .await + .map_err(|e| format!("Failed to join Tokio task: {e}"))? + .map_err(|e: String| format!("`store_bytes_batch` task failed: {e}")) } /// @@ -548,104 +537,103 @@ impl ShardedLmdb { { let store = self.clone(); self.executor - .spawn_blocking( - move || { - let mut attempts = 0; - loop { - let effective_key = VersionedFingerprint::new( - expected_digest.hash, - ShardedLmdb::SCHEMA_VERSION, - ); - let (env, db, lease_database) = store.get(&expected_digest.hash); - let put_res: Result<(), StoreError> = - env.begin_rw_txn() - .map_err(StoreError::Lmdb) - .and_then(|mut txn| { - // Second pass: copy into the reserved memory. - let mut writer = txn - .reserve( - db, - &effective_key, - expected_digest.size_bytes, - WriteFlags::NO_OVERWRITE, - )? - .writer(); - let mut read = data_provider() - .map_err(|e| format!("Failed to read: {e}"))?; - let should_retry = - !sync_verified_copy(expected_digest, data_is_immutable, &mut read, &mut writer) - .map_err(|e| { - format!("Failed to copy from {read:?} or store in {env:?}: {e:?}") - })?; - - if should_retry { - let msg = format!("Input {read:?} changed while reading."); - log::debug!("{msg}"); - return Err(StoreError::Retry(msg)); - } + .spawn_blocking(move || { + let mut attempts = 0; + loop { + let effective_key = VersionedFingerprint::new( + expected_digest.hash, + ShardedLmdb::SCHEMA_VERSION, + ); + let (env, db, lease_database) = store.get(&expected_digest.hash); + let put_res: Result<(), StoreError> = env + .begin_rw_txn() + .map_err(StoreError::Lmdb) + .and_then(|mut txn| { + // Second pass: copy into the reserved memory. + let mut writer = txn + .reserve( + db, + &effective_key, + expected_digest.size_bytes, + WriteFlags::NO_OVERWRITE, + )? + .writer(); + let mut read = + data_provider().map_err(|e| format!("Failed to read: {e}"))?; + let should_retry = !sync_verified_copy( + expected_digest, + data_is_immutable, + &mut read, + &mut writer, + ) + .map_err(|e| { + format!("Failed to copy from {read:?} or store in {env:?}: {e:?}") + })?; - if initial_lease { - store.lease_inner( - lease_database, - &effective_key, - store.lease_until_secs_since_epoch(), - &mut txn, - )?; - } - txn.commit()?; - Ok(()) - }); - - match put_res { - Ok(()) => return Ok(()), - Err(StoreError::Retry(msg)) => { - // Input changed during reading: maybe retry. - if attempts > 10 { - return Err(msg); - } else { - attempts += 1; - } + if should_retry { + let msg = format!("Input {read:?} changed while reading."); + log::debug!("{msg}"); + return Err(StoreError::Retry(msg)); } - Err(StoreError::Lmdb(lmdb::Error::KeyExist)) => return Ok(()), - Err(StoreError::Lmdb(err)) => { - return Err(format!("Error storing {expected_digest:?}: {err}")); + + if initial_lease { + store.lease_inner( + lease_database, + &effective_key, + store.lease_until_secs_since_epoch(), + &mut txn, + )?; } - Err(StoreError::Io(err)) => { - return Err(format!("Error storing {expected_digest:?}: {err}")); + txn.commit()?; + Ok(()) + }); + + match put_res { + Ok(()) => return Ok(()), + Err(StoreError::Retry(msg)) => { + // Input changed during reading: maybe retry. + if attempts > 10 { + return Err(msg); + } else { + attempts += 1; } - }; - } - }, - |e| Err(format!("`store` task failed: {e}")), - ) + } + Err(StoreError::Lmdb(lmdb::Error::KeyExist)) => return Ok(()), + Err(StoreError::Lmdb(err)) => { + return Err(format!("Error storing {expected_digest:?}: {err}")); + } + Err(StoreError::Io(err)) => { + return Err(format!("Error storing {expected_digest:?}: {err}")); + } + }; + } + }) .await + .map_err(|e| format!("Failed to join Tokio task: {e}"))? + .map_err(|e| format!("`store` task failed: {e}")) } pub async fn lease(&self, fingerprint: Fingerprint) -> Result<(), String> { let store = self.clone(); self.executor - .spawn_blocking( - move || { - let until_secs_since_epoch: u64 = store.lease_until_secs_since_epoch(); - let (env, _, lease_database) = store.get(&fingerprint); - env.begin_rw_txn() - .and_then(|mut txn| { - store.lease_inner( - lease_database, - &VersionedFingerprint::new( - fingerprint, - ShardedLmdb::SCHEMA_VERSION, - ), - until_secs_since_epoch, - &mut txn, - )?; - txn.commit() - }) - .map_err(|e| format!("Error leasing {fingerprint:?}: {e}")) - }, - |e| Err(format!("`lease` task failed: {e}")), - ) + .spawn_blocking(move || { + let until_secs_since_epoch: u64 = store.lease_until_secs_since_epoch(); + let (env, _, lease_database) = store.get(&fingerprint); + env.begin_rw_txn() + .and_then(|mut txn| { + store.lease_inner( + lease_database, + &VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION), + until_secs_since_epoch, + &mut txn, + )?; + txn.commit() + }) + .map_err(|e| format!("Error leasing {fingerprint:?}: {e}")) + }) .await + .map_err(|e| format!("Failed to join Tokio task: {e}"))? + .map_err(|e| format!("`lease` task failed: {e}")) } fn lease_inner( @@ -681,25 +669,24 @@ impl ShardedLmdb { let store = self.clone(); let effective_key = VersionedFingerprint::new(fingerprint, ShardedLmdb::SCHEMA_VERSION); self.executor - .spawn_blocking( - move || { - let (env, db, _) = store.get(&fingerprint); - let ro_txn = env - .begin_ro_txn() - .map_err(|err| format!("Failed to begin read transaction: {err}"))?; - match ro_txn.get(db, &effective_key) { - Ok(bytes) => f(bytes).map(Some), - Err(lmdb::Error::NotFound) => Ok(None), - Err(err) => Err(format!( - "Error loading versioned key {:?}: {}", - effective_key.to_hex(), - err, - )), - } - }, - |e| Err(format!("`load_bytes_with` task failed: {e}")), - ) + .spawn_blocking(move || { + let (env, db, _) = store.get(&fingerprint); + let ro_txn = env + .begin_ro_txn() + .map_err(|err| format!("Failed to begin read transaction: {err}"))?; + match ro_txn.get(db, &effective_key) { + Ok(bytes) => f(bytes).map(Some), + Err(lmdb::Error::NotFound) => Ok(None), + Err(err) => Err(format!( + "Error loading versioned key {:?}: {}", + effective_key.to_hex(), + err, + )), + } + }) .await + .map_err(|e| format!("Failed to join Tokio task: {e}"))? + .map_err(|e| format!("`load_bytes_with` task failed: {e}")) } #[allow(clippy::useless_conversion)] // False positive: https://github.com/rust-lang/rust-clippy/issues/3913 diff --git a/src/rust/task_executor/Cargo.toml b/src/rust/task_executor/Cargo.toml index 27b9850bafa..e214054c781 100644 --- a/src/rust/task_executor/Cargo.toml +++ b/src/rust/task_executor/Cargo.toml @@ -1,6 +1,6 @@ [package] version = "0.0.1" -edition = "2021" +edition = "2024" name = "task_executor" authors = ["Pants Build "] publish = false diff --git a/src/rust/task_executor/src/lib.rs b/src/rust/task_executor/src/lib.rs index 10b98102f15..e60e6c95edd 100644 --- a/src/rust/task_executor/src/lib.rs +++ b/src/rust/task_executor/src/lib.rs @@ -174,32 +174,11 @@ impl Executor { self.handle.block_on(future_with_correct_context(future)) } - /// - /// Spawn a Future on a threadpool specifically reserved for I/O tasks which are allowed to be - /// long-running. - /// - /// If the background Task exits abnormally, the given closure will be called to recover: usually - /// it should convert the resulting Error to a relevant error type. - /// - /// If the returned Future is dropped, the computation will still continue to completion: see - /// - /// - pub fn spawn_blocking R + Send + 'static, R: Send + 'static>( - &self, - f: F, - rescue_join_error: impl FnOnce(JoinError) -> R, - ) -> impl Future { - self.native_spawn_blocking(f).map(|res| match res { - Ok(o) => o, - Err(e) => rescue_join_error(e), - }) - } - /// /// Spawn a Future on threads specifically reserved for I/O tasks which are allowed to be /// long-running and return a JoinHandle /// - pub fn native_spawn_blocking R + Send + 'static, R: Send + 'static>( + pub fn spawn_blocking R + Send + 'static, R: Send + 'static>( &self, f: F, ) -> JoinHandle { diff --git a/src/rust/ui/src/instance/prodash.rs b/src/rust/ui/src/instance/prodash.rs index a82a17fd27c..1cf9ae6331b 100644 --- a/src/rust/ui/src/instance/prodash.rs +++ b/src/rust/ui/src/instance/prodash.rs @@ -77,7 +77,7 @@ impl ProdashInstance { // TODO: There is a shutdown race here, where if the UI is torn down before exclusive access is // dropped, we might drop stderr on the floor. That likely causes: // https://github.com/pantsbuild/pants/issues/13276 - let _stderr_task = executor.native_spawn_blocking({ + let _stderr_task = executor.spawn_blocking({ let mut tree = tree.clone(); move || { while let Ok(stderr) = stderr_receiver.recv() { @@ -99,13 +99,14 @@ impl ProdashInstance { // Drop all tasks to clear the Tree. The call to shutdown will render a final "Tick" with the // empty Tree, which will clear the screen. self.tasks_to_display.clear(); - self.executor - .clone() - .spawn_blocking( - move || self.handle.shutdown_and_wait(), - |e| fatal_log!("Failed to teardown UI: {e}"), - ) - .boxed() + let executor = self.executor.clone(); + let shutdown = executor.spawn_blocking(move || self.handle.shutdown_and_wait()); + async move { + if let Err(e) = shutdown.await { + fatal_log!("Failed to teardown UI: {e}"); + } + } + .boxed() } pub fn render(&mut self, heavy_hitters: &HashMap) { diff --git a/src/rust/watch/src/lib.rs b/src/rust/watch/src/lib.rs index f581b8402ff..2e0a0bca61a 100644 --- a/src/rust/watch/src/lib.rs +++ b/src/rust/watch/src/lib.rs @@ -290,17 +290,16 @@ impl InvalidationWatcher { let watcher = self.clone(); executor - .spawn_blocking( - move || { - let mut inner = watcher.0.lock(); - inner - .watcher - .watch(&path, RecursiveMode::NonRecursive) - .map_err(|e| maybe_enrich_notify_error(&path, e)) - }, - |e| Err(format!("Watch attempt failed: {e}")), - ) + .spawn_blocking(move || { + let mut inner = watcher.0.lock(); + inner + .watcher + .watch(&path, RecursiveMode::NonRecursive) + .map_err(|e| maybe_enrich_notify_error(&path, e)) + }) .await + .map_err(|e| format!("Failed to join Tokio task: {e}"))? + .map_err(|e| format!("Watch attempt failed: {e}")) } }