diff --git a/Cargo.lock b/Cargo.lock index 6a93837..31c269d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -209,14 +209,12 @@ name = "nng" version = "1.0.1" dependencies = [ "log", - "nng-sys 0.3.0+1.11.0", + "nng-sys 0.4.0-v2pre.2+v2.0.0-alpha.7 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "nng-sys" -version = "0.3.0+1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a704f444070c59b169a15d18d8503f474a873b1d49b50ddb183ac576f302a657" +version = "0.4.0-v2pre.2+v2.0.0-alpha.7" dependencies = [ "bindgen", "cc", @@ -229,6 +227,8 @@ dependencies = [ [[package]] name = "nng-sys" version = "0.4.0-v2pre.2+v2.0.0-alpha.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1400b22e8b705ada022d2b2b85b2ee2d026f3e32670473a005aa376ac8d533a9" dependencies = [ "bindgen", "cc", diff --git a/nng/CHANGELOG.adoc b/nng/CHANGELOG.adoc index 8973719..08f33e9 100644 --- a/nng/CHANGELOG.adoc +++ b/nng/CHANGELOG.adoc @@ -8,12 +8,18 @@ The format is based on https://keepachangelog.com/en/1.0.0/[Keep a Changelog] an === Added === +* Added NNG initialization function and associated type. + === Changed === +* The crate now depends on NNGv2. + === Deprecated === === Removed === +* Anything and everything related to options (NOTE TO SELF: make a replacement). + === Fixed === === Security === diff --git a/nng/Cargo.toml b/nng/Cargo.toml index 261aff4..e8760f8 100644 --- a/nng/Cargo.toml +++ b/nng/Cargo.toml @@ -27,7 +27,43 @@ ffi-module = [] log = "0.4.28" [dependencies.nng-sys] -version = "0.3.0" -# TODO(flxo): use path dependency once nng is updated to use nng-sys v0.4 -# path = "../nng-sys" +version = "0.4.0-v2pre.2" default-features = false + +[lints.rust] +improper_ctypes = "forbid" +bare_trait_objects = "deny" +missing_debug_implementations = "deny" +missing_docs = "deny" + +[lints.clippy] +all = { level = "deny", priority = -1 } +wrong_self_convention = { level = "deny", priority = -1 } + +# I would like to be able to keep these on, but due to the nature of the crate +# it just isn't feasible. For example, the "cast_sign_loss" will warn at every +# i32/u32 conversion. Normally, I would like that, but this library is a safe +# wrapper around a Bindgen-based binding of a C library, which means the types +# are a little bit up-in-the-air. +cast_sign_loss = "allow" +empty_enum = "allow" # Revisit after RFC1861 and RFC1216. +cargo_common_metadata = "allow" # Can't control this. +module_name_repetitions = "allow" # Doesn't recognize public re-exports. +cast_possible_wrap = "allow" + +# I want to enable this but it requires bumping the Rustc version and I don't +# want to do that just for a clippy lint. +ptr_as_ptr = "allow" + +# In these cases, I just don't like what Clippy suggests. +use_self = "allow" +if_not_else = "allow" +must_use_candidate = "allow" +missing_const_for_fn = "allow" +option_if_let_else = "allow" # Semantically backwards when used with non-zero error codes +wildcard_imports = "allow" # I don't generally like them either but can be used well +enum_glob_use = "allow" # Same as wildcards +manual_non_exhaustive = "allow" # Not available in v1.36 + +# This isn't smart enough to notice that we're converting to a variable type. +unnecessary_cast = "allow" diff --git a/nng/examples/pair.rs b/nng/examples/pair.rs index d06e471..dd2e5e0 100644 --- a/nng/examples/pair.rs +++ b/nng/examples/pair.rs @@ -6,10 +6,7 @@ //! This example was derived from [this NNG example][1]. //! //! [1]: https://nanomsg.org/gettingstarted/nng/pair.html -use nng::{ - options::{Options, RecvTimeout}, - Error, Message, Protocol, Socket, -}; +use nng::{Error, Message, Protocol, Socket}; use std::{env, io::Write, process, str, thread, time::Duration}; /// Entry point of the application. @@ -44,7 +41,6 @@ fn node1(url: &str) -> Result<(), Error> { /// Sends and receives messages on the socket. fn send_recv(s: &Socket, name: &str) -> Result<(), Error> { - s.set_opt::(Some(Duration::from_millis(100)))?; loop { // Attempt to reuse the message if we can. let mut msg = match s.recv() { diff --git a/nng/examples/pubsub.rs b/nng/examples/pubsub.rs index 5641d81..254009f 100644 --- a/nng/examples/pubsub.rs +++ b/nng/examples/pubsub.rs @@ -2,10 +2,7 @@ //! //! This pattern is used to allow a single broadcaster to publish messages to many subscribers, //! which may choose to limit which messages they receive. -use nng::{ - options::{protocol::pubsub::Subscribe, Options}, - PipeEvent, Protocol, Socket, -}; +use nng::{PipeEvent, Protocol, Socket}; use std::{ convert::TryInto, env, process, @@ -66,8 +63,8 @@ fn subscriber(url: &str) -> Result<(), nng::Error> { s.dial(url)?; println!("SUBSCRIBER: SUBSCRIBING TO ALL TOPICS"); - let all_topics = vec![]; - s.set_opt::(all_topics)?; + //let all_topics = vec![]; + //s.set_opt::(all_topics)?; loop { let msg = s.recv()?; diff --git a/nng/src/addr.rs b/nng/src/addr.rs index 484d409..bc2153c 100644 --- a/nng/src/addr.rs +++ b/nng/src/addr.rs @@ -20,10 +20,6 @@ pub enum SocketAddr { /// Address for TCP/IP (v6) communication. Inet6(SocketAddrV6), - #[doc(hidden)] - /// Used to represent a ZeroTier address. - ZeroTier(SocketAddrZt), - #[doc(hidden)] /// Used to represent an abstract IPC socket address. Abstract(Box<[u8]>), @@ -44,7 +40,6 @@ impl fmt::Display for SocketAddr { SocketAddr::Ipc(s) => write!(f, "ipc://{}", s.to_string_lossy()), SocketAddr::Inet(s) => write!(f, "tcp://{s}"), SocketAddr::Inet6(s) => write!(f, "tcp://{s}"), - SocketAddr::ZeroTier(s) => write!(f, "zt://{s}"), SocketAddr::Abstract(s) => { write!(f, "abstract://")?; // Quick-and-dirty URI-encoding: @@ -82,9 +77,6 @@ impl From for SocketAddr { let port = addr.s_in6.sa_port; SocketAddr::Inet6(SocketAddrV6::new(v6_addr, port, 0, 0)) } - Some(nng_sys::nng_sockaddr_family::NNG_AF_ZT) => { - SocketAddr::ZeroTier(SocketAddrZt::new(&addr.s_zt)) - } Some(nng_sys::nng_sockaddr_family::NNG_AF_ABSTRACT) => SocketAddr::Abstract( Box::from(&addr.s_abstract.sa_name[..usize::from(addr.s_abstract.sa_len)]), ), @@ -97,35 +89,6 @@ impl From for SocketAddr { } } -/// A ZeroTier socket address. -#[doc(hidden)] -#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] -pub struct SocketAddrZt { - pub family: u16, - pub nwid: u64, - pub nodeid: u64, - pub port: u32, -} -impl SocketAddrZt { - /// Converts an `nng_sockaddr_zt` into its corresponding Rust type. - const fn new(addr: &nng_sys::nng_sockaddr_zt) -> SocketAddrZt { - SocketAddrZt { - family: addr.sa_family, - nwid: addr.sa_nwid, - nodeid: addr.sa_nodeid, - port: addr.sa_port, - } - } -} -impl fmt::Display for SocketAddrZt { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - // I have no idea if this output is meaningful at all. This is just vaguely - // based off the URI format for ZeroTier, ignoring fields that don't appear in - // the specification and guessing how all of the others align. - write!(f, "{}.{}:{}", self.nodeid, self.nwid, self.port) - } -} - /// Creates a `String` from a slice that _probably_ contains UTF-8 and /// _probably_ is null terminated. /// diff --git a/nng/src/aio.rs b/nng/src/aio.rs index 4128502..4b3c556 100644 --- a/nng/src/aio.rs +++ b/nng/src/aio.rs @@ -11,6 +11,7 @@ use std::{ }; use log::error; +use nng_sys::nng_err; use crate::{ ctx::Context, @@ -172,7 +173,7 @@ impl Aio { let res = unsafe { let state = cb_aio.inner.state.load(Ordering::Acquire).into(); let aiop = cb_aio.inner.handle.load(Ordering::Relaxed); - let rv = nng_sys::nng_aio_result(aiop) as u32; + let nng_err(rv) = nng_sys::nng_aio_result(aiop); let res = match (state, rv) { (State::Sending, 0) => AioResult::Send(Ok(())), @@ -218,7 +219,8 @@ impl Aio { let mut aio: *mut nng_sys::nng_aio = ptr::null_mut(); let aiop: *mut *mut nng_sys::nng_aio = &mut aio as _; - let rv = unsafe { nng_sys::nng_aio_alloc(aiop, Some(Aio::trampoline), callback_ptr as _) }; + let nng_err(rv) = + unsafe { nng_sys::nng_aio_alloc(aiop, Some(Aio::trampoline), callback_ptr as _) }; // NNG should never touch the pointer and return a non-zero code at the same // time. That being said, I'm going to be a pessimist and double check. If we do @@ -231,7 +233,7 @@ impl Aio { error!("NNG returned a non-null pointer from a failed function"); return Err(Error::Unknown(0)); } - validate_ptr(rv, aio)?; + validate_ptr(rv as _, aio)?; inner.handle.store(aio, Ordering::Release); inner.callback.store(callback_ptr, Ordering::Relaxed); @@ -342,7 +344,7 @@ impl Aio { let aiop = self.inner.handle.load(Ordering::Relaxed); unsafe { nng_sys::nng_aio_set_msg(aiop, msg.into_ptr().as_ptr()); - nng_sys::nng_send_aio(socket.handle(), aiop); + nng_sys::nng_socket_send(socket.handle(), aiop); } Ok(()) } @@ -358,7 +360,7 @@ impl Aio { let aiop = self.inner.handle.load(Ordering::Relaxed); unsafe { - nng_sys::nng_recv_aio(socket.handle(), aiop); + nng_sys::nng_socket_recv(socket.handle(), aiop); } Ok(()) } diff --git a/nng/src/ctx.rs b/nng/src/ctx.rs index 1b465cc..df20e05 100644 --- a/nng/src/ctx.rs +++ b/nng/src/ctx.rs @@ -11,6 +11,7 @@ use crate::{ message::Message, socket::Socket, }; +use nng_sys::nng_err; /// A socket context. /// @@ -145,30 +146,6 @@ impl Hash for Context { } } -#[rustfmt::skip] -expose_options!{ - Context :: inner.ctx -> nng_sys::nng_ctx; - - GETOPT_BOOL = nng_sys::nng_ctx_get_bool; - GETOPT_INT = nng_sys::nng_ctx_get_int; - GETOPT_MS = nng_sys::nng_ctx_get_ms; - GETOPT_SIZE = nng_sys::nng_ctx_get_size; - GETOPT_SOCKADDR = nng_sys::nng_ctx_get_addr; - GETOPT_STRING = nng_sys::nng_ctx_get_string; - GETOPT_UINT64 = nng_sys::nng_ctx_get_uint64; - - SETOPT = nng_sys::nng_ctx_set; - SETOPT_BOOL = nng_sys::nng_ctx_set_bool; - SETOPT_INT = nng_sys::nng_ctx_set_int; - SETOPT_MS = nng_sys::nng_ctx_set_ms; - SETOPT_PTR = nng_sys::nng_ctx_set_ptr; - SETOPT_SIZE = nng_sys::nng_ctx_set_size; - SETOPT_STRING = nng_sys::nng_ctx_set_string; - - Gets -> [protocol::reqrep::ResendTime, protocol::survey::SurveyTime]; - Sets -> [protocol::reqrep::ResendTime, protocol::survey::SurveyTime]; -} - /// A wrapper around an `nng_ctx`. #[derive(Debug)] struct Inner { @@ -180,7 +157,7 @@ impl Inner { // was never open. Neither of those are an issue for us. let rv = unsafe { nng_sys::nng_ctx_close(self.ctx) }; assert!( - rv == 0 || rv == nng_sys::NNG_ECLOSED as i32, + rv == 0 || nng_err(rv as _) == nng_sys::nng_err::NNG_ECLOSED, "Unexpected error code while closing context ({})", rv ); diff --git a/nng/src/device.rs b/nng/src/device.rs index 15a4545..6578ba6 100644 --- a/nng/src/device.rs +++ b/nng/src/device.rs @@ -1,5 +1,7 @@ use std::num::NonZeroU32; +use nng_sys::nng_err; + use crate::{ error::{Error, Result}, socket::RawSocket, @@ -39,7 +41,7 @@ use crate::{ /// [`MaxTtl`]: options/enum.MaxTtl.html /// [`OutOfMemory`]: enum.Error.html#variant.OutOfMemory pub fn forwarder(s1: RawSocket, s2: RawSocket) -> Result<()> { - let rv = unsafe { nng_sys::nng_device(s1.socket.handle(), s2.socket.handle()) }; + let nng_err(rv) = unsafe { nng_sys::nng_device(s1.socket.handle(), s2.socket.handle()) }; // Appease Clippy. drop(s1); @@ -76,7 +78,7 @@ pub fn forwarder(s1: RawSocket, s2: RawSocket) -> Result<()> { /// [`InvalidInput`]: enum.Error.html#variant.InvalidInput /// [`OutOfMemory`]: enum.Error.html#variant.OutOfMemory pub fn reflector(s1: RawSocket) -> Result<()> { - let rv = unsafe { + let nng_err(rv) = unsafe { nng_sys::nng_device( s1.socket.handle(), nng_sys::nng_socket::NNG_SOCKET_INITIALIZER, diff --git a/nng/src/dialer.rs b/nng/src/dialer.rs index 990865c..3a68ce6 100644 --- a/nng/src/dialer.rs +++ b/nng/src/dialer.rs @@ -5,6 +5,8 @@ use std::{ num::NonZeroU32, }; +use nng_sys::nng_err; + use crate::{ error::{Error, Result}, socket::Socket, @@ -110,7 +112,7 @@ impl Dialer { // both of those mean that the drop was successful. let rv = unsafe { nng_sys::nng_dialer_close(self.handle) }; assert!( - rv == 0 || rv == nng_sys::NNG_ECLOSED as i32, + rv == 0 || nng_err(rv as _) == nng_sys::nng_err::NNG_ECLOSED, "Unexpected error code while closing dialer ({})", rv ); @@ -167,39 +169,6 @@ impl Hash for Dialer { } } -#[rustfmt::skip] -expose_options!{ - Dialer :: handle -> nng_sys::nng_dialer; - - GETOPT_BOOL = nng_sys::nng_dialer_get_bool; - GETOPT_INT = nng_sys::nng_dialer_get_int; - GETOPT_MS = nng_sys::nng_dialer_get_ms; - GETOPT_SIZE = nng_sys::nng_dialer_get_size; - GETOPT_SOCKADDR = nng_sys::nng_dialer_get_addr; - GETOPT_STRING = nng_sys::nng_dialer_get_string; - GETOPT_UINT64 = nng_sys::nng_dialer_get_uint64; - - SETOPT = nng_sys::nng_dialer_set; - SETOPT_BOOL = nng_sys::nng_dialer_set_bool; - SETOPT_INT = nng_sys::nng_dialer_set_int; - SETOPT_MS = nng_sys::nng_dialer_set_ms; - SETOPT_PTR = nng_sys::nng_dialer_set_ptr; - SETOPT_SIZE = nng_sys::nng_dialer_set_size; - SETOPT_STRING = nng_sys::nng_dialer_set_string; - - Gets -> [LocalAddr, Raw, ReconnectMinTime, - ReconnectMaxTime, RecvBufferSize, - RecvMaxSize, RecvTimeout, - SendBufferSize, SendTimeout, - SocketName, MaxTtl, Url, - protocol::reqrep::ResendTime, - protocol::survey::SurveyTime, - transport::tcp::NoDelay, - transport::tcp::KeepAlive, - transport::websocket::Protocol]; - Sets -> []; -} - /// Configuration utility for NNG dialers. /// /// This object allows for the configuration of dialers before they are @@ -311,52 +280,13 @@ impl DialerBuilder { } } -#[rustfmt::skip] -expose_options!{ - DialerBuilder :: handle -> nng_sys::nng_dialer; - - GETOPT_BOOL = nng_sys::nng_dialer_get_bool; - GETOPT_INT = nng_sys::nng_dialer_get_int; - GETOPT_MS = nng_sys::nng_dialer_get_ms; - GETOPT_SIZE = nng_sys::nng_dialer_get_size; - GETOPT_SOCKADDR = nng_sys::nng_dialer_get_addr; - GETOPT_STRING = nng_sys::nng_dialer_get_string; - GETOPT_UINT64 = nng_sys::nng_dialer_get_uint64; - - SETOPT = nng_sys::nng_dialer_set; - SETOPT_BOOL = nng_sys::nng_dialer_set_bool; - SETOPT_INT = nng_sys::nng_dialer_set_int; - SETOPT_MS = nng_sys::nng_dialer_set_ms; - SETOPT_PTR = nng_sys::nng_dialer_set_ptr; - SETOPT_SIZE = nng_sys::nng_dialer_set_size; - SETOPT_STRING = nng_sys::nng_dialer_set_string; - - Gets -> [LocalAddr, Raw, ReconnectMinTime, - ReconnectMaxTime, RecvBufferSize, - RecvMaxSize, RecvTimeout, - SendBufferSize, SendTimeout, - SocketName, MaxTtl, Url, - protocol::reqrep::ResendTime, - protocol::survey::SurveyTime, - transport::tcp::NoDelay, - transport::tcp::KeepAlive, - transport::websocket::Protocol]; - Sets -> [ReconnectMinTime, ReconnectMaxTime, - RecvMaxSize, transport::tcp::NoDelay, - transport::tcp::KeepAlive, - transport::tls::CaFile, - transport::tls::CertKeyFile, - transport::websocket::RequestHeaders, - transport::websocket::Protocol]; -} - impl Drop for DialerBuilder { fn drop(&mut self) { // Closing the dialer should only ever result in success or ECLOSED and // both of those mean that the drop was successful. let rv = unsafe { nng_sys::nng_dialer_close(self.handle) }; assert!( - rv == 0 || rv == nng_sys::NNG_ECLOSED as i32, + rv == 0 || nng_err(rv as _) == nng_sys::nng_err::NNG_ECLOSED, "Unexpected error code while closing dialer ({})", rv ); diff --git a/nng/src/error.rs b/nng/src/error.rs index 99389b5..22cdfc7 100644 --- a/nng/src/error.rs +++ b/nng/src/error.rs @@ -1,5 +1,7 @@ use std::{error, fmt, io, num::NonZeroU32}; +use nng_sys::nng_err; + use crate::message::Message; /// Specialized `Result` type for use with NNG. @@ -133,42 +135,40 @@ impl From for Error { #[rustfmt::skip] fn from(code: NonZeroU32) -> Error { - match code.get() { - nng_sys::NNG_EINTR => Error::Interrupted, - nng_sys::NNG_ENOMEM => Error::OutOfMemory, - nng_sys::NNG_EINVAL => Error::InvalidInput, - nng_sys::NNG_EBUSY => Error::Busy, - nng_sys::NNG_ETIMEDOUT => Error::TimedOut, - nng_sys::NNG_ECONNREFUSED => Error::ConnectionRefused, - nng_sys::NNG_ECLOSED => Error::Closed, - nng_sys::NNG_EAGAIN => Error::TryAgain, - nng_sys::NNG_ENOTSUP => Error::NotSupported, - nng_sys::NNG_EADDRINUSE => Error::AddressInUse, - nng_sys::NNG_ESTATE => Error::IncorrectState, - nng_sys::NNG_ENOENT => Error::EntryNotFound, - nng_sys::NNG_EPROTO => Error::Protocol, - nng_sys::NNG_EUNREACHABLE => Error::DestUnreachable, - nng_sys::NNG_EADDRINVAL => Error::AddressInvalid, - nng_sys::NNG_EPERM => Error::PermissionDenied, - nng_sys::NNG_EMSGSIZE => Error::MessageTooLarge, - nng_sys::NNG_ECONNABORTED => Error::ConnectionAborted, - nng_sys::NNG_ECONNRESET => Error::ConnectionReset, - nng_sys::NNG_ECANCELED => Error::Canceled, - nng_sys::NNG_ENOFILES => Error::OutOfFiles, - nng_sys::NNG_ENOSPC => Error::OutOfSpace, - nng_sys::NNG_EEXIST => Error::ResourceExists, - nng_sys::NNG_EREADONLY => Error::ReadOnly, - nng_sys::NNG_EWRITEONLY => Error::WriteOnly, - nng_sys::NNG_ECRYPTO => Error::Crypto, - nng_sys::NNG_EPEERAUTH => Error::PeerAuth, - nng_sys::NNG_ENOARG => Error::NoArgument, - nng_sys::NNG_EAMBIGUOUS => Error::Ambiguous, - nng_sys::NNG_EBADTYPE => Error::BadType, - nng_sys::NNG_ECONNSHUT => Error::ConnectionShutdown, - nng_sys::NNG_EINTERNAL => Error::Internal, - c if c & nng_sys::NNG_ESYSERR != 0 => Error::SystemErr(c & !nng_sys::NNG_ESYSERR), - c if c & nng_sys::NNG_ETRANERR != 0 => Error::TransportErr(c & !nng_sys::NNG_ETRANERR), - c => Error::Unknown(c), + match nng_err(code.get() as _) { + nng_err::NNG_EINTR => Error::Interrupted, + nng_err::NNG_ENOMEM => Error::OutOfMemory, + nng_err::NNG_EINVAL => Error::InvalidInput, + nng_err::NNG_EBUSY => Error::Busy, + nng_err::NNG_ETIMEDOUT => Error::TimedOut, + nng_err::NNG_ECONNREFUSED => Error::ConnectionRefused, + nng_err::NNG_ECLOSED => Error::Closed, + nng_err::NNG_EAGAIN => Error::TryAgain, + nng_err::NNG_ENOTSUP => Error::NotSupported, + nng_err::NNG_EADDRINUSE => Error::AddressInUse, + nng_err::NNG_ESTATE => Error::IncorrectState, + nng_err::NNG_ENOENT => Error::EntryNotFound, + nng_err::NNG_EPROTO => Error::Protocol, + nng_err::NNG_EUNREACHABLE => Error::DestUnreachable, + nng_err::NNG_EADDRINVAL => Error::AddressInvalid, + nng_err::NNG_EPERM => Error::PermissionDenied, + nng_err::NNG_EMSGSIZE => Error::MessageTooLarge, + nng_err::NNG_ECONNABORTED => Error::ConnectionAborted, + nng_err::NNG_ECONNRESET => Error::ConnectionReset, + nng_err::NNG_ECANCELED => Error::Canceled, + nng_err::NNG_ENOFILES => Error::OutOfFiles, + nng_err::NNG_ENOSPC => Error::OutOfSpace, + nng_err::NNG_EEXIST => Error::ResourceExists, + nng_err::NNG_EREADONLY => Error::ReadOnly, + nng_err::NNG_EWRITEONLY => Error::WriteOnly, + nng_err::NNG_ECRYPTO => Error::Crypto, + nng_err::NNG_EPEERAUTH => Error::PeerAuth, + nng_err::NNG_EBADTYPE => Error::BadType, + nng_err::NNG_ECONNSHUT => Error::ConnectionShutdown, + nng_err::NNG_EINTERNAL => Error::Internal, + nng_err(c) if c & nng_err::NNG_ESYSERR.0 != 0 => Error::SystemErr(c & !nng_err::NNG_ESYSERR.0), + nng_err(c) if c & nng_err::NNG_ETRANERR.0 != 0 => Error::TransportErr(c & !nng_err::NNG_ETRANERR.0), + c => Error::Unknown(c.0), } } } diff --git a/nng/src/lib.rs b/nng/src/lib.rs index f663817..6879662 100644 --- a/nng/src/lib.rs +++ b/nng/src/lib.rs @@ -122,47 +122,7 @@ //! [4]: https://nanomsg.github.io/nng/man/v1.2.2/nng_rep.7 //! [5]: https://doc.rust-lang.org/cargo/reference/manifest.html#the-patch-section //! [6]: https://github.com/rust-lang/cargo/issues/2980 - -// The following lints are of critical importance. -#![forbid(improper_ctypes)] -// Utilize Clippy to try and keep this crate clean. At some point (cargo#5034, I think?) this -// specification should be possible in either the Clippy TOML file or in the Cargo TOML file. These -// should be moved there once possible. -#![deny(bare_trait_objects)] -#![deny(missing_debug_implementations)] -#![deny(missing_docs)] -#![deny(clippy::all)] -#![deny(clippy::wrong_self_convention)] -// Clippy doesn't enable these with "all". Best to keep them warnings. -#![warn(clippy::nursery)] -#![warn(clippy::pedantic)] -#![warn(clippy::cargo)] -#![warn(clippy::clone_on_ref_ptr)] -#![warn(clippy::decimal_literal_representation)] -#![warn(clippy::print_stdout)] -#![warn(clippy::unimplemented)] -#![warn(clippy::use_debug)] -// I would like to be able to keep these on, but due to the nature of the crate it just isn't -// feasible. For example, the "cast_sign_loss" will warn at every i32/u32 conversion. Normally, I -// would like that, but this library is a safe wrapper around a Bindgen-based binding of a C -// library, which means the types are a little bit up-in-the-air. -#![allow(clippy::cast_sign_loss)] -#![allow(clippy::empty_enum)] // Revisit after RFC1861 and RFC1216. -#![allow(clippy::cargo_common_metadata)] // Can't control this. -#![allow(clippy::module_name_repetitions)] // Doesn't recognize public re-exports. -#![allow(clippy::cast_possible_wrap)] -// I want to enable this but it requires bumping the Rustc version and I don't want to do that just -// for a clippy lint. -#![allow(clippy::ptr_as_ptr)] -// In these cases, I just don't like what Clippy suggests. -#![allow(clippy::use_self)] -#![allow(clippy::if_not_else)] -#![allow(clippy::must_use_candidate)] -#![allow(clippy::missing_const_for_fn)] -#![allow(clippy::option_if_let_else)] // Semantically backwards when used with non-zero error codes -#![allow(clippy::wildcard_imports)] // I don't generally like them either but can be used well -#![allow(clippy::enum_glob_use)] // Same as wildcards -#![allow(clippy::manual_non_exhaustive)] // Not available in v1.36 +use std::num::NonZeroU16; #[macro_use] mod util; @@ -179,8 +139,6 @@ mod pipe; mod protocol; mod socket; -pub mod options; - #[cfg(feature = "ffi-module")] /// Raw NNG foreign function interface. pub use nng_sys as ffi; @@ -200,3 +158,102 @@ pub use crate::{ protocol::Protocol, socket::{RawSocket, Socket}, }; + +/// A handle to the NNG resources. +/// +/// This type can be used to configure the parameters of NNG resources or to +/// control the life of said resources. Every [`Socket`] has an implicit handle +/// to those resources and will create it with the default values. If one of +/// these is created before any sockets, it can be used to set runtime tunables +/// for NNG. +#[non_exhaustive] +#[derive(Debug, Default)] +pub struct Params { + /// The number of threads to use for tasks. + /// + /// These tasks are principally for callback completion. This number cannot + /// exceed [`Init::max_task_threads`]. + pub num_task_threads: Option, + + /// The maximum number of threads to use for tasks. + /// + /// These tasks are principally used for callback completion. This value can + /// be used to provide an upper limit while still allowing the number of + /// task threads to be dynamically calculated. + pub max_task_threads: Option, + + /// The number of threads used for expiring operations. + /// + /// Using a larger value will reduce contention on some common locks, and + /// may improve performance. This number cannot exceed + /// [`Init::max_expire_threads`]. + pub num_expire_threads: Option, + + /// The maximum number of threads used for expiring operations. + /// + /// Using a larger value will reduce contention on some common locks, and + /// may improve performance. This can be used to provide an upper limit + /// while still allowing a dynamic count. + pub max_expire_threads: Option, + + /// The number of threads to use for performing I/O. + /// + /// Not all configurations will support this. Cannot exceed + /// [`Init::max_poller_threads`]. + pub num_poller_threads: Option, + + /// The maximum number of threads to use for performing I/O. + /// + /// Not all configurations will support this. This allows an upper limit on + /// the number of polling threads while still allowing the count to be + /// dynamically calculated. + pub max_poller_threads: Option, + + /// The number of threads used for asynchronous DNS lookup. + pub num_resolver_threads: Option, +} + +impl Params { + /// Initializes NNG with the specified tunables. + /// + /// This initializes NNG with the provided parameters and will prevent NNG + /// from releasing certain resources until after the provided handle is + /// dropped. Each [`Socket`] implicitly has their own handle and the + /// resources will not be released until all [`Socket`] objects are dropped + /// as well. + /// + /// # Errors + /// + /// If NNG has already been initialized, this will return [`Error::Busy`]. + pub fn init(self) -> Result { + let params = self.into(); + + // SAFETY: We know that this is a valid pointer. + let rv = unsafe { nng_sys::nng_init(¶ms) }; + + struct Defer; + impl Drop for Defer { + fn drop(&mut self) { + // SAFETY: We did the initialization above. + unsafe { nng_sys::nng_fini() } + } + } + + rv2res!(rv.0, Defer) + } +} + +impl From for nng_sys::nng_init_params { + fn from(params: Params) -> Self { + let convert = |n: NonZeroU16| n.get() as i16; + nng_sys::nng_init_params { + num_task_threads: params.num_task_threads.map_or(0, convert), + max_task_threads: params.max_task_threads.map_or(0, convert), + num_expire_threads: params.num_expire_threads.map_or(0, convert), + max_expire_threads: params.max_expire_threads.map_or(0, convert), + num_poller_threads: params.num_poller_threads.map_or(0, convert), + max_poller_threads: params.max_poller_threads.map_or(0, convert), + num_resolver_threads: params.num_resolver_threads.map_or(0, convert), + } + } +} diff --git a/nng/src/listener.rs b/nng/src/listener.rs index d2b7da3..a42bc5b 100644 --- a/nng/src/listener.rs +++ b/nng/src/listener.rs @@ -5,6 +5,8 @@ use std::{ num::NonZeroU32, }; +use nng_sys::nng_err; + use crate::{ error::{Error, Result}, socket::Socket, @@ -87,7 +89,7 @@ impl Listener { // and both of those mean that the drop was successful. let rv = unsafe { nng_sys::nng_listener_close(self.handle) }; assert!( - rv == 0 || rv == nng_sys::NNG_ECLOSED as i32, + rv == 0 || nng_err(rv as _) == nng_err::NNG_ECLOSED, "Unexpected error code while closing listener ({})", rv ); @@ -144,38 +146,6 @@ impl Hash for Listener { } } -#[rustfmt::skip] -expose_options!{ - Listener :: handle -> nng_sys::nng_listener; - - GETOPT_BOOL = nng_sys::nng_listener_get_bool; - GETOPT_INT = nng_sys::nng_listener_get_int; - GETOPT_MS = nng_sys::nng_listener_get_ms; - GETOPT_SIZE = nng_sys::nng_listener_get_size; - GETOPT_SOCKADDR = nng_sys::nng_listener_get_addr; - GETOPT_STRING = nng_sys::nng_listener_get_string; - GETOPT_UINT64 = nng_sys::nng_listener_get_uint64; - - SETOPT = nng_sys::nng_listener_set; - SETOPT_BOOL = nng_sys::nng_listener_set_bool; - SETOPT_INT = nng_sys::nng_listener_set_int; - SETOPT_MS = nng_sys::nng_listener_set_ms; - SETOPT_PTR = nng_sys::nng_listener_set_ptr; - SETOPT_SIZE = nng_sys::nng_listener_set_size; - SETOPT_STRING = nng_sys::nng_listener_set_string; - - Gets -> [LocalAddr, Raw, RecvBufferSize, - RecvTimeout, SendBufferSize, Url, - SendTimeout, SocketName, MaxTtl, - protocol::reqrep::ResendTime, - protocol::survey::SurveyTime, - transport::tcp::NoDelay, - transport::tcp::KeepAlive, - transport::tcp::BoundPort, - transport::websocket::Protocol]; - Sets -> []; -} - /// Configuration utility for nanomsg-next-generation listeners. /// /// This object allows for the configuration of listeners before they are @@ -262,57 +232,13 @@ impl ListenerBuilder { } } -#[rustfmt::skip] -expose_options!{ - ListenerBuilder :: handle -> nng_sys::nng_listener; - - GETOPT_BOOL = nng_sys::nng_listener_get_bool; - GETOPT_INT = nng_sys::nng_listener_get_int; - GETOPT_MS = nng_sys::nng_listener_get_ms; - GETOPT_SIZE = nng_sys::nng_listener_get_size; - GETOPT_SOCKADDR = nng_sys::nng_listener_get_addr; - GETOPT_STRING = nng_sys::nng_listener_get_string; - GETOPT_UINT64 = nng_sys::nng_listener_get_uint64; - - SETOPT = nng_sys::nng_listener_setopt; - SETOPT_BOOL = nng_sys::nng_listener_setopt_bool; - SETOPT_INT = nng_sys::nng_listener_setopt_int; - SETOPT_MS = nng_sys::nng_listener_setopt_ms; - SETOPT_PTR = nng_sys::nng_listener_setopt_ptr; - SETOPT_SIZE = nng_sys::nng_listener_setopt_size; - SETOPT_STRING = nng_sys::nng_listener_setopt_string; - - Gets -> [LocalAddr, Raw, RecvBufferSize, - RecvTimeout, SendBufferSize, Url, - SendTimeout, SocketName, MaxTtl, - protocol::reqrep::ResendTime, - protocol::survey::SurveyTime, - transport::tcp::NoDelay, - transport::tcp::KeepAlive, - transport::websocket::Protocol]; - Sets -> [RecvMaxSize, transport::tcp::NoDelay, - transport::tcp::KeepAlive, - transport::tls::CaFile, - transport::tls::CertKeyFile, - transport::websocket::ResponseHeaders, - transport::websocket::Protocol]; -} - -#[cfg(unix)] -mod unix_impls { - use super::*; - use crate::options::transport::ipc; - - impl crate::options::SetOpt for ListenerBuilder {} -} - impl Drop for ListenerBuilder { fn drop(&mut self) { // Closing the listener should only ever result in success or ECLOSED // and both of those mean that the drop was successful. let rv = unsafe { nng_sys::nng_listener_close(self.handle) }; assert!( - rv == 0 || rv == nng_sys::NNG_ECLOSED as i32, + rv == 0 || nng_err(rv as _) == nng_err::NNG_ECLOSED, "Unexpected error code while closing listener ({})", rv ); diff --git a/nng/src/options/mod.rs b/nng/src/options/mod.rs deleted file mode 100644 index 5101f3c..0000000 --- a/nng/src/options/mod.rs +++ /dev/null @@ -1,73 +0,0 @@ -//! Options available to configure NNG constructs. -//! -//! Many of the options are transport or protocol specific. Additionally, even -//! though the Socket does not have a specific transport, it is able to accept -//! transport options to be used as defaults for any new Dialers or Listeners. -//! -//! Additionally, a Dialer or Listener is able to read options from the -//! underlying Socket but they are unable to write options unless they are -//! directly supported. -use crate::error::Result; - -mod types; -pub use self::types::*; - -pub(crate) mod private; - -/// Trait for getting and setting options. -/// -/// This trait allows for the getting and setting of options as long as that -/// option is available. An example of this would be the `Raw` option - it is a -/// read-only option that is available exclusively to sockets. So the following -/// code will work: -/// -/// ``` -/// use nng::options::{Options, Raw}; -/// use nng::*; -/// -/// let socket = Socket::new(Protocol::Pub0).unwrap(); -/// let raw = socket.get_opt::().unwrap(); -/// assert!(!raw); -/// ``` -/// -/// But all this is a compile error: -/// -/// ```compile_fail -/// use nng::options::{Options, Raw}; -/// use nng::*; -/// -/// let socket = Socket::new(Protocol::Pub0).unwrap(); -/// socket.set_opt::(true).unwrap(); // Won't compile -/// ``` -pub trait Options: private::HasOpts { - /// Reads the specified option from the object. - #[allow(clippy::missing_errors_doc)] - fn get_opt(&self) -> Result - where - Self: GetOpt, - { - T::get(self) - } - - /// Writes the specified option to the object. - #[allow(clippy::missing_errors_doc)] - fn set_opt(&self, val: T::OptType) -> Result<()> - where - Self: SetOpt, - { - T::set(self, val) - } -} -impl Options for T {} - -/// Marks the type as an NNG option. -pub trait Opt { - /// The type that the option read and writes. - type OptType; -} - -/// Marks that a type can get the specific NNG option. -pub trait GetOpt: private::HasOpts {} - -/// Marks that a type can set the specific NNG option. -pub trait SetOpt: private::HasOpts {} diff --git a/nng/src/options/private.rs b/nng/src/options/private.rs deleted file mode 100644 index e4a7d34..0000000 --- a/nng/src/options/private.rs +++ /dev/null @@ -1,195 +0,0 @@ -use std::{ - ffi::{CStr, CString}, - mem::MaybeUninit, - os::raw::{c_char, c_int, c_void}, - ptr, - time::Duration, -}; - -use crate::{ - addr::SocketAddr, - error::{Error, Result}, - util::validate_ptr, -}; - -/// Exposes the ability to get and set the option. -/// -/// This trait does not enforce the availability of any specific options on -/// any specific type. Calling these methods incorrectly will result in NNG -/// returning an error code. -pub trait OptOps: super::Opt { - /// Get the value of the option using the specified type. - fn get(s: &T) -> Result; - - /// Set the value of the option using the specified type. - fn set(s: &T, val: Self::OptType) -> Result<()>; -} - -/// Marks a type that can get and set NNG options. -pub trait HasOpts: Sized { - /// Underlying NNG type. - type Handle; - - /// Raw NNG function for getting a boolean option. - const GETOPT_BOOL: unsafe extern "C" fn(Self::Handle, *const c_char, *mut bool) -> c_int; - /// Rawn NNG funcion for getting an integer otion. - const GETOPT_INT: unsafe extern "C" fn(Self::Handle, *const c_char, *mut c_int) -> c_int; - /// Raw NNG function to get an `nng_duration`. - const GETOPT_MS: unsafe extern "C" fn( - Self::Handle, - *const c_char, - *mut nng_sys::nng_duration, - ) -> c_int; - /// Raw NNG function for getting a `size_t` option. - const GETOPT_SIZE: unsafe extern "C" fn(Self::Handle, *const c_char, *mut usize) -> c_int; - /// Raw NNG function for getting an `nng_sockaddr` option. - const GETOPT_SOCKADDR: unsafe extern "C" fn( - Self::Handle, - *const c_char, - *mut nng_sys::nng_sockaddr, - ) -> c_int; - /// Raw NNG function for getting a string value. - const GETOPT_STRING: unsafe extern "C" fn( - Self::Handle, - *const c_char, - *mut *mut c_char, - ) -> c_int; - /// Raw NNG function for getting a u64. - const GETOPT_UINT64: unsafe extern "C" fn(Self::Handle, *const c_char, *mut u64) -> c_int; - - /// Raw NNG function for setting opaque data. - const SETOPT: unsafe extern "C" fn(Self::Handle, *const c_char, *const c_void, usize) -> c_int; - /// Raw NNG function for setting a boolean. - const SETOPT_BOOL: unsafe extern "C" fn(Self::Handle, *const c_char, bool) -> c_int; - /// Raw NNG function to set an integer. - const SETOPT_INT: unsafe extern "C" fn(Self::Handle, *const c_char, c_int) -> c_int; - /// Raw NNG function to set an `nng_duration`. - const SETOPT_MS: unsafe extern "C" fn( - Self::Handle, - *const c_char, - nng_sys::nng_duration, - ) -> c_int; - /// Raw NNG function to set a pointer option. - const SETOPT_PTR: unsafe extern "C" fn(Self::Handle, *const c_char, *mut c_void) -> c_int; - /// Raw NNG function to set a `size_t` option. - const SETOPT_SIZE: unsafe extern "C" fn(Self::Handle, *const c_char, usize) -> c_int; - /// Raw NNG function to set a string value. - const SETOPT_STRING: unsafe extern "C" fn(Self::Handle, *const c_char, *const c_char) -> c_int; - - /// Returns the underlying NNG type. - fn handle(&self) -> Self::Handle; - - /// Get the boolean option. - fn getopt_bool(&self, opt: *const c_char) -> Result { - let mut raw = false; - let rv = unsafe { (Self::GETOPT_BOOL)(self.handle(), opt, ptr::from_mut(&mut raw)) }; - - rv2res!(rv, raw) - } - - /// Get an integer option. - fn getopt_int(&self, opt: *const c_char) -> Result { - let mut res = 0; - let rv = unsafe { (Self::GETOPT_INT)(self.handle(), opt, ptr::from_mut(&mut res)) }; - - rv2res!(rv, res) - } - - /// Get the duration from the option. - fn getopt_ms(&self, opt: *const c_char) -> Result> { - let mut dur: nng_sys::nng_duration = 0; - let rv = unsafe { (Self::GETOPT_MS)(self.handle(), opt, ptr::from_mut(&mut dur)) }; - - rv2res!(rv, crate::util::nng_to_duration(dur)) - } - - /// Get the `size_t` option. - fn getopt_size(&self, opt: *const c_char) -> Result { - let mut sz = 0; - let rv = unsafe { (Self::GETOPT_SIZE)(self.handle(), opt, ptr::from_mut(&mut sz)) }; - - rv2res!(rv, sz) - } - - /// Get the specified socket address option. - fn getopt_sockaddr(&self, opt: *const c_char) -> Result { - unsafe { - let mut addr: MaybeUninit = MaybeUninit::uninit(); - let rv = (Self::GETOPT_SOCKADDR)(self.handle(), opt, addr.as_mut_ptr()); - - rv2res!(rv, addr.assume_init().into()) - } - } - - /// Get the string value of the specified option. - fn getopt_string(&self, opt: *const c_char) -> Result { - unsafe { - let mut ptr: *mut c_char = ptr::null_mut(); - let rv = (Self::GETOPT_STRING)(self.handle(), opt, &raw mut ptr); - let ptr = validate_ptr(rv, ptr)?; - - let name = CStr::from_ptr(ptr.as_ptr()).to_string_lossy().into_owned(); - nng_sys::nng_strfree(ptr.as_ptr()); - - Ok(name) - } - } - - /// The the `u64` option. - fn getopt_uint64(&self, opt: *const c_char) -> Result { - let mut res = 0; - let rv = unsafe { (Self::GETOPT_UINT64)(self.handle(), opt, ptr::from_mut(&mut res)) }; - - rv2res!(rv, res) - } - - /// Sets the value of opaque data. - fn setopt(&self, opt: *const c_char, val: &[u8]) -> Result<()> { - let rv = unsafe { (Self::SETOPT)(self.handle(), opt, val.as_ptr() as _, val.len()) }; - - rv2res!(rv) - } - - /// Sets the value of a boolean option. - fn setopt_bool(&self, opt: *const c_char, val: bool) -> Result<()> { - let rv = unsafe { (Self::SETOPT_BOOL)(self.handle(), opt, val) }; - - rv2res!(rv) - } - - /// Set the value of an integer option. - fn setopt_int(&self, opt: *const c_char, val: i32) -> Result<()> { - let rv = unsafe { (Self::SETOPT_INT)(self.handle(), opt, val) }; - - rv2res!(rv) - } - - /// Set the duration to the option. - fn setopt_ms(&self, opt: *const c_char, dur: Option) -> Result<()> { - let ms = crate::util::duration_to_nng(dur); - - let rv = unsafe { (Self::SETOPT_MS)(self.handle(), opt, ms) }; - rv2res!(rv) - } - - /// Set the value of the pointer to the option. - unsafe fn setopt_ptr(&self, opt: *const c_char, val: *mut c_void) -> Result<()> { - let rv = (Self::SETOPT_PTR)(self.handle(), opt, val); - rv2res!(rv) - } - - /// Set the value of a `size` option. - fn setopt_size(&self, opt: *const c_char, val: usize) -> Result<()> { - let rv = unsafe { (Self::SETOPT_SIZE)(self.handle(), opt, val) }; - - rv2res!(rv) - } - - /// Set the value of the option to the value of the string. - fn setopt_string(&self, opt: *const c_char, val: &str) -> Result<()> { - let cval = CString::new(val).map_err(|_| Error::InvalidInput)?; - let rv = unsafe { (Self::SETOPT_STRING)(self.handle(), opt, cval.as_ptr()) }; - - rv2res!(rv) - } -} diff --git a/nng/src/options/types.rs b/nng/src/options/types.rs deleted file mode 100644 index 12f0870..0000000 --- a/nng/src/options/types.rs +++ /dev/null @@ -1,658 +0,0 @@ -#[cfg(unix)] -use std::os::unix::io::RawFd; -use std::time::Duration; - -use crate::addr::SocketAddr; - -create_option! { - /// The local address used for communication. - /// - /// ## Support - /// - /// * Dialers can read from this with the IPC transport. - /// * Listeners can read from this on the following transports: - /// * TCP - /// * ZeroTier - /// * IPC - /// * TLS - /// * Pipes can read from this on the following transports: - /// * IPC - /// * `InProc` - /// * TCP - /// * `ZeroTier` - /// * WebSocket - /// * TLS - LocalAddr -> SocketAddr: - Get s = s.getopt_sockaddr(std::ptr::from_ref(nng_sys::NNG_OPT_LOCADDR) as _); -} - -create_option! { - /// The remote address of the peer. - /// - /// The availability of this option is dependent on the transport. - /// - /// ## Support - /// - /// * Pipes can read from this on the following transports: - /// * IPC - /// * `InProc` - /// * TCP - /// * `ZeroTier` - /// * WebSocket - /// * TLS - RemAddr -> SocketAddr: - Get s = s.getopt_sockaddr(std::ptr::from_ref(nng_sys::NNG_OPT_REMADDR) as _); -} - -create_option! { - /// Whether or not the socket is in "raw" mode. - /// - /// Raw mode sockets generally do not have any protocol-specific semantics - /// applied to them; instead the application is expected to perform such - /// semantics itself. (For example, in "cooked" mode a _rep_ socket would - /// automatically copy message headers from a received message to the - /// corresponding reply, whereas in "raw" mode this is not done.) - /// - /// See [raw mode][1] for more details. - /// - /// ## Support - /// - /// * Sockets can read this option. - /// * Dialers and Listeners can retrieve this from their owning Socket. - /// - /// [1]: https://nanomsg.github.io/nng/man/v1.2.2/nng.7.html#raw_mode - Raw -> bool: - Get s = s.getopt_bool(std::ptr::from_ref(nng_sys::NNG_OPT_RAW) as _); -} - -create_option! { - /// The minimum amount of time to wait before attempting to establish a - /// connection after a previous attempt has failed. - /// - /// If set on a `Socket`, this value becomes the default for new dialers. - /// Individual dialers can then override the setting. - /// - /// ## Support - /// - /// * Dialers can use this option. - /// * Sockets can use this option to create a new default value. - ReconnectMinTime -> Option: - Get s = s.getopt_ms(std::ptr::from_ref(nng_sys::NNG_OPT_RECONNMINT) as _); - Set s val = s.setopt_ms(std::ptr::from_ref(nng_sys::NNG_OPT_RECONNMINT) as _, val); -} - -create_option! { - /// The maximum amount of time to wait before attempting to establish a - /// connection after a previous attempt has failed. - /// - /// If this is non-zero, then the time between successive connection - /// attempts will start at the value of `ReconnectMinTime`, and grow - /// exponentially, until it reaches this value. If this value is zero, then - /// no exponential back-off between connection attempts is done, and each - /// attempt will wait the time specified by `ReconnectMinTime`. This can be - /// set on a socket, but it can also be overridden on an individual dialer. - /// - /// ## Support - /// - /// * Dialers can use this option. - /// * Sockets can use this option to create a new default value. - ReconnectMaxTime -> Option: - Get s = s.getopt_ms(std::ptr::from_ref(nng_sys::NNG_OPT_RECONNMAXT) as _); - Set s val = s.setopt_ms(std::ptr::from_ref(nng_sys::NNG_OPT_RECONNMAXT) as _, val); -} - -create_option! { - /// The depth of the socket's receive buffer as a number of messages. - /// - /// Messages received by the transport may be buffered until the - /// application has accepted them for delivery. - /// - /// ## Support - /// - /// * Sockets can read and write this option. - /// * Dialers and Listeners can retrieve it from their owning Socket. - RecvBufferSize -> i32: - Get s = s.getopt_int(std::ptr::from_ref(nng_sys::NNG_OPT_RECVBUF) as _); - Set s val = s.setopt_int(std::ptr::from_ref(nng_sys::NNG_OPT_RECVBUF) as _, val); -} - -#[cfg(unix)] -create_option! { - /// A raw file descriptor that can be used to poll for receiving on a socket. - /// - /// This descriptor will be _readable_ when a message is available for - /// receiving on the socket. When no message is ready for receiving, it will - /// _not_ be readable. - /// - /// While this may be useful for integrating into existing polling loops, the - /// use of asynchronous I/O objects will be more efficient. - /// - /// Applications should **never** attempt to read or write to the file - /// descriptor. - /// - /// ## Support - /// - /// * All sockets. - RecvFd -> RawFd: - Get s = s.getopt_int(std::ptr::from_ref(nng_sys::NNG_OPT_RECVFD) as _); -} - -create_option! { - /// The maximum message size that the will be accepted from a remote peer. - /// - /// If a peer attempts to send a message larger than this, then the message - /// will be discarded. If the value of this is zero, then no limit on - /// message sizes is enforced. This option exists to prevent certain kinds - /// of denial-of-service attacks, where a malicious agent can claim to want - /// to send an extraordinarily large message, without sending any data. - /// This option can be set for the socket, but may be overridden for on a - /// per-dialer or per-listener basis. - /// - /// Note that some transports may have further message size restrictions. - /// - /// ## Support - /// - /// * Dialers and Listeners can use this with the following transports: - /// * TCP - /// * ZeroTier - /// * IPC - /// * TLS - /// * WebSocket - /// * Pipes can read this value on the following transports: - /// * ZeroTier - /// * Sockets can utilize this to set a new default value. - RecvMaxSize -> usize: - Get s = s.getopt_size(std::ptr::from_ref(nng_sys::NNG_OPT_RECVMAXSZ) as _); - Set s val = s.setopt_size(std::ptr::from_ref(nng_sys::NNG_OPT_RECVMAXSZ) as _, val); -} - -create_option! { - /// The socket receive timeout. - /// - /// When no message is available for receiving at the socket for this period - /// of time, receive operations will fail with `ErrorKind::TimedOut`. - /// - /// ## Support - /// - /// * Sockets can utilize this value. - /// * Dialers and Listeners can retrieve it from their owning Socket. - RecvTimeout -> Option: - Get s = s.getopt_ms(std::ptr::from_ref(nng_sys::NNG_OPT_RECVTIMEO) as _); - Set s val = s.setopt_ms(std::ptr::from_ref(nng_sys::NNG_OPT_RECVTIMEO) as _, val); -} - -create_option! { - /// The depth of the socket send buffer as a number of messages. - /// - /// Messages sent by an application may be buffered by the socket until a - /// transport is ready to accept them for delivery. This value must be an - /// integer between 0 and 8192, inclusive. - /// - /// ## Support - /// - /// * Sockets can utilize this value. - /// * Dialers and Listeners can retrieve it from their owning Socket. - SendBufferSize -> i32: - Get s = s.getopt_int(std::ptr::from_ref(nng_sys::NNG_OPT_SENDBUF) as _); - Set s val = s.setopt_int(std::ptr::from_ref(nng_sys::NNG_OPT_SENDBUF) as _, val); -} - -#[cfg(unix)] -create_option! { - /// A raw file descriptor that can be used to poll for sending on a socket. - /// - /// This descriptor will be _readable_ when a message is available for sending - /// a message without blocking. When the socket can no longer accept messages - /// without blocking, the descriptor will _not_ be readable. - /// - /// While this may be useful for integrating into existing polling loops, the - /// use of asynchronous I/O objects will be more efficient. - /// - /// Applications should **never** attempt to read or write to the file - /// descriptor. - /// - /// ## Support - /// - /// * All sockets. - SendFd -> RawFd: - Get s = s.getopt_int(std::ptr::from_ref(nng_sys::NNG_OPT_SENDFD) as _); -} - -create_option! { - /// The socket send timeout. - /// - /// When a message cannot be queued for delivery by the socket for this - /// period of time (such as if send buffers are full), the operation will - /// fail with `ErrorKind::TimedOut`. - /// - /// ## Support - /// - /// * Sockets can utilize this value. - /// * Dialers and Listeners can retrieve it from their owning Socket. - SendTimeout -> Option: - Get s = s.getopt_ms(std::ptr::from_ref(nng_sys::NNG_OPT_SENDTIMEO) as _); - Set s val = s.setopt_ms(std::ptr::from_ref(nng_sys::NNG_OPT_SENDTIMEO) as _, val); -} - -create_option! { - /// The socket name. - /// - /// By default this is a string corresponding to the value of the socket. - /// The string must fit within 63-bytes but it can be changed for other - /// application uses. - /// - /// ## Support - /// - /// * Sockets can utilize this value. - /// * Dialers and Listeners can retrieve it from their owning Socket. - SocketName -> String: - Get s = s.getopt_string(std::ptr::from_ref(nng_sys::NNG_OPT_SOCKNAME) as _); - Set s val = s.setopt_string(std::ptr::from_ref(nng_sys::NNG_OPT_SOCKNAME) as _, &val); -} - -create_option! { - /// The maximum number of "hops" a message may traverse. - /// - /// The intention here is to prevent forwarding loops in [device chains][1]. - /// Note that not all protocols support this option and those that do - /// generally have a default value of 8. - /// - /// Each node along a forwarding path may have its own value for the - /// maximum time-to-live, and performs its own checks before forwarding a - /// message. Therefore it is helpful if all nodes in the topology use the - /// same value for this option. - /// - /// ## Support - /// - /// * Sockets can use this with the following protocols: - /// * Pair v1 - /// * Rep v0 - /// * Req v0 - /// * Surveyor v0 - /// * Respondent v0 - /// * Dialers and Listeners can retrieve it from their owning Socket, if applicable. - /// - /// [1]: https://nanomsg.github.io/nng/man/v1.2.2/nng_device.3.html - MaxTtl -> u8: - Get s = s.getopt_int(std::ptr::from_ref(nng_sys::NNG_OPT_MAXTTL) as _).map(|v| v as u8); - Set s val = s.setopt_int(std::ptr::from_ref(nng_sys::NNG_OPT_MAXTTL) as _, val.into()); -} - -create_option! { - /// The URL with which a listener or dialer was configured. - /// - /// Note that some transports will canonify URLs before returning them to - /// the application. - /// - /// ## Support - /// - /// * Dialers and Listeners can read this value. - Url -> String: - Get s = s.getopt_string(std::ptr::from_ref(nng_sys::NNG_OPT_URL) as _); -} - -/// Options relating to the socket protocol. -pub mod protocol { - /// Options dealing with the PAIR protocol. - pub mod pair { - create_option! { - /// Enables or disables the use of _polyamorous_ mode. - /// - /// Normally pair sockets are for one-to-one communication and a given peer will reject - /// new connections if it already has an active connection to another peer. In - /// _polyamorous_ mode, which is only available with Version 1, a socket can support - /// many one-to-one connections. - /// - /// In this mode, the application must choose the remote peer to receive an outgoing - /// message by setting the `Pipe` for the `Message`. Most often the value of the - /// outgoing pipe will be obtained from an incoming message, such as when replying to an - /// incoming message. - /// - /// In order to prevent head-of-line blocking, if the peer on the given pipe is not able - /// to receive (or if the pipe is no longer available, such as if the peer has - /// disconnected), then the message will be discarded with no notification to the - /// sender. - /// - /// ## Support - /// - /// * Sockets are able to read and write this value if they are using the `Pair1` - /// protocol. - #[doc(hidden)] - #[deprecated(since = "1.0.0", note = "Polyamorous mode will be removed from Pair1 eventually")] - Polyamorous -> bool: - Get s = s.getopt_bool(std::ptr::from_ref(nng_sys::NNG_OPT_PAIR1_POLY) as _); - Set s v = s.setopt_bool(std::ptr::from_ref(nng_sys::NNG_OPT_PAIR1_POLY) as _, v); - } - } - - /// Options dealing with the PUBSUB protocol. - pub mod pubsub { - create_option! { - /// Register a topic that the subscriber is interested in. - /// - /// This option takes an array of bytes, of arbitrary size. Each - /// incoming message is checked against the list of subscribed - /// topics. If the body begins with the entire set of bytes in the - /// topic, then the message is accepted. If no topic matches, then - /// the message is discarded. - /// - /// To receive all messages, an empty topic (zero length) can be - /// used. To receive any messages, at least one subscription must - /// exist. - /// - /// ## Support - /// - /// * Sockets can set this option when using the Sub v0 protocol. - Subscribe -> Vec: - Set s val = s.setopt(std::ptr::from_ref(nng_sys::NNG_OPT_SUB_SUBSCRIBE) as _, &val); - } - - create_option! { - /// Remove a topic from the subscription list. - /// - /// Note that if the topic was not previously subscribed via the - /// `Subscribe` option, then using this option will result in - /// `ErrorKind::EntryNotFound`. - /// - /// ## Support - /// - /// * Sockets can set this option when using the Sub v0 protocol. - Unsubscribe -> Vec: - Set s val = s.setopt(std::ptr::from_ref(nng_sys::NNG_OPT_SUB_UNSUBSCRIBE) as _, &val); - } - } - - /// Options dealing with the REQREP protocol. - pub mod reqrep { - use std::time::Duration; - - create_option! { - /// Amount of time to wait before sending a new request. - /// - /// When a new request is started, a timer of this duration is also - /// started. If no reply is received before this timer expires, - /// then the request will be resent. (Requests are also - /// automatically resent if the peer to whom the original request - /// was sent disconnects, or if a peer becomes available while the - /// requester is waiting for an available peer.) - /// - /// ## Support - /// - /// * Sockets can read and write this value when using the following protocols: - /// * Req v0 - /// * Dialers and Listeners can retrieve it from their owning Socket, if applicable. - ResendTime -> Option: - Get s = s.getopt_ms(std::ptr::from_ref(nng_sys::NNG_OPT_REQ_RESENDTIME) as _); - Set s val = s.setopt_ms(std::ptr::from_ref(nng_sys::NNG_OPT_REQ_RESENDTIME) as _, val); - } - } - - /// Options dealing with the survey protocol. - pub mod survey { - use std::time::Duration; - - create_option! { - /// Amount of time that the following surveys will last. - /// - /// When a new survey is started, a timer of this duration is also - /// started. Any responses arriving this time will be discarded. - /// Attempts to receive after the timer expires with no other - /// surveys started will result in `ErrorKind::IncorrectState`. - /// Attempts to receive when this timer expires will result in - /// `ErrorKind::TimedOut`. - /// - /// ## Support - /// - /// * Sockets can read and write this value when using the following protocols: - /// * Surveyor v0 - /// * Dialers and Listeners can retrieve it from their owning Socket, if applicable. - SurveyTime -> Option: - Get s = s.getopt_ms(std::ptr::from_ref(nng_sys::NNG_OPT_SURVEYOR_SURVEYTIME) as _); - Set s val = s.setopt_ms(std::ptr::from_ref(nng_sys::NNG_OPT_SURVEYOR_SURVEYTIME) as _, val); - } - } -} - -/// Options dealing with the underlying transport. -pub mod transport { - /// Options related to transports built on top of IPC. - pub mod ipc { - #[cfg(unix)] - create_option! { - /// Configures the permissions used on the UNIX domain socket. - /// - /// This value represents the normal permission bits of the file. The default is - /// system-specific but is most often `0644`. Note that not all systems will respect - /// this value. In particular, illumos and Solaris are known to ignore these permission - /// settings. It is also important to note that the _umask_ of the process is **not** - /// applied to these bits. - /// - /// The best practice for limiting access is to place the socket in a directory writable - /// only by the server, and only readable and searchable by clients. All mainstream - /// POSIX systems will fail to permit a client to connect to a socket located in a - /// directory for which the client lacks search (execute) permission. - /// - /// Also consider using the `PeerId` property from within the pipe notify callback to - /// validate peer credentials. - /// - /// ## Support - /// - /// * Listeners that are using the IPC protocol. - Permissions -> u32: - Set s val = s.setopt_int(std::ptr::from_ref(nng_sys::NNG_OPT_IPC_PERMISSIONS) as _, val as _); - } - - #[cfg(unix)] - create_option! { - /// Returns the peer user ID from a pipe. - /// - /// This is the effective user id of the peer when either the underlying `listen()` or - /// `connect()` calls were made, and is not forgeable. - /// - /// ## Supports - /// - /// * Pipes that are using the IPC protocol. - PeerUid -> u64: - Get s = s.getopt_uint64(std::ptr::from_ref(nng_sys::NNG_OPT_IPC_PEER_UID) as _); - } - - #[cfg(unix)] - create_option! { - /// Returns the peer primary group ID from a pipe. - /// - /// This is the effective group id of the peer when either the underlying `listen()` or - /// `connect()` calls were made, and is not forgeable. - /// - /// ## Supports - /// - /// * Pipes that are using the IPC protocol. - PeerGid -> u64: - Get s = s.getopt_uint64(std::ptr::from_ref(nng_sys::NNG_OPT_IPC_PEER_GID) as _); - } - - create_option! { - /// Returns the process ID of the peer. - /// - /// Applications should not assume that the process ID does not change, as it is - /// possible (although unsupported!) for a nefarious process to pass a file descriptor - /// between processes. However, it is not possible for a nefarious application to forge - /// the identity of a well-behaved one using this method. - /// - /// ## Supports - /// - /// * Pipes that are using the IPC protocol. - PeerPid -> u64: - Get s = s.getopt_uint64(std::ptr::from_ref(nng_sys::NNG_OPT_IPC_PEER_PID) as _); - } - } - - /// Options related to transports built on top of TCP. - pub mod tcp { - create_option! { - /// Disable (or enable) the use of Nagle's algorithm for TCP - /// connections. - /// - /// When `true` (the default), messages are sent immediately by the - /// underlying TCP stream without waiting to gather more data. When - /// `false`, Nagle's algorithm is enabled, and the TCP stream may wait - /// briefly in attempt to coalesce messages. Nagle's algorithm is - /// useful on low-bandwidth connections to reduce overhead, but it - /// comes at a cost to latency. - /// - /// ## Support - /// - /// * Dialers and Listeners can use this option with the following transports: - /// * TCP - /// * TLS - /// * Pipes can read this value on the following transports: - /// * TCP - /// * TLS - /// * Sockets can use this to set a default value. - NoDelay -> bool: - Get s = s.getopt_bool(std::ptr::from_ref(nng_sys::NNG_OPT_TCP_NODELAY) as _); - Set s val = s.setopt_bool(std::ptr::from_ref(nng_sys::NNG_OPT_TCP_NODELAY) as _, val); - } - - create_option! { - /// Enable the sending of keep-alive messages on the underlying TCP stream. - /// - /// This option is `false` by default. When enabled, if no messages are - /// seen for a period of time, then a zero length TCP message is sent - /// with the ACK flag set in an attempt to tickle some traffic from the - /// peer. If none is still seen (after some platform-specific number of - /// retries and timeouts), then the remote peer is presumed dead, and - /// the connection is closed. - /// - /// This option has two purposes. First, it can be used to detect dead - /// peers on an otherwise quiescent network. Second, it can be used to - /// keep connection table entries in NAT and other middleware from - /// being expiring due to lack of activity. - /// - /// ## Support - /// - /// * Dialers and Listeners can use this option with the following transports: - /// * TCP - /// * TLS - /// * Pipes can read this value on the following transports: - /// * TCP - /// * TLS - /// * Sockets can use this to set a default value. - KeepAlive -> bool: - Get s = s.getopt_bool(std::ptr::from_ref(nng_sys::NNG_OPT_TCP_KEEPALIVE) as _); - Set s val = s.setopt_bool(std::ptr::from_ref(nng_sys::NNG_OPT_TCP_KEEPALIVE) as _, val); - } - - create_option! { - /// Get the local TCP port number. - /// - /// This is used on a listener and is inteded to be used after starting the listener on - /// a wildcard (0) local port. The returned value is the emphemeral port that was - /// selected and bound. - /// - /// ## Support - /// - /// * Listeners using the TCP or TLS transports. - BoundPort -> u16: - Get s = s.getopt_int(std::ptr::from_ref(nng_sys::NNG_OPT_TCP_BOUND_PORT) as _).map(|v| v as u16); - } - } - - /// Options related to the TLS transport. - pub mod tls { - create_option! { - /// Used to load certificates associated associated private key from a - /// file. - /// - /// See the [CA Config][1] documentation for more information. - /// - /// ## Support - /// - /// * Dialers and Listeners can set this option with the following transports: - /// * TLS - /// * WebSocket (Secure) - /// * Sockets can set this to set a default value. - /// - /// [1]: https://nanomsg.github.io/nng/man/v1.2.2/nng_tls.7.html - CaFile -> String: - Set s val = s.setopt_string(std::ptr::from_ref(nng_sys::NNG_OPT_TLS_CA_FILE) as _, &val); - } - - create_option! { - /// Used to load the local certificate and associated private key from - /// a file. - /// - /// The private key used must be unencrypted. See the [nng docs][1] for - /// more information. - /// - /// ## Support - /// - /// * Dialers and Listeners can set this option with the following transports: - /// * TLS - /// * WebSocket (Secure) - /// * Sockets can use this to set a default value. - /// - /// [1]: https://nanomsg.github.io/nng/man/v1.2.2/nng_tls.7.html - CertKeyFile -> String: - Set s val = s.setopt_string(std::ptr::from_ref(nng_sys::NNG_OPT_TLS_CERT_KEY_FILE) as _, &val); - } - - create_option! { - /// Indicates whether the remote peer has been properly verified using TLS - /// authentication. - /// - /// This may return incorrect results if peer authentication is disabled. - /// - /// ## Support - /// - /// * Pipes can read this option on the following transports: - /// * WebSocket - /// * TLS - /// - /// [1]: https://nanomsg.github.io/nng/man/v1.2.2/nng_tls.7.html - Verified -> bool: - Get s = s.getopt_bool(std::ptr::from_ref(nng_sys::NNG_OPT_TLS_VERIFIED) as _); - } - } - - /// Options related to the WebSocket and Secure WebSocket transports. - pub mod websocket { - create_option! { - /// A multiline string terminated by CRLF sequences, that can be used - /// to add further headers to the HTTP request sent when connecting. - /// - /// ## Support - /// - /// * Dialers can set this when using the WebSocket transport. - /// * Pipes can read this value when using the WebSocket transport. - /// * Sockets can set this to set a default value. - RequestHeaders -> String: - Get s = s.getopt_string(std::ptr::from_ref(nng_sys::NNG_OPT_WS_REQUEST_HEADERS) as _); - Set s val = s.setopt_string(std::ptr::from_ref(nng_sys::NNG_OPT_WS_REQUEST_HEADERS) as _, &val); - } - - create_option! { - /// A multiline string terminated by CRLF sequences, that can be used - /// to add further headers to the HTTP response sent when connecting. - /// - /// ## Support - /// - /// * Listeners can set this when using the WebSocket transport. - /// * Pipes can read this when using the WebSocket transport. - /// * Sockets can set this to set a default value. - ResponseHeaders -> String: - Get s = s.getopt_string(std::ptr::from_ref(nng_sys::NNG_OPT_WS_RESPONSE_HEADERS) as _); - Set s val = s.setopt_string(std::ptr::from_ref(nng_sys::NNG_OPT_WS_RESPONSE_HEADERS) as _, &val); - } - - create_option! { - /// The Websocket protocol, also known as the Sec-WebSocket-Protocol header. - /// - /// ## Support - /// - /// * Listeners and dialers can get/set this when using the WebSocket protocol. - Protocol -> String: - Get s = s.getopt_string(std::ptr::from_ref(nng_sys::NNG_OPT_WS_PROTOCOL) as _); - Set s val = s.setopt_string(std::ptr::from_ref(nng_sys::NNG_OPT_WS_PROTOCOL) as _, &val); - } - } -} diff --git a/nng/src/pipe.rs b/nng/src/pipe.rs index eba3767..ac942a8 100644 --- a/nng/src/pipe.rs +++ b/nng/src/pipe.rs @@ -3,6 +3,8 @@ use std::{ hash::{Hash, Hasher}, }; +use nng_sys::nng_err; + use crate::{dialer::Dialer, listener::Listener}; /// An NNG communication pipe. @@ -66,9 +68,9 @@ impl Pipe { // The pipe either closes succesfully, was already closed, or was never open. In // any of those scenarios, the pipe is in the desired state. As such, we don't // care about the return value. - let rv = unsafe { nng_sys::nng_pipe_close(self.handle) }; + let nng_err(rv) = unsafe { nng_sys::nng_pipe_close(self.handle) }; assert!( - rv == 0 || rv == nng_sys::NNG_ECLOSED as i32, + rv == 0 || nng_err(rv) == nng_sys::nng_err::NNG_ECLOSED, "Unexpected error code while closing pipe ({})", rv ); @@ -130,45 +132,6 @@ impl Hash for Pipe { } } -#[rustfmt::skip] -expose_options!{ - Pipe :: handle -> nng_sys::nng_pipe; - - GETOPT_BOOL = nng_sys::nng_pipe_get_bool; - GETOPT_INT = nng_sys::nng_pipe_get_int; - GETOPT_MS = nng_sys::nng_pipe_get_ms; - GETOPT_SIZE = nng_sys::nng_pipe_get_size; - GETOPT_SOCKADDR = nng_sys::nng_pipe_get_addr; - GETOPT_STRING = nng_sys::nng_pipe_get_string; - GETOPT_UINT64 = nng_sys::nng_pipe_get_uint64; - - SETOPT = crate::util::fake_genopt; - SETOPT_BOOL = crate::util::fake_opt; - SETOPT_INT = crate::util::fake_opt; - SETOPT_MS = crate::util::fake_opt; - SETOPT_PTR = crate::util::fake_opt; - SETOPT_SIZE = crate::util::fake_opt; - SETOPT_STRING =crate::util::fake_opt; - - Gets -> [LocalAddr, RemAddr, RecvMaxSize, - transport::ipc::PeerPid, - transport::tcp::NoDelay, - transport::tcp::KeepAlive, - transport::tls::Verified, - transport::websocket::RequestHeaders, - transport::websocket::ResponseHeaders]; - Sets -> []; -} - -#[cfg(unix)] -mod unix_impls { - use super::*; - use crate::options::{transport::ipc, SetOpt}; - - impl SetOpt for Pipe {} - impl SetOpt for Pipe {} -} - /// An event that happens on a [`Pipe`] instance. /// /// diff --git a/nng/src/socket.rs b/nng/src/socket.rs index dad9d81..3098ec0 100644 --- a/nng/src/socket.rs +++ b/nng/src/socket.rs @@ -11,6 +11,8 @@ use std::{ sync::{Arc, RwLock}, }; +use nng_sys::nng_err; + use crate::{ aio::Aio, error::{Error, Result, SendResult}, @@ -70,15 +72,18 @@ impl Socket { } }; - rv2res!( - rv, + rv2res!(rv, { + // SAFETY: Null pointers are valid. + unsafe { + nng_sys::nng_init(ptr::null()); + } Socket { inner: Arc::new(Inner { handle: socket, - pipe_notify: RwLock::new(None) - }) + pipe_notify: RwLock::new(None), + }), } - ) + }) } /// Initiates a remote connection to a listener. @@ -450,7 +455,7 @@ impl Socket { &raw const *self.inner as _, ) }) - .map(|rv| rv2res!(rv)) + .map(|nng_err(rv)| rv2res!(rv)) .fold(Ok(()), std::result::Result::and) } @@ -565,58 +570,6 @@ impl Hash for Socket { } } -#[rustfmt::skip] -expose_options!{ - Socket :: inner.handle -> nng_sys::nng_socket; - - GETOPT_BOOL = nng_sys::nng_socket_get_bool; - GETOPT_INT = nng_sys::nng_socket_get_int; - GETOPT_MS = nng_sys::nng_socket_get_ms; - GETOPT_SIZE = nng_sys::nng_socket_get_size; - GETOPT_SOCKADDR = nng_sys::nng_socket_get_addr; - GETOPT_STRING = nng_sys::nng_socket_get_string; - GETOPT_UINT64 = nng_sys::nng_socket_get_uint64; - - SETOPT = nng_sys::nng_socket_set; - SETOPT_BOOL = nng_sys::nng_socket_set_bool; - SETOPT_INT = nng_sys::nng_socket_set_int; - SETOPT_MS = nng_sys::nng_socket_set_ms; - SETOPT_PTR = nng_sys::nng_socket_set_ptr; - SETOPT_SIZE = nng_sys::nng_socket_set_size; - SETOPT_STRING = nng_sys::nng_socket_set_string; - - Gets -> [Raw, MaxTtl, RecvBufferSize, - RecvTimeout, SendBufferSize, - SendTimeout, SocketName, - protocol::pair::Polyamorous, - protocol::reqrep::ResendTime, - protocol::survey::SurveyTime]; - Sets -> [ReconnectMinTime, ReconnectMaxTime, - RecvBufferSize, RecvMaxSize, - RecvTimeout, SendBufferSize, - SendTimeout, SocketName, MaxTtl, - protocol::pair::Polyamorous, - protocol::reqrep::ResendTime, - protocol::pubsub::Subscribe, - protocol::pubsub::Unsubscribe, - protocol::survey::SurveyTime, - transport::tcp::NoDelay, - transport::tcp::KeepAlive, - transport::tls::CaFile, - transport::tls::CertKeyFile, - transport::websocket::RequestHeaders, - transport::websocket::ResponseHeaders]; -} - -#[cfg(unix)] -mod unix_impls { - use super::*; - use crate::options::{GetOpt, RecvFd, SendFd}; - - impl GetOpt for Socket {} - impl GetOpt for Socket {} -} - /// A wrapper type around the underlying `nng_socket`. /// /// This allows us to have mutliple Rust socket types that won't clone the C @@ -634,9 +587,9 @@ impl Inner { // of those mean we have nothing to drop. However, just to be sane // about it all, we'll warn the user if we see something odd. If that // ever happens, hopefully it will make its way to a bug report. - let rv = unsafe { nng_sys::nng_close(self.handle) }; + let rv = unsafe { nng_sys::nng_socket_close(self.handle) }; assert!( - rv == 0 || rv == nng_sys::NNG_ECLOSED as i32, + rv == 0 || nng_err(rv as _) == nng_sys::nng_err::NNG_ECLOSED, "Unexpected error code while closing socket ({})", rv ); @@ -654,6 +607,10 @@ impl fmt::Debug for Inner { impl Drop for Inner { fn drop(&mut self) { + // SAFETY: This is paired with an init when the socket was created. + unsafe { + nng_sys::nng_fini(); + } self.close(); } } @@ -732,12 +689,13 @@ impl TryFrom for RawSocket { type Error = CookedSocketError; fn try_from(socket: Socket) -> std::result::Result { - use crate::options::{Options, Raw}; + let mut raw = false; - if socket - .get_opt::() - .expect("Socket should have \"raw\" option available") - { + // SAFETY: The handle and pointer are both valid. + unsafe { + nng_sys::nng_socket_raw(socket.inner.handle, &mut raw); + } + if raw { Ok(RawSocket { socket, _priv: () }) } else { Err(CookedSocketError) diff --git a/nng/src/util.rs b/nng/src/util.rs index 484c06b..ac05fbf 100644 --- a/nng/src/util.rs +++ b/nng/src/util.rs @@ -1,8 +1,4 @@ -use std::{ - os::raw::{c_char, c_int, c_void}, - ptr::NonNull, - time::Duration, -}; +use std::{os::raw::c_int, ptr::NonNull, time::Duration}; use crate::error::{Error, Result}; @@ -20,155 +16,6 @@ macro_rules! rv2res { }; } -/// Utility macro for creating a new option type. -/// -/// This is 90% me just playing around with macros. It is probably a terrible -/// way to go around doing this task but, then again, this whole options -/// business has been a complete mess. -macro_rules! create_option -{ - ( - $(#[$attr:meta])* - $opt:ident -> $ot:ty: - Get $g:ident = $gexpr:stmt; - Set $s:ident $v:ident = $sexpr:stmt; - ) => { - $(#[$attr])* - #[allow(missing_debug_implementations)] - #[allow(missing_copy_implementations)] - #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] - pub enum $opt {} - #[allow(deprecated)] - impl $crate::options::Opt for $opt - { - type OptType = $ot; - } - #[allow(deprecated)] - #[allow(clippy::cast_possible_truncation)] - impl $crate::options::private::OptOps for $opt - { - fn get($g: &T) -> $crate::error::Result { $gexpr } - fn set($s: &T, $v: Self::OptType) -> $crate::error::Result<()> { $sexpr } - } - #[allow(deprecated)] - #[allow(clippy::use_debug)] - impl std::fmt::Display for $opt - { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result - { - write!(f, "{:?}", self) - } - } - }; - - ( - $(#[$attr:meta])* - $opt:ident -> $ot:ty: - Set $s:ident $v:ident = $sexpr:stmt; - ) => { - create_option!( - $(#[$attr])* - $opt -> $ot: - Get _g = unreachable!("should not have been implemented - option is write-only"); - Set $s $v = $sexpr; - ); - }; - - ( - $(#[$attr:meta])* - $opt:ident -> $ot:ty: - Get $g:ident = $gexpr:stmt; - ) => { - create_option!( - $(#[$attr])* - $opt -> $ot: - Get $g = $gexpr; - Set _s _v = unreachable!("should not have been implemented - option is read-only"); - ); - }; -} - -/// Implements the specified options for the type. -macro_rules! expose_options -{ - ( - $struct:ident :: $($member:ident).+ -> $handle:ty; - GETOPT_BOOL = $go_b:path; - GETOPT_INT = $go_i:path; - GETOPT_MS = $go_ms:path; - GETOPT_SIZE = $go_sz:path; - GETOPT_SOCKADDR = $go_sa:path; - GETOPT_STRING = $go_str:path; - GETOPT_UINT64 = $go_uint64:path; - - SETOPT = $so:path; - SETOPT_BOOL = $so_b:path; - SETOPT_INT = $so_i:path; - SETOPT_MS = $so_ms:path; - SETOPT_PTR = $so_ptr:path; - SETOPT_SIZE = $so_sz:path; - SETOPT_STRING = $so_str:path; - - Gets -> [$($($getters:ident)::+),*]; - Sets -> [$($($setters:ident)::+),*]; - ) => { - impl $crate::options::private::HasOpts for $struct - { - type Handle = $handle; - fn handle(&self) -> Self::Handle { self.$($member).+ } - - const GETOPT_BOOL: unsafe extern "C" fn(Self::Handle, *const std::os::raw::c_char, *mut bool) -> std::os::raw::c_int = $go_b; - const GETOPT_INT: unsafe extern "C" fn(Self::Handle, *const std::os::raw::c_char, *mut std::os::raw::c_int) -> std::os::raw::c_int = $go_i; - const GETOPT_MS: unsafe extern "C" fn(Self::Handle, *const std::os::raw::c_char, *mut nng_sys::nng_duration) -> std::os::raw::c_int = $go_ms; - const GETOPT_SIZE: unsafe extern "C" fn(Self::Handle, *const std::os::raw::c_char, *mut usize) -> std::os::raw::c_int = $go_sz; - const GETOPT_SOCKADDR: unsafe extern "C" fn(Self::Handle, *const std::os::raw::c_char, *mut nng_sys::nng_sockaddr) -> std::os::raw::c_int = $go_sa; - const GETOPT_STRING: unsafe extern "C" fn(Self::Handle, *const std::os::raw::c_char, *mut *mut std::os::raw::c_char) -> std::os::raw::c_int = $go_str; - const GETOPT_UINT64: unsafe extern "C" fn(Self::Handle, *const std::os::raw::c_char, *mut u64) -> std::os::raw::c_int = $go_uint64; - - const SETOPT: unsafe extern "C" fn(Self::Handle, *const std::os::raw::c_char, *const std::os::raw::c_void, usize) -> std::os::raw::c_int = $so; - const SETOPT_BOOL: unsafe extern "C" fn(Self::Handle, *const std::os::raw::c_char, bool) -> std::os::raw::c_int = $so_b; - const SETOPT_INT: unsafe extern "C" fn(Self::Handle, *const std::os::raw::c_char, std::os::raw::c_int) -> std::os::raw::c_int = $so_i; - const SETOPT_MS: unsafe extern "C" fn(Self::Handle, *const std::os::raw::c_char, nng_sys::nng_duration) -> std::os::raw::c_int = $so_ms; - const SETOPT_PTR: unsafe extern "C" fn(Self::Handle, *const std::os::raw::c_char, *mut std::os::raw::c_void) -> std::os::raw::c_int = $so_ptr; - const SETOPT_SIZE: unsafe extern "C" fn(Self::Handle, *const std::os::raw::c_char, usize) -> std::os::raw::c_int = $so_sz; - const SETOPT_STRING: unsafe extern "C" fn(Self::Handle, *const std::os::raw::c_char, *const std::os::raw::c_char) -> std::os::raw::c_int = $so_str; - } - - $( - #[allow(deprecated)] - impl $crate::options::GetOpt<$crate::options::$($getters)::+> for $struct {} - )* - $( - #[allow(deprecated)] - impl $crate::options::SetOpt<$crate::options::$($setters)::+> for $struct {} - )* - } -} - -/// A catch-all function for unsupported options operations. -#[allow(clippy::unimplemented)] -pub unsafe extern "C" fn fake_opt(_: H, _: *const c_char, _: T) -> c_int { - unimplemented!( - "{} does not support the option operation on {}", - stringify!(H), - stringify!(T) - ) -} - -/// A catch-all function for unsupported generic options operations. -#[allow(clippy::unimplemented)] -pub unsafe extern "C" fn fake_genopt( - _: H, - _: *const c_char, - _: *const c_void, - _: usize, -) -> c_int { - unimplemented!( - "{} does not support the generic option operation", - stringify!(H) - ) -} - /// Converts a Rust `Duration` into an `nng_duration`. #[allow(clippy::cast_possible_truncation)] pub fn duration_to_nng(dur: Option) -> nng_sys::nng_duration { @@ -190,17 +37,6 @@ pub fn duration_to_nng(dur: Option) -> nng_sys::nng_duration { } } -/// Converts an `nng_duration` into a Rust `Duration`. -pub fn nng_to_duration(ms: nng_sys::nng_duration) -> Option { - if ms == nng_sys::NNG_DURATION_INFINITE { - None - } else if ms >= 0 { - Some(Duration::from_millis(ms as u64)) - } else { - panic!("Unexpected value for `nng_duration` ({})", ms) - } -} - /// Checks an NNG return code and validates the pointer, returning a /// `NonNull`. #[inline]