From f67dbbb53b3e606ab43d8bf4041833945fa0c256 Mon Sep 17 00:00:00 2001 From: tahmid-23 <60953955+tahmid-23@users.noreply.github.com> Date: Thu, 8 Jan 2026 22:18:09 -0500 Subject: [PATCH] fs: implement tokio::fs::hard_link via io_uring --- tokio/src/fs/hard_link.rs | 15 +++ tokio/src/io/uring/link.rs | 58 +++++++++++ tokio/src/io/uring/mod.rs | 1 + tokio/src/runtime/driver/op.rs | 2 + tokio/tests/fs_uring_link.rs | 172 +++++++++++++++++++++++++++++++++ 5 files changed, 248 insertions(+) create mode 100644 tokio/src/io/uring/link.rs create mode 100644 tokio/tests/fs_uring_link.rs diff --git a/tokio/src/fs/hard_link.rs b/tokio/src/fs/hard_link.rs index d0f6041cd9a..f9078f50b9a 100644 --- a/tokio/src/fs/hard_link.rs +++ b/tokio/src/fs/hard_link.rs @@ -40,5 +40,20 @@ pub async fn hard_link(original: impl AsRef, link: impl AsRef) -> io let original = original.as_ref().to_owned(); let link = link.as_ref().to_owned(); + #[cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" + ))] + { + let handle = crate::runtime::Handle::current(); + let driver_handle = handle.inner.driver().io(); + if driver_handle.check_and_init(io_uring::opcode::LinkAt::CODE)? { + return crate::runtime::driver::op::Op::link(&original, &link)?.await; + } + } + asyncify(move || std::fs::hard_link(original, link)).await } diff --git a/tokio/src/io/uring/link.rs b/tokio/src/io/uring/link.rs new file mode 100644 index 00000000000..8fee1c45875 --- /dev/null +++ b/tokio/src/io/uring/link.rs @@ -0,0 +1,58 @@ +use super::utils::cstr; + +use crate::runtime::driver::op::{CancelData, Cancellable, Completable, CqeResult, Op}; + +use io_uring::{opcode, types}; +use std::ffi::CString; +use std::io; +use std::io::Error; +use std::path::Path; + +#[derive(Debug)] +pub(crate) struct Link { + /// This field will be read by the kernel during the operation, so we + /// need to ensure it is valid for the entire duration of the operation. + #[allow(dead_code)] + original: CString, + /// This field will be read by the kernel during the operation, so we + /// need to ensure it is valid for the entire duration of the operation. + #[allow(dead_code)] + link: CString, +} + +impl Completable for Link { + type Output = io::Result<()>; + + fn complete(self, cqe: CqeResult) -> Self::Output { + cqe.result.map(|_| ()) + } + + fn complete_with_error(self, err: Error) -> Self::Output { + Err(err) + } +} + +impl Cancellable for Link { + fn cancel(self) -> CancelData { + CancelData::Link(self) + } +} + +impl Op { + /// Submit a request to create a hard link. + pub(crate) fn link(original: &Path, link: &Path) -> io::Result { + let original = cstr(original)?; + let link = cstr(link)?; + + let link_op = opcode::LinkAt::new( + types::Fd(libc::AT_FDCWD), + original.as_ptr(), + types::Fd(libc::AT_FDCWD), + link.as_ptr(), + ) + .build(); + + // SAFETY: Parameters are valid for the entire duration of the operation + Ok(unsafe { Op::new(link_op, Link { original, link }) }) + } +} diff --git a/tokio/src/io/uring/mod.rs b/tokio/src/io/uring/mod.rs index facad596f63..183d7bef0d5 100644 --- a/tokio/src/io/uring/mod.rs +++ b/tokio/src/io/uring/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod link; pub(crate) mod open; pub(crate) mod read; pub(crate) mod utils; diff --git a/tokio/src/runtime/driver/op.rs b/tokio/src/runtime/driver/op.rs index d2b9289ceee..fdd30e0d35d 100644 --- a/tokio/src/runtime/driver/op.rs +++ b/tokio/src/runtime/driver/op.rs @@ -1,3 +1,4 @@ +use crate::io::uring::link::Link; use crate::io::uring::open::Open; use crate::io::uring::read::Read; use crate::io::uring::write::Write; @@ -16,6 +17,7 @@ use std::task::{Context, Poll, Waker}; #[allow(dead_code)] #[derive(Debug)] pub(crate) enum CancelData { + Link(Link), Open(Open), Write(Write), Read(Read), diff --git a/tokio/tests/fs_uring_link.rs b/tokio/tests/fs_uring_link.rs new file mode 100644 index 00000000000..ca0e76c27b7 --- /dev/null +++ b/tokio/tests/fs_uring_link.rs @@ -0,0 +1,172 @@ +//! Uring hard link operations tests. + +#![cfg(all( + tokio_unstable, + feature = "io-uring", + feature = "rt", + feature = "fs", + target_os = "linux" +))] + +use futures::FutureExt; +use std::future::poll_fn; +use std::future::Future; +use std::os::unix::fs::MetadataExt; +use std::path::PathBuf; +use std::pin::pin; +use std::sync::mpsc; +use std::task::Poll; +use std::time::Duration; +use tempfile::TempDir; +use tokio::runtime::{Builder, Runtime}; +use tokio::task::JoinSet; +use tokio_test::assert_pending; +use tokio_util::task::TaskTracker; + +fn multi_rt(n: usize) -> Box Runtime> { + Box::new(move || { + Builder::new_multi_thread() + .worker_threads(n) + .enable_all() + .build() + .unwrap() + }) +} + +fn current_rt() -> Box Runtime> { + Box::new(|| Builder::new_current_thread().enable_all().build().unwrap()) +} + +fn rt_combinations() -> Vec Runtime>> { + vec![ + current_rt(), + multi_rt(1), + multi_rt(2), + multi_rt(8), + multi_rt(64), + multi_rt(256), + ] +} + +#[test] +fn shutdown_runtime_while_performing_io_uring_ops() { + fn run(rt: Runtime) { + let (done_tx, done_rx) = mpsc::channel(); + let (workdir, target) = create_tmp_dir(); + + rt.spawn(async move { + // spawning a bunch of uring operations. + for i in 0..usize::MAX { + let link = workdir.path().join(format!("{i}")); + let target = target.clone(); + tokio::spawn(async move { + let mut fut = pin!(tokio::fs::hard_link(target, &link)); + + poll_fn(|cx| { + assert_pending!(fut.as_mut().poll(cx)); + Poll::<()>::Pending + }) + .await; + + fut.await.unwrap(); + }); + + // Avoid busy looping. + tokio::task::yield_now().await; + } + }); + + std::thread::spawn(move || { + rt.shutdown_timeout(Duration::from_millis(300)); + done_tx.send(()).unwrap(); + }); + + done_rx.recv().unwrap(); + } + + for rt in rt_combinations() { + run(rt()); + } +} + +#[test] +fn hard_link_many_files() { + fn run(rt: Runtime) { + let (workdir, target) = create_tmp_dir(); + + rt.block_on(async move { + const N_LINKS: usize = 10_000; + + let tracker = TaskTracker::new(); + + for i in 0..N_LINKS { + let target = target.clone(); + let link = workdir.path().join(format!("{i}")); + tracker.spawn(async move { + tokio::fs::hard_link(&target, &link).await.unwrap(); + }); + } + tracker.close(); + tracker.wait().await; + + let mut metadata_tasks = JoinSet::new(); + for i in 0..N_LINKS { + let link = workdir.path().join(format!("{i}")); + metadata_tasks.spawn(async move { tokio::fs::metadata(&link).await.unwrap() }); + } + + let target_metadata = tokio::fs::metadata(&target).await.unwrap(); + while let Some(metadata_result) = metadata_tasks.join_next().await { + let link_metadata = metadata_result.unwrap(); + assert_eq!(target_metadata.dev(), link_metadata.dev()); + assert_eq!(target_metadata.ino(), link_metadata.ino()); + } + }); + } + + for rt in rt_combinations() { + run(rt()); + } +} + +#[tokio::test] +async fn cancel_op_future() { + let (workdir, target) = create_tmp_dir(); + + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let handle = tokio::spawn(async move { + poll_fn(|cx| { + let link = workdir.path().join("link"); + let fut = tokio::fs::hard_link(&target, &link); + + // If io_uring is enabled (and not falling back to the thread pool), + // the first poll should return Pending. + let _pending = pin!(fut).poll_unpin(cx); + + tx.send(()).unwrap(); + + Poll::<()>::Pending + }) + .await; + }); + + // Wait for the first poll + rx.recv().await.unwrap(); + + handle.abort(); + + let res = handle.await.unwrap_err(); + assert!(res.is_cancelled()); +} + +fn create_tmp_dir() -> (TempDir, PathBuf) { + let workdir = tempfile::tempdir().unwrap(); + let target = workdir.path().join("target"); + std::fs::OpenOptions::new() + .create_new(true) + .write(true) + .open(&target) + .unwrap(); + + (workdir, target) +}