From 60142c5204211c3a8edba42316d52d020f4adc17 Mon Sep 17 00:00:00 2001 From: "winter.loo" Date: Wed, 7 Jan 2026 14:03:51 +0800 Subject: [PATCH 01/11] signal: use eventfd instead of UnixStream for signal notification fallback to UnixStream on non-Linux platform --- tokio/src/runtime/io/driver/signal.rs | 7 +- tokio/src/runtime/signal/mod.rs | 32 +--- tokio/src/signal/unix.rs | 203 +++++++++++++++++++++++++- 3 files changed, 205 insertions(+), 37 deletions(-) diff --git a/tokio/src/runtime/io/driver/signal.rs b/tokio/src/runtime/io/driver/signal.rs index ea3ef07b611..bed31386f9d 100644 --- a/tokio/src/runtime/io/driver/signal.rs +++ b/tokio/src/runtime/io/driver/signal.rs @@ -1,12 +1,11 @@ +use crate::signal::unix::pipe; + use super::{Driver, Handle, TOKEN_SIGNAL}; use std::io; impl Handle { - pub(crate) fn register_signal_receiver( - &self, - receiver: &mut mio::net::UnixStream, - ) -> io::Result<()> { + pub(crate) fn register_signal_receiver(&self, receiver: &mut pipe::Receiver) -> io::Result<()> { self.registry .register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?; Ok(()) diff --git a/tokio/src/runtime/signal/mod.rs b/tokio/src/runtime/signal/mod.rs index 8055c0965a6..e14d166a4de 100644 --- a/tokio/src/runtime/signal/mod.rs +++ b/tokio/src/runtime/signal/mod.rs @@ -4,9 +4,9 @@ use crate::runtime::{driver, io}; use crate::signal::registry::globals; +use crate::signal::unix::pipe; -use mio::net::UnixStream; -use std::io::{self as std_io, Read}; +use std::io::{self as std_io}; use std::sync::{Arc, Weak}; use std::time::Duration; @@ -21,7 +21,7 @@ pub(crate) struct Driver { io: io::Driver, /// A pipe for receiving wake events from the signal handler - receiver: UnixStream, + receiver: pipe::Receiver, /// Shared state. The driver keeps a strong ref and the handle keeps a weak /// ref. The weak ref is used to check if the driver is still active before @@ -41,9 +41,6 @@ pub(crate) struct Handle { impl Driver { /// Creates a new signal `Driver` instance that delegates wakeups to `park`. pub(crate) fn new(io: io::Driver, io_handle: &io::Handle) -> std_io::Result { - use std::mem::ManuallyDrop; - use std::os::unix::io::{AsRawFd, FromRawFd}; - // NB: We give each driver a "fresh" receiver file descriptor to avoid // the issues described in alexcrichton/tokio-process#42. // @@ -63,14 +60,7 @@ impl Driver { // safe as each dup is registered with separate reactors **and** we // only expect at least one dup to receive the notification. - // Manually drop as we don't actually own this instance of UnixStream. - let receiver_fd = globals().receiver.as_raw_fd(); - - // safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe. - let original = - ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) }); - let mut receiver = UnixStream::from_std(original.try_clone()?); - + let mut receiver = globals().receiver(); io_handle.register_signal_receiver(&mut receiver)?; Ok(Self { @@ -109,18 +99,8 @@ impl Driver { return; } - // Drain the pipe completely so we can receive a new readiness event - // if another signal has come in. - let mut buf = [0; 128]; - #[allow(clippy::unused_io_amount)] - loop { - match self.receiver.read(&mut buf) { - Ok(0) => panic!("EOF on self-pipe"), - Ok(_) => continue, // Keep reading - Err(e) if e.kind() == std_io::ErrorKind::WouldBlock => break, - Err(e) => panic!("Bad read on self-pipe: {e}"), - } - } + // consume value + self.receiver.read(); // Broadcast any signals which were received globals().broadcast(); diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index ea75e821512..0d2278219f5 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -12,8 +12,7 @@ use crate::signal::registry::{globals, EventId, EventInfo, Globals, Storage}; use crate::signal::RxFuture; use crate::sync::watch; -use mio::net::UnixStream; -use std::io::{self, Error, ErrorKind, Write}; +use std::io::{self, Error, ErrorKind}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Once; use std::task::{Context, Poll}; @@ -61,20 +60,211 @@ impl Storage for OsStorage { } } +#[cfg(any(target_os = "linux", target_os = "illumos"))] +pub(crate) mod pipe { + use std::{ + io, + os::fd::{AsRawFd, FromRawFd, OwnedFd}, + }; + + use mio::{event, unix::SourceFd}; + + #[derive(Debug)] + pub(crate) struct Sender { + fd: OwnedFd, + } + + #[derive(Debug)] + pub(crate) struct Receiver { + fd: OwnedFd, + } + + impl event::Source for Receiver { + fn register( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interests: mio::Interest, + ) -> io::Result<()> { + SourceFd(&self.fd.as_raw_fd()).register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interests: mio::Interest, + ) -> io::Result<()> { + SourceFd(&self.fd.as_raw_fd()).reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { + SourceFd(&self.fd.as_raw_fd()).deregister(registry) + } + } + + impl Sender { + pub(crate) fn new() -> Self { + let fd = unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) }; + if fd < 0 { + panic!("eventfd failed: {}", io::Error::last_os_error()); + } + Sender { + fd: unsafe { OwnedFd::from_raw_fd(fd) }, + } + } + + pub(crate) fn receiver(&self) -> Receiver { + Receiver { + fd: self.fd.try_clone().unwrap(), + } + } + } + + impl Sender { + pub(crate) fn write(&self) { + unsafe { + libc::eventfd_write(self.fd.as_raw_fd(), 1); + } + } + } + + impl Receiver { + pub(crate) fn read(&self) -> libc::c_int { + let fd = &self.fd; + let mut value: libc::eventfd_t = 0; + + unsafe { libc::eventfd_read(fd.as_raw_fd(), &mut value as *mut libc::eventfd_t) } + } + } +} + +#[cfg(not(any(target_os = "linux", target_os = "illumos")))] +pub(crate) mod pipe { + use mio::net::UnixStream; + use std::io::{self, Read, Write}; + use std::mem::ManuallyDrop; + use std::os::unix::io::{AsRawFd, FromRawFd}; + + #[derive(Debug)] + pub(crate) struct Sender { + inner: UnixStream, + } + + #[derive(Debug)] + pub(crate) struct Receiver { + inner: UnixStream, + } + + impl Clone for Receiver { + fn clone(&self) -> Self { + let receiver_fd = self.inner.as_raw_fd(); + let original = ManuallyDrop::new(unsafe { + std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) + }); + let inner = + UnixStream::from_std(original.try_clone().expect("failed to clone UnixStream")); + Self { inner } + } + } + + impl mio::event::Source for Receiver { + fn register( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interests: mio::Interest, + ) -> io::Result<()> { + self.inner.register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interests: mio::Interest, + ) -> io::Result<()> { + self.inner.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { + self.inner.deregister(registry) + } + } + + impl Sender { + pub(crate) fn write(&self) { + let _ = (&self.inner).write(&[1]); + } + } + + impl Receiver { + pub(crate) fn read(&self) -> libc::c_int { + // Drain the pipe completely so we can receive a new readiness event + // if another signal has come in. + let mut buf = [0; 128]; + #[allow(clippy::unused_io_amount)] + loop { + match self.inner.read(&mut buf) { + Ok(0) => panic!("EOF on self-pipe"), + Ok(_) => continue, // Keep reading + Err(e) if e.kind() == std_io::ErrorKind::WouldBlock => break, + Err(e) => panic!("Bad read on self-pipe: {e}"), + } + } + 0 + } + } + + pub(crate) fn channel() -> (Sender, Receiver) { + let (sender, receiver) = UnixStream::pair().expect("failed to create UnixStream"); + (Sender { inner: sender }, Receiver { inner: receiver }) + } +} + +#[cfg(any(target_os = "linux", target_os = "illumos"))] #[derive(Debug)] pub(crate) struct OsExtraData { - sender: UnixStream, - pub(crate) receiver: UnixStream, + sender: pipe::Sender, } +#[cfg(any(target_os = "linux", target_os = "illumos"))] impl Default for OsExtraData { fn default() -> Self { - let (receiver, sender) = UnixStream::pair().expect("failed to create UnixStream"); + let sender = pipe::Sender::new(); + Self { sender } + } +} + +#[cfg(any(target_os = "linux", target_os = "illumos"))] +impl OsExtraData { + pub(crate) fn receiver(&self) -> pipe::Receiver { + self.sender.receiver() + } +} +#[cfg(not(any(target_os = "linux", target_os = "illumos")))] +#[derive(Debug)] +pub(crate) struct OsExtraData { + sender: pipe::Sender, + receiver: pipe::Receiver, +} + +#[cfg(not(any(target_os = "linux", target_os = "illumos")))] +impl Default for OsExtraData { + fn default() -> Self { + let (sender, receiver) = pipe::channel(); Self { sender, receiver } } } +#[cfg(not(any(target_os = "linux", target_os = "illumos")))] +impl OsExtraData { + pub(crate) fn receiver(&self) -> pipe::Receiver { + self.receiver.clone() + } +} + /// Represents the specific kind of signal to listen for. #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] pub struct SignalKind(libc::c_int); @@ -268,8 +458,7 @@ fn action(globals: &'static Globals, signal: libc::c_int) { // Send a wakeup, ignore any errors (anything reasonably possible is // full pipe and then it will wake up anyway). - let mut sender = &globals.sender; - drop(sender.write(&[1])); + globals.sender.write(); } /// Enables this module to receive signal notifications for the `signal` From f7cbb0aec76cd43ed7aa6654ee6c5833d1531248 Mon Sep 17 00:00:00 2001 From: "winter.loo" Date: Fri, 9 Jan 2026 12:39:08 +0800 Subject: [PATCH 02/11] fix compilation on non Linux platform --- tokio/src/signal/unix.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index 0d2278219f5..eb652a56a2d 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -208,7 +208,7 @@ pub(crate) mod pipe { match self.inner.read(&mut buf) { Ok(0) => panic!("EOF on self-pipe"), Ok(_) => continue, // Keep reading - Err(e) if e.kind() == std_io::ErrorKind::WouldBlock => break, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, Err(e) => panic!("Bad read on self-pipe: {e}"), } } From 76f49b81e64eb97d8da7e947ed7d055eb77e4b71 Mon Sep 17 00:00:00 2001 From: "winter.loo" Date: Fri, 9 Jan 2026 13:34:09 +0800 Subject: [PATCH 03/11] use &mut self for read --- tokio/src/signal/unix.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index eb652a56a2d..305f2b99fae 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -130,7 +130,7 @@ pub(crate) mod pipe { } impl Receiver { - pub(crate) fn read(&self) -> libc::c_int { + pub(crate) fn read(&mut self) -> libc::c_int { let fd = &self.fd; let mut value: libc::eventfd_t = 0; @@ -199,7 +199,7 @@ pub(crate) mod pipe { } impl Receiver { - pub(crate) fn read(&self) -> libc::c_int { + pub(crate) fn read(&mut self) -> libc::c_int { // Drain the pipe completely so we can receive a new readiness event // if another signal has come in. let mut buf = [0; 128]; From af29bd616f4f62259b9df469404e2ca760e2d1a1 Mon Sep 17 00:00:00 2001 From: "winter.loo" Date: Tue, 13 Jan 2026 02:24:28 +0800 Subject: [PATCH 04/11] separate implementation --- tokio/src/signal/pipe/eventfd.rs | 96 ++++++++++++ tokio/src/signal/pipe/unixstream.rs | 98 ++++++++++++ tokio/src/signal/unix.rs | 232 ++++------------------------ 3 files changed, 220 insertions(+), 206 deletions(-) create mode 100644 tokio/src/signal/pipe/eventfd.rs create mode 100644 tokio/src/signal/pipe/unixstream.rs diff --git a/tokio/src/signal/pipe/eventfd.rs b/tokio/src/signal/pipe/eventfd.rs new file mode 100644 index 00000000000..cbf031b24a1 --- /dev/null +++ b/tokio/src/signal/pipe/eventfd.rs @@ -0,0 +1,96 @@ +use std::{ + io, + os::fd::{AsRawFd, FromRawFd, OwnedFd}, +}; + +use mio::{event, unix::SourceFd}; + +#[derive(Debug)] +pub(crate) struct Sender { + fd: OwnedFd, +} + +#[derive(Debug)] +pub(crate) struct Receiver { + fd: OwnedFd, +} + +impl event::Source for Receiver { + fn register( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interests: mio::Interest, + ) -> io::Result<()> { + SourceFd(&self.fd.as_raw_fd()).register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interests: mio::Interest, + ) -> io::Result<()> { + SourceFd(&self.fd.as_raw_fd()).reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { + SourceFd(&self.fd.as_raw_fd()).deregister(registry) + } +} + +impl Sender { + pub(crate) fn new() -> Self { + let fd = unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) }; + if fd < 0 { + panic!("eventfd failed: {}", io::Error::last_os_error()); + } + Sender { + fd: unsafe { OwnedFd::from_raw_fd(fd) }, + } + } + + pub(crate) fn receiver(&self) -> Receiver { + Receiver { + fd: self.fd.try_clone().unwrap(), + } + } +} + +impl Sender { + pub(crate) fn write(&self) { + unsafe { + libc::eventfd_write(self.fd.as_raw_fd(), 1); + } + } +} + +impl Receiver { + pub(crate) fn read(&mut self) -> libc::c_int { + let fd = &self.fd; + let mut value: libc::eventfd_t = 0; + + unsafe { libc::eventfd_read(fd.as_raw_fd(), &mut value as *mut libc::eventfd_t) } + } +} + +pub(crate) struct OsExtraData { + sender: Sender, +} + +impl Default for OsExtraData { + fn default() -> Self { + let sender = Sender::new(); + Self { sender } + } +} + +impl OsExtraData { + pub(crate) fn receiver(&self) -> Receiver { + self.sender.receiver() + } + + pub(crate) fn sender(&self) -> &Sender { + &self.sender + } +} diff --git a/tokio/src/signal/pipe/unixstream.rs b/tokio/src/signal/pipe/unixstream.rs new file mode 100644 index 00000000000..1399094606e --- /dev/null +++ b/tokio/src/signal/pipe/unixstream.rs @@ -0,0 +1,98 @@ +use mio::net::UnixStream; +use std::io::{self, Read, Write}; +use std::mem::ManuallyDrop; +use std::os::unix::io::{AsRawFd, FromRawFd}; + +pub(crate) struct Sender { + inner: UnixStream, +} + +#[derive(Debug)] +pub(crate) struct Receiver { + inner: UnixStream, +} + +impl Clone for Receiver { + fn clone(&self) -> Self { + let receiver_fd = self.inner.as_raw_fd(); + let original = + ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) }); + let inner = UnixStream::from_std(original.try_clone().expect("failed to clone UnixStream")); + Self { inner } + } +} + +impl mio::event::Source for Receiver { + fn register( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interests: mio::Interest, + ) -> io::Result<()> { + self.inner.register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interests: mio::Interest, + ) -> io::Result<()> { + self.inner.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { + self.inner.deregister(registry) + } +} + +impl Sender { + pub(crate) fn write(&self) { + let _ = (&self.inner).write(&[1]); + } +} + +impl Receiver { + pub(crate) fn read(&mut self) -> libc::c_int { + // Drain the pipe completely so we can receive a new readiness event + // if another signal has come in. + let mut buf = [0; 128]; + #[allow(clippy::unused_io_amount)] + loop { + match self.inner.read(&mut buf) { + Ok(0) => panic!("EOF on self-pipe"), + Ok(_) => continue, // Keep reading + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) => panic!("Bad read on self-pipe: {e}"), + } + } + 0 + } +} + +pub(crate) fn channel() -> (Sender, Receiver) { + let (sender, receiver) = UnixStream::pair().expect("failed to create UnixStream"); + (Sender { inner: sender }, Receiver { inner: receiver }) +} + +pub(crate) struct OsExtraData { + sender: Sender, + receiver: Receiver, +} + +impl Default for OsExtraData { + fn default() -> Self { + let (sender, receiver) = channel(); + Self { sender, receiver } + } +} + +impl OsExtraData { + pub(crate) fn receiver(&self) -> Receiver { + self.receiver.clone() + } + + pub(crate) fn sender(&self) -> &Sender { + &self.sender + } +} diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index 305f2b99fae..a5498853805 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -17,6 +17,31 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Once; use std::task::{Context, Poll}; +#[cfg_attr( + any( + target_os = "android", + target_os = "espidf", + target_os = "fuchsia", + target_os = "hermit", + target_os = "illumos", + target_os = "linux", + ), + path = "pipe/eventfd.rs" +)] +#[cfg_attr( + not(any( + target_os = "android", + target_os = "espidf", + target_os = "fuchsia", + target_os = "hermit", + target_os = "illumos", + target_os = "linux", + )), + path = "pipe/unixstream.rs" +)] +pub(crate) mod pipe; +pub(crate) use pipe::OsExtraData; + #[cfg(not(any(target_os = "linux", target_os = "illumos")))] pub(crate) struct OsStorage([SignalInfo; 33]); @@ -60,211 +85,6 @@ impl Storage for OsStorage { } } -#[cfg(any(target_os = "linux", target_os = "illumos"))] -pub(crate) mod pipe { - use std::{ - io, - os::fd::{AsRawFd, FromRawFd, OwnedFd}, - }; - - use mio::{event, unix::SourceFd}; - - #[derive(Debug)] - pub(crate) struct Sender { - fd: OwnedFd, - } - - #[derive(Debug)] - pub(crate) struct Receiver { - fd: OwnedFd, - } - - impl event::Source for Receiver { - fn register( - &mut self, - registry: &mio::Registry, - token: mio::Token, - interests: mio::Interest, - ) -> io::Result<()> { - SourceFd(&self.fd.as_raw_fd()).register(registry, token, interests) - } - - fn reregister( - &mut self, - registry: &mio::Registry, - token: mio::Token, - interests: mio::Interest, - ) -> io::Result<()> { - SourceFd(&self.fd.as_raw_fd()).reregister(registry, token, interests) - } - - fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { - SourceFd(&self.fd.as_raw_fd()).deregister(registry) - } - } - - impl Sender { - pub(crate) fn new() -> Self { - let fd = unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) }; - if fd < 0 { - panic!("eventfd failed: {}", io::Error::last_os_error()); - } - Sender { - fd: unsafe { OwnedFd::from_raw_fd(fd) }, - } - } - - pub(crate) fn receiver(&self) -> Receiver { - Receiver { - fd: self.fd.try_clone().unwrap(), - } - } - } - - impl Sender { - pub(crate) fn write(&self) { - unsafe { - libc::eventfd_write(self.fd.as_raw_fd(), 1); - } - } - } - - impl Receiver { - pub(crate) fn read(&mut self) -> libc::c_int { - let fd = &self.fd; - let mut value: libc::eventfd_t = 0; - - unsafe { libc::eventfd_read(fd.as_raw_fd(), &mut value as *mut libc::eventfd_t) } - } - } -} - -#[cfg(not(any(target_os = "linux", target_os = "illumos")))] -pub(crate) mod pipe { - use mio::net::UnixStream; - use std::io::{self, Read, Write}; - use std::mem::ManuallyDrop; - use std::os::unix::io::{AsRawFd, FromRawFd}; - - #[derive(Debug)] - pub(crate) struct Sender { - inner: UnixStream, - } - - #[derive(Debug)] - pub(crate) struct Receiver { - inner: UnixStream, - } - - impl Clone for Receiver { - fn clone(&self) -> Self { - let receiver_fd = self.inner.as_raw_fd(); - let original = ManuallyDrop::new(unsafe { - std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) - }); - let inner = - UnixStream::from_std(original.try_clone().expect("failed to clone UnixStream")); - Self { inner } - } - } - - impl mio::event::Source for Receiver { - fn register( - &mut self, - registry: &mio::Registry, - token: mio::Token, - interests: mio::Interest, - ) -> io::Result<()> { - self.inner.register(registry, token, interests) - } - - fn reregister( - &mut self, - registry: &mio::Registry, - token: mio::Token, - interests: mio::Interest, - ) -> io::Result<()> { - self.inner.reregister(registry, token, interests) - } - - fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { - self.inner.deregister(registry) - } - } - - impl Sender { - pub(crate) fn write(&self) { - let _ = (&self.inner).write(&[1]); - } - } - - impl Receiver { - pub(crate) fn read(&mut self) -> libc::c_int { - // Drain the pipe completely so we can receive a new readiness event - // if another signal has come in. - let mut buf = [0; 128]; - #[allow(clippy::unused_io_amount)] - loop { - match self.inner.read(&mut buf) { - Ok(0) => panic!("EOF on self-pipe"), - Ok(_) => continue, // Keep reading - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, - Err(e) => panic!("Bad read on self-pipe: {e}"), - } - } - 0 - } - } - - pub(crate) fn channel() -> (Sender, Receiver) { - let (sender, receiver) = UnixStream::pair().expect("failed to create UnixStream"); - (Sender { inner: sender }, Receiver { inner: receiver }) - } -} - -#[cfg(any(target_os = "linux", target_os = "illumos"))] -#[derive(Debug)] -pub(crate) struct OsExtraData { - sender: pipe::Sender, -} - -#[cfg(any(target_os = "linux", target_os = "illumos"))] -impl Default for OsExtraData { - fn default() -> Self { - let sender = pipe::Sender::new(); - Self { sender } - } -} - -#[cfg(any(target_os = "linux", target_os = "illumos"))] -impl OsExtraData { - pub(crate) fn receiver(&self) -> pipe::Receiver { - self.sender.receiver() - } -} - -#[cfg(not(any(target_os = "linux", target_os = "illumos")))] -#[derive(Debug)] -pub(crate) struct OsExtraData { - sender: pipe::Sender, - receiver: pipe::Receiver, -} - -#[cfg(not(any(target_os = "linux", target_os = "illumos")))] -impl Default for OsExtraData { - fn default() -> Self { - let (sender, receiver) = pipe::channel(); - Self { sender, receiver } - } -} - -#[cfg(not(any(target_os = "linux", target_os = "illumos")))] -impl OsExtraData { - pub(crate) fn receiver(&self) -> pipe::Receiver { - self.receiver.clone() - } -} - /// Represents the specific kind of signal to listen for. #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] pub struct SignalKind(libc::c_int); @@ -458,7 +278,7 @@ fn action(globals: &'static Globals, signal: libc::c_int) { // Send a wakeup, ignore any errors (anything reasonably possible is // full pipe and then it will wake up anyway). - globals.sender.write(); + globals.sender().write(); } /// Enables this module to receive signal notifications for the `signal` From 9a36beab1de1548ac2951d3ac904496f0b63fe53 Mon Sep 17 00:00:00 2001 From: "winter.loo" Date: Wed, 14 Jan 2026 03:38:41 +0800 Subject: [PATCH 05/11] propagate low level io error --- tokio/src/runtime/signal/mod.rs | 4 +-- tokio/src/signal/pipe/eventfd.rs | 51 ++++++++++++++++++----------- tokio/src/signal/pipe/unixstream.rs | 45 ++++++++++++------------- tokio/src/signal/registry.rs | 6 ++-- tokio/src/signal/unix.rs | 2 +- 5 files changed, 58 insertions(+), 50 deletions(-) diff --git a/tokio/src/runtime/signal/mod.rs b/tokio/src/runtime/signal/mod.rs index e14d166a4de..06ce446f051 100644 --- a/tokio/src/runtime/signal/mod.rs +++ b/tokio/src/runtime/signal/mod.rs @@ -60,7 +60,7 @@ impl Driver { // safe as each dup is registered with separate reactors **and** we // only expect at least one dup to receive the notification. - let mut receiver = globals().receiver(); + let mut receiver = globals().receiver()?; io_handle.register_signal_receiver(&mut receiver)?; Ok(Self { @@ -100,7 +100,7 @@ impl Driver { } // consume value - self.receiver.read(); + let _ = self.receiver.read(); // Broadcast any signals which were received globals().broadcast(); diff --git a/tokio/src/signal/pipe/eventfd.rs b/tokio/src/signal/pipe/eventfd.rs index cbf031b24a1..febfa28ae5b 100644 --- a/tokio/src/signal/pipe/eventfd.rs +++ b/tokio/src/signal/pipe/eventfd.rs @@ -40,37 +40,49 @@ impl event::Source for Receiver { } impl Sender { - pub(crate) fn new() -> Self { + pub(crate) fn new() -> std::io::Result { + // SAFETY: it's ok to call libc API let fd = unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) }; - if fd < 0 { - panic!("eventfd failed: {}", io::Error::last_os_error()); + if fd == -1 { + return Err(io::Error::last_os_error()); } - Sender { + Ok(Sender { + // SAFETY: fd just opened by the above libc::eventfd fd: unsafe { OwnedFd::from_raw_fd(fd) }, - } + }) } - pub(crate) fn receiver(&self) -> Receiver { - Receiver { - fd: self.fd.try_clone().unwrap(), - } + pub(crate) fn receiver(&self) -> std::io::Result { + Ok(Receiver { + fd: self.fd.try_clone()?, + }) } } impl Sender { - pub(crate) fn write(&self) { - unsafe { - libc::eventfd_write(self.fd.as_raw_fd(), 1); + pub(crate) fn write(&self) -> std::io::Result { + // SAFETY: it's ok to call libc API + let r = unsafe { libc::eventfd_write(self.fd.as_raw_fd(), 1) }; + if r == 0 { + Ok(0) + } else { + Err(std::io::Error::last_os_error()) } } } impl Receiver { - pub(crate) fn read(&mut self) -> libc::c_int { - let fd = &self.fd; + pub(crate) fn read(&mut self) -> std::io::Result { + let fd = self.fd.as_raw_fd(); let mut value: libc::eventfd_t = 0; - unsafe { libc::eventfd_read(fd.as_raw_fd(), &mut value as *mut libc::eventfd_t) } + // SAFETY: it's ok to call libc API + let r = unsafe { libc::eventfd_read(fd, &mut value as *mut libc::eventfd_t) }; + if r == 0 { + Ok(0) + } else { + Err(std::io::Error::last_os_error()) + } } } @@ -78,15 +90,14 @@ pub(crate) struct OsExtraData { sender: Sender, } -impl Default for OsExtraData { - fn default() -> Self { - let sender = Sender::new(); - Self { sender } +impl OsExtraData { + pub(crate) fn new() -> std::io::Result { + Sender::new().map(|sender| Self { sender }) } } impl OsExtraData { - pub(crate) fn receiver(&self) -> Receiver { + pub(crate) fn receiver(&self) -> std::io::Result { self.sender.receiver() } diff --git a/tokio/src/signal/pipe/unixstream.rs b/tokio/src/signal/pipe/unixstream.rs index 1399094606e..ee97af90d71 100644 --- a/tokio/src/signal/pipe/unixstream.rs +++ b/tokio/src/signal/pipe/unixstream.rs @@ -12,16 +12,6 @@ pub(crate) struct Receiver { inner: UnixStream, } -impl Clone for Receiver { - fn clone(&self) -> Self { - let receiver_fd = self.inner.as_raw_fd(); - let original = - ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) }); - let inner = UnixStream::from_std(original.try_clone().expect("failed to clone UnixStream")); - Self { inner } - } -} - impl mio::event::Source for Receiver { fn register( &mut self, @@ -47,13 +37,13 @@ impl mio::event::Source for Receiver { } impl Sender { - pub(crate) fn write(&self) { - let _ = (&self.inner).write(&[1]); + pub(crate) fn write(&self) -> std::io::Result { + (&self.inner).write(&[1]) } } impl Receiver { - pub(crate) fn read(&mut self) -> libc::c_int { + pub(crate) fn read(&mut self) -> std::io::Result { // Drain the pipe completely so we can receive a new readiness event // if another signal has come in. let mut buf = [0; 128]; @@ -63,16 +53,18 @@ impl Receiver { Ok(0) => panic!("EOF on self-pipe"), Ok(_) => continue, // Keep reading Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, - Err(e) => panic!("Bad read on self-pipe: {e}"), + Err(e) => { + return Err(e); + } } } - 0 + Ok(0) } } -pub(crate) fn channel() -> (Sender, Receiver) { - let (sender, receiver) = UnixStream::pair().expect("failed to create UnixStream"); - (Sender { inner: sender }, Receiver { inner: receiver }) +pub(crate) fn channel() -> std::io::Result<(Sender, Receiver)> { + let (sender, receiver) = UnixStream::pair()?; + Ok((Sender { inner: sender }, Receiver { inner: receiver })) } pub(crate) struct OsExtraData { @@ -80,16 +72,21 @@ pub(crate) struct OsExtraData { receiver: Receiver, } -impl Default for OsExtraData { - fn default() -> Self { - let (sender, receiver) = channel(); - Self { sender, receiver } +impl OsExtraData { + pub(crate) fn new() -> std::io::Result { + let (sender, receiver) = channel()?; + Ok(Self { sender, receiver }) } } impl OsExtraData { - pub(crate) fn receiver(&self) -> Receiver { - self.receiver.clone() + pub(crate) fn receiver(&self) -> std::io::Result { + let receiver_fd = self.receiver.inner.as_raw_fd(); + // SAFETY: fd owned by receiver is opened + let original = + ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) }); + let inner = UnixStream::from_std(original.try_clone()?); + Ok(Receiver { inner }) } pub(crate) fn sender(&self) -> &Sender { diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index 2a3eea3982d..4584c717677 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -144,18 +144,18 @@ impl Globals { fn globals_init() -> Globals where - OsExtraData: 'static + Send + Sync + Default, + OsExtraData: 'static + Send + Sync, OsStorage: 'static + Send + Sync + Default, { Globals { - extra: OsExtraData::default(), + extra: OsExtraData::new().expect("failed to initialize OsExtraData"), registry: Registry::new(OsStorage::default()), } } pub(crate) fn globals() -> &'static Globals where - OsExtraData: 'static + Send + Sync + Default, + OsExtraData: 'static + Send + Sync, OsStorage: 'static + Send + Sync + Default, { static GLOBALS: OnceLock = OnceLock::new(); diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index a5498853805..7cf4488fba2 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -278,7 +278,7 @@ fn action(globals: &'static Globals, signal: libc::c_int) { // Send a wakeup, ignore any errors (anything reasonably possible is // full pipe and then it will wake up anyway). - globals.sender().write(); + let _ = globals.sender().write(); } /// Enables this module to receive signal notifications for the `signal` From 3b62b56bf209099d232b9ebf0ec8315a87702962 Mon Sep 17 00:00:00 2001 From: "winter.loo" Date: Wed, 14 Jan 2026 04:12:32 +0800 Subject: [PATCH 06/11] try to fix for Windows --- tokio/src/signal/windows/sys.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tokio/src/signal/windows/sys.rs b/tokio/src/signal/windows/sys.rs index 518560ecfc2..01656fb5c51 100644 --- a/tokio/src/signal/windows/sys.rs +++ b/tokio/src/signal/windows/sys.rs @@ -84,6 +84,12 @@ impl Storage for OsStorage { #[derive(Debug, Default)] pub(crate) struct OsExtraData {} +impl OsExtraData { + pub(crate) fn new() -> std::io::Result { + Ok(OsExtraData{}) + } +} + fn global_init() -> io::Result<()> { static INIT: Once = Once::new(); From da7a4f1f4cb79c267501f1888f1a7a545c85747b Mon Sep 17 00:00:00 2001 From: "winter.loo" Date: Wed, 14 Jan 2026 04:16:11 +0800 Subject: [PATCH 07/11] fix code format --- tokio/src/signal/windows/sys.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/signal/windows/sys.rs b/tokio/src/signal/windows/sys.rs index 01656fb5c51..d3912078d80 100644 --- a/tokio/src/signal/windows/sys.rs +++ b/tokio/src/signal/windows/sys.rs @@ -86,7 +86,7 @@ pub(crate) struct OsExtraData {} impl OsExtraData { pub(crate) fn new() -> std::io::Result { - Ok(OsExtraData{}) + Ok(OsExtraData {}) } } From b48c950e844aedc41e324336aa6a48cb30cbe6a0 Mon Sep 17 00:00:00 2001 From: "winter.loo" Date: Thu, 15 Jan 2026 20:40:31 +0800 Subject: [PATCH 08/11] fix use std::io --- tokio/src/runtime/signal/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/signal/mod.rs b/tokio/src/runtime/signal/mod.rs index 06ce446f051..8a1310a50b6 100644 --- a/tokio/src/runtime/signal/mod.rs +++ b/tokio/src/runtime/signal/mod.rs @@ -6,7 +6,7 @@ use crate::runtime::{driver, io}; use crate::signal::registry::globals; use crate::signal::unix::pipe; -use std::io::{self as std_io}; +use std::io as std_io; use std::sync::{Arc, Weak}; use std::time::Duration; From e7e66d051dcd85c9fe1fceda7f9fe5e84f986275 Mon Sep 17 00:00:00 2001 From: "winter.loo" Date: Thu, 15 Jan 2026 20:56:28 +0800 Subject: [PATCH 09/11] remove unused import --- tokio/src/signal/unix.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index ce0c1cf36d5..d5730586cc7 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -12,7 +12,7 @@ use crate::signal::registry::{globals, EventId, EventInfo, Globals, Storage}; use crate::signal::RxFuture; use crate::sync::watch; -use std::io::{self, Error, ErrorKind, Write}; +use std::io::{self, Error, ErrorKind}; use std::sync::OnceLock; use std::task::{Context, Poll}; From 5d05e272f9902d724da92c2a9b276735e30f1b63 Mon Sep 17 00:00:00 2001 From: "winter.loo" Date: Mon, 26 Jan 2026 17:05:29 +0800 Subject: [PATCH 10/11] propagate errros instead of panic --- tokio/src/runtime/signal/mod.rs | 9 ++++++--- tokio/src/signal/registry.rs | 17 ++++++++++------- tokio/src/signal/unix.rs | 4 ++-- tokio/src/signal/windows/sys.rs | 7 +++++-- 4 files changed, 23 insertions(+), 14 deletions(-) diff --git a/tokio/src/runtime/signal/mod.rs b/tokio/src/runtime/signal/mod.rs index 8a1310a50b6..2aa48fd2fcb 100644 --- a/tokio/src/runtime/signal/mod.rs +++ b/tokio/src/runtime/signal/mod.rs @@ -60,7 +60,7 @@ impl Driver { // safe as each dup is registered with separate reactors **and** we // only expect at least one dup to receive the notification. - let mut receiver = globals().receiver()?; + let mut receiver = globals()?.receiver()?; io_handle.register_signal_receiver(&mut receiver)?; Ok(Self { @@ -102,8 +102,11 @@ impl Driver { // consume value let _ = self.receiver.read(); - // Broadcast any signals which were received - globals().broadcast(); + // We do a best-effort broadcast here + if let Ok(globals) = globals() { + // Broadcast any signals which were received. + globals.broadcast(); + } } } diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index 4584c717677..ed0a0473d8a 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -142,25 +142,28 @@ impl Globals { } } -fn globals_init() -> Globals +fn globals_init() -> std::io::Result where OsExtraData: 'static + Send + Sync, OsStorage: 'static + Send + Sync + Default, { - Globals { - extra: OsExtraData::new().expect("failed to initialize OsExtraData"), + Ok(Globals { + extra: OsExtraData::new()?, registry: Registry::new(OsStorage::default()), - } + }) } -pub(crate) fn globals() -> &'static Globals +pub(crate) fn globals() -> std::io::Result<&'static Globals> where OsExtraData: 'static + Send + Sync, OsStorage: 'static + Send + Sync + Default, { - static GLOBALS: OnceLock = OnceLock::new(); + static GLOBALS: OnceLock> = OnceLock::new(); - GLOBALS.get_or_init(globals_init) + match GLOBALS.get_or_init(globals_init) { + Ok(globals) => Ok(globals), + Err(e) => Err(std::io::Error::new(e.kind(), e.to_string())), + } } #[cfg(all(test, not(loom)))] diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index d5730586cc7..0acc245c35f 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -287,7 +287,7 @@ fn signal_enable(signal: SignalKind, handle: &Handle) -> io::Result<()> { // Check that we have a signal driver running handle.check_inner()?; - let globals = globals(); + let globals = globals()?; let siginfo = match globals.storage().get(signal as EventId) { Some(slot) => slot, None => return Err(io::Error::new(io::ErrorKind::Other, "signal too large")), @@ -424,7 +424,7 @@ pub(crate) fn signal_with_handle( // Turn the signal delivery on once we are ready for it signal_enable(kind, handle)?; - Ok(globals().register_listener(kind.0 as EventId)) + Ok(globals()?.register_listener(kind.0 as EventId)) } impl Signal { diff --git a/tokio/src/signal/windows/sys.rs b/tokio/src/signal/windows/sys.rs index f58f343438f..d109d39ce81 100644 --- a/tokio/src/signal/windows/sys.rs +++ b/tokio/src/signal/windows/sys.rs @@ -29,7 +29,7 @@ pub(super) fn ctrl_shutdown() -> io::Result { fn new(signum: u32) -> io::Result { global_init()?; - let rx = globals().register_listener(signum as EventId); + let rx = globals()?.register_listener(signum as EventId)?; Ok(RxFuture::new(rx)) } @@ -110,7 +110,10 @@ fn global_init() -> io::Result<()> { } unsafe extern "system" fn handler(ty: u32) -> BOOL { - let globals = globals(); + let globals = match globals() { + Ok(globals) => globals, + Err(_) => return 0, + }; globals.record_event(ty as EventId); // According to https://docs.microsoft.com/en-us/windows/console/handlerroutine From 0fc12427bcf3b4b7b596e8db8f879baf8ec43ed2 Mon Sep 17 00:00:00 2001 From: "winter.loo" Date: Mon, 26 Jan 2026 23:56:32 +0800 Subject: [PATCH 11/11] fix Windows compilation --- tokio/src/signal/windows/sys.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/signal/windows/sys.rs b/tokio/src/signal/windows/sys.rs index d109d39ce81..c563a88b189 100644 --- a/tokio/src/signal/windows/sys.rs +++ b/tokio/src/signal/windows/sys.rs @@ -29,7 +29,7 @@ pub(super) fn ctrl_shutdown() -> io::Result { fn new(signum: u32) -> io::Result { global_init()?; - let rx = globals()?.register_listener(signum as EventId)?; + let rx = globals()?.register_listener(signum as EventId); Ok(RxFuture::new(rx)) }