Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions nng/CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ The format is based on https://keepachangelog.com/en/1.0.0/[Keep a Changelog] an

=== Added ===

* Added NNG initialization function and associated type.

=== Changed ===

* The crate now depends on NNGv2.

=== Deprecated ===

=== Removed ===

* Anything and everything related to options (NOTE TO SELF: make a replacement).
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Internal note should be removed or converted to a TODO issue.

The text (NOTE TO SELF: make a replacement) appears to be a personal reminder that shouldn't be in the public changelog.

📝 Suggested options

Either remove the note:

-* Anything and everything related to options (NOTE TO SELF: make a replacement).
+* Removed the entire `options` module and related option accessor APIs.

Or create a tracking issue and reference it in the changelog entry.

Would you like me to help draft a more detailed changelog entry describing what was removed, or open an issue to track the options replacement work?

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
* Anything and everything related to options (NOTE TO SELF: make a replacement).
* Removed the entire `options` module and related option accessor APIs.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@nng/CHANGELOG.adoc` at line 21, Remove the personal reminder "(NOTE TO SELF:
make a replacement)" from the changelog entry "* Anything and everything related
to options (NOTE TO SELF: make a replacement)" and either delete that
parenthetical entirely or replace it with a public tracking reference (e.g.,
"see issue `#NNNN`" or "tracked by TODO-ISSUE") so the entry is professional; if
you prefer creation of an issue, open one to track the options replacement work
and reference its identifier in the modified entry.


=== Fixed ===

=== Security ===
Expand Down
42 changes: 39 additions & 3 deletions nng/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,43 @@ ffi-module = []
log = "0.4.28"

[dependencies.nng-sys]
version = "0.3.0"
# TODO(flxo): use path dependency once nng is updated to use nng-sys v0.4
# path = "../nng-sys"
version = "0.4.0-v2pre.2"
default-features = false

[lints.rust]
improper_ctypes = "forbid"
bare_trait_objects = "deny"
missing_debug_implementations = "deny"
missing_docs = "deny"

[lints.clippy]
all = { level = "deny", priority = -1 }
wrong_self_convention = { level = "deny", priority = -1 }

# I would like to be able to keep these on, but due to the nature of the crate
# it just isn't feasible. For example, the "cast_sign_loss" will warn at every
# i32/u32 conversion. Normally, I would like that, but this library is a safe
# wrapper around a Bindgen-based binding of a C library, which means the types
# are a little bit up-in-the-air.
cast_sign_loss = "allow"
empty_enum = "allow" # Revisit after RFC1861 and RFC1216.
cargo_common_metadata = "allow" # Can't control this.
module_name_repetitions = "allow" # Doesn't recognize public re-exports.
cast_possible_wrap = "allow"

# I want to enable this but it requires bumping the Rustc version and I don't
# want to do that just for a clippy lint.
ptr_as_ptr = "allow"

# In these cases, I just don't like what Clippy suggests.
use_self = "allow"
if_not_else = "allow"
must_use_candidate = "allow"
missing_const_for_fn = "allow"
option_if_let_else = "allow" # Semantically backwards when used with non-zero error codes
wildcard_imports = "allow" # I don't generally like them either but can be used well
enum_glob_use = "allow" # Same as wildcards
manual_non_exhaustive = "allow" # Not available in v1.36

# This isn't smart enough to notice that we're converting to a variable type.
unnecessary_cast = "allow"
6 changes: 1 addition & 5 deletions nng/examples/pair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
//! This example was derived from [this NNG example][1].
//!
//! [1]: https://nanomsg.org/gettingstarted/nng/pair.html
use nng::{
options::{Options, RecvTimeout},
Error, Message, Protocol, Socket,
};
use nng::{Error, Message, Protocol, Socket};
use std::{env, io::Write, process, str, thread, time::Duration};

/// Entry point of the application.
Expand Down Expand Up @@ -44,7 +41,6 @@ fn node1(url: &str) -> Result<(), Error> {

/// Sends and receives messages on the socket.
fn send_recv(s: &Socket, name: &str) -> Result<(), Error> {
s.set_opt::<RecvTimeout>(Some(Duration::from_millis(100)))?;
loop {
// Attempt to reuse the message if we can.
let mut msg = match s.recv() {
Expand Down
9 changes: 3 additions & 6 deletions nng/examples/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
//!
//! This pattern is used to allow a single broadcaster to publish messages to many subscribers,
//! which may choose to limit which messages they receive.
use nng::{
options::{protocol::pubsub::Subscribe, Options},
PipeEvent, Protocol, Socket,
};
use nng::{PipeEvent, Protocol, Socket};
use std::{
convert::TryInto,
env, process,
Expand Down Expand Up @@ -66,8 +63,8 @@ fn subscriber(url: &str) -> Result<(), nng::Error> {
s.dial(url)?;

println!("SUBSCRIBER: SUBSCRIBING TO ALL TOPICS");
let all_topics = vec![];
s.set_opt::<Subscribe>(all_topics)?;
//let all_topics = vec![];
//s.set_opt::<Subscribe>(all_topics)?;

loop {
let msg = s.recv()?;
Expand Down
37 changes: 0 additions & 37 deletions nng/src/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ pub enum SocketAddr {
/// Address for TCP/IP (v6) communication.
Inet6(SocketAddrV6),

#[doc(hidden)]
/// Used to represent a ZeroTier address.
ZeroTier(SocketAddrZt),

#[doc(hidden)]
/// Used to represent an abstract IPC socket address.
Abstract(Box<[u8]>),
Expand All @@ -44,7 +40,6 @@ impl fmt::Display for SocketAddr {
SocketAddr::Ipc(s) => write!(f, "ipc://{}", s.to_string_lossy()),
SocketAddr::Inet(s) => write!(f, "tcp://{s}"),
SocketAddr::Inet6(s) => write!(f, "tcp://{s}"),
SocketAddr::ZeroTier(s) => write!(f, "zt://{s}"),
SocketAddr::Abstract(s) => {
write!(f, "abstract://")?;
// Quick-and-dirty URI-encoding:
Expand Down Expand Up @@ -82,9 +77,6 @@ impl From<nng_sys::nng_sockaddr> for SocketAddr {
let port = addr.s_in6.sa_port;
SocketAddr::Inet6(SocketAddrV6::new(v6_addr, port, 0, 0))
}
Some(nng_sys::nng_sockaddr_family::NNG_AF_ZT) => {
SocketAddr::ZeroTier(SocketAddrZt::new(&addr.s_zt))
}
Some(nng_sys::nng_sockaddr_family::NNG_AF_ABSTRACT) => SocketAddr::Abstract(
Box::from(&addr.s_abstract.sa_name[..usize::from(addr.s_abstract.sa_len)]),
),
Expand All @@ -97,35 +89,6 @@ impl From<nng_sys::nng_sockaddr> for SocketAddr {
}
}

/// A ZeroTier socket address.
#[doc(hidden)]
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct SocketAddrZt {
pub family: u16,
pub nwid: u64,
pub nodeid: u64,
pub port: u32,
}
impl SocketAddrZt {
/// Converts an `nng_sockaddr_zt` into its corresponding Rust type.
const fn new(addr: &nng_sys::nng_sockaddr_zt) -> SocketAddrZt {
SocketAddrZt {
family: addr.sa_family,
nwid: addr.sa_nwid,
nodeid: addr.sa_nodeid,
port: addr.sa_port,
}
}
}
impl fmt::Display for SocketAddrZt {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// I have no idea if this output is meaningful at all. This is just vaguely
// based off the URI format for ZeroTier, ignoring fields that don't appear in
// the specification and guessing how all of the others align.
write!(f, "{}.{}:{}", self.nodeid, self.nwid, self.port)
}
}

/// Creates a `String` from a slice that _probably_ contains UTF-8 and
/// _probably_ is null terminated.
///
Expand Down
12 changes: 7 additions & 5 deletions nng/src/aio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
};

use log::error;
use nng_sys::nng_err;

use crate::{
ctx::Context,
Expand Down Expand Up @@ -172,7 +173,7 @@ impl Aio {
let res = unsafe {
let state = cb_aio.inner.state.load(Ordering::Acquire).into();
let aiop = cb_aio.inner.handle.load(Ordering::Relaxed);
let rv = nng_sys::nng_aio_result(aiop) as u32;
let nng_err(rv) = nng_sys::nng_aio_result(aiop);

let res = match (state, rv) {
(State::Sending, 0) => AioResult::Send(Ok(())),
Expand Down Expand Up @@ -218,7 +219,8 @@ impl Aio {

let mut aio: *mut nng_sys::nng_aio = ptr::null_mut();
let aiop: *mut *mut nng_sys::nng_aio = &mut aio as _;
let rv = unsafe { nng_sys::nng_aio_alloc(aiop, Some(Aio::trampoline), callback_ptr as _) };
let nng_err(rv) =
unsafe { nng_sys::nng_aio_alloc(aiop, Some(Aio::trampoline), callback_ptr as _) };

// NNG should never touch the pointer and return a non-zero code at the same
// time. That being said, I'm going to be a pessimist and double check. If we do
Expand All @@ -231,7 +233,7 @@ impl Aio {
error!("NNG returned a non-null pointer from a failed function");
return Err(Error::Unknown(0));
}
validate_ptr(rv, aio)?;
validate_ptr(rv as _, aio)?;
inner.handle.store(aio, Ordering::Release);
inner.callback.store(callback_ptr, Ordering::Relaxed);

Expand Down Expand Up @@ -342,7 +344,7 @@ impl Aio {
let aiop = self.inner.handle.load(Ordering::Relaxed);
unsafe {
nng_sys::nng_aio_set_msg(aiop, msg.into_ptr().as_ptr());
nng_sys::nng_send_aio(socket.handle(), aiop);
nng_sys::nng_socket_send(socket.handle(), aiop);
}
Ok(())
}
Expand All @@ -358,7 +360,7 @@ impl Aio {

let aiop = self.inner.handle.load(Ordering::Relaxed);
unsafe {
nng_sys::nng_recv_aio(socket.handle(), aiop);
nng_sys::nng_socket_recv(socket.handle(), aiop);
}
Ok(())
}
Expand Down
27 changes: 2 additions & 25 deletions nng/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
message::Message,
socket::Socket,
};
use nng_sys::nng_err;

/// A socket context.
///
Expand Down Expand Up @@ -145,30 +146,6 @@ impl Hash for Context {
}
}

#[rustfmt::skip]
expose_options!{
Context :: inner.ctx -> nng_sys::nng_ctx;

GETOPT_BOOL = nng_sys::nng_ctx_get_bool;
GETOPT_INT = nng_sys::nng_ctx_get_int;
GETOPT_MS = nng_sys::nng_ctx_get_ms;
GETOPT_SIZE = nng_sys::nng_ctx_get_size;
GETOPT_SOCKADDR = nng_sys::nng_ctx_get_addr;
GETOPT_STRING = nng_sys::nng_ctx_get_string;
GETOPT_UINT64 = nng_sys::nng_ctx_get_uint64;

SETOPT = nng_sys::nng_ctx_set;
SETOPT_BOOL = nng_sys::nng_ctx_set_bool;
SETOPT_INT = nng_sys::nng_ctx_set_int;
SETOPT_MS = nng_sys::nng_ctx_set_ms;
SETOPT_PTR = nng_sys::nng_ctx_set_ptr;
SETOPT_SIZE = nng_sys::nng_ctx_set_size;
SETOPT_STRING = nng_sys::nng_ctx_set_string;

Gets -> [protocol::reqrep::ResendTime, protocol::survey::SurveyTime];
Sets -> [protocol::reqrep::ResendTime, protocol::survey::SurveyTime];
}

/// A wrapper around an `nng_ctx`.
#[derive(Debug)]
struct Inner {
Expand All @@ -180,7 +157,7 @@ impl Inner {
// was never open. Neither of those are an issue for us.
let rv = unsafe { nng_sys::nng_ctx_close(self.ctx) };
assert!(
rv == 0 || rv == nng_sys::NNG_ECLOSED as i32,
rv == 0 || nng_err(rv as _) == nng_sys::nng_err::NNG_ECLOSED,
"Unexpected error code while closing context ({})",
rv
);
Expand Down
6 changes: 4 additions & 2 deletions nng/src/device.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::num::NonZeroU32;

use nng_sys::nng_err;

use crate::{
error::{Error, Result},
socket::RawSocket,
Expand Down Expand Up @@ -39,7 +41,7 @@ use crate::{
/// [`MaxTtl`]: options/enum.MaxTtl.html
/// [`OutOfMemory`]: enum.Error.html#variant.OutOfMemory
pub fn forwarder(s1: RawSocket, s2: RawSocket) -> Result<()> {
let rv = unsafe { nng_sys::nng_device(s1.socket.handle(), s2.socket.handle()) };
let nng_err(rv) = unsafe { nng_sys::nng_device(s1.socket.handle(), s2.socket.handle()) };

// Appease Clippy.
drop(s1);
Expand Down Expand Up @@ -76,7 +78,7 @@ pub fn forwarder(s1: RawSocket, s2: RawSocket) -> Result<()> {
/// [`InvalidInput`]: enum.Error.html#variant.InvalidInput
/// [`OutOfMemory`]: enum.Error.html#variant.OutOfMemory
pub fn reflector(s1: RawSocket) -> Result<()> {
let rv = unsafe {
let nng_err(rv) = unsafe {
nng_sys::nng_device(
s1.socket.handle(),
nng_sys::nng_socket::NNG_SOCKET_INITIALIZER,
Expand Down
Loading
Loading