diff --git a/zbus/src/address/transport/mod.rs b/zbus/src/address/transport/mod.rs index fb0b86014..4260f9bd4 100644 --- a/zbus/src/address/transport/mod.rs +++ b/zbus/src/address/transport/mod.rs @@ -131,7 +131,7 @@ impl Transport { #[cfg(not(unix))] { - let _ = path; + let _ = stream; Err(Error::Unsupported) } } diff --git a/zbus/src/connection/builder.rs b/zbus/src/connection/builder.rs index ca3e8db51..1ce40adce 100644 --- a/zbus/src/connection/builder.rs +++ b/zbus/src/connection/builder.rs @@ -28,7 +28,7 @@ use crate::{ async_lock::RwLock, names::{InterfaceName, UniqueName, WellKnownName}, object_server::Interface, - Connection, Error, Executor, Guid, Result, + Connection, Error, Executor, Guid, OwnedGuid, Result, }; use super::{ @@ -161,7 +161,7 @@ impl<'a> Builder<'a> { /// Create a builder for connection that will use the given socket. pub fn socket(socket: S) -> Self { - Self::new(Target::Socket(Split::new_boxed(socket))) + Self::new(Target::Socket(socket.into())) } /// Specify the mechanisms to use during authentication. @@ -343,17 +343,11 @@ impl<'a> Builder<'a> { } async fn build_(mut self, executor: Executor<'static>) -> Result { - let mut stream = self.stream_for_target().await?; + let (mut stream, server_guid) = self.target_connect().await?; let mut auth = match self.guid { None => { - let guid = match self.target { - Some(Target::Address(ref addr)) => { - addr.guid().map(|guid| guid.to_owned().into()) - } - _ => None, - }; // SASL Handshake - Authenticated::client(stream, guid, self.auth_mechanisms).await? + Authenticated::client(stream, server_guid, self.auth_mechanisms).await? } Some(guid) => { if !self.p2p { @@ -456,36 +450,42 @@ impl<'a> Builder<'a> { } } - async fn stream_for_target(&mut self) -> Result { - // SAFETY: `self.target` is always `Some` from the beginning and this methos is only called + async fn target_connect(&mut self) -> Result<(BoxedSplit, Option)> { + // SAFETY: `self.target` is always `Some` from the beginning and this method is only called // once. - Ok(match self.target.take().unwrap() { + let split = match self.target.take().unwrap() { #[cfg(not(feature = "tokio"))] - Target::UnixStream(stream) => Split::new_boxed(Async::new(stream)?), + Target::UnixStream(stream) => Async::new(stream)?.into(), #[cfg(all(unix, feature = "tokio"))] - Target::UnixStream(stream) => Split::new_boxed(stream), + Target::UnixStream(stream) => stream.into(), #[cfg(all(not(unix), feature = "tokio"))] Target::UnixStream(_) => return Err(Error::Unsupported), #[cfg(not(feature = "tokio"))] - Target::TcpStream(stream) => Split::new_boxed(Async::new(stream)?), + Target::TcpStream(stream) => Async::new(stream)?.into(), #[cfg(feature = "tokio")] - Target::TcpStream(stream) => Split::new_boxed(stream), + Target::TcpStream(stream) => stream.into(), #[cfg(all(feature = "vsock", not(feature = "tokio")))] - Target::VsockStream(stream) => Split::new_boxed(Async::new(stream)?), + Target::VsockStream(stream) => Async::new(stream)?.into(), #[cfg(feature = "tokio-vsock")] - Target::VsockStream(stream) => Split::new_boxed(stream), - Target::Address(address) => match address.connect().await? { - #[cfg(any(unix, not(feature = "tokio")))] - address::transport::Stream::Unix(stream) => Split::new_boxed(stream), - address::transport::Stream::Tcp(stream) => Split::new_boxed(stream), - #[cfg(any( - all(feature = "vsock", not(feature = "tokio")), - feature = "tokio-vsock" - ))] - address::transport::Stream::Vsock(stream) => Split::new_boxed(stream), - }, + Target::VsockStream(stream) => stream.into(), + Target::Address(address) => { + let guid = address.guid().map(|g| g.to_owned().into()); + let split = match address.connect().await? { + #[cfg(any(unix, not(feature = "tokio")))] + address::transport::Stream::Unix(stream) => stream.into(), + address::transport::Stream::Tcp(stream) => stream.into(), + #[cfg(any( + all(feature = "vsock", not(feature = "tokio")), + feature = "tokio-vsock" + ))] + address::transport::Stream::Vsock(stream) => stream.into(), + }; + return Ok((split, guid)); + } Target::Socket(stream) => stream, - }) + }; + + Ok((split, None)) } } diff --git a/zbus/src/connection/handshake.rs b/zbus/src/connection/handshake.rs index b3b33a3fa..0d512cb8c 100644 --- a/zbus/src/connection/handshake.rs +++ b/zbus/src/connection/handshake.rs @@ -1045,7 +1045,7 @@ mod tests { use super::*; - use crate::{connection::socket::Split, Guid, Socket}; + use crate::{Guid, Socket}; fn create_async_socket_pair() -> (impl AsyncWrite + Socket, impl AsyncWrite + Socket) { // Tokio needs us to call the sync function from async context. :shrug: @@ -1071,9 +1071,9 @@ mod tests { let (p0, p1) = create_async_socket_pair(); let guid = OwnedGuid::from(Guid::generate()); - let client = ClientHandshake::new(Split::new_boxed(p0), None, Some(guid.clone())); + let client = ClientHandshake::new(p0.into(), None, Some(guid.clone())); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), guid, Some(Uid::effective().into()), None, @@ -1097,7 +1097,7 @@ mod tests { fn pipelined_handshake() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), None, @@ -1126,7 +1126,7 @@ mod tests { fn separate_external_data() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), None, @@ -1153,7 +1153,7 @@ mod tests { fn missing_external_data() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), None, @@ -1171,7 +1171,7 @@ mod tests { fn anonymous_handshake() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), Some(vec![AuthMechanism::Anonymous].into()), @@ -1189,7 +1189,7 @@ mod tests { fn separate_anonymous_data() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), Some(vec![AuthMechanism::Anonymous].into()), diff --git a/zbus/src/connection/socket/split.rs b/zbus/src/connection/socket/split.rs index dc26082c1..403b97568 100644 --- a/zbus/src/connection/socket/split.rs +++ b/zbus/src/connection/socket/split.rs @@ -8,16 +8,6 @@ pub struct Split { } impl Split { - /// Create a new boxed `Split` from `socket`. - pub fn new_boxed>(socket: S) -> BoxedSplit { - let split = socket.split(); - - Split { - read: Box::new(split.read), - write: Box::new(split.write), - } - } - /// Reference to the read half. pub fn read(&self) -> &R { &self.read @@ -46,3 +36,14 @@ impl Split { /// A boxed `Split`. pub type BoxedSplit = Split, Box>; + +impl From for BoxedSplit { + fn from(socket: S) -> Self { + let split = socket.split(); + + Split { + read: Box::new(split.read), + write: Box::new(split.write), + } + } +} diff --git a/zbus/src/connection/socket/tcp.rs b/zbus/src/connection/socket/tcp.rs index 0e4dd4fc2..331f9f124 100644 --- a/zbus/src/connection/socket/tcp.rs +++ b/zbus/src/connection/socket/tcp.rs @@ -1,6 +1,4 @@ #[cfg(not(feature = "tokio"))] -use crate::fdo::ConnectionCredentials; -#[cfg(not(feature = "tokio"))] use async_io::Async; use std::io; #[cfg(unix)] @@ -28,7 +26,7 @@ impl ReadHalf for Arc> { } } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { #[cfg(windows)] let creds = { let stream = self.clone(); @@ -40,7 +38,7 @@ impl ReadHalf for Arc> { let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None }) .and_then(|process_token| process_token.sid())?; io::Result::Ok( - ConnectionCredentials::default() + crate::fdo::ConnectionCredentials::default() .set_process_id(pid) .set_windows_sid(sid), ) @@ -51,7 +49,7 @@ impl ReadHalf for Arc> { }?; #[cfg(not(windows))] - let creds = ConnectionCredentials::default(); + let creds = crate::fdo::ConnectionCredentials::default(); Ok(creds) } @@ -85,7 +83,7 @@ impl WriteHalf for Arc> { .await } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { ReadHalf::peer_credentials(self).await } } @@ -119,21 +117,13 @@ impl ReadHalf for tokio::net::tcp::OwnedReadHalf { } #[cfg(windows)] - fn peer_sid(&self) -> Option { - use crate::win32::{socket_addr_get_pid, ProcessToken}; - - let peer_addr = match self.peer_addr() { - Ok(addr) => addr, - Err(_) => return None, - }; - - if let Ok(pid) = socket_addr_get_pid(&peer_addr) { - if let Ok(process_token) = ProcessToken::open(if pid != 0 { Some(pid) } else { None }) { - return process_token.sid().ok(); - } - } - - None + async fn peer_credentials(&mut self) -> io::Result { + let peer_addr = self.peer_addr()?.clone(); + crate::Task::spawn_blocking( + move || win32_credentials_from_addr(&peer_addr), + "peer credentials", + ) + .await } } @@ -161,4 +151,29 @@ impl WriteHalf for tokio::net::tcp::OwnedWriteHalf { async fn close(&mut self) -> io::Result<()> { tokio::io::AsyncWriteExt::shutdown(self).await } + + #[cfg(windows)] + async fn peer_credentials(&mut self) -> io::Result { + let peer_addr = self.peer_addr()?.clone(); + crate::Task::spawn_blocking( + move || win32_credentials_from_addr(&peer_addr), + "peer credentials", + ) + .await + } +} + +#[cfg(feature = "tokio")] +#[cfg(windows)] +fn win32_credentials_from_addr( + addr: &std::net::SocketAddr, +) -> io::Result { + use crate::win32::{socket_addr_get_pid, ProcessToken}; + + let pid = socket_addr_get_pid(addr)? as _; + let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None }) + .and_then(|process_token| process_token.sid())?; + Ok(crate::fdo::ConnectionCredentials::default() + .set_process_id(pid) + .set_windows_sid(sid)) } diff --git a/zbus/src/connection/socket/unix.rs b/zbus/src/connection/socket/unix.rs index a7c8d5d9c..02d658221 100644 --- a/zbus/src/connection/socket/unix.rs +++ b/zbus/src/connection/socket/unix.rs @@ -26,7 +26,7 @@ use nix::{ use super::{ReadHalf, RecvmsgResult, WriteHalf}; #[cfg(feature = "tokio")] use super::{Socket, Split}; -use crate::fdo::ConnectionCredentials; + #[cfg(unix)] use crate::utils::FDS_MAX; @@ -56,7 +56,7 @@ impl ReadHalf for Arc> { true } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { get_unix_peer_creds(self).await } } @@ -106,7 +106,7 @@ impl WriteHalf for Arc> { true } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { get_unix_peer_creds(self).await } } @@ -154,7 +154,7 @@ impl ReadHalf for tokio::net::unix::OwnedReadHalf { true } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { get_unix_peer_creds(self.as_ref()).await } } @@ -204,7 +204,7 @@ impl WriteHalf for tokio::net::unix::OwnedWriteHalf { true } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { get_unix_peer_creds(self.as_ref()).await } } @@ -225,7 +225,7 @@ impl ReadHalf for Arc> { } } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { let stream = self.clone(); crate::Task::spawn_blocking( move || { @@ -234,7 +234,7 @@ impl ReadHalf for Arc> { let pid = unix_stream_get_peer_pid(&stream.get_ref())? as _; let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None }) .and_then(|process_token| process_token.sid())?; - Ok(ConnectionCredentials::default() + Ok(crate::fdo::ConnectionCredentials::default() .set_process_id(pid) .set_windows_sid(sid)) }, @@ -264,7 +264,7 @@ impl WriteHalf for Arc> { .await } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { ReadHalf::peer_credentials(self).await } } @@ -323,7 +323,7 @@ fn fd_sendmsg(fd: RawFd, buffer: &[u8], fds: &[BorrowedFd<'_>]) -> io::Result io::Result { +async fn get_unix_peer_creds(fd: &impl AsRawFd) -> io::Result { let fd = fd.as_raw_fd(); // FIXME: Is it likely enough for sending of 1 byte to block, to justify a task (possibly // launching a thread in turn)? @@ -331,7 +331,7 @@ async fn get_unix_peer_creds(fd: &impl AsRawFd) -> io::Result io::Result { +fn get_unix_peer_creds_blocking(fd: RawFd) -> io::Result { #[cfg(any(target_os = "android", target_os = "linux"))] { use nix::sys::socket::{getsockopt, sockopt::PeerCredentials}; @@ -342,7 +342,7 @@ fn get_unix_peer_creds_blocking(fd: RawFd) -> io::Result getsockopt(&fd, PeerCredentials) .map(|creds| { - ConnectionCredentials::default() + crate::fdo::ConnectionCredentials::default() .set_process_id(creds.pid() as _) .set_unix_user_id(creds.uid()) }) @@ -361,7 +361,7 @@ fn get_unix_peer_creds_blocking(fd: RawFd) -> io::Result let fd = fd.as_raw_fd(); let uid = nix::unistd::getpeereid(fd).map(|(uid, _)| uid.into())?; // FIXME: Handle pid fetching too. - Ok(ConnectionCredentials::default().set_unix_user_id(uid)) + Ok(crate::fdo::ConnectionCredentials::default().set_unix_user_id(uid)) } }