diff --git a/tokio/src/runtime/io/driver/signal.rs b/tokio/src/runtime/io/driver/signal.rs index ea3ef07b611..bed31386f9d 100644 --- a/tokio/src/runtime/io/driver/signal.rs +++ b/tokio/src/runtime/io/driver/signal.rs @@ -1,12 +1,11 @@ +use crate::signal::unix::pipe; + use super::{Driver, Handle, TOKEN_SIGNAL}; use std::io; impl Handle { - pub(crate) fn register_signal_receiver( - &self, - receiver: &mut mio::net::UnixStream, - ) -> io::Result<()> { + pub(crate) fn register_signal_receiver(&self, receiver: &mut pipe::Receiver) -> io::Result<()> { self.registry .register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?; Ok(()) diff --git a/tokio/src/runtime/signal/mod.rs b/tokio/src/runtime/signal/mod.rs index 8055c0965a6..2aa48fd2fcb 100644 --- a/tokio/src/runtime/signal/mod.rs +++ b/tokio/src/runtime/signal/mod.rs @@ -4,9 +4,9 @@ use crate::runtime::{driver, io}; use crate::signal::registry::globals; +use crate::signal::unix::pipe; -use mio::net::UnixStream; -use std::io::{self as std_io, Read}; +use std::io as std_io; use std::sync::{Arc, Weak}; use std::time::Duration; @@ -21,7 +21,7 @@ pub(crate) struct Driver { io: io::Driver, /// A pipe for receiving wake events from the signal handler - receiver: UnixStream, + receiver: pipe::Receiver, /// Shared state. The driver keeps a strong ref and the handle keeps a weak /// ref. The weak ref is used to check if the driver is still active before @@ -41,9 +41,6 @@ pub(crate) struct Handle { impl Driver { /// Creates a new signal `Driver` instance that delegates wakeups to `park`. pub(crate) fn new(io: io::Driver, io_handle: &io::Handle) -> std_io::Result { - use std::mem::ManuallyDrop; - use std::os::unix::io::{AsRawFd, FromRawFd}; - // NB: We give each driver a "fresh" receiver file descriptor to avoid // the issues described in alexcrichton/tokio-process#42. // @@ -63,14 +60,7 @@ impl Driver { // safe as each dup is registered with separate reactors **and** we // only expect at least one dup to receive the notification. - // Manually drop as we don't actually own this instance of UnixStream. - let receiver_fd = globals().receiver.as_raw_fd(); - - // safety: there is nothing unsafe about this, but the `from_raw_fd` fn is marked as unsafe. - let original = - ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) }); - let mut receiver = UnixStream::from_std(original.try_clone()?); - + let mut receiver = globals()?.receiver()?; io_handle.register_signal_receiver(&mut receiver)?; Ok(Self { @@ -109,21 +99,14 @@ impl Driver { return; } - // Drain the pipe completely so we can receive a new readiness event - // if another signal has come in. - let mut buf = [0; 128]; - #[allow(clippy::unused_io_amount)] - loop { - match self.receiver.read(&mut buf) { - Ok(0) => panic!("EOF on self-pipe"), - Ok(_) => continue, // Keep reading - Err(e) if e.kind() == std_io::ErrorKind::WouldBlock => break, - Err(e) => panic!("Bad read on self-pipe: {e}"), - } - } + // consume value + let _ = self.receiver.read(); - // Broadcast any signals which were received - globals().broadcast(); + // We do a best-effort broadcast here + if let Ok(globals) = globals() { + // Broadcast any signals which were received. + globals.broadcast(); + } } } diff --git a/tokio/src/signal/pipe/eventfd.rs b/tokio/src/signal/pipe/eventfd.rs new file mode 100644 index 00000000000..febfa28ae5b --- /dev/null +++ b/tokio/src/signal/pipe/eventfd.rs @@ -0,0 +1,107 @@ +use std::{ + io, + os::fd::{AsRawFd, FromRawFd, OwnedFd}, +}; + +use mio::{event, unix::SourceFd}; + +#[derive(Debug)] +pub(crate) struct Sender { + fd: OwnedFd, +} + +#[derive(Debug)] +pub(crate) struct Receiver { + fd: OwnedFd, +} + +impl event::Source for Receiver { + fn register( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interests: mio::Interest, + ) -> io::Result<()> { + SourceFd(&self.fd.as_raw_fd()).register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interests: mio::Interest, + ) -> io::Result<()> { + SourceFd(&self.fd.as_raw_fd()).reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { + SourceFd(&self.fd.as_raw_fd()).deregister(registry) + } +} + +impl Sender { + pub(crate) fn new() -> std::io::Result { + // SAFETY: it's ok to call libc API + let fd = unsafe { libc::eventfd(0, libc::EFD_NONBLOCK | libc::EFD_CLOEXEC) }; + if fd == -1 { + return Err(io::Error::last_os_error()); + } + Ok(Sender { + // SAFETY: fd just opened by the above libc::eventfd + fd: unsafe { OwnedFd::from_raw_fd(fd) }, + }) + } + + pub(crate) fn receiver(&self) -> std::io::Result { + Ok(Receiver { + fd: self.fd.try_clone()?, + }) + } +} + +impl Sender { + pub(crate) fn write(&self) -> std::io::Result { + // SAFETY: it's ok to call libc API + let r = unsafe { libc::eventfd_write(self.fd.as_raw_fd(), 1) }; + if r == 0 { + Ok(0) + } else { + Err(std::io::Error::last_os_error()) + } + } +} + +impl Receiver { + pub(crate) fn read(&mut self) -> std::io::Result { + let fd = self.fd.as_raw_fd(); + let mut value: libc::eventfd_t = 0; + + // SAFETY: it's ok to call libc API + let r = unsafe { libc::eventfd_read(fd, &mut value as *mut libc::eventfd_t) }; + if r == 0 { + Ok(0) + } else { + Err(std::io::Error::last_os_error()) + } + } +} + +pub(crate) struct OsExtraData { + sender: Sender, +} + +impl OsExtraData { + pub(crate) fn new() -> std::io::Result { + Sender::new().map(|sender| Self { sender }) + } +} + +impl OsExtraData { + pub(crate) fn receiver(&self) -> std::io::Result { + self.sender.receiver() + } + + pub(crate) fn sender(&self) -> &Sender { + &self.sender + } +} diff --git a/tokio/src/signal/pipe/unixstream.rs b/tokio/src/signal/pipe/unixstream.rs new file mode 100644 index 00000000000..ee97af90d71 --- /dev/null +++ b/tokio/src/signal/pipe/unixstream.rs @@ -0,0 +1,95 @@ +use mio::net::UnixStream; +use std::io::{self, Read, Write}; +use std::mem::ManuallyDrop; +use std::os::unix::io::{AsRawFd, FromRawFd}; + +pub(crate) struct Sender { + inner: UnixStream, +} + +#[derive(Debug)] +pub(crate) struct Receiver { + inner: UnixStream, +} + +impl mio::event::Source for Receiver { + fn register( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interests: mio::Interest, + ) -> io::Result<()> { + self.inner.register(registry, token, interests) + } + + fn reregister( + &mut self, + registry: &mio::Registry, + token: mio::Token, + interests: mio::Interest, + ) -> io::Result<()> { + self.inner.reregister(registry, token, interests) + } + + fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { + self.inner.deregister(registry) + } +} + +impl Sender { + pub(crate) fn write(&self) -> std::io::Result { + (&self.inner).write(&[1]) + } +} + +impl Receiver { + pub(crate) fn read(&mut self) -> std::io::Result { + // Drain the pipe completely so we can receive a new readiness event + // if another signal has come in. + let mut buf = [0; 128]; + #[allow(clippy::unused_io_amount)] + loop { + match self.inner.read(&mut buf) { + Ok(0) => panic!("EOF on self-pipe"), + Ok(_) => continue, // Keep reading + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) => { + return Err(e); + } + } + } + Ok(0) + } +} + +pub(crate) fn channel() -> std::io::Result<(Sender, Receiver)> { + let (sender, receiver) = UnixStream::pair()?; + Ok((Sender { inner: sender }, Receiver { inner: receiver })) +} + +pub(crate) struct OsExtraData { + sender: Sender, + receiver: Receiver, +} + +impl OsExtraData { + pub(crate) fn new() -> std::io::Result { + let (sender, receiver) = channel()?; + Ok(Self { sender, receiver }) + } +} + +impl OsExtraData { + pub(crate) fn receiver(&self) -> std::io::Result { + let receiver_fd = self.receiver.inner.as_raw_fd(); + // SAFETY: fd owned by receiver is opened + let original = + ManuallyDrop::new(unsafe { std::os::unix::net::UnixStream::from_raw_fd(receiver_fd) }); + let inner = UnixStream::from_std(original.try_clone()?); + Ok(Receiver { inner }) + } + + pub(crate) fn sender(&self) -> &Sender { + &self.sender + } +} diff --git a/tokio/src/signal/registry.rs b/tokio/src/signal/registry.rs index 2a3eea3982d..ed0a0473d8a 100644 --- a/tokio/src/signal/registry.rs +++ b/tokio/src/signal/registry.rs @@ -142,25 +142,28 @@ impl Globals { } } -fn globals_init() -> Globals +fn globals_init() -> std::io::Result where - OsExtraData: 'static + Send + Sync + Default, + OsExtraData: 'static + Send + Sync, OsStorage: 'static + Send + Sync + Default, { - Globals { - extra: OsExtraData::default(), + Ok(Globals { + extra: OsExtraData::new()?, registry: Registry::new(OsStorage::default()), - } + }) } -pub(crate) fn globals() -> &'static Globals +pub(crate) fn globals() -> std::io::Result<&'static Globals> where - OsExtraData: 'static + Send + Sync + Default, + OsExtraData: 'static + Send + Sync, OsStorage: 'static + Send + Sync + Default, { - static GLOBALS: OnceLock = OnceLock::new(); + static GLOBALS: OnceLock> = OnceLock::new(); - GLOBALS.get_or_init(globals_init) + match GLOBALS.get_or_init(globals_init) { + Ok(globals) => Ok(globals), + Err(e) => Err(std::io::Error::new(e.kind(), e.to_string())), + } } #[cfg(all(test, not(loom)))] diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index c58771ee96c..0acc245c35f 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -12,11 +12,35 @@ use crate::signal::registry::{globals, EventId, EventInfo, Globals, Storage}; use crate::signal::RxFuture; use crate::sync::watch; -use mio::net::UnixStream; -use std::io::{self, Error, ErrorKind, Write}; +use std::io::{self, Error, ErrorKind}; use std::sync::OnceLock; use std::task::{Context, Poll}; +#[cfg_attr( + any( + target_os = "android", + target_os = "espidf", + target_os = "fuchsia", + target_os = "hermit", + target_os = "illumos", + target_os = "linux", + ), + path = "pipe/eventfd.rs" +)] +#[cfg_attr( + not(any( + target_os = "android", + target_os = "espidf", + target_os = "fuchsia", + target_os = "hermit", + target_os = "illumos", + target_os = "linux", + )), + path = "pipe/unixstream.rs" +)] +pub(crate) mod pipe; +pub(crate) use pipe::OsExtraData; + #[cfg(not(any(target_os = "linux", target_os = "illumos")))] pub(crate) struct OsStorage([SignalInfo; 33]); @@ -60,20 +84,6 @@ impl Storage for OsStorage { } } -#[derive(Debug)] -pub(crate) struct OsExtraData { - sender: UnixStream, - pub(crate) receiver: UnixStream, -} - -impl Default for OsExtraData { - fn default() -> Self { - let (receiver, sender) = UnixStream::pair().expect("failed to create UnixStream"); - - Self { sender, receiver } - } -} - /// Represents the specific kind of signal to listen for. #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] pub struct SignalKind(libc::c_int); @@ -257,8 +267,7 @@ fn action(globals: &'static Globals, signal: libc::c_int) { // Send a wakeup, ignore any errors (anything reasonably possible is // full pipe and then it will wake up anyway). - let mut sender = &globals.sender; - drop(sender.write(&[1])); + let _ = globals.sender().write(); } /// Enables this module to receive signal notifications for the `signal` @@ -278,7 +287,7 @@ fn signal_enable(signal: SignalKind, handle: &Handle) -> io::Result<()> { // Check that we have a signal driver running handle.check_inner()?; - let globals = globals(); + let globals = globals()?; let siginfo = match globals.storage().get(signal as EventId) { Some(slot) => slot, None => return Err(io::Error::new(io::ErrorKind::Other, "signal too large")), @@ -415,7 +424,7 @@ pub(crate) fn signal_with_handle( // Turn the signal delivery on once we are ready for it signal_enable(kind, handle)?; - Ok(globals().register_listener(kind.0 as EventId)) + Ok(globals()?.register_listener(kind.0 as EventId)) } impl Signal { diff --git a/tokio/src/signal/windows/sys.rs b/tokio/src/signal/windows/sys.rs index 9fe1261ff32..c563a88b189 100644 --- a/tokio/src/signal/windows/sys.rs +++ b/tokio/src/signal/windows/sys.rs @@ -29,7 +29,7 @@ pub(super) fn ctrl_shutdown() -> io::Result { fn new(signum: u32) -> io::Result { global_init()?; - let rx = globals().register_listener(signum as EventId); + let rx = globals()?.register_listener(signum as EventId); Ok(RxFuture::new(rx)) } @@ -84,6 +84,12 @@ impl Storage for OsStorage { #[derive(Debug, Default)] pub(crate) struct OsExtraData {} +impl OsExtraData { + pub(crate) fn new() -> std::io::Result { + Ok(OsExtraData {}) + } +} + fn global_init() -> io::Result<()> { static INIT: OnceLock>> = OnceLock::new(); @@ -104,7 +110,10 @@ fn global_init() -> io::Result<()> { } unsafe extern "system" fn handler(ty: u32) -> BOOL { - let globals = globals(); + let globals = match globals() { + Ok(globals) => globals, + Err(_) => return 0, + }; globals.record_event(ty as EventId); // According to https://docs.microsoft.com/en-us/windows/console/handlerroutine