diff --git a/Cargo.lock b/Cargo.lock index 81ced602..10adacbd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3740,6 +3740,7 @@ version = "0.1.0" dependencies = [ "chrono", "colored", + "mg-common", "progenitor 0.13.0", "rdb-types 0.1.0", "reqwest 0.13.2", @@ -3810,6 +3811,7 @@ version = "0.1.0" dependencies = [ "anyhow", "ddm-types", + "mg-common", "oxide-tokio-rt", "oxnet", "reqwest 0.13.2", @@ -3852,6 +3854,7 @@ version = "0.1.0" dependencies = [ "anyhow", "ddm-admin-client", + "mg-common", "slog", "slog-async", "slog-envlogger", @@ -9339,6 +9342,7 @@ version = "0.1.0" dependencies = [ "anyhow", "clap", + "mg-common", "tokio", ] diff --git a/bfd/src/lib.rs b/bfd/src/lib.rs index e469c2d8..0476a048 100644 --- a/bfd/src/lib.rs +++ b/bfd/src/lib.rs @@ -251,6 +251,7 @@ pub enum AddPeerError { #[cfg(test)] mod test { use super::*; + use mg_common::eprintln_nopipe; use pretty_assertions::assert_eq; use slog::Drain; use std::net::IpAddr; @@ -311,11 +312,11 @@ mod test { tx.send((addr, msg)).unwrap(); } None => { - eprintln!("no egress for {}", addr); + eprintln_nopipe!("no egress for {}", addr); } }, Err(e) => { - eprintln!("recv: {}", e); + eprintln_nopipe!("recv: {}", e); } } } diff --git a/bgp/src/connection_channel.rs b/bgp/src/connection_channel.rs index 75981aa7..d7715eac 100644 --- a/bgp/src/connection_channel.rs +++ b/bgp/src/connection_channel.rs @@ -84,6 +84,7 @@ impl std::fmt::Display for Network { #[derive(Debug)] struct Listener { rx: Receiver<(SocketAddr, Endpoint)>, + addr: SocketAddr, } impl Listener { @@ -98,6 +99,12 @@ impl Listener { } } +impl Drop for Listener { + fn drop(&mut self) { + NET.unbind(&self.addr); + } +} + // NOTE: this is not designed to be a full fidelity TCP/IP drop in. It gives // us enough functionality to pass messages between BGP routers to test // state machine transitions above TCP connection tracking. That's all we're @@ -113,7 +120,12 @@ impl Network { fn bind(&self, sa: SocketAddr) -> Listener { let (tx, rx) = mpsc_channel(); lock!(self.endpoints).insert(sa, tx); - Listener { rx } + Listener { rx, addr: sa } + } + + /// Remove a bound address from the network. + fn unbind(&self, addr: &SocketAddr) { + lock!(self.endpoints).remove(addr); } /// Send a copy of the provided endpoint to the endpoint identified by the diff --git a/bgp/src/connection_tcp.rs b/bgp/src/connection_tcp.rs index 6ee434a2..25cba52c 100644 --- a/bgp/src/connection_tcp.rs +++ b/bgp/src/connection_tcp.rs @@ -559,6 +559,8 @@ impl BgpConnectionTcp { direction: ConnectionDirection, config: &SessionInfo, ) -> Result { + conn.set_nodelay(true)?; + let id = ConnectionId::new(source, peer); let dropped = Arc::new(AtomicBool::new(false)); diff --git a/bgp/src/messages.rs b/bgp/src/messages.rs index 4208e163..de654376 100644 --- a/bgp/src/messages.rs +++ b/bgp/src/messages.rs @@ -5949,7 +5949,7 @@ impl From for Option { #[cfg(test)] mod tests { use super::*; - use mg_common::{cidr, ip, parse}; + use mg_common::{cidr, ip, parse, println_nopipe}; use pretty_assertions::assert_eq; use pretty_hex::*; use std::net::{Ipv4Addr, Ipv6Addr}; @@ -6004,7 +6004,7 @@ mod tests { }; let buf = h0.to_wire(); - println!("buf: {}", buf.hex_dump()); + println_nopipe!("buf: {}", buf.hex_dump()); assert_eq!( buf, @@ -6025,7 +6025,7 @@ mod tests { let om0 = OpenMessage::new4(395849, 0x1234, 0xaabbccdd, false); let buf = om0.to_wire().expect("open message to wire"); - println!("buf: {}", buf.hex_dump()); + println_nopipe!("buf: {}", buf.hex_dump()); let om1 = OpenMessage::from_wire(&buf).expect("open message from wire"); assert_eq!(om0, om1); @@ -6036,7 +6036,7 @@ mod tests { let om0 = OpenMessage::new4(395849, 0x1234, 0xaabbccdd, true); let buf = om0.to_wire().expect("open message to wire"); - println!("buf: {}", buf.hex_dump()); + println_nopipe!("buf: {}", buf.hex_dump()); let om1 = OpenMessage::from_wire(&buf).expect("open message from wire"); assert_eq!(om0, om1); @@ -6085,7 +6085,7 @@ mod tests { }; let buf = um0.to_wire().expect("update message to wire"); - println!("buf: {}", buf.hex_dump()); + println_nopipe!("buf: {}", buf.hex_dump()); let um1 = UpdateMessage::from_wire(&buf).expect("update message from wire"); diff --git a/bgp/src/policy.rs b/bgp/src/policy.rs index c956b182..562965a2 100644 --- a/bgp/src/policy.rs +++ b/bgp/src/policy.rs @@ -235,12 +235,13 @@ pub fn new_rhai_engine() -> Engine { #[cfg(debug_assertions)] { - println!("Functions registered:"); + use mg_common::println_nopipe; + println_nopipe!("Functions registered:"); engine .gen_fn_signatures(false) .into_iter() - .for_each(|func| println!("{func}")); - println!(); + .for_each(|func| println_nopipe!("{func}")); + println_nopipe!(); } engine diff --git a/bgp/src/test.rs b/bgp/src/test.rs index df8077c3..af2ebf14 100644 --- a/bgp/src/test.rs +++ b/bgp/src/test.rs @@ -125,12 +125,12 @@ impl TestRouter { let d = self.dispatcher.clone(); let listen_addr = self.dispatcher.listen_addr().to_string(); let listen_addr_for_log = listen_addr.clone(); - eprintln!("Spawning Dispatcher thread for {}", listen_addr); + eprintln_nopipe!("Spawning Dispatcher thread for {}", listen_addr); Builder::new() .name(format!("bgp-listener-{}", listen_addr)) .spawn(move || { d.run::(); - eprintln!( + eprintln_nopipe!( "Dispatcher thread for {} exiting", listen_addr_for_log ); @@ -352,12 +352,12 @@ where let d = dispatcher.clone(); let listen_addr = dispatcher.listen_addr().to_string(); let listen_addr_for_log = listen_addr.clone(); - eprintln!("Spawning Dispatcher thread for {}", listen_addr); + eprintln_nopipe!("Spawning Dispatcher thread for {}", listen_addr); Builder::new() .name(format!("bgp-listener-{}", listen_addr)) .spawn(move || { d.run::(); - eprintln!( + eprintln_nopipe!( "Dispatcher thread for {} exiting", listen_addr_for_log ); @@ -604,7 +604,7 @@ fn basic_peering_helper< wait_for_eq!( { let state = r1_session.state(); - println!("r1_session.state(): {state}"); + println_nopipe!("r1_session.state(): {state}"); state }, FsmStateKind::Established, @@ -1219,7 +1219,7 @@ fn test_neighbor_thread_lifecycle_no_leaks() { let count = mg_common::test::count_threads_with_prefix("bgp-") .expect("couldn't collect thread count"); if count > 0 { - eprintln!( + eprintln_nopipe!( "Waiting for baseline to stabilize (current: {count})" ); } @@ -1228,7 +1228,7 @@ fn test_neighbor_thread_lifecycle_no_leaks() { "Baseline BGP thread count should reach 0" ); let baseline = 0; - eprintln!("=== Baseline BGP thread count: {baseline} ==="); + eprintln_nopipe!("=== Baseline BGP thread count: {baseline} ==="); let r1_peer_config = PeerConfig { name: "r2".into(), @@ -1304,7 +1304,7 @@ fn test_neighbor_thread_lifecycle_no_leaks() { let after_establish = mg_common::test::count_threads_with_prefix("bgp-") .expect("couldn't collect thread count"); - eprintln!( + eprintln_nopipe!( "=== After establishment BGP thread count: {after_establish} (baseline: {baseline}, delta: +{}) ===", after_establish - baseline ); @@ -1349,18 +1349,18 @@ fn test_neighbor_thread_lifecycle_no_leaks() { mg_common::test::count_threads_with_prefix("bgp-") .expect("couldn't get bgp thread count"); if after_shutdown != baseline { - eprintln!( + eprintln_nopipe!( "BGP thread count after shutdown ({after_shutdown} != baseline {baseline})" ); // Dump detailed thread stacks match mg_common::test::dump_thread_stacks() { Ok(stacks) => { - eprintln!("=== Thread stack traces ==="); - eprintln!("{stacks}"); + eprintln_nopipe!("=== Thread stack traces ==="); + eprintln_nopipe!("{stacks}"); } Err(e) => { - eprintln!("Could not dump thread stacks: {e}"); + eprintln_nopipe!("Could not dump thread stacks: {e}"); } } } @@ -1786,6 +1786,8 @@ fn unnumbered_peering_helper( Arc>, Arc, Vec>>, + Vec>>, + Vec>>, ) { let log = init_file_logger(&format!("{}.log", test_name)); @@ -2018,7 +2020,16 @@ fn unnumbered_peering_helper( .expect("start session2"); } - (router1, mock_ndp1, sessions1, router2, mock_ndp2, sessions2) + ( + router1, + mock_ndp1, + sessions1, + router2, + mock_ndp2, + sessions2, + dispatchers1, + dispatchers2, + ) } /// Test: Session survives NDP neighbor changes and reconnects via new neighbor after reset. @@ -2031,24 +2042,39 @@ fn unnumbered_peering_helper( /// - After AdminEvent::Reset, session reconnects using new NDP neighbor #[test] fn test_unnumbered_session_survives_peer_change() { - let (router1, mock_ndp1, sessions1, router2, _mock_ndp2, sessions2) = - unnumbered_peering_helper( - "unnumbered_peer_change", - vec![("eth0".to_string(), 2)], - RouteExchange::Ipv4 { nexthop: None }, - ); + let scope_id = next_scope_id(); + let ( + router1, + mock_ndp1, + sessions1, + router2, + _mock_ndp2, + sessions2, + disps1, + disps2, + ) = unnumbered_peering_helper( + "unnumbered_peer_change", + vec![("eth0".to_string(), scope_id)], + RouteExchange::Ipv4 { nexthop: None }, + ); let session1 = &sessions1[0]; let session2 = &sessions2[0]; // Debug: check what's happening sleep(Duration::from_secs(5)); - eprintln!("Session1 state: {:?}", session1.state()); - eprintln!("Session2 state: {:?}", session2.state()); - eprintln!("Session1 peer addr: {:?}", session1.get_peer_socket_addr()); - eprintln!("Session2 peer addr: {:?}", session2.get_peer_socket_addr()); - eprintln!("Session1 is_unnumbered: {}", session1.is_unnumbered()); - eprintln!("Session2 is_unnumbered: {}", session2.is_unnumbered()); + eprintln_nopipe!("Session1 state: {:?}", session1.state()); + eprintln_nopipe!("Session2 state: {:?}", session2.state()); + eprintln_nopipe!( + "Session1 peer addr: {:?}", + session1.get_peer_socket_addr() + ); + eprintln_nopipe!( + "Session2 peer addr: {:?}", + session2.get_peer_socket_addr() + ); + eprintln_nopipe!("Session1 is_unnumbered: {}", session1.is_unnumbered()); + eprintln_nopipe!("Session2 is_unnumbered: {}", session2.is_unnumbered()); // Wait for both sessions to reach Established wait_for_eq!(session1.state(), FsmStateKind::Established); @@ -2059,7 +2085,7 @@ fn test_unnumbered_session_survives_peer_change() { "fe80::2".parse().unwrap(), TEST_BGP_PORT, 0, - 2, + scope_id, )); assert_eq!(session1.get_peer_socket_addr(), Some(peer2_addr)); @@ -2068,8 +2094,12 @@ fn test_unnumbered_session_survives_peer_change() { mock_ndp1.discover_peer("eth0", new_peer_ip).unwrap(); // Router 1's query now returns the new peer - let new_peer_addr = - SocketAddr::V6(SocketAddrV6::new(new_peer_ip, TEST_BGP_PORT, 0, 2)); + let new_peer_addr = SocketAddr::V6(SocketAddrV6::new( + new_peer_ip, + TEST_BGP_PORT, + 0, + scope_id, + )); wait_for!(session1.get_peer_socket_addr() == Some(new_peer_addr)); // CRITICAL: Session must stay Established (NDP change doesn't affect FSM) @@ -2099,6 +2129,12 @@ fn test_unnumbered_session_survives_peer_change() { // Clean up router1.shutdown(); router2.shutdown(); + for d in &disps1 { + d.shutdown(); + } + for d in &disps2 { + d.shutdown(); + } } /// Test: Session handles peer expiry and rediscovery. @@ -2111,12 +2147,21 @@ fn test_unnumbered_session_survives_peer_change() { /// - Session remains Established throughout #[test] fn test_unnumbered_peer_expiry_and_rediscovery() { - let (router1, mock_ndp1, sessions1, router2, _mock_ndp2, sessions2) = - unnumbered_peering_helper( - "unnumbered_expiry", - vec![("eth0".to_string(), 2)], - RouteExchange::Ipv4 { nexthop: None }, - ); + let scope_id = next_scope_id(); + let ( + router1, + mock_ndp1, + sessions1, + router2, + _mock_ndp2, + sessions2, + disps1, + disps2, + ) = unnumbered_peering_helper( + "unnumbered_expiry", + vec![("eth0".to_string(), scope_id)], + RouteExchange::Ipv4 { nexthop: None }, + ); let session1 = &sessions1[0]; let session2 = &sessions2[0]; @@ -2154,7 +2199,7 @@ fn test_unnumbered_peer_expiry_and_rediscovery() { // Router 1's query returns the peer again let peer2_addr = - SocketAddr::V6(SocketAddrV6::new(peer2_ip, TEST_BGP_PORT, 0, 2)); + SocketAddr::V6(SocketAddrV6::new(peer2_ip, TEST_BGP_PORT, 0, scope_id)); wait_for!(session1.get_peer_socket_addr() == Some(peer2_addr)); // Sessions should still be Established @@ -2164,6 +2209,12 @@ fn test_unnumbered_peer_expiry_and_rediscovery() { // Clean up router1.shutdown(); router2.shutdown(); + for d in &disps1 { + d.shutdown(); + } + for d in &disps2 { + d.shutdown(); + } } /// Test: Multiple unnumbered sessions on different interfaces work independently. @@ -2175,12 +2226,25 @@ fn test_unnumbered_peer_expiry_and_rediscovery() { /// - Both sessions stay Established when eth0's NDP changes #[test] fn test_multiple_unnumbered_sessions() { - let (router1, mock_ndp1, sessions1, router2, _mock_ndp2, sessions2) = - unnumbered_peering_helper( - "multiple_unnumbered", - vec![("eth0".to_string(), 2), ("eth1".to_string(), 3)], - RouteExchange::Ipv4 { nexthop: None }, - ); + let scope_eth0 = next_scope_id(); + let scope_eth1 = next_scope_id(); + let ( + router1, + mock_ndp1, + sessions1, + router2, + _mock_ndp2, + sessions2, + disps1, + disps2, + ) = unnumbered_peering_helper( + "multiple_unnumbered", + vec![ + ("eth0".to_string(), scope_eth0), + ("eth1".to_string(), scope_eth1), + ], + RouteExchange::Ipv4 { nexthop: None }, + ); let session1_eth0 = &sessions1[0]; let session1_eth1 = &sessions1[1]; @@ -2196,8 +2260,12 @@ fn test_multiple_unnumbered_sessions() { // Change Router 1's eth0 NDP neighbor let new_peer_ip: Ipv6Addr = "fe80::99".parse().unwrap(); mock_ndp1.discover_peer("eth0", new_peer_ip).unwrap(); - let new_peer = - SocketAddr::V6(SocketAddrV6::new(new_peer_ip, TEST_BGP_PORT, 0, 2)); + let new_peer = SocketAddr::V6(SocketAddrV6::new( + new_peer_ip, + TEST_BGP_PORT, + 0, + scope_eth0, + )); wait_for!(session1_eth0.get_peer_socket_addr() == Some(new_peer)); // CRITICAL: All sessions must stay Established @@ -2227,7 +2295,7 @@ fn test_multiple_unnumbered_sessions() { "fe80::2".parse().unwrap(), TEST_BGP_PORT, 0, - 3, + scope_eth1, )); assert_eq!( session1_eth1.get_peer_socket_addr(), @@ -2238,6 +2306,12 @@ fn test_multiple_unnumbered_sessions() { // Clean up router1.shutdown(); router2.shutdown(); + for d in &disps1 { + d.shutdown(); + } + for d in &disps2 { + d.shutdown(); + } } /// Test: Same link-local address on multiple interfaces. @@ -2272,12 +2346,25 @@ fn test_multiple_unnumbered_sessions() { /// each interface's scope_id correctly identifies which physical link to use. #[test] fn test_same_linklocal_multiple_interfaces() { - let (router1, mock_ndp1, sessions1, router2, _mock_ndp2, sessions2) = - unnumbered_peering_helper( - "same_linklocal", - vec![("eth0".to_string(), 2), ("eth1".to_string(), 3)], - RouteExchange::Ipv4 { nexthop: None }, - ); + let scope_eth0 = next_scope_id(); + let scope_eth1 = next_scope_id(); + let ( + router1, + mock_ndp1, + sessions1, + router2, + _mock_ndp2, + sessions2, + disps1, + disps2, + ) = unnumbered_peering_helper( + "same_linklocal", + vec![ + ("eth0".to_string(), scope_eth0), + ("eth1".to_string(), scope_eth1), + ], + RouteExchange::Ipv4 { nexthop: None }, + ); // The helper already discovers fe80::2 on both interfaces of mock_ndp1. // The point of this test is to verify that the same peer IP (fe80::2) on @@ -2289,13 +2376,13 @@ fn test_same_linklocal_multiple_interfaces() { peer_ip, TEST_BGP_PORT, 0, - 2, // scope_id 2 for eth0 + scope_eth0, )); let peer_eth1 = SocketAddr::V6(SocketAddrV6::new( peer_ip, // SAME IP as eth0 TEST_BGP_PORT, 0, - 3, // scope_id 3 for eth1 + scope_eth1, )); let session1_eth0 = &sessions1[0]; @@ -2347,11 +2434,15 @@ fn test_same_linklocal_multiple_interfaces() { // Verify they're truly independent: change eth0's peer let new_peer_ip: Ipv6Addr = "fe80::99".parse().unwrap(); mock_ndp1.discover_peer("eth0", new_peer_ip).unwrap(); - let new_peer_eth0 = - SocketAddr::V6(SocketAddrV6::new(new_peer_ip, TEST_BGP_PORT, 0, 2)); + let new_peer_eth0 = SocketAddr::V6(SocketAddrV6::new( + new_peer_ip, + TEST_BGP_PORT, + 0, + scope_eth0, + )); wait_for!(session1_eth0.get_peer_socket_addr() == Some(new_peer_eth0)); - // eth1 should still see fe80::2 with scope 3 + // eth1 should still see fe80::2 with its own scope_id assert_eq!( session1_eth1.get_peer_socket_addr(), Some(peer_eth1), @@ -2367,6 +2458,12 @@ fn test_same_linklocal_multiple_interfaces() { // Clean up router1.shutdown(); router2.shutdown(); + for d in &disps1 { + d.shutdown(); + } + for d in &disps2 { + d.shutdown(); + } } // ========================================================================= @@ -3736,7 +3833,7 @@ fn test_unnumbered_interface_lifecycle() { let mock_ndp1 = UnnumberedManagerMock::new(); let mock_ndp2 = UnnumberedManagerMock::new(); - let scope_id = 2u32; + let scope_id = next_scope_id(); // ONLY configure interface mapping - do NOT add to system yet mock_ndp1.configure_interface("eth0".to_string(), scope_id); diff --git a/clippy.toml b/clippy.toml index 90bc2078..5ca9d2d3 100644 --- a/clippy.toml +++ b/clippy.toml @@ -2,3 +2,23 @@ path = "tokio::main" reason = "prefer `oxide_tokio_rt` for production software" replacement = "oxide_tokio_rt::run" + +[[disallowed-macros]] +path = "std::println" +reason = "panics on broken pipe; use `mg_common::println_nopipe!` to exit cleanly" +replacement = "mg_common::println_nopipe" + +[[disallowed-macros]] +path = "std::print" +reason = "panics on broken pipe; use `mg_common::print_nopipe!` to exit cleanly" +replacement = "mg_common::print_nopipe" + +[[disallowed-macros]] +path = "std::eprintln" +reason = "panics on broken pipe; use `mg_common::eprintln_nopipe!` to exit cleanly" +replacement = "mg_common::eprintln_nopipe" + +[[disallowed-macros]] +path = "std::eprint" +reason = "panics on broken pipe; use `mg_common::eprint_nopipe!` to exit cleanly" +replacement = "mg_common::eprint_nopipe" diff --git a/ddmadm/src/main.rs b/ddmadm/src/main.rs index 800315d8..8c5c89bd 100644 --- a/ddmadm/src/main.rs +++ b/ddmadm/src/main.rs @@ -90,7 +90,15 @@ struct Peer { } fn main() -> Result<()> { - oxide_tokio_rt::run(run()) + match oxide_tokio_rt::run(run()) { + Ok(()) => Ok(()), + Err(e) => { + if mg_common::is_broken_pipe(&e) { + std::process::exit(0); + } + Err(e) + } + } } async fn run() -> Result<()> { diff --git a/mg-admin-client/Cargo.toml b/mg-admin-client/Cargo.toml index 26a2838f..c264c67e 100644 --- a/mg-admin-client/Cargo.toml +++ b/mg-admin-client/Cargo.toml @@ -15,6 +15,7 @@ progenitor.workspace = true schemars.workspace = true chrono.workspace = true uuid.workspace = true +mg-common.workspace = true rdb-types.workspace = true tabwriter.workspace = true colored.workspace = true diff --git a/mg-admin-client/src/lib.rs b/mg-admin-client/src/lib.rs index 62ceb635..211095de 100644 --- a/mg-admin-client/src/lib.rs +++ b/mg-admin-client/src/lib.rs @@ -28,6 +28,7 @@ progenitor::generate_api!( ); use colored::*; +use mg_common::{eprintln_nopipe, println_nopipe}; use rdb_types::{AddressFamily, Prefix, ProtocolFilter}; use std::collections::BTreeMap; use std::io::{Write, stdout}; @@ -52,7 +53,7 @@ pub fn print_rib( let pfx: Prefix = match prefix.parse() { Ok(p) => p, Err(e) => { - eprintln!("failed to parse prefix [{prefix}]: {e}"); + eprintln_nopipe!("failed to parse prefix [{prefix}]: {e}"); continue; } }; @@ -131,8 +132,8 @@ fn print_static_routes(routes: &BTreeMap>, title: &str) { } } - println!("{}", title.dimmed()); - println!("{}", "=".repeat(title.len()).dimmed()); + println_nopipe!("{}", title.dimmed()); + println_nopipe!("{}", "=".repeat(title.len()).dimmed()); tw.flush().unwrap(); } @@ -181,7 +182,7 @@ fn print_bgp_routes(routes: &BTreeMap>, title: &str) { } } - println!("{}", title.dimmed()); - println!("{}", "=".repeat(title.len()).dimmed()); + println_nopipe!("{}", title.dimmed()); + println_nopipe!("{}", "=".repeat(title.len()).dimmed()); tw.flush().unwrap(); } diff --git a/mg-common/src/lib.rs b/mg-common/src/lib.rs index 6d8be10f..01c02551 100644 --- a/mg-common/src/lib.rs +++ b/mg-common/src/lib.rs @@ -39,6 +39,115 @@ pub fn format_duration_human(d: Duration) -> String { } } +/// Like `println!`, but silently exits on broken pipe (EPIPE) instead of +/// panicking. Other I/O errors still panic. +#[macro_export] +macro_rules! println_nopipe { + () => { + { + use std::io::Write; + let r = writeln!(std::io::stdout()); + match r { + Ok(_) => {}, + Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => { + std::process::exit(0); + }, + Err(e) => panic!("failed printing to stdout: {e}"), + } + } + }; + ($($arg:tt)*) => { + { + use std::io::Write; + let r = writeln!(std::io::stdout(), $($arg)*); + match r { + Ok(_) => {}, + Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => { + std::process::exit(0); + }, + Err(e) => panic!("failed printing to stdout: {e}"), + } + } + }; +} + +/// Like `print!`, but silently exits on broken pipe (EPIPE) instead of +/// panicking. Other I/O errors still panic. +#[macro_export] +macro_rules! print_nopipe { + ($($arg:tt)*) => { + { + use std::io::Write; + let r = write!(std::io::stdout(), $($arg)*); + match r { + Ok(_) => {}, + Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => { + std::process::exit(0); + }, + Err(e) => panic!("failed printing to stdout: {e}"), + } + } + }; +} + +/// Like `eprintln!`, but silently exits on broken pipe (EPIPE) instead of +/// panicking. Other I/O errors still panic. +#[macro_export] +macro_rules! eprintln_nopipe { + () => { + { + use std::io::Write; + let r = writeln!(std::io::stderr()); + match r { + Ok(_) => {}, + Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => { + std::process::exit(0); + }, + Err(e) => panic!("failed printing to stderr: {e}"), + } + } + }; + ($($arg:tt)*) => { + { + use std::io::Write; + let r = writeln!(std::io::stderr(), $($arg)*); + match r { + Ok(_) => {}, + Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => { + std::process::exit(0); + }, + Err(e) => panic!("failed printing to stderr: {e}"), + } + } + }; +} + +/// Like `eprint!`, but silently exits on broken pipe (EPIPE) instead of +/// panicking. Other I/O errors still panic. +#[macro_export] +macro_rules! eprint_nopipe { + ($($arg:tt)*) => { + { + use std::io::Write; + let r = write!(std::io::stderr(), $($arg)*); + match r { + Ok(_) => {}, + Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => { + std::process::exit(0); + }, + Err(e) => panic!("failed printing to stderr: {e}"), + } + } + }; +} + +/// Returns `true` if the root cause of `err` is a broken pipe (EPIPE). +pub fn is_broken_pipe(err: &anyhow::Error) -> bool { + err.root_cause() + .downcast_ref::() + .is_some_and(|e| e.kind() == std::io::ErrorKind::BrokenPipe) +} + #[macro_export] macro_rules! lock { ($mtx:expr) => { diff --git a/mg-common/src/test.rs b/mg-common/src/test.rs index 8e1e568c..5c8fe6d9 100644 --- a/mg-common/src/test.rs +++ b/mg-common/src/test.rs @@ -162,7 +162,10 @@ macro_rules! sockaddr { struct ManagedIp { address: IpAddr, - installed: bool, + /// Number of IpAllocations within this process currently using this IP. + /// The system-level install/uninstall only happens when this transitions + /// between 0 and 1. use_count > 0 implies the IP is installed. + use_count: u32, lockfile: Option, } @@ -196,13 +199,13 @@ impl LoopbackIpManager { } } - pub fn add(&mut self, addresses: &[IpAddr]) { + fn add(&mut self, addresses: &[IpAddr]) { for addr in addresses { // Only add if not already present if !self.ips.iter().any(|ip| ip.address == *addr) { self.ips.push(ManagedIp { address: *addr, - installed: false, + use_count: 0, lockfile: None, }); } @@ -219,7 +222,10 @@ impl LoopbackIpManager { { let mut mgr = lock!(manager); mgr.add(addresses); - mgr.install()?; + if let Err(e) = mgr.install(addresses) { + mgr.uninstall_addresses(addresses); + return Err(e); + } } // Return guard that will clean up on drop @@ -230,6 +236,13 @@ impl LoopbackIpManager { } } +/// Returns true for addresses that are always present on loopback interfaces +/// and should never be installed or removed by the manager. +fn is_always_present(addr: IpAddr) -> bool { + addr == IpAddr::V4(std::net::Ipv4Addr::LOCALHOST) + || addr == IpAddr::V6(std::net::Ipv6Addr::LOCALHOST) +} + // Helper functions for lockfile-based reference counting fn flock(path: &str) -> std::io::Result { let file = OpenOptions::new() @@ -258,12 +271,12 @@ fn write_refcount(file: &mut File, count: u32) -> std::io::Result<()> { } impl LoopbackIpManager { - pub fn install(&mut self) -> Result<(), std::io::Error> { + fn install(&mut self, addresses: &[IpAddr]) -> Result<(), std::io::Error> { let ifname = self.ifname.clone(); let log = self.log.clone(); for ip in &mut self.ips { - if !ip.installed { + if addresses.contains(&ip.address) { Self::install_single_ip_static(&ifname, &log, ip)?; } } @@ -277,14 +290,30 @@ impl LoopbackIpManager { log: &Logger, ip: &mut ManagedIp, ) -> Result<(), std::io::Error> { - // Skip 127.0.0.1/::1 as they're always present on loopback interfaces by default - let ip_str = ip.address.to_string(); - if ip_str == "127.0.0.1" || ip_str == "::1" { - info!(log, "skipping {ip_str} (always present on loopback)"); - ip.installed = true; // Mark as installed but don't create lockfile + if is_always_present(ip.address) { + info!(log, "skipping {} (always present on loopback)", ip.address); + ip.use_count += 1; return Ok(()); } + // If already installed by another allocation in this process, nothing + // more to do — the system IP and lockfile refcount are already set up. + // Increment use_count now since this path can't fail. + if ip.use_count > 0 { + ip.use_count += 1; + info!( + log, + "{}: already installed, use_count now {}", + ip.address, + ip.use_count, + ); + return Ok(()); + } + + // First use in this process: acquire the cross-process lockfile and + // install the IP if no other process has done so yet. + // Don't increment use_count until all fallible work succeeds. + // 1. Acquire lock for this IP let lockfile_path = format!("/tmp/maghemite-ip-{}.lock", ip.address); let mut lockfile = flock(&lockfile_path)?; @@ -305,8 +334,8 @@ impl LoopbackIpManager { ); write_refcount(&mut lockfile, new_refcount)?; - // 5. Update our state - ip.installed = true; + // 5. All fallible work done — update our state + ip.use_count += 1; ip.lockfile = Some(lockfile); Ok(()) @@ -336,6 +365,7 @@ impl LoopbackIpManager { let cmd = [ "ipadm", "create-addr", + "-t", "-T", "static", "-a", @@ -385,27 +415,17 @@ impl LoopbackIpManager { } } - /// Uninstall all managed addresses - pub fn uninstall(&mut self) { - let addresses: Vec = - self.ips.iter().map(|ip| ip.address).collect(); - self.uninstall_addresses(&addresses); - } - /// Uninstall a single IP address with proper refcount management /// Skips 127.0.0.1 as it should always remain on loopback interfaces fn uninstall_single_ip(&mut self, target_addr: IpAddr) { - // Skip 127.0.0.1 as it should always remain on loopback interfaces - if target_addr.to_string() == "127.0.0.1" { + if is_always_present(target_addr) { info!( self.log, - "skipping 127.0.0.1 cleanup (always present on loopback)" + "skipping {target_addr} cleanup (always present on loopback)" ); - // Just mark as uninstalled in our tracking, but don't touch the system for ip in &mut self.ips { if ip.address == target_addr { - ip.installed = false; - ip.lockfile = None; // No lockfile was created for 127.0.0.1 + ip.use_count = ip.use_count.saturating_sub(1); break; } } @@ -413,7 +433,22 @@ impl LoopbackIpManager { } for ip in &mut self.ips { - if ip.address == target_addr && ip.installed { + if ip.address == target_addr && ip.use_count > 0 { + ip.use_count -= 1; + + if ip.use_count > 0 { + // Other allocations in this process still need this IP. + info!( + self.log, + "{}: use_count now {}, keeping installed", + ip.address, + ip.use_count, + ); + break; + } + + // Last in-process user: decrement the cross-process refcount + // and remove the IP from the system if no other process needs it. if let Some(mut lockfile) = ip.lockfile.take() { let lockfile_path = format!("/tmp/maghemite-ip-{}.lock", ip.address); @@ -434,7 +469,7 @@ impl LoopbackIpManager { Self::remove_ip_from_system_static( &self.ifname, &self.log, - ip, + ip.address, ); // Remove the lockfile completely when refcount reaches 0 @@ -457,9 +492,6 @@ impl LoopbackIpManager { } } - // Always update our state - ip.installed = false; - ip.lockfile = None; info!(self.log, "uninstalled {}", ip.address); break; } @@ -467,20 +499,16 @@ impl LoopbackIpManager { } /// Remove IP from the system using platform-specific commands - fn remove_ip_from_system_static( - ifname: &str, - log: &Logger, - ip: &ManagedIp, - ) { + fn remove_ip_from_system_static(ifname: &str, log: &Logger, addr: IpAddr) { #[cfg(target_os = "illumos")] let output = { - let v = match ip.address { + let v = match addr { IpAddr::V4(_) => "v4", IpAddr::V6(_) => "v6", }; - let mut ip_descr = format!("{v}{}", ip.address); + let mut ip_descr = format!("{v}{addr}"); ip_descr.retain(|c| c.is_alphanumeric()); - let addr_obj = format!("{}/test{}", ifname, ip_descr); + let addr_obj = format!("{ifname}/{ip_descr}"); Command::new("pfexec") .args(["ipadm", "delete-addr", &addr_obj]) .output() @@ -488,11 +516,11 @@ impl LoopbackIpManager { #[cfg(target_os = "linux")] let output = { - let mask = match ip.address { + let mask = match addr { IpAddr::V4(_) => 32, IpAddr::V6(_) => 128, }; - let addr_str = format!("{}/{mask}", ip.address); + let addr_str = format!("{addr}/{mask}"); Command::new("sudo") .args(["ip", "addr", "del", &addr_str, "dev", ifname]) .output() @@ -500,18 +528,12 @@ impl LoopbackIpManager { #[cfg(target_os = "macos")] let output = { - let af = match ip.address { + let af = match addr { IpAddr::V4(_) => "inet", IpAddr::V6(_) => "inet6", }; Command::new("sudo") - .args([ - "ifconfig", - ifname, - af, - &ip.address.to_string(), - "-alias", - ]) + .args(["ifconfig", ifname, af, &addr.to_string(), "-alias"]) .output() }; @@ -526,19 +548,15 @@ impl LoopbackIpManager { { error!( log, - "failed to remove {} from system: {stderr}", - ip.address + "failed to remove {addr} from system: {stderr}" ); return; } } - info!(log, "removed {} from system", ip.address); + info!(log, "removed {addr} from system"); } Err(e) => { - error!( - log, - "failed to execute remove command for {}: {e}", ip.address - ); + error!(log, "failed to execute remove command for {addr}: {e}"); } } } diff --git a/mg-ddm-verify/Cargo.toml b/mg-ddm-verify/Cargo.toml index 4cee4d5e..cdbb2a68 100644 --- a/mg-ddm-verify/Cargo.toml +++ b/mg-ddm-verify/Cargo.toml @@ -13,3 +13,4 @@ serde.workspace = true serde_json.workspace = true oxnet.workspace = true ddm-types.workspace = true +mg-common.workspace = true diff --git a/mg-ddm-verify/src/main.rs b/mg-ddm-verify/src/main.rs index 5b420391..5f3197fa 100644 --- a/mg-ddm-verify/src/main.rs +++ b/mg-ddm-verify/src/main.rs @@ -4,6 +4,7 @@ use anyhow::Result; use ddm_types::exchange::PathVector; +use mg_common::{eprintln_nopipe, println_nopipe}; use oxnet::Ipv6Net; use serde::Deserialize; use std::collections::HashMap; @@ -78,9 +79,11 @@ async fn run() -> Result<()> { for prefix in advertised_prefixes { if !other_sled_prefixes.iter().any(|x| x.destination == *prefix) { - eprintln!( + eprintln_nopipe!( "sled {} advertised {:?} but {} didn't receive it!", - sled, prefix, other_sled, + sled, + prefix, + other_sled, ); missed_direction .entry(sled.to_string()) @@ -92,10 +95,10 @@ async fn run() -> Result<()> { } // show all missed directions - println!("missed directions:"); + println_nopipe!("missed directions:"); for (source, dests) in &missed_direction { for dest in dests { - println!(" {} -> {}", source, dest); + println_nopipe!(" {} -> {}", source, dest); } } diff --git a/mgadm/src/bgp.rs b/mgadm/src/bgp.rs index 725dfc8f..f381ed76 100644 --- a/mgadm/src/bgp.rs +++ b/mgadm/src/bgp.rs @@ -13,6 +13,7 @@ use mg_admin_client::{ Ipv6UnicastConfig, NeighborResetRequest, }, }; +use mg_common::{print_nopipe, println_nopipe}; use rdb::types::{PeerId, Prefix4, Prefix6}; use std::{ fs::read_to_string, @@ -994,7 +995,7 @@ pub async fn commands(command: Commands, c: Client) -> Result<()> { async fn read_routers(c: Client) -> Result<()> { let routers = c.read_routers().await?.into_inner(); - println!("{routers:#?}"); + println_nopipe!("{routers:#?}"); Ok(()) } @@ -1022,7 +1023,7 @@ async fn update_router(cfg: RouterConfig, c: Client) -> Result<()> { async fn read_router(asn: u32, c: Client) -> Result<()> { let response = c.read_router(asn).await?; - println!("{response:#?}"); + println_nopipe!("{response:#?}"); Ok(()) } @@ -1078,8 +1079,7 @@ fn display_neighbors_summary( "State Duration".dimmed(), "Hold".dimmed(), "Keepalive".dimmed(), - ) - .unwrap(); + )?; for (addr, info) in neighbors.iter() { writeln!( @@ -1093,10 +1093,9 @@ fn display_neighbors_summary( format_duration_human(info.timers.hold.negotiated), format_duration_human(info.timers.keepalive.configured), format_duration_human(info.timers.keepalive.negotiated), - ) - .unwrap(); + )?; } - tw.flush().unwrap(); + tw.flush()?; Ok(()) } @@ -1105,174 +1104,211 @@ fn display_neighbors_detail( ) -> Result<()> { for (i, (addr, info)) in neighbors.iter().enumerate() { if i > 0 { - println!(); + println_nopipe!(); } - println!("{}", "=".repeat(80)); - println!("{}", format!("Neighbor: {}", addr).bold()); - println!("{}", "=".repeat(80)); + println_nopipe!("{}", "=".repeat(80)); + println_nopipe!("{}", format!("Neighbor: {}", addr).bold()); + println_nopipe!("{}", "=".repeat(80)); - println!("\n{}", "Basic Information:".bold()); - println!(" Name: {}", info.name); - println!(" Peer Group: {}", info.peer_group); - println!(" FSM State: {:?}", info.fsm_state); - println!( + println_nopipe!("\n{}", "Basic Information:".bold()); + println_nopipe!(" Name: {}", info.name); + println_nopipe!(" Peer Group: {}", info.peer_group); + println_nopipe!(" FSM State: {:?}", info.fsm_state); + println_nopipe!( " FSM State Duration: {}", format_duration_human(info.fsm_state_duration) ); if let Some(asn) = info.asn { - println!(" Peer ASN: {}", asn); + println_nopipe!(" Peer ASN: {}", asn); } if let Some(id) = info.id { - println!(" Peer Router ID: {}", Ipv4Addr::from(id)); + println_nopipe!(" Peer Router ID: {}", Ipv4Addr::from(id)); } - println!("\n{}", "Connection:".bold()); - println!(" Local: {}:{}", info.local_ip, info.local_tcp_port); - println!(" Remote: {}:{}", info.remote_ip, info.remote_tcp_port); + println_nopipe!("\n{}", "Connection:".bold()); + println_nopipe!(" Local: {}:{}", info.local_ip, info.local_tcp_port); + println_nopipe!( + " Remote: {}:{}", + info.remote_ip, + info.remote_tcp_port + ); - println!("\n{}", "Address Families:".bold()); - println!(" IPv4 Unicast:"); - println!(" Import Policy: {:?}", info.ipv4_unicast.import_policy); - println!(" Export Policy: {:?}", info.ipv4_unicast.export_policy); + println_nopipe!("\n{}", "Address Families:".bold()); + println_nopipe!(" IPv4 Unicast:"); + println_nopipe!( + " Import Policy: {:?}", + info.ipv4_unicast.import_policy + ); + println_nopipe!( + " Export Policy: {:?}", + info.ipv4_unicast.export_policy + ); if let Some(nh) = info.ipv4_unicast.nexthop { - println!(" Nexthop: {}", nh); + println_nopipe!(" Nexthop: {}", nh); } - println!(" IPv6 Unicast:"); - println!(" Import Policy: {:?}", info.ipv6_unicast.import_policy); - println!(" Export Policy: {:?}", info.ipv6_unicast.export_policy); + println_nopipe!(" IPv6 Unicast:"); + println_nopipe!( + " Import Policy: {:?}", + info.ipv6_unicast.import_policy + ); + println_nopipe!( + " Export Policy: {:?}", + info.ipv6_unicast.export_policy + ); if let Some(nh) = info.ipv6_unicast.nexthop { - println!(" Nexthop: {}", nh); + println_nopipe!(" Nexthop: {}", nh); } - println!("\n{}", "Timers:".bold()); - println!( + println_nopipe!("\n{}", "Timers:".bold()); + println_nopipe!( " Hold Time: configured={}, negotiated={}, remaining={}", format_duration_human(info.timers.hold.configured), format_duration_human(info.timers.hold.negotiated), format_duration_human(info.timers.hold.remaining), ); - println!( + println_nopipe!( " Keepalive: configured={}, negotiated={}, remaining={}", format_duration_human(info.timers.keepalive.configured), format_duration_human(info.timers.keepalive.negotiated), format_duration_human(info.timers.keepalive.remaining), ); - println!( + println_nopipe!( " Connect Retry: configured={}, remaining={}", format_duration_human(info.timers.connect_retry.configured), format_duration_human(info.timers.connect_retry.remaining), ); match &info.timers.connect_retry_jitter { Some(jitter) => { - println!(" Jitter: {}-{}", jitter.min, jitter.max) + println_nopipe!(" Jitter: {}-{}", jitter.min, jitter.max) } - None => println!(" Jitter: none"), + None => println_nopipe!(" Jitter: none"), } - println!( + println_nopipe!( " Idle Hold: configured={}, remaining={}", format_duration_human(info.timers.idle_hold.configured), format_duration_human(info.timers.idle_hold.remaining), ); match &info.timers.idle_hold_jitter { Some(jitter) => { - println!(" Jitter: {}-{}", jitter.min, jitter.max) + println_nopipe!(" Jitter: {}-{}", jitter.min, jitter.max) } - None => println!(" Jitter: none"), + None => println_nopipe!(" Jitter: none"), } - println!( + println_nopipe!( " Delay Open: configured={}, remaining={}", format_duration_human(info.timers.delay_open.configured), format_duration_human(info.timers.delay_open.remaining), ); if !info.received_capabilities.is_empty() { - println!("\n{}", "Received Capabilities:".bold()); + println_nopipe!("\n{}", "Received Capabilities:".bold()); for cap in &info.received_capabilities { - println!(" {:?}", cap); + println_nopipe!(" {:?}", cap); } } - println!("\n{}", "Counters:".bold()); - println!(" Prefixes:"); - println!(" Advertised: {}", info.counters.prefixes_advertised); - println!(" Imported: {}", info.counters.prefixes_imported); - - println!(" Messages Sent:"); - println!(" Opens: {}", info.counters.opens_sent); - println!(" Updates: {}", info.counters.updates_sent); - println!(" Keepalives: {}", info.counters.keepalives_sent); - println!(" Route Refresh: {}", info.counters.route_refresh_sent); - println!(" Notifications: {}", info.counters.notifications_sent); - - println!(" Messages Received:"); - println!(" Opens: {}", info.counters.opens_received); - println!(" Updates: {}", info.counters.updates_received); - println!(" Keepalives: {}", info.counters.keepalives_received); - println!( + println_nopipe!("\n{}", "Counters:".bold()); + println_nopipe!(" Prefixes:"); + println_nopipe!( + " Advertised: {}", + info.counters.prefixes_advertised + ); + println_nopipe!(" Imported: {}", info.counters.prefixes_imported); + + println_nopipe!(" Messages Sent:"); + println_nopipe!(" Opens: {}", info.counters.opens_sent); + println_nopipe!(" Updates: {}", info.counters.updates_sent); + println_nopipe!(" Keepalives: {}", info.counters.keepalives_sent); + println_nopipe!( + " Route Refresh: {}", + info.counters.route_refresh_sent + ); + println_nopipe!( + " Notifications: {}", + info.counters.notifications_sent + ); + + println_nopipe!(" Messages Received:"); + println_nopipe!(" Opens: {}", info.counters.opens_received); + println_nopipe!(" Updates: {}", info.counters.updates_received); + println_nopipe!( + " Keepalives: {}", + info.counters.keepalives_received + ); + println_nopipe!( " Route Refresh: {}", info.counters.route_refresh_received ); - println!( + println_nopipe!( " Notifications: {}", info.counters.notifications_received ); - println!(" FSM Transitions:"); - println!( + println_nopipe!(" FSM Transitions:"); + println_nopipe!( " To Established: {}", info.counters.transitions_to_established ); - println!(" To Idle: {}", info.counters.transitions_to_idle); - println!(" To Connect: {}", info.counters.transitions_to_connect); + println_nopipe!(" To Idle: {}", info.counters.transitions_to_idle); + println_nopipe!( + " To Connect: {}", + info.counters.transitions_to_connect + ); - println!(" Connections:"); - println!( + println_nopipe!(" Connections:"); + println_nopipe!( " Active Accepted: {}", info.counters.active_connections_accepted ); - println!( + println_nopipe!( " Active Declined: {}", info.counters.active_connections_declined ); - println!( + println_nopipe!( " Passive Accepted: {}", info.counters.passive_connections_accepted ); - println!( + println_nopipe!( " Passive Declined: {}", info.counters.passive_connections_declined ); - println!( + println_nopipe!( " Connection Retries: {}", info.counters.connection_retries ); // Error Counters - println!("\n{}", "Error Counters:".bold()); - println!( + println_nopipe!("\n{}", "Error Counters:".bold()); + println_nopipe!( " TCP Connection Failures: {}", info.counters.tcp_connection_failure ); - println!(" MD5 Auth Failures: {}", info.counters.md5_auth_failures); - println!( + println_nopipe!( + " MD5 Auth Failures: {}", + info.counters.md5_auth_failures + ); + println_nopipe!( " Hold Timer Expirations: {}", info.counters.hold_timer_expirations ); - println!( + println_nopipe!( " Update Nexthop Missing: {}", info.counters.update_nexhop_missing ); - println!( + println_nopipe!( " Open Handle Failures: {}", info.counters.open_handle_failures ); - println!( + println_nopipe!( " Notification Send Failures: {}", info.counters.notification_send_failure ); - println!(" Connector Panics: {}", info.counters.connector_panics); + println_nopipe!( + " Connector Panics: {}", + info.counters.connector_panics + ); } Ok(()) @@ -1300,7 +1336,7 @@ async fn get_exported( .await? .into_inner(); - println!("{exported:#?}"); + println_nopipe!("{exported:#?}"); Ok(()) } @@ -1310,7 +1346,7 @@ async fn list_nbr(asn: u32, c: Client) -> Result<()> { let unnumbered = c.read_unnumbered_neighbors_v2(asn).await?.into_inner(); if numbered.is_empty() && unnumbered.is_empty() { - println!("No neighbors configured for ASN {}", asn); + println_nopipe!("No neighbors configured for ASN {}", asn); return Ok(()); } @@ -1364,14 +1400,14 @@ async fn read_nbr(asn: u32, peer: String, c: Client) -> Result<()> { PeerType::Numbered(addr) => { let nbr = c.read_neighbor(asn, &addr.to_string()).await?.into_inner(); - println!("{nbr:#?}"); + println_nopipe!("{nbr:#?}"); } PeerType::Unnumbered(interface) => { let nbr = c .read_unnumbered_neighbor_v2(asn, &interface) .await? .into_inner(); - println!("{nbr:#?}"); + println_nopipe!("{nbr:#?}"); } } Ok(()) @@ -1465,7 +1501,7 @@ async fn delete_origin4(asn: u32, c: Client) -> Result<()> { async fn read_origin4(asn: u32, c: Client) -> Result<()> { let o4 = c.read_origin4(asn).await?; - println!("{o4:#?}"); + println_nopipe!("{o4:#?}"); Ok(()) } @@ -1504,7 +1540,7 @@ async fn delete_origin6(asn: u32, c: Client) -> Result<()> { async fn read_origin6(asn: u32, c: Client) -> Result<()> { let o6 = c.read_origin6(asn).await?; - println!("{o6:#?}"); + println_nopipe!("{o6:#?}"); Ok(()) } @@ -1528,7 +1564,7 @@ async fn create_chk(filename: String, asn: u32, c: Client) -> Result<()> { async fn read_chk(asn: u32, c: Client) -> Result<()> { let result = c.read_checker(asn).await?; - print!("{result:#?}"); + print_nopipe!("{result:#?}"); Ok(()) } @@ -1560,7 +1596,7 @@ async fn create_shp(filename: String, asn: u32, c: Client) -> Result<()> { async fn read_shp(asn: u32, c: Client) -> Result<()> { let result = c.read_shaper(asn).await?; - print!("{result:#?}"); + print_nopipe!("{result:#?}"); Ok(()) } @@ -1616,9 +1652,9 @@ async fn get_fsm_history( if result.by_peer.is_empty() { if let Some(peer_str) = peer { - println!("No FSM history found for peer {}", peer_str); + println_nopipe!("No FSM history found for peer {}", peer_str); } else { - println!("No FSM history found for ASN {}", asn); + println_nopipe!("No FSM history found for ASN {}", asn); } return Ok(()); } @@ -1637,7 +1673,7 @@ async fn get_fsm_history( // Display FSM history in tabular format for (peer_addr, events) in result.by_peer.iter() { if events.is_empty() { - println!( + println_nopipe!( "\n{}", format!( "FSM Event History - Peer: {} - {} (empty)", @@ -1648,7 +1684,7 @@ async fn get_fsm_history( continue; } - println!( + println_nopipe!( "\n{}", format!( "FSM Event History - Peer: {} - {}", @@ -1656,8 +1692,8 @@ async fn get_fsm_history( ) .dimmed() ); - println!("{}", "=".repeat(100).dimmed()); - println!( + println_nopipe!("{}", "=".repeat(100).dimmed()); + println_nopipe!( "Showing {} of {} events\n", events.len().min(limit), events.len() @@ -1706,7 +1742,7 @@ async fn get_fsm_history( tw.flush()?; if events.len() > limit { - println!( + println_nopipe!( "\n... ({} more events not shown, use --limit all to see everything)", events.len() - limit ); @@ -1758,7 +1794,11 @@ async fn get_message_history( .into_inner(); if result.by_peer.is_empty() { - println!("No message history found for ASN {} peer {}", asn, peer); + println_nopipe!( + "No message history found for ASN {} peer {}", + asn, + peer + ); return Ok(()); } @@ -1797,12 +1837,12 @@ async fn get_message_history( all_messages.iter().rev().take(limit).collect::>(); let total_count = all_messages.len(); - println!( + println_nopipe!( "\n{}", format!("BGP Message History - Peer: {}", peer).dimmed() ); - println!("{}", "=".repeat(80).dimmed()); - println!( + println_nopipe!("{}", "=".repeat(80).dimmed()); + println_nopipe!( "Showing {} of {} messages ({} RX, {} TX)\n", messages_to_show.len(), total_count, @@ -1830,7 +1870,7 @@ async fn get_message_history( msg_content }; - println!( + println_nopipe!( "{} {} [{}] {}", ts_str.to_string().dimmed(), if *direction == "RX" { diff --git a/mgadm/src/main.rs b/mgadm/src/main.rs index 3ceec95c..340179f5 100644 --- a/mgadm/src/main.rs +++ b/mgadm/src/main.rs @@ -63,7 +63,15 @@ enum Commands { } fn main() -> Result<()> { - oxide_tokio_rt::run(run()) + match oxide_tokio_rt::run(run()) { + Ok(()) => Ok(()), + Err(e) => { + if mg_common::is_broken_pipe(&e) { + std::process::exit(0); + } + Err(e) + } + } } async fn run() -> Result<()> { diff --git a/mgadm/src/ndp.rs b/mgadm/src/ndp.rs index de2a8f37..f4a1dc52 100644 --- a/mgadm/src/ndp.rs +++ b/mgadm/src/ndp.rs @@ -6,6 +6,7 @@ use anyhow::Result; use clap::{Args, Subcommand}; use colored::Colorize; use mg_admin_client::Client; +use mg_common::println_nopipe; use std::io::{Write, stdout}; use tabwriter::TabWriter; @@ -59,9 +60,9 @@ pub async fn commands(command: Commands, c: Client) -> Result<()> { async fn ndp_manager_status(asn: u32, c: Client) -> Result<()> { let state = c.get_ndp_manager_state(asn).await?.into_inner(); - println!("NDP Manager State (ASN {})", asn); - println!("{}", "=".repeat(60)); - println!(); + println_nopipe!("NDP Manager State (ASN {})", asn); + println_nopipe!("{}", "=".repeat(60)); + println_nopipe!(); // Monitor thread status let monitor_status = if state.monitor_thread_running { @@ -69,33 +70,34 @@ async fn ndp_manager_status(asn: u32, c: Client) -> Result<()> { } else { "Stopped".red() }; - println!("Monitor Thread: {}", monitor_status); - println!(); + println_nopipe!("Monitor Thread: {}", monitor_status); + println_nopipe!(); // Pending interfaces - println!( + println_nopipe!( "Pending Interfaces (configured, waiting for system): {}", state.pending_interfaces.len() ); if state.pending_interfaces.is_empty() { - println!(" (none)"); + println_nopipe!(" (none)"); } else { for pending in &state.pending_interfaces { - println!( + println_nopipe!( " {} (router_lifetime: {}s)", - pending.interface, pending.router_lifetime + pending.interface, + pending.router_lifetime ); } } - println!(); + println_nopipe!(); // Active interfaces - println!("Active Interfaces: {}", state.active_interfaces.len()); + println_nopipe!("Active Interfaces: {}", state.active_interfaces.len()); if state.active_interfaces.is_empty() { - println!(" (none)"); + println_nopipe!(" (none)"); } else { for iface in &state.active_interfaces { - println!(" {}", iface); + println_nopipe!(" {}", iface); } } @@ -106,7 +108,7 @@ async fn ndp_interfaces(asn: u32, c: Client) -> Result<()> { let interfaces = c.get_ndp_interfaces(asn).await?.into_inner(); if interfaces.is_empty() { - println!("No NDP-managed interfaces found for ASN {}", asn); + println_nopipe!("No NDP-managed interfaces found for ASN {}", asn); return Ok(()); } @@ -181,22 +183,22 @@ async fn ndp_interface_detail( .await? .into_inner(); - println!("NDP State: {}", interface); - println!("{}", "=".repeat(60)); - println!(); + println_nopipe!("NDP State: {}", interface); + println_nopipe!("{}", "=".repeat(60)); + println_nopipe!(); - println!("Interface Information:"); - println!(" Name: {}", detail.interface); - println!(" Local Address: {}", detail.local_address); - println!(" Scope ID: {}", detail.scope_id); - println!( + println_nopipe!("Interface Information:"); + println_nopipe!(" Name: {}", detail.interface); + println_nopipe!(" Local Address: {}", detail.local_address); + println_nopipe!(" Scope ID: {}", detail.scope_id); + println_nopipe!( " Router Lifetime (advertised): {}s", detail.router_lifetime ); - println!(); + println_nopipe!(); // Thread state - println!("Thread State:"); + println_nopipe!("Thread State:"); if let Some(ts) = &detail.thread_state { let tx_status = if ts.tx_running { "Running".green() @@ -208,41 +210,41 @@ async fn ndp_interface_detail( } else { "Stopped".red() }; - println!(" TX Loop: {}", tx_status); - println!(" RX Loop: {}", rx_status); + println_nopipe!(" TX Loop: {}", tx_status); + println_nopipe!(" RX Loop: {}", rx_status); } else { - println!(" TX Loop: {}", "Unknown".dimmed()); - println!(" RX Loop: {}", "Unknown".dimmed()); + println_nopipe!(" TX Loop: {}", "Unknown".dimmed()); + println_nopipe!(" RX Loop: {}", "Unknown".dimmed()); } - println!(); + println_nopipe!(); if let Some(peer) = detail.discovered_peer { if peer.expired { - println!("{}", "Discovered Peer (EXPIRED):".red()); + println_nopipe!("{}", "Discovered Peer (EXPIRED):".red()); } else { - println!("Discovered Peer:"); + println_nopipe!("Discovered Peer:"); } - println!(" Address: {}", peer.address); - println!(" Discovered At: {}", peer.discovered_at); - println!(" Last Advertisement: {}", peer.last_advertisement); - println!(" Router Lifetime: {}s", peer.router_lifetime); - println!(" Reachable Time: {}ms", peer.reachable_time); - println!(" Retrans Timer: {}ms", peer.retrans_timer); + println_nopipe!(" Address: {}", peer.address); + println_nopipe!(" Discovered At: {}", peer.discovered_at); + println_nopipe!(" Last Advertisement: {}", peer.last_advertisement); + println_nopipe!(" Router Lifetime: {}s", peer.router_lifetime); + println_nopipe!(" Reachable Time: {}ms", peer.reachable_time); + println_nopipe!(" Retrans Timer: {}ms", peer.retrans_timer); if peer.expired { - println!(" Expired: {}", "Yes".red()); + println_nopipe!(" Expired: {}", "Yes".red()); if let Some(time_since) = peer.time_until_expiry { - println!(" Time Since Expiry: {}", time_since); + println_nopipe!(" Time Since Expiry: {}", time_since); } } else { - println!(" Expired: {}", "No".green()); + println_nopipe!(" Expired: {}", "No".green()); if let Some(time_until) = peer.time_until_expiry { - println!(" Time Until Expiry: {}", time_until); + println_nopipe!(" Time Until Expiry: {}", time_until); } } } else { - println!("Discovered Peer: None"); + println_nopipe!("Discovered Peer: None"); } Ok(()) diff --git a/mgadm/src/rib.rs b/mgadm/src/rib.rs index 9b76aba8..dd2216d6 100644 --- a/mgadm/src/rib.rs +++ b/mgadm/src/rib.rs @@ -2,13 +2,13 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use std::num::NonZeroU8; - use anyhow::Result; use clap::{Args, Subcommand}; use mg_admin_client::types::BestpathFanoutRequest; use mg_admin_client::{Client, print_rib}; +use mg_common::println_nopipe; use rdb::types::{AddressFamily, ProtocolFilter}; +use std::num::NonZeroU8; #[derive(Subcommand, Debug)] pub enum Commands { @@ -120,13 +120,13 @@ async fn get_selected( async fn read_bestpath_fanout(c: Client) -> Result<()> { let result = c.read_bestpath_fanout().await?; - println!("{}", result.into_inner().fanout); + println_nopipe!("{}", result.into_inner().fanout); Ok(()) } async fn update_bestpath_fanout(fanout: NonZeroU8, c: Client) -> Result<()> { c.update_bestpath_fanout(&BestpathFanoutRequest { fanout }) .await?; - println!("Updated bestpath fanout to: {}", fanout); + println_nopipe!("Updated bestpath fanout to: {}", fanout); Ok(()) } diff --git a/mgadm/src/static_routing.rs b/mgadm/src/static_routing.rs index 761f187e..28ccec44 100644 --- a/mgadm/src/static_routing.rs +++ b/mgadm/src/static_routing.rs @@ -5,6 +5,7 @@ use anyhow::Result; use clap::{Args, Subcommand}; use mg_admin_client::{Client, types}; +use mg_common::println_nopipe; use oxnet::{Ipv4Net, Ipv6Net}; use rdb::{DEFAULT_RIB_PRIORITY_STATIC, Prefix4, Prefix6}; use std::net::{Ipv4Addr, Ipv6Addr}; @@ -43,7 +44,7 @@ pub async fn commands(command: Commands, client: Client) -> Result<()> { match command { Commands::GetV4Routes => { let routes = client.static_list_v4_routes().await?; - println!("{:#?}", routes); + println_nopipe!("{:#?}", routes); } Commands::AddV4Route(route) => { let arg = types::AddStaticRoute4Request { @@ -79,7 +80,7 @@ pub async fn commands(command: Commands, client: Client) -> Result<()> { } Commands::GetV6Routes => { let routes = client.static_list_v6_routes().await?; - println!("{:#?}", routes); + println_nopipe!("{:#?}", routes); } Commands::AddV6Route(route) => { let arg = types::AddStaticRoute6Request { diff --git a/mgd/src/bgp_admin.rs b/mgd/src/bgp_admin.rs index f7cb8f49..3427b3f5 100644 --- a/mgd/src/bgp_admin.rs +++ b/mgd/src/bgp_admin.rs @@ -2545,7 +2545,7 @@ mod tests { admin::HandlerContext, bfd_admin::BfdContext, bgp_admin::BgpContext, }; use bgp::params::{ApplyRequestV1, BgpPeerConfigV1, BgpPeerParametersV1}; - use mg_common::stats::MgLowerStats; + use mg_common::{println_nopipe, stats::MgLowerStats}; use rdb::test::get_test_db; #[cfg(all(feature = "mg-lower", target_os = "illumos"))] use std::net::Ipv6Addr; @@ -2568,7 +2568,7 @@ mod tests { remove_dir_all(&tmpdir).unwrap(); } create_dir_all(&tmpdir).unwrap(); - println!("tmpdir is {tmpdir}"); + println_nopipe!("tmpdir is {tmpdir}"); let log = mg_common::log::init_file_logger("apply_remove_entire_group.log"); diff --git a/rdb/src/db.rs b/rdb/src/db.rs index a0ad6708..d905562c 100644 --- a/rdb/src/db.rs +++ b/rdb/src/db.rs @@ -1479,6 +1479,7 @@ mod test { Prefix6, StaticRouteKey, db::Db, test::TestDb, types::PrefixDbKey, types::test_helpers::path_vecs_equal, }; + use mg_common::eprintln_nopipe; use mg_common::log::*; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::str::FromStr; @@ -1496,15 +1497,15 @@ mod test { ) -> bool { let curr_rib_in_paths = db.get_prefix_paths(prefix); if !path_vecs_equal(&curr_rib_in_paths, &rib_in_paths) { - eprintln!("curr_rib_in_paths: {:?}", curr_rib_in_paths); - eprintln!("rib_in_paths: {:?}", rib_in_paths); + eprintln_nopipe!("curr_rib_in_paths: {:?}", curr_rib_in_paths); + eprintln_nopipe!("rib_in_paths: {:?}", rib_in_paths); return false; } let curr_loc_rib_paths = db.get_selected_prefix_paths(prefix); if !path_vecs_equal(&curr_loc_rib_paths, &loc_rib_paths) { - eprintln!("curr_loc_rib_paths: {:?}", curr_loc_rib_paths); - eprintln!("loc_rib_paths: {:?}", loc_rib_paths); + eprintln_nopipe!("curr_loc_rib_paths: {:?}", curr_loc_rib_paths); + eprintln_nopipe!("loc_rib_paths: {:?}", loc_rib_paths); return false; } true diff --git a/rdb/src/test.rs b/rdb/src/test.rs index 8e4b27b3..59db5700 100644 --- a/rdb/src/test.rs +++ b/rdb/src/test.rs @@ -5,6 +5,7 @@ //! Test utilities for rdb tests. use crate::{Db, error::Error}; +use mg_common::eprintln_nopipe; use slog::Logger; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicU64, Ordering}; @@ -60,7 +61,7 @@ impl Drop for TestDb { if !std::thread::panicking() { let _ = std::fs::remove_dir_all(&self.path); } else { - eprintln!("Test failed - database left at: {}", self.path); + eprintln_nopipe!("Test failed - database left at: {}", self.path); } } } diff --git a/rdb/src/types.rs b/rdb/src/types.rs index 47b2cb7f..753356e0 100644 --- a/rdb/src/types.rs +++ b/rdb/src/types.rs @@ -186,13 +186,10 @@ impl From for BgpPathPropertiesV1 { // Convert PeerId to IpAddr - only Ip variant is valid for V1 API peer: match value.peer { PeerId::Ip(ip) => ip, - PeerId::Interface(iface) => { - // This shouldn't happen in pre-UNNUMBERED versions - // Log warning and use unspecified address as fallback - eprintln!( - "Warning: Interface peer '{}' in V1 API context", - iface - ); + PeerId::Interface(_) => { + // This shouldn't happen in pre-UNNUMBERED versions; fall + // back to the unspecified address so V1 clients see + // something well-formed. IpAddr::V6(Ipv6Addr::UNSPECIFIED) } }, @@ -434,7 +431,7 @@ impl FromStr for PolicyAction { fn from_str(s: &str) -> Result { match s { "allow" | "Allow" => Ok(Self::Allow), - "deny" | "Deny" => Ok(Self::Allow), + "deny" | "Deny" => Ok(Self::Deny), _ => Err("Unknown policy action, must be allow or deny".into()), } } diff --git a/tests/Cargo.toml b/tests/Cargo.toml index fbfc83c9..dfbc0404 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -8,6 +8,7 @@ zone = { git = "https://github.com/oxidecomputer/zone" } ddm-admin-client = { path = "../ddm-admin-client" } anyhow.workspace = true +mg-common.workspace = true slog.workspace = true slog-term.workspace = true slog-envlogger.workspace = true diff --git a/tests/src/ddm.rs b/tests/src/ddm.rs index 4e0afdd0..558f6068 100644 --- a/tests/src/ddm.rs +++ b/tests/src/ddm.rs @@ -5,6 +5,7 @@ use anyhow::{Result, anyhow}; use ddm_admin_client::Client; use ddm_admin_client::types::TunnelOrigin; +use mg_common::{eprintln_nopipe, println_nopipe}; use slog::{Drain, Logger}; use std::env; use std::net::Ipv6Addr; @@ -112,16 +113,17 @@ impl<'a> SoftnpuZone<'a> { impl Drop for SoftnpuZone<'_> { fn drop(&mut self) { if let Err(e) = self.zone.zexec("pkill softnpu") { - eprintln!("failed to stop softnpu: {}", e); + eprintln_nopipe!("failed to stop softnpu: {}", e); } if let Err(e) = self.zfs.copy_from_zone( &self.zone.name, "opt/softnpu.log", &format!("/work/{}-softnpu.log", self.zone.name), ) { - eprintln!( + eprintln_nopipe!( "failed to copy zone log file for {}: {}", - self.zone.name, e, + self.zone.name, + e, ); } } @@ -223,7 +225,7 @@ impl<'a> RouterZone<'a> { self.zone.zexec("svcadm refresh dendrite:default")?; self.zone.zexec("svcadm enable dendrite:default")?; // wait for dendrite to come up - println!("wait 10s for dendrite to come up ..."); + println_nopipe!("wait 10s for dendrite to come up ..."); sleep(Duration::from_secs(10)); self.zone.zexec( "svccfg -s tfport setprop config/pkt_source = none", @@ -250,7 +252,7 @@ impl<'a> RouterZone<'a> { } fn setup(&self, index: u8) -> Result<()> { - println!("running zone {} setup", self.zone.name); + println_nopipe!("running zone {} setup", self.zone.name); let z = Zlogin::new(&self.zone.name); self.zone.wait_for_network()?; @@ -297,7 +299,9 @@ impl<'a> RouterZone<'a> { // Wait for these files to show up in the zone. Testing has shown // that this is not instant and subsequent steps can fail if the // copy is not complete. - println!("waiting 3s for copy of files to zone to complete ..."); + println_nopipe!( + "waiting 3s for copy of files to zone to complete ..." + ); sleep(Duration::from_secs(3)); self.zone.zcmd( &z, @@ -328,16 +332,17 @@ impl std::ops::Deref for RouterZone<'_> { impl Drop for RouterZone<'_> { fn drop(&mut self) { if let Err(e) = self.zone.zexec("pkill ddmd") { - eprintln!("failed to stop ddmd: {}", e); + eprintln_nopipe!("failed to stop ddmd: {}", e); } if let Err(e) = self.zfs.copy_from_zone( &self.zone.name, "opt/ddmd.log", &format!("/work/{}.log", self.zone.name), ) { - eprintln!( + eprintln_nopipe!( "failed to copy zone log file for {}: {}", - self.zone.name, e, + self.zone.name, + e, ); } if self.transit @@ -347,9 +352,10 @@ impl Drop for RouterZone<'_> { &format!("/work/{}-dpd.log", self.zone.name), ) { - eprintln!( + eprintln_nopipe!( "failed to copy zone dpd log file for {}: {}", - self.zone.name, e, + self.zone.name, + e, ); } } @@ -360,11 +366,11 @@ macro_rules! run_topo { if env::var("TEST_INTERACTIVE").is_err() { $fn } else { - println!("running interactive test"); + println_nopipe!("running interactive test"); let mut line = String::new(); let result = $fn; - println!("test result {:?}", result); - println!("press enter to continue"); + println_nopipe!("test result {:?}", result); + println_nopipe!("press enter to continue"); std::io::stdin().read_line(&mut line).unwrap(); result } @@ -420,11 +426,11 @@ async fn test_trio() -> Result<()> { "trio", )?; - println!("start zone s1"); + println_nopipe!("start zone s1"); let s1 = RouterZone::server("s1.trio", &zfs, &mg2.name, &[&sl0_sw0.end_a])?; - println!("start zone s2"); + println_nopipe!("start zone s2"); let s2 = RouterZone::server("s2.trio", &zfs, &mg3.name, &[&sl1_sw1.end_a])?; - println!("start zone t1"); + println_nopipe!("start zone t1"); let t1 = RouterZone::transit( "t1.trio", &zfs, @@ -433,7 +439,7 @@ async fn test_trio() -> Result<()> { "trio", )?; - println!("waiting for zones to come up"); + println_nopipe!("waiting for zones to come up"); sleep(Duration::from_secs(10)); sidecar.setup()?; @@ -460,7 +466,7 @@ async fn run_trio_tests( wait_for_eq!(s2.get_peers().await.map_or(99, |x| x.len()), 1); wait_for_eq!(t1.get_peers().await.map_or(99, |x| x.len()), 2); - println!("initial peering test passed"); + println_nopipe!("initial peering test passed"); s1.advertise_prefixes(&vec!["fd00:1::/64".parse().unwrap()]) .await?; @@ -469,7 +475,7 @@ async fn run_trio_tests( wait_for_eq!(prefix_count(&s2).await?, 1); wait_for_eq!(prefix_count(&t1).await?, 1); - println!("advertise from one passed"); + println_nopipe!("advertise from one passed"); s2.advertise_prefixes(&vec!["fd00:2::/64".parse().unwrap()]) .await?; @@ -478,12 +484,12 @@ async fn run_trio_tests( wait_for_eq!(prefix_count(&s2).await?, 1); wait_for_eq!(prefix_count(&t1).await?, 2); - println!("advertise from two passed"); + println_nopipe!("advertise from two passed"); retry_cmd!(zs1.zexec("ping fd00:2::1"), 1, 10); retry_cmd!(zs2.zexec("ping fd00:1::1"), 1, 10); - println!("connectivity test passed"); + println_nopipe!("connectivity test passed"); zt1.stop_router()?; wait_for_eq!(prefix_count(&s1).await?, 0); @@ -495,7 +501,7 @@ async fn run_trio_tests( retry_cmd!(zs1.zexec("ping fd00:2::1"), 1, 10); retry_cmd!(zs2.zexec("ping fd00:1::1"), 1, 10); - println!("transit router restart passed"); + println_nopipe!("transit router restart passed"); softnpu_dump!(softnpu); zs1.stop_router()?; @@ -537,7 +543,7 @@ async fn run_trio_tests( ); retry_cmd!(zs2.zexec("ping fd00:1::1"), 1, 10); - println!("server router restart passed"); + println_nopipe!("server router restart passed"); let peers = t1.get_peers().await?; let p0: Ipv6Addr = peers @@ -565,7 +571,7 @@ async fn run_trio_tests( wait_for_eq!(prefix_count(&s2).await?, 1); wait_for_eq!(prefix_count(&t1).await?, 2); - println!("peer expiration recovery passed"); + println_nopipe!("peer expiration recovery passed"); s2.advertise_prefixes(&vec![ "fd00:2::/64".parse().unwrap(), @@ -580,7 +586,7 @@ async fn run_trio_tests( let kernel_count = zs1.zexec("netstat -nrf inet6 | grep fd00 | wc -l")?; assert_eq!(kernel_count, "3"); - println!("redundant advertise passed"); + println_nopipe!("redundant advertise passed"); wait_for_eq!(tunnel_originated_endpoint_count(&t1).await?, 0); @@ -597,7 +603,7 @@ async fn run_trio_tests( wait_for_eq!(tunnel_endpoint_count(&s1).await?, 1); wait_for_eq!(tunnel_endpoint_count(&s2).await?, 1); - println!("tunnel endpoint advertise passed"); + println_nopipe!("tunnel endpoint advertise passed"); // redundant advertise should not change things @@ -616,7 +622,7 @@ async fn run_trio_tests( wait_for_eq!(tunnel_endpoint_count(&s1).await?, 1); wait_for_eq!(tunnel_endpoint_count(&s2).await?, 1); - println!("redundant tunnel endpoint advertise passed"); + println_nopipe!("redundant tunnel endpoint advertise passed"); zs1.stop_router()?; sleep(Duration::from_secs(5)); @@ -625,7 +631,7 @@ async fn run_trio_tests( let s1 = Client::new("http://10.0.0.1:8000", log.clone()); wait_for_eq!(tunnel_endpoint_count(&s1).await?, 1); - println!("tunnel router restart passed"); + println_nopipe!("tunnel router restart passed"); t1.withdraw_tunnel_endpoints(&vec![TunnelOrigin { overlay_prefix: "203.0.113.0/24".parse().unwrap(), @@ -640,7 +646,7 @@ async fn run_trio_tests( wait_for_eq!(tunnel_endpoint_count(&s1).await?, 0); wait_for_eq!(tunnel_endpoint_count(&s2).await?, 0); - println!("tunnel endpoint withdraw passed"); + println_nopipe!("tunnel endpoint withdraw passed"); Ok(()) } @@ -704,16 +710,16 @@ async fn test_quartet() -> Result<()> { "quartet", )?; - println!("start zone s1"); + println_nopipe!("start zone s1"); let s1 = RouterZone::server("s1.quartet", &zfs, &mgs1.name, &[&sl0_sw0.end_a])?; - println!("start zone s2"); + println_nopipe!("start zone s2"); let s2 = RouterZone::server("s2.quartet", &zfs, &mgs2.name, &[&sl1_sw1.end_a])?; - println!("start zone s3"); + println_nopipe!("start zone s3"); let s3 = RouterZone::server("s3.quartet", &zfs, &mgs3.name, &[&sl2_sw2.end_a])?; - println!("start zone t1"); + println_nopipe!("start zone t1"); let t1 = RouterZone::transit( "t1.quartet", &zfs, @@ -722,7 +728,7 @@ async fn test_quartet() -> Result<()> { "quartet", )?; - println!("waiting for zones to come up"); + println_nopipe!("waiting for zones to come up"); sleep(Duration::from_secs(10)); sidecar.setup()?; @@ -754,7 +760,7 @@ async fn run_quartet_tests( wait_for_eq!(s3.get_peers().await.map_or(99, |x| x.len()), 1); wait_for_eq!(t1.get_peers().await.map_or(99, |x| x.len()), 3); - println!("initial peering test passed"); + println_nopipe!("initial peering test passed"); s1.advertise_prefixes(&vec!["fd00:1::/64".parse().unwrap()]) .await?; diff --git a/xtask/Cargo.toml b/xtask/Cargo.toml index 0b38f98c..428a20b8 100644 --- a/xtask/Cargo.toml +++ b/xtask/Cargo.toml @@ -6,4 +6,5 @@ edition = "2024" [dependencies] anyhow.workspace = true clap.workspace = true +mg-common.workspace = true tokio.workspace = true diff --git a/xtask/src/main.rs b/xtask/src/main.rs index 66ca436e..96726a49 100644 --- a/xtask/src/main.rs +++ b/xtask/src/main.rs @@ -5,6 +5,7 @@ // Copyright 2025 Oxide Computer Company use clap::{Parser, Subcommand}; +use mg_common::eprintln_nopipe; mod external; @@ -34,7 +35,7 @@ async fn main() { XtaskCommands::Openapi(external) => external .exec_bin("maghemite-dropshot-apis", "maghemite-dropshot-apis"), } { - eprintln!("failed: {e}"); + eprintln_nopipe!("failed: {e}"); std::process::exit(-1); } }