Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/rust/fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,11 +426,9 @@ impl PosixFS {
pub async fn scandir(&self, dir_relative_to_root: Dir) -> Result<DirectoryListing, io::Error> {
let vfs = self.clone();
self.executor
.spawn_blocking(
move || vfs.scandir_sync(&dir_relative_to_root),
|e| Err(io::Error::other(format!("Synchronous scandir failed: {e}"))),
)
.await
.spawn_blocking(move || vfs.scandir_sync(&dir_relative_to_root))
.await?
.map_err(|e| io::Error::other(format!("Synchronous scandir failed: {e}")))
}

fn scandir_sync(&self, dir_relative_to_root: &Dir) -> Result<DirectoryListing, io::Error> {
Expand Down
159 changes: 76 additions & 83 deletions src/rust/fs/store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,21 +203,20 @@ impl ShardedFSDB {
.map_err(|e| format!("Failed to create directory: {e}"))?;
let (src_file, dst_dir) = fsdb
.executor
.spawn_blocking(
move || {
let src_file = Builder::new()
.suffix(".hardlink_canary")
.tempfile_in(&fsdb.root)
.map_err(|e| format!("Failed to create hardlink canary file: {e}"))?;
let dst_dir = Builder::new()
.suffix(".hardlink_canary")
.tempdir_in(dst_parent_dir)
.map_err(|e| format!("Failed to create hardlink canary dir: {e}"))?;
Ok((src_file, dst_dir))
},
|e| Err(format!("hardlink canary temp files task failed: {e}")),
)
.await?;
.spawn_blocking(move || {
let src_file = Builder::new()
.suffix(".hardlink_canary")
.tempfile_in(&fsdb.root)
.map_err(|e| format!("Failed to create hardlink canary file: {e}"))?;
let dst_dir = Builder::new()
.suffix(".hardlink_canary")
.tempdir_in(dst_parent_dir)
.map_err(|e| format!("Failed to create hardlink canary dir: {e}"))?;
Ok((src_file, dst_dir))
})
.await
.map_err(|e| format!("Failed to join Tokio task: {e}"))?
.map_err(|e: String| format!("hardlink canary temp files task failed: {e}"))?;
let dst_file = dst_dir.path().join("hard_link");
let is_hardlinkable = hard_link(src_file, dst_file).await.is_ok();
log::debug!("{src_display} -> {dst_display} hardlinkable: {is_hardlinkable}");
Expand Down Expand Up @@ -289,16 +288,15 @@ impl ShardedFSDB {
// have to worry about parent dirs.
let named_temp_file = self
.executor
.spawn_blocking(
move || {
Builder::new()
.suffix(".tmp")
.tempfile_in(dest_path2.parent().unwrap())
.map_err(|e| format!("Failed to create temp file: {e}"))
},
|e| Err(format!("temp file creation task failed: {e}")),
)
.await?;
.spawn_blocking(move || {
Builder::new()
.suffix(".tmp")
.tempfile_in(dest_path2.parent().unwrap())
.map_err(|e| format!("Failed to create temp file: {e}"))
})
.await
.map_err(|e| format!("Failed to join Tokio task: {e}"))?
.map_err(|e| format!("temp file creation task failed: {e}"))?;
let (std_file, tmp_path) = named_temp_file
.keep()
.map_err(|e| format!("Failed to keep temp file: {e}"))?;
Expand Down Expand Up @@ -366,14 +364,13 @@ impl UnderlyingByteStore for ShardedFSDB {
async fn lease(&self, fingerprint: Fingerprint) -> Result<(), String> {
let path = self.get_path(fingerprint);
self.executor
.spawn_blocking(
move || {
fs_set_times::set_mtime(&path, fs_set_times::SystemTimeSpec::SymbolicNow)
.map_err(|e| format!("Failed to extend mtime of {path:?}: {e}"))
},
|e| Err(format!("`lease` task failed: {e}")),
)
.spawn_blocking(move || {
fs_set_times::set_mtime(&path, fs_set_times::SystemTimeSpec::SymbolicNow)
.map_err(|e| format!("Failed to extend mtime of {path:?}: {e}"))
})
.await
.map_err(|e| format!("Failed to join Tokio task: {e}"))?
.map_err(|e| format!("`lease` task failed: {e}"))
}

async fn remove(&self, fingerprint: Fingerprint) -> Result<bool, String> {
Expand Down Expand Up @@ -470,61 +467,57 @@ impl UnderlyingByteStore for ShardedFSDB {
let root = self.root.clone();
let expiration_time = SystemTime::now() - self.lease_time;
self.executor
.spawn_blocking(
move || {
let maybe_shards = std::fs::read_dir(&root);
let mut fingerprints = vec![];
if let Ok(shards) = maybe_shards {
for entry in shards {
let shard =
entry.map_err(|e| format!("Error iterating dir {root:?}: {e}."))?;
let large_files = std::fs::read_dir(shard.path())
.map_err(|e| format!("Failed to read shard directory: {e}."))?;
for entry in large_files {
let large_file = entry.map_err(|e| {
format!(
"Error iterating dir {:?}: {e}",
shard.path().file_name()
)
})?;
let path = large_file.path();
if path.extension().is_some() {
continue; // NB: This is a tempfile
}

let hash = path.file_name().unwrap().to_str().unwrap();
let (length, mtime) = large_file
.metadata()
.and_then(|metadata| {
let length = metadata.len();
let mtime = metadata.modified()?;
Ok((length, mtime))
})
.map_err(|e| {
format!("Could not access metadata for {path:?}: {e}")
})?;

let expired_seconds_ago = expiration_time
.duration_since(mtime)
.map(|t| t.as_secs())
// 0 indicates unexpired.
.unwrap_or(0);

fingerprints.push(AgedFingerprint {
expired_seconds_ago,
fingerprint: Fingerprint::from_hex_string(hash).map_err(
|e| format!("Invalid file store entry at {path:?}: {e}"),
)?,
size_bytes: length as usize,
});
.spawn_blocking(move || {
let maybe_shards = std::fs::read_dir(&root);
let mut fingerprints = vec![];
if let Ok(shards) = maybe_shards {
for entry in shards {
let shard =
entry.map_err(|e| format!("Error iterating dir {root:?}: {e}."))?;
let large_files = std::fs::read_dir(shard.path())
.map_err(|e| format!("Failed to read shard directory: {e}."))?;
for entry in large_files {
let large_file = entry.map_err(|e| {
format!("Error iterating dir {:?}: {e}", shard.path().file_name())
})?;
let path = large_file.path();
if path.extension().is_some() {
continue; // NB: This is a tempfile
}

let hash = path.file_name().unwrap().to_str().unwrap();
let (length, mtime) = large_file
.metadata()
.and_then(|metadata| {
let length = metadata.len();
let mtime = metadata.modified()?;
Ok((length, mtime))
})
.map_err(|e| {
format!("Could not access metadata for {path:?}: {e}")
})?;

let expired_seconds_ago = expiration_time
.duration_since(mtime)
.map(|t| t.as_secs())
// 0 indicates unexpired.
.unwrap_or(0);

fingerprints.push(AgedFingerprint {
expired_seconds_ago,
fingerprint: Fingerprint::from_hex_string(hash).map_err(|e| {
format!("Invalid file store entry at {path:?}: {e}")
})?,
size_bytes: length as usize,
});
}
}
Ok(fingerprints)
},
|e| Err(format!("`aged_fingerprints` task failed: {e}")),
)
}
Ok(fingerprints)
})
.await
.map_err(|e| format!("Failed to join Tokio task: {e}"))?
.map_err(|e: String| format!("`aged_fingerprints` task failed: {e}"))
}
}

Expand Down
36 changes: 19 additions & 17 deletions src/rust/nailgun/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,26 +240,28 @@ impl Nail for RawFdNail {

// Spawn the underlying function as a blocking task, and capture its exit code to append to the
// output stream.
let executor = self.executor.clone();
let nail = self.clone();
let exit_code = self
.executor
.spawn_blocking(
move || {
// NB: This closure captures the stdio handles, and will drop/close them when it completes.
(nail.runner)(RawFdExecution {
cmd,
cancelled,
stdin_fd: stdin_handle.as_raw_fd(),
stdout_fd: stdout_handle.as_raw_fd(),
stderr_fd: stderr_handle.as_raw_fd(),
})
},
|e| {
let exit_code_join = executor.spawn_blocking(move || {
// NB: This closure captures the stdio handles, and will drop/close them when it completes.
(nail.runner)(RawFdExecution {
cmd,
cancelled,
stdin_fd: stdin_handle.as_raw_fd(),
stdout_fd: stdout_handle.as_raw_fd(),
stderr_fd: stderr_handle.as_raw_fd(),
})
});
let exit_code = async move {
match exit_code_join.await {
Ok(code) => code,
Err(e) => {
log::warn!("Server exited uncleanly: {e}");
ExitCode(1)
},
)
.boxed();
}
}
}
.boxed();

// Select a single stdout/stderr stream.
let stdout_stream = stdout_stream.map_ok(ChildOutput::Stdout);
Expand Down
73 changes: 34 additions & 39 deletions src/rust/process_execution/docker/src/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,50 +279,45 @@ async fn credentials_for_image(
let server = server.to_owned();

executor
.spawn_blocking(
move || {
// Resolve the server as a DNS name to confirm that it is actually a registry.
let Ok(_) = (server.as_ref(), 80)
.to_socket_addrs()
.or_else(|_| server.to_socket_addrs())
else {
.spawn_blocking(move || {
// Resolve the server as a DNS name to confirm that it is actually a registry.
let Ok(_) = (server.as_ref(), 80)
.to_socket_addrs()
.or_else(|_| server.to_socket_addrs())
else {
return Ok(None);
};

// TODO: https://github.com/keirlawson/docker_credential/issues/7 means that this will only
// work for credential helpers and credentials encoded directly in the docker config,
// rather than for general credStore implementations.
let credential = match docker_credential::get_credential(&server) {
Ok(credential) => credential,
Err(e) => {
log::warn!("Failed to retrieve Docker credentials for server `{server}`: {e}");
return Ok(None);
};
}
};

// TODO: https://github.com/keirlawson/docker_credential/issues/7 means that this will only
// work for credential helpers and credentials encoded directly in the docker config,
// rather than for general credStore implementations.
let credential = match docker_credential::get_credential(&server) {
Ok(credential) => credential,
Err(e) => {
log::warn!(
"Failed to retrieve Docker credentials for server `{server}`: {e}"
);
return Ok(None);
let bollard_credentials = match credential {
docker_credential::DockerCredential::IdentityToken(token) => DockerCredentials {
identitytoken: Some(token),
..DockerCredentials::default()
},
docker_credential::DockerCredential::UsernamePassword(username, password) => {
DockerCredentials {
username: Some(username),
password: Some(password),
..DockerCredentials::default()
}
};

let bollard_credentials = match credential {
docker_credential::DockerCredential::IdentityToken(token) => {
DockerCredentials {
identitytoken: Some(token),
..DockerCredentials::default()
}
}
docker_credential::DockerCredential::UsernamePassword(username, password) => {
DockerCredentials {
username: Some(username),
password: Some(password),
..DockerCredentials::default()
}
}
};
}
};

Ok(Some(bollard_credentials))
},
|e| Err(format!("Credentials task failed: {e}")),
)
Ok(Some(bollard_credentials))
})
.await
.map_err(|e| format!("Failed to join Tokio task: {e}"))?
.map_err(|e: String| format!("Credentials task failed: {e}"))
}

/// Pull an image given its name and the image pull policy. This method is debounced by
Expand Down
17 changes: 8 additions & 9 deletions src/rust/process_execution/pe_nailgun/src/nailgun_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,14 +389,13 @@ impl NailgunProcess {

// Spawn the process and read its port from stdout.
let (child, port) = executor
.spawn_blocking(
{
let workdir = workdir.path().to_owned();
move || spawn_and_read_port(startup_options, workdir)
},
|e| Err(format!("Nailgun spawn task failed: {e}")),
)
.await?;
.spawn_blocking({
let workdir = workdir.path().to_owned();
move || spawn_and_read_port(startup_options, workdir)
})
.await
.map_err(|e| format!("Failed to join Tokio task: {e}"))?
.map_err(|e| format!("Nailgun spawn task failed: {e}"))?;
debug!(
"Created nailgun server process with pid {} and port {}",
child.id(),
Expand Down Expand Up @@ -549,7 +548,7 @@ async fn clear_workdir(
future::try_join_all(moves).await?;

// And drop it in the background.
let fut = executor.native_spawn_blocking(move || std::mem::drop(garbage_dir));
let fut = executor.spawn_blocking(move || std::mem::drop(garbage_dir));
drop(fut);

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/rust/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ impl AsyncDropSandbox {
impl Drop for AsyncDropSandbox {
fn drop(&mut self) {
if let Some(sandbox) = self.2.take() {
let _background_cleanup = self.0.native_spawn_blocking(|| std::mem::drop(sandbox));
let _background_cleanup = self.0.spawn_blocking(|| std::mem::drop(sandbox));
}
}
}
Expand Down
Loading
Loading