From 26f620d9a8c992f4c9b0aa7d580992231f0f35bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Lureau?= Date: Wed, 31 Jan 2024 16:36:34 +0400 Subject: [PATCH 1/7] =?UTF-8?q?=F0=9F=97=91=EF=B8=8F=20=20zb:=20drop=20unn?= =?UTF-8?q?eeded=20peer=5Fsid()?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Its usage is gone. --- zbus/src/connection/socket/tcp.rs | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/zbus/src/connection/socket/tcp.rs b/zbus/src/connection/socket/tcp.rs index 0e4dd4fc2..3c8913f0c 100644 --- a/zbus/src/connection/socket/tcp.rs +++ b/zbus/src/connection/socket/tcp.rs @@ -117,24 +117,6 @@ impl ReadHalf for tokio::net::tcp::OwnedReadHalf { ret }) } - - #[cfg(windows)] - fn peer_sid(&self) -> Option { - use crate::win32::{socket_addr_get_pid, ProcessToken}; - - let peer_addr = match self.peer_addr() { - Ok(addr) => addr, - Err(_) => return None, - }; - - if let Ok(pid) = socket_addr_get_pid(&peer_addr) { - if let Ok(process_token) = ProcessToken::open(if pid != 0 { Some(pid) } else { None }) { - return process_token.sid().ok(); - } - } - - None - } } #[cfg(feature = "tokio")] From a11d89204655f5bc2a405fc9342473abd7616310 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Lureau?= Date: Wed, 31 Jan 2024 16:37:56 +0400 Subject: [PATCH 2/7] =?UTF-8?q?=F0=9F=90=9B=20zb:=20fix=20invalid=20value?= =?UTF-8?q?=20in=20tokio/non-unix=20path?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit error[E0423]: expected value, found built-in attribute `path` --- zbus/src/address/transport/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zbus/src/address/transport/mod.rs b/zbus/src/address/transport/mod.rs index fb0b86014..4260f9bd4 100644 --- a/zbus/src/address/transport/mod.rs +++ b/zbus/src/address/transport/mod.rs @@ -131,7 +131,7 @@ impl Transport { #[cfg(not(unix))] { - let _ = path; + let _ = stream; Err(Error::Unsupported) } } From 98c854a480d412497d0144113e65ad6941520492 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Lureau?= Date: Thu, 1 Feb 2024 17:39:33 +0400 Subject: [PATCH 3/7] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20=20zb:=20fix=20unused?= =?UTF-8?q?=20ConnectionCredentials=20imports=20in=20some=20cfg?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zbus/src/connection/socket/tcp.rs | 10 ++++------ zbus/src/connection/socket/unix.rs | 24 ++++++++++++------------ 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/zbus/src/connection/socket/tcp.rs b/zbus/src/connection/socket/tcp.rs index 3c8913f0c..e93363028 100644 --- a/zbus/src/connection/socket/tcp.rs +++ b/zbus/src/connection/socket/tcp.rs @@ -1,6 +1,4 @@ #[cfg(not(feature = "tokio"))] -use crate::fdo::ConnectionCredentials; -#[cfg(not(feature = "tokio"))] use async_io::Async; use std::io; #[cfg(unix)] @@ -28,7 +26,7 @@ impl ReadHalf for Arc> { } } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { #[cfg(windows)] let creds = { let stream = self.clone(); @@ -40,7 +38,7 @@ impl ReadHalf for Arc> { let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None }) .and_then(|process_token| process_token.sid())?; io::Result::Ok( - ConnectionCredentials::default() + crate::fdo::ConnectionCredentials::default() .set_process_id(pid) .set_windows_sid(sid), ) @@ -51,7 +49,7 @@ impl ReadHalf for Arc> { }?; #[cfg(not(windows))] - let creds = ConnectionCredentials::default(); + let creds = crate::fdo::ConnectionCredentials::default(); Ok(creds) } @@ -85,7 +83,7 @@ impl WriteHalf for Arc> { .await } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { ReadHalf::peer_credentials(self).await } } diff --git a/zbus/src/connection/socket/unix.rs b/zbus/src/connection/socket/unix.rs index a7c8d5d9c..02d658221 100644 --- a/zbus/src/connection/socket/unix.rs +++ b/zbus/src/connection/socket/unix.rs @@ -26,7 +26,7 @@ use nix::{ use super::{ReadHalf, RecvmsgResult, WriteHalf}; #[cfg(feature = "tokio")] use super::{Socket, Split}; -use crate::fdo::ConnectionCredentials; + #[cfg(unix)] use crate::utils::FDS_MAX; @@ -56,7 +56,7 @@ impl ReadHalf for Arc> { true } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { get_unix_peer_creds(self).await } } @@ -106,7 +106,7 @@ impl WriteHalf for Arc> { true } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { get_unix_peer_creds(self).await } } @@ -154,7 +154,7 @@ impl ReadHalf for tokio::net::unix::OwnedReadHalf { true } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { get_unix_peer_creds(self.as_ref()).await } } @@ -204,7 +204,7 @@ impl WriteHalf for tokio::net::unix::OwnedWriteHalf { true } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { get_unix_peer_creds(self.as_ref()).await } } @@ -225,7 +225,7 @@ impl ReadHalf for Arc> { } } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { let stream = self.clone(); crate::Task::spawn_blocking( move || { @@ -234,7 +234,7 @@ impl ReadHalf for Arc> { let pid = unix_stream_get_peer_pid(&stream.get_ref())? as _; let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None }) .and_then(|process_token| process_token.sid())?; - Ok(ConnectionCredentials::default() + Ok(crate::fdo::ConnectionCredentials::default() .set_process_id(pid) .set_windows_sid(sid)) }, @@ -264,7 +264,7 @@ impl WriteHalf for Arc> { .await } - async fn peer_credentials(&mut self) -> io::Result { + async fn peer_credentials(&mut self) -> io::Result { ReadHalf::peer_credentials(self).await } } @@ -323,7 +323,7 @@ fn fd_sendmsg(fd: RawFd, buffer: &[u8], fds: &[BorrowedFd<'_>]) -> io::Result io::Result { +async fn get_unix_peer_creds(fd: &impl AsRawFd) -> io::Result { let fd = fd.as_raw_fd(); // FIXME: Is it likely enough for sending of 1 byte to block, to justify a task (possibly // launching a thread in turn)? @@ -331,7 +331,7 @@ async fn get_unix_peer_creds(fd: &impl AsRawFd) -> io::Result io::Result { +fn get_unix_peer_creds_blocking(fd: RawFd) -> io::Result { #[cfg(any(target_os = "android", target_os = "linux"))] { use nix::sys::socket::{getsockopt, sockopt::PeerCredentials}; @@ -342,7 +342,7 @@ fn get_unix_peer_creds_blocking(fd: RawFd) -> io::Result getsockopt(&fd, PeerCredentials) .map(|creds| { - ConnectionCredentials::default() + crate::fdo::ConnectionCredentials::default() .set_process_id(creds.pid() as _) .set_unix_user_id(creds.uid()) }) @@ -361,7 +361,7 @@ fn get_unix_peer_creds_blocking(fd: RawFd) -> io::Result let fd = fd.as_raw_fd(); let uid = nix::unistd::getpeereid(fd).map(|(uid, _)| uid.into())?; // FIXME: Handle pid fetching too. - Ok(ConnectionCredentials::default().set_unix_user_id(uid)) + Ok(crate::fdo::ConnectionCredentials::default().set_unix_user_id(uid)) } } From 0d74cfb5b990fc4494d6c591c4e6e6a6f0f19166 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Lureau?= Date: Wed, 31 Jan 2024 17:12:02 +0400 Subject: [PATCH 4/7] =?UTF-8?q?=F0=9F=A9=B9=20zb:=20implement=20peer=5Fcre?= =?UTF-8?q?dentials()=20for=20tokio=20win32?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zbus/src/connection/socket/tcp.rs | 35 +++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/zbus/src/connection/socket/tcp.rs b/zbus/src/connection/socket/tcp.rs index e93363028..331f9f124 100644 --- a/zbus/src/connection/socket/tcp.rs +++ b/zbus/src/connection/socket/tcp.rs @@ -115,6 +115,16 @@ impl ReadHalf for tokio::net::tcp::OwnedReadHalf { ret }) } + + #[cfg(windows)] + async fn peer_credentials(&mut self) -> io::Result { + let peer_addr = self.peer_addr()?.clone(); + crate::Task::spawn_blocking( + move || win32_credentials_from_addr(&peer_addr), + "peer credentials", + ) + .await + } } #[cfg(feature = "tokio")] @@ -141,4 +151,29 @@ impl WriteHalf for tokio::net::tcp::OwnedWriteHalf { async fn close(&mut self) -> io::Result<()> { tokio::io::AsyncWriteExt::shutdown(self).await } + + #[cfg(windows)] + async fn peer_credentials(&mut self) -> io::Result { + let peer_addr = self.peer_addr()?.clone(); + crate::Task::spawn_blocking( + move || win32_credentials_from_addr(&peer_addr), + "peer credentials", + ) + .await + } +} + +#[cfg(feature = "tokio")] +#[cfg(windows)] +fn win32_credentials_from_addr( + addr: &std::net::SocketAddr, +) -> io::Result { + use crate::win32::{socket_addr_get_pid, ProcessToken}; + + let pid = socket_addr_get_pid(addr)? as _; + let sid = ProcessToken::open(if pid != 0 { Some(pid as _) } else { None }) + .and_then(|process_token| process_token.sid())?; + Ok(crate::fdo::ConnectionCredentials::default() + .set_process_id(pid) + .set_windows_sid(sid)) } From 53dd2b16a54dfffbf275a4526a51578af29bc153 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Lureau?= Date: Wed, 31 Jan 2024 11:56:54 +0400 Subject: [PATCH 5/7] =?UTF-8?q?=E2=9C=8F=EF=B8=8F=20=20zb:=20typo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- zbus/src/connection/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zbus/src/connection/builder.rs b/zbus/src/connection/builder.rs index ca3e8db51..4fc53006c 100644 --- a/zbus/src/connection/builder.rs +++ b/zbus/src/connection/builder.rs @@ -457,7 +457,7 @@ impl<'a> Builder<'a> { } async fn stream_for_target(&mut self) -> Result { - // SAFETY: `self.target` is always `Some` from the beginning and this methos is only called + // SAFETY: `self.target` is always `Some` from the beginning and this method is only called // once. Ok(match self.target.take().unwrap() { #[cfg(not(feature = "tokio"))] From cc3620a41afad5c15ec66585c890ef2c4f940040 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Lureau?= Date: Wed, 31 Jan 2024 14:01:03 +0400 Subject: [PATCH 6/7] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20=20zb:=20impl=20From=20for=20BoxedSplit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Simplify a bit the code. --- zbus/src/connection/builder.rs | 20 ++++++++++---------- zbus/src/connection/handshake.rs | 16 ++++++++-------- zbus/src/connection/socket/split.rs | 21 +++++++++++---------- 3 files changed, 29 insertions(+), 28 deletions(-) diff --git a/zbus/src/connection/builder.rs b/zbus/src/connection/builder.rs index 4fc53006c..f9ce72db5 100644 --- a/zbus/src/connection/builder.rs +++ b/zbus/src/connection/builder.rs @@ -161,7 +161,7 @@ impl<'a> Builder<'a> { /// Create a builder for connection that will use the given socket. pub fn socket(socket: S) -> Self { - Self::new(Target::Socket(Split::new_boxed(socket))) + Self::new(Target::Socket(socket.into())) } /// Specify the mechanisms to use during authentication. @@ -461,28 +461,28 @@ impl<'a> Builder<'a> { // once. Ok(match self.target.take().unwrap() { #[cfg(not(feature = "tokio"))] - Target::UnixStream(stream) => Split::new_boxed(Async::new(stream)?), + Target::UnixStream(stream) => Async::new(stream)?.into(), #[cfg(all(unix, feature = "tokio"))] - Target::UnixStream(stream) => Split::new_boxed(stream), + Target::UnixStream(stream) => stream.into(), #[cfg(all(not(unix), feature = "tokio"))] Target::UnixStream(_) => return Err(Error::Unsupported), #[cfg(not(feature = "tokio"))] - Target::TcpStream(stream) => Split::new_boxed(Async::new(stream)?), + Target::TcpStream(stream) => Async::new(stream)?.into(), #[cfg(feature = "tokio")] - Target::TcpStream(stream) => Split::new_boxed(stream), + Target::TcpStream(stream) => stream.into(), #[cfg(all(feature = "vsock", not(feature = "tokio")))] - Target::VsockStream(stream) => Split::new_boxed(Async::new(stream)?), + Target::VsockStream(stream) => Async::new(stream)?.into(), #[cfg(feature = "tokio-vsock")] - Target::VsockStream(stream) => Split::new_boxed(stream), + Target::VsockStream(stream) => stream.into(), Target::Address(address) => match address.connect().await? { #[cfg(any(unix, not(feature = "tokio")))] - address::transport::Stream::Unix(stream) => Split::new_boxed(stream), - address::transport::Stream::Tcp(stream) => Split::new_boxed(stream), + address::transport::Stream::Unix(stream) => stream.into(), + address::transport::Stream::Tcp(stream) => stream.into(), #[cfg(any( all(feature = "vsock", not(feature = "tokio")), feature = "tokio-vsock" ))] - address::transport::Stream::Vsock(stream) => Split::new_boxed(stream), + address::transport::Stream::Vsock(stream) => stream.into(), }, Target::Socket(stream) => stream, }) diff --git a/zbus/src/connection/handshake.rs b/zbus/src/connection/handshake.rs index b3b33a3fa..0d512cb8c 100644 --- a/zbus/src/connection/handshake.rs +++ b/zbus/src/connection/handshake.rs @@ -1045,7 +1045,7 @@ mod tests { use super::*; - use crate::{connection::socket::Split, Guid, Socket}; + use crate::{Guid, Socket}; fn create_async_socket_pair() -> (impl AsyncWrite + Socket, impl AsyncWrite + Socket) { // Tokio needs us to call the sync function from async context. :shrug: @@ -1071,9 +1071,9 @@ mod tests { let (p0, p1) = create_async_socket_pair(); let guid = OwnedGuid::from(Guid::generate()); - let client = ClientHandshake::new(Split::new_boxed(p0), None, Some(guid.clone())); + let client = ClientHandshake::new(p0.into(), None, Some(guid.clone())); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), guid, Some(Uid::effective().into()), None, @@ -1097,7 +1097,7 @@ mod tests { fn pipelined_handshake() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), None, @@ -1126,7 +1126,7 @@ mod tests { fn separate_external_data() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), None, @@ -1153,7 +1153,7 @@ mod tests { fn missing_external_data() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), None, @@ -1171,7 +1171,7 @@ mod tests { fn anonymous_handshake() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), Some(vec![AuthMechanism::Anonymous].into()), @@ -1189,7 +1189,7 @@ mod tests { fn separate_anonymous_data() { let (mut p0, p1) = create_async_socket_pair(); let server = ServerHandshake::new( - Split::new_boxed(p1), + p1.into(), Guid::generate().into(), Some(Uid::effective().into()), Some(vec![AuthMechanism::Anonymous].into()), diff --git a/zbus/src/connection/socket/split.rs b/zbus/src/connection/socket/split.rs index dc26082c1..403b97568 100644 --- a/zbus/src/connection/socket/split.rs +++ b/zbus/src/connection/socket/split.rs @@ -8,16 +8,6 @@ pub struct Split { } impl Split { - /// Create a new boxed `Split` from `socket`. - pub fn new_boxed>(socket: S) -> BoxedSplit { - let split = socket.split(); - - Split { - read: Box::new(split.read), - write: Box::new(split.write), - } - } - /// Reference to the read half. pub fn read(&self) -> &R { &self.read @@ -46,3 +36,14 @@ impl Split { /// A boxed `Split`. pub type BoxedSplit = Split, Box>; + +impl From for BoxedSplit { + fn from(socket: S) -> Self { + let split = socket.split(); + + Split { + read: Box::new(split.read), + write: Box::new(split.write), + } + } +} From aca142c3b0d670acb38c16ff2f67c1f783c62b94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Lureau?= Date: Tue, 30 Jan 2024 12:51:24 +0400 Subject: [PATCH 7/7] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20=20zb:=20internal=20re?= =?UTF-8?q?factoring=20of=20address=20guid=20handling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Connect to the target and return the associated guid. As a target address may have multiple addresses in the future, return the actual associated guid used. --- zbus/src/connection/builder.rs | 44 +++++++++++++++++----------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/zbus/src/connection/builder.rs b/zbus/src/connection/builder.rs index f9ce72db5..1ce40adce 100644 --- a/zbus/src/connection/builder.rs +++ b/zbus/src/connection/builder.rs @@ -28,7 +28,7 @@ use crate::{ async_lock::RwLock, names::{InterfaceName, UniqueName, WellKnownName}, object_server::Interface, - Connection, Error, Executor, Guid, Result, + Connection, Error, Executor, Guid, OwnedGuid, Result, }; use super::{ @@ -343,17 +343,11 @@ impl<'a> Builder<'a> { } async fn build_(mut self, executor: Executor<'static>) -> Result { - let mut stream = self.stream_for_target().await?; + let (mut stream, server_guid) = self.target_connect().await?; let mut auth = match self.guid { None => { - let guid = match self.target { - Some(Target::Address(ref addr)) => { - addr.guid().map(|guid| guid.to_owned().into()) - } - _ => None, - }; // SASL Handshake - Authenticated::client(stream, guid, self.auth_mechanisms).await? + Authenticated::client(stream, server_guid, self.auth_mechanisms).await? } Some(guid) => { if !self.p2p { @@ -456,10 +450,10 @@ impl<'a> Builder<'a> { } } - async fn stream_for_target(&mut self) -> Result { + async fn target_connect(&mut self) -> Result<(BoxedSplit, Option)> { // SAFETY: `self.target` is always `Some` from the beginning and this method is only called // once. - Ok(match self.target.take().unwrap() { + let split = match self.target.take().unwrap() { #[cfg(not(feature = "tokio"))] Target::UnixStream(stream) => Async::new(stream)?.into(), #[cfg(all(unix, feature = "tokio"))] @@ -474,18 +468,24 @@ impl<'a> Builder<'a> { Target::VsockStream(stream) => Async::new(stream)?.into(), #[cfg(feature = "tokio-vsock")] Target::VsockStream(stream) => stream.into(), - Target::Address(address) => match address.connect().await? { - #[cfg(any(unix, not(feature = "tokio")))] - address::transport::Stream::Unix(stream) => stream.into(), - address::transport::Stream::Tcp(stream) => stream.into(), - #[cfg(any( - all(feature = "vsock", not(feature = "tokio")), - feature = "tokio-vsock" - ))] - address::transport::Stream::Vsock(stream) => stream.into(), - }, + Target::Address(address) => { + let guid = address.guid().map(|g| g.to_owned().into()); + let split = match address.connect().await? { + #[cfg(any(unix, not(feature = "tokio")))] + address::transport::Stream::Unix(stream) => stream.into(), + address::transport::Stream::Tcp(stream) => stream.into(), + #[cfg(any( + all(feature = "vsock", not(feature = "tokio")), + feature = "tokio-vsock" + ))] + address::transport::Stream::Vsock(stream) => stream.into(), + }; + return Ok((split, guid)); + } Target::Socket(stream) => stream, - }) + }; + + Ok((split, None)) } }