From 76df41dd48574d3a5bfbc475ea3b403f7a0a69be Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Tue, 2 Jun 2026 20:06:47 +0300 Subject: [PATCH 1/3] p2p: rename backend_timeouts to backend_config --- dns-server/src/main.rs | 2 +- node-lib/src/config_files/p2p.rs | 2 +- .../src/block_announcement.rs | 2 +- p2p/src/config.rs | 10 +++---- p2p/src/lib.rs | 2 +- p2p/src/net/default_backend/backend.rs | 2 +- .../default_backend/peer/handshake_handler.rs | 2 +- p2p/src/net/default_backend/peer/mod.rs | 4 +-- .../tests/addr_list_response_caching.rs | 2 +- p2p/src/peer_manager/tests/addresses.rs | 2 +- p2p/src/peer_manager/tests/ban.rs | 4 +-- p2p/src/peer_manager/tests/connections.rs | 30 +++++++++---------- p2p/src/peer_manager/tests/discouragement.rs | 4 +-- p2p/src/peer_manager/tests/eviction.rs | 2 +- p2p/src/peer_manager/tests/peer_types.rs | 2 +- p2p/src/peer_manager/tests/ping.rs | 2 +- p2p/src/peer_manager/tests/whitelist.rs | 2 +- p2p/src/sync/tests/block_announcement.rs | 6 ++-- p2p/src/sync/tests/block_response.rs | 6 ++-- p2p/src/sync/tests/header_list_request.rs | 2 +- p2p/src/sync/tests/header_list_response.rs | 2 +- p2p/src/sync/tests/network_sync.rs | 8 ++--- p2p/src/sync/tests/tx_announcement.rs | 4 +-- p2p/src/test_helpers.rs | 8 ++--- p2p/src/tests/bad_time_diff.rs | 4 +-- .../connection_lockup_when_socket_not_read.rs | 2 +- p2p/src/tests/min_peer_software_version.rs | 2 +- p2p/src/tests/peer_discovery_on_stale_tip.rs | 2 +- wallet/wallet-node-client/tests/call_tests.rs | 2 +- wallet/wallet-test-node/src/lib.rs | 2 +- 30 files changed, 63 insertions(+), 63 deletions(-) diff --git a/dns-server/src/main.rs b/dns-server/src/main.rs index b87af0e3b..d7dd4ae15 100644 --- a/dns-server/src/main.rs +++ b/dns-server/src/main.rs @@ -85,7 +85,7 @@ async fn run(options: DnsServerRunOptions) -> anyhow::Result { sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); diff --git a/node-lib/src/config_files/p2p.rs b/node-lib/src/config_files/p2p.rs index 058eba819..d982326a0 100644 --- a/node-lib/src/config_files/p2p.rs +++ b/node-lib/src/config_files/p2p.rs @@ -186,7 +186,7 @@ impl From for P2pConfig { min_peer_software_version, }, - backend_timeouts: BackendTimeoutsConfig { + backend_config: BackendConfig { outbound_connection_timeout: outbound_connection_timeout .map(|t| Duration::from_secs(t.into())) .into(), diff --git a/p2p/backend-test-suite/src/block_announcement.rs b/p2p/backend-test-suite/src/block_announcement.rs index 995286278..1ba2bf3c8 100644 --- a/p2p/backend-test-suite/src/block_announcement.rs +++ b/p2p/backend-test-suite/src/block_announcement.rs @@ -193,7 +193,7 @@ where sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let shutdown = Arc::new(SeqCstAtomicBool::new(false)); diff --git a/p2p/src/config.rs b/p2p/src/config.rs index dec9d23f1..aede13703 100644 --- a/p2p/src/config.rs +++ b/p2p/src/config.rs @@ -121,8 +121,8 @@ pub struct P2pConfig { /// Various limits related to the protocol; these should only be overridden in tests. pub protocol_config: ProtocolConfig, - /// Various timeouts used by the backend. - pub backend_timeouts: BackendTimeoutsConfig, + /// Various settings specific to the backend. + pub backend_config: BackendConfig, /// If set, this text will be sent to banned peers as part of the DisconnectionReason. pub custom_disconnection_reason_for_banning: Option, @@ -134,13 +134,13 @@ impl P2pConfig { /// It is calculated as the max clock diff setting plus handshake timeout to allow for /// imprecisions caused by the network latency. pub fn effective_max_clock_diff(&self) -> Duration { - *self.max_clock_diff + *self.backend_timeouts.peer_handshake_timeout + *self.max_clock_diff + *self.backend_config.peer_handshake_timeout } } -/// Part of P2pConfig containing various timeouts used by the backend. +/// Part of P2pConfig containing various settings specific to the backend. #[derive(Default, Debug, Clone)] -pub struct BackendTimeoutsConfig { +pub struct BackendConfig { /// The outbound connection timeout value. pub outbound_connection_timeout: OutboundConnectionTimeout, diff --git a/p2p/src/lib.rs b/p2p/src/lib.rs index e9ed269f0..eaccd5fdc 100644 --- a/p2p/src/lib.rs +++ b/p2p/src/lib.rs @@ -344,7 +344,7 @@ pub fn make_p2p( // // For more details, see `Peer::handshake` and `P2pConfig`. assert_eq!( - *p2p_config.backend_timeouts.peer_handshake_timeout, + *p2p_config.backend_config.peer_handshake_timeout, Duration::from_secs(10), "Handshake timeout changed" ); diff --git a/p2p/src/net/default_backend/backend.rs b/p2p/src/net/default_backend/backend.rs index feb70ef19..3c7e24a80 100644 --- a/p2p/src/net/default_backend/backend.rs +++ b/p2p/src/net/default_backend/backend.rs @@ -732,7 +732,7 @@ where local_services_override, } => { let connection_fut = timeout( - *self.p2p_config.backend_timeouts.outbound_connection_timeout, + *self.p2p_config.backend_config.outbound_connection_timeout, self.transport.connect(address.socket_addr()), ); diff --git a/p2p/src/net/default_backend/peer/handshake_handler.rs b/p2p/src/net/default_backend/peer/handshake_handler.rs index 0c05b12ed..451127885 100644 --- a/p2p/src/net/default_backend/peer/handshake_handler.rs +++ b/p2p/src/net/default_backend/peer/handshake_handler.rs @@ -328,7 +328,7 @@ impl HandshakeHandler { socket_writer: &mut MessageWriter, ) -> crate::Result { // handshake with remote peer and send peer's info to backend - let handshake_timeout = *self.p2p_config.backend_timeouts.peer_handshake_timeout; + let handshake_timeout = *self.p2p_config.backend_config.peer_handshake_timeout; let handshake_res = timeout( handshake_timeout, self.handshake_impl(peer_event_sender, socket_reader, socket_writer), diff --git a/p2p/src/net/default_backend/peer/mod.rs b/p2p/src/net/default_backend/peer/mod.rs index 39325b370..3466415a2 100644 --- a/p2p/src/net/default_backend/peer/mod.rs +++ b/p2p/src/net/default_backend/peer/mod.rs @@ -356,7 +356,7 @@ where match send_result { Ok(()) => { let disconnect_result = tokio::time::timeout( - *p2p_config.backend_timeouts.disconnection_timeout, + *p2p_config.backend_config.disconnection_timeout, async { match writer_event_receiver.recv().await { Some(WriterEvent::WriterClosed(result)) => { @@ -483,7 +483,7 @@ async fn writer_loop( } tokio::time::timeout( - *p2p_config.backend_timeouts.socket_write_timeout, + *p2p_config.backend_config.socket_write_timeout, socket_writer.send(*message), ) .await diff --git a/p2p/src/peer_manager/tests/addr_list_response_caching.rs b/p2p/src/peer_manager/tests/addr_list_response_caching.rs index 11d1ba164..a92eb6dd2 100644 --- a/p2p/src/peer_manager/tests/addr_list_response_caching.rs +++ b/p2p/src/peer_manager/tests/addr_list_response_caching.rs @@ -264,7 +264,7 @@ fn make_p2p_config() -> P2pConfig { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), } } diff --git a/p2p/src/peer_manager/tests/addresses.rs b/p2p/src/peer_manager/tests/addresses.rs index bd710b2bb..fa737d987 100644 --- a/p2p/src/peer_manager/tests/addresses.rs +++ b/p2p/src/peer_manager/tests/addresses.rs @@ -735,7 +735,7 @@ async fn dont_use_dns_seed_if_connections_exist(#[case] seed: Seed) { sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let (cmd_sender, mut cmd_receiver) = tokio::sync::mpsc::unbounded_channel(); diff --git a/p2p/src/peer_manager/tests/ban.rs b/p2p/src/peer_manager/tests/ban.rs index d6a064aad..4cee165e1 100644 --- a/p2p/src/peer_manager/tests/ban.rs +++ b/p2p/src/peer_manager/tests/ban.rs @@ -419,7 +419,7 @@ async fn banned_address_is_not_announced(#[case] seed: Seed) { sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); @@ -545,7 +545,7 @@ async fn banned_address_not_in_addr_response(#[case] seed: Seed) { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); diff --git a/p2p/src/peer_manager/tests/connections.rs b/p2p/src/peer_manager/tests/connections.rs index bcea4efda..5ed8b56fa 100644 --- a/p2p/src/peer_manager/tests/connections.rs +++ b/p2p/src/peer_manager/tests/connections.rs @@ -48,7 +48,7 @@ use utils_networking::IpOrSocketAddress; use crate::{ PeerManagerEvent, - config::{BackendTimeoutsConfig, P2pConfig}, + config::{BackendConfig, P2pConfig}, disconnection_reason::DisconnectionReason, error::{ConnectionValidationError, DialError, P2pError, ProtocolError}, message::AddrListRequest, @@ -1004,7 +1004,7 @@ async fn connection_timeout_rpc_notified( let config = Arc::new(config::create_unit_test_config()); let p2p_config = Arc::new(P2pConfig { - backend_timeouts: BackendTimeoutsConfig { + backend_config: BackendConfig { outbound_connection_timeout: Duration::from_secs(1).into(), peer_handshake_timeout: Default::default(), disconnection_timeout: Default::default(), @@ -1162,7 +1162,7 @@ where sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let (peer_mgr_event_sender, _shutdown_sender, _subscribers_sender) = run_peer_manager::( @@ -1207,7 +1207,7 @@ where sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let (peer_mgr_event_sender, _shutdown_sender, _subscribers_sender) = run_peer_manager::( @@ -1332,7 +1332,7 @@ where user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let (peer_mgr_event_sender1, _shutdown_sender, _subscribers_sender) = run_peer_manager::( @@ -1377,7 +1377,7 @@ where user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let (peer_mgr_event_sender2, _shutdown_sender, _subscribers_sender) = run_peer_manager::( @@ -1409,7 +1409,7 @@ where user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let (peer_mgr_event_sender3, _shutdown_sender, _subscribers_sender) = run_peer_manager::( @@ -1554,7 +1554,7 @@ async fn discovered_node_2_groups(#[case] seed: Seed) { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let (peer_mgr_event_sender1, _shutdown_sender, _subscribers_sender) = @@ -1600,7 +1600,7 @@ async fn discovered_node_2_groups(#[case] seed: Seed) { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let (peer_mgr_event_sender2, _shutdown_sender, _subscribers_sender) = @@ -1633,7 +1633,7 @@ async fn discovered_node_2_groups(#[case] seed: Seed) { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let (peer_mgr_event_sender3, _shutdown_sender, _subscribers_sender) = @@ -1730,7 +1730,7 @@ async fn discovered_node_separate_groups(#[case] seed: Seed) { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let (peer_mgr_event_sender1, _shutdown_sender, _subscribers_sender) = @@ -1776,7 +1776,7 @@ async fn discovered_node_separate_groups(#[case] seed: Seed) { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let (peer_mgr_event_sender2, _shutdown_sender, _subscribers_sender) = @@ -1809,7 +1809,7 @@ async fn discovered_node_separate_groups(#[case] seed: Seed) { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let (peer_mgr_event_sender3, _shutdown_sender, _subscribers_sender) = @@ -2125,7 +2125,7 @@ mod feeler_connections_test_utils { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), } } @@ -2212,7 +2212,7 @@ async fn reject_connection_to_existing_ip(#[case] seed: Seed) { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); diff --git a/p2p/src/peer_manager/tests/discouragement.rs b/p2p/src/peer_manager/tests/discouragement.rs index 69221f17a..eee7c866b 100644 --- a/p2p/src/peer_manager/tests/discouragement.rs +++ b/p2p/src/peer_manager/tests/discouragement.rs @@ -492,7 +492,7 @@ async fn discouraged_address_is_not_announced(#[case] seed: Seed) { sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); @@ -617,7 +617,7 @@ async fn discouraged_address_not_in_addr_response(#[case] seed: Seed) { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); diff --git a/p2p/src/peer_manager/tests/eviction.rs b/p2p/src/peer_manager/tests/eviction.rs index 7462342c5..a028c86b2 100644 --- a/p2p/src/peer_manager/tests/eviction.rs +++ b/p2p/src/peer_manager/tests/eviction.rs @@ -141,7 +141,7 @@ mod dont_evict_if_blocks_in_flight { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); diff --git a/p2p/src/peer_manager/tests/peer_types.rs b/p2p/src/peer_manager/tests/peer_types.rs index bbda9e200..94368c714 100644 --- a/p2p/src/peer_manager/tests/peer_types.rs +++ b/p2p/src/peer_manager/tests/peer_types.rs @@ -71,7 +71,7 @@ fn validate_services(#[case] seed: Seed) { sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); diff --git a/p2p/src/peer_manager/tests/ping.rs b/p2p/src/peer_manager/tests/ping.rs index 0f27716e5..bf543fb6e 100644 --- a/p2p/src/peer_manager/tests/ping.rs +++ b/p2p/src/peer_manager/tests/ping.rs @@ -76,7 +76,7 @@ async fn ping_timeout(#[case] seed: Seed) { sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let ping_check_period = *p2p_config.ping_check_period; diff --git a/p2p/src/peer_manager/tests/whitelist.rs b/p2p/src/peer_manager/tests/whitelist.rs index 598b15618..88b72a489 100644 --- a/p2p/src/peer_manager/tests/whitelist.rs +++ b/p2p/src/peer_manager/tests/whitelist.rs @@ -72,7 +72,7 @@ fn p2p_config_with_whitelisted(whitelisted_addresses: Vec) -> P2pConfig sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), } } diff --git a/p2p/src/sync/tests/block_announcement.rs b/p2p/src/sync/tests/block_announcement.rs index 644de28ea..ff141429f 100644 --- a/p2p/src/sync/tests/block_announcement.rs +++ b/p2p/src/sync/tests/block_announcement.rs @@ -540,7 +540,7 @@ async fn send_headers_connected_to_previously_sent_headers(#[case] seed: Seed) { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); @@ -643,7 +643,7 @@ async fn send_headers_connected_to_block_which_is_being_downloaded(#[case] seed: user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); @@ -743,7 +743,7 @@ async fn correct_pending_headers_update(#[case] seed: Seed) { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); diff --git a/p2p/src/sync/tests/block_response.rs b/p2p/src/sync/tests/block_response.rs index d970f3d49..4822664c3 100644 --- a/p2p/src/sync/tests/block_response.rs +++ b/p2p/src/sync/tests/block_response.rs @@ -174,7 +174,7 @@ async fn block_responses_in_wrong_order(#[case] seed: Seed) { sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); @@ -302,7 +302,7 @@ async fn disconnect(#[case] seed: Seed) { user_agent: "test".try_into().unwrap(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let mut node = TestNode::builder(protocol_version) @@ -369,7 +369,7 @@ async fn slow_response(#[case] seed: Seed) { user_agent: mintlayer_core_user_agent(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); diff --git a/p2p/src/sync/tests/header_list_request.rs b/p2p/src/sync/tests/header_list_request.rs index 361428223..65d3b8a64 100644 --- a/p2p/src/sync/tests/header_list_request.rs +++ b/p2p/src/sync/tests/header_list_request.rs @@ -159,7 +159,7 @@ async fn respond_with_empty_header_list_when_in_ibd() { user_agent: mintlayer_core_user_agent(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); diff --git a/p2p/src/sync/tests/header_list_response.rs b/p2p/src/sync/tests/header_list_response.rs index 4cf8b4b12..b2d96a0c1 100644 --- a/p2p/src/sync/tests/header_list_response.rs +++ b/p2p/src/sync/tests/header_list_response.rs @@ -239,7 +239,7 @@ async fn disconnect() { user_agent: "test".try_into().unwrap(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let mut node = TestNode::builder(protocol_version) diff --git a/p2p/src/sync/tests/network_sync.rs b/p2p/src/sync/tests/network_sync.rs index ffac1a09f..a8e42fe68 100644 --- a/p2p/src/sync/tests/network_sync.rs +++ b/p2p/src/sync/tests/network_sync.rs @@ -36,7 +36,7 @@ use utils::atomics::SeqCstAtomicU64; use crate::{ PeerManagerEvent, - config::{BackendTimeoutsConfig, P2pConfig}, + config::{BackendConfig, P2pConfig}, message::{BlockListRequest, BlockResponse, BlockSyncMessage, HeaderList, HeaderListRequest}, protocol::ProtocolConfig, sync::tests::helpers::{ @@ -85,7 +85,7 @@ async fn basic(#[case] seed: Seed) { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); @@ -327,7 +327,7 @@ async fn block_announcement_disconnected_headers(#[case] seed: Seed) { user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); @@ -429,7 +429,7 @@ async fn send_block_from_the_future_again(#[case] seed: Seed) { let p2p_config = Arc::new(P2pConfig { // Minimize the time block sync manager spends in wait_for_clock_diff. max_clock_diff: Duration::from_secs(1).into(), - backend_timeouts: BackendTimeoutsConfig { + backend_config: BackendConfig { peer_handshake_timeout: Duration::from_secs(1).into(), outbound_connection_timeout: Default::default(), disconnection_timeout: Default::default(), diff --git a/p2p/src/sync/tests/tx_announcement.rs b/p2p/src/sync/tests/tx_announcement.rs index 89634f554..8fcd27c09 100644 --- a/p2p/src/sync/tests/tx_announcement.rs +++ b/p2p/src/sync/tests/tx_announcement.rs @@ -188,7 +188,7 @@ async fn no_transaction_service(#[case] seed: Seed) { sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let mut node = TestNode::builder(protocol_version) @@ -263,7 +263,7 @@ async fn too_many_announcements(#[case] seed: Seed) { user_agent: "test".try_into().unwrap(), sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); let mut node = TestNode::builder(protocol_version) diff --git a/p2p/src/test_helpers.rs b/p2p/src/test_helpers.rs index f1efcfafa..eee9478b3 100644 --- a/p2p/src/test_helpers.rs +++ b/p2p/src/test_helpers.rs @@ -195,7 +195,7 @@ pub fn test_p2p_config() -> P2pConfig { sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), } } @@ -219,7 +219,7 @@ pub fn test_p2p_config_with_protocol_config(protocol_config: ProtocolConfig) -> user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), } } @@ -243,7 +243,7 @@ pub fn test_p2p_config_with_peer_mgr_config(peer_manager_config: PeerManagerConf user_agent: mintlayer_core_user_agent(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), } } @@ -292,7 +292,7 @@ pub fn test_p2p_config_with_ban_config(ban_config: BanConfig) -> P2pConfig { sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), } } diff --git a/p2p/src/tests/bad_time_diff.rs b/p2p/src/tests/bad_time_diff.rs index 34c88b7e1..11a73a1ca 100644 --- a/p2p/src/tests/bad_time_diff.rs +++ b/p2p/src/tests/bad_time_diff.rs @@ -73,7 +73,7 @@ where sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); @@ -209,7 +209,7 @@ where sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }); diff --git a/p2p/src/tests/connection_lockup_when_socket_not_read.rs b/p2p/src/tests/connection_lockup_when_socket_not_read.rs index 19830eda6..736a86040 100644 --- a/p2p/src/tests/connection_lockup_when_socket_not_read.rs +++ b/p2p/src/tests/connection_lockup_when_socket_not_read.rs @@ -285,7 +285,7 @@ async fn timeout_when_socket_not_read( let time_getter = BasicTestTimeGetter::new(); let chain_config = Arc::new(chain::config::create_unit_test_config()); let p2p_config = Arc::new(P2pConfig { - backend_timeouts: BackendTimeoutsConfig { + backend_config: BackendConfig { socket_write_timeout: socket_write_timeout.into(), disconnection_timeout: disconnection_timeout.into(), diff --git a/p2p/src/tests/min_peer_software_version.rs b/p2p/src/tests/min_peer_software_version.rs index 7f914e462..53818314d 100644 --- a/p2p/src/tests/min_peer_software_version.rs +++ b/p2p/src/tests/min_peer_software_version.rs @@ -144,7 +144,7 @@ fn make_p2p_config(test_params: &TestParams) -> P2pConfig { allow_discover_private_ips: Default::default(), sync_stalling_timeout: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), } } diff --git a/p2p/src/tests/peer_discovery_on_stale_tip.rs b/p2p/src/tests/peer_discovery_on_stale_tip.rs index 91f417adb..e98cd156a 100644 --- a/p2p/src/tests/peer_discovery_on_stale_tip.rs +++ b/p2p/src/tests/peer_discovery_on_stale_tip.rs @@ -519,7 +519,7 @@ pub fn make_p2p_config(peer_manager_config: PeerManagerConfig) -> P2pConfig { allow_discover_private_ips: Default::default(), user_agent: mintlayer_core_user_agent(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), } } diff --git a/wallet/wallet-node-client/tests/call_tests.rs b/wallet/wallet-node-client/tests/call_tests.rs index 4714ba389..eaba0982c 100644 --- a/wallet/wallet-node-client/tests/call_tests.rs +++ b/wallet/wallet-node-client/tests/call_tests.rs @@ -68,7 +68,7 @@ pub async fn start_subsystems( sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }; let mempool_config = MempoolConfig::new(); diff --git a/wallet/wallet-test-node/src/lib.rs b/wallet/wallet-test-node/src/lib.rs index 3472c5534..f022cc0cd 100644 --- a/wallet/wallet-test-node/src/lib.rs +++ b/wallet/wallet-test-node/src/lib.rs @@ -179,7 +179,7 @@ pub async fn start_node(chain_config: Arc) -> (subsystem::Manager, sync_stalling_timeout: Default::default(), peer_manager_config: Default::default(), protocol_config: Default::default(), - backend_timeouts: Default::default(), + backend_config: Default::default(), custom_disconnection_reason_for_banning: Default::default(), }; let rpc_creds = RpcCreds::basic(RPC_USERNAME, RPC_PASSWORD).unwrap(); From 5b36fc39a5a22a456274a9c74f41626f557698e9 Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Tue, 2 Jun 2026 21:09:17 +0300 Subject: [PATCH 2/3] p2p: limit the number of pending inbound connections --- CHANGELOG.md | 2 + .../wrapped_transport/wrapped_listener.rs | 14 +- node-lib/src/config_files/p2p.rs | 3 +- p2p/src/config.rs | 16 +- p2p/src/net/default_backend/backend.rs | 116 +++++++-- p2p/src/net/default_backend/peer/mod.rs | 7 + p2p/src/net/default_backend/types.rs | 9 + p2p/src/peer_manager/tests/connections.rs | 1 + p2p/src/sync/tests/network_sync.rs | 1 + .../connection_lockup_when_socket_not_read.rs | 3 +- p2p/src/tests/helpers/mod.rs | 10 + p2p/src/tests/helpers/test_node.rs | 57 +++++ p2p/src/tests/mod.rs | 1 + .../pending_inbound_connections_limit.rs | 232 ++++++++++++++++++ 14 files changed, 450 insertions(+), 22 deletions(-) create mode 100644 p2p/src/tests/pending_inbound_connections_limit.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 91b6bb766..4e8ee268b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,8 @@ The format is loosely based on [Keep a Changelog](https://keepachangelog.com/en/ - Adding an already existing relayable local transaction to the mempool will cause p2p to re-announce it. - Transactions are now announced in batches at irregular intervals (previously, a random delay was added before each individual transaction announcement). + - Security improvements: + - The number of pending inbound connections is now limited. - Mempool: - Various optimizations were made. diff --git a/networking/src/transport/impls/stream_adapter/wrapped_transport/wrapped_listener.rs b/networking/src/transport/impls/stream_adapter/wrapped_transport/wrapped_listener.rs index f6ed14361..f2401aed7 100644 --- a/networking/src/transport/impls/stream_adapter/wrapped_transport/wrapped_listener.rs +++ b/networking/src/transport/impls/stream_adapter/wrapped_transport/wrapped_listener.rs @@ -21,6 +21,8 @@ use futures::{ stream::{FuturesUnordered, StreamExt}, }; +use logging::log; + use crate::{ Result, transport::{TransportListener, TransportSocket, impls::stream_adapter::traits::StreamAdapter}, @@ -63,7 +65,10 @@ impl, T: TransportSocket> TransportListener for Adap match handshake_res { (Ok(handshake), addr) => return Ok((handshake, addr)), (Err(err), addr) => { - logging::log::warn!("Handshake with {addr} failed: {err}"); + // Note: failed transport handshakes are peer-triggerable. In particular, + // a malicious peer can deliberately and repeatedly fail the Noise handshake, + // so we keep this at debug level to avoid flooding default logs. + log::debug!("Handshake with {addr} failed: {err}"); continue; }, } @@ -80,7 +85,12 @@ impl, T: TransportSocket> TransportListener for Adap self.handshakes.push(handshake_with_addr); }, Err(err) => { - logging::log::error!("Accept failed unexpectedly: {err}"); + // Note: this can also be peer-triggered, though less reliably than the + // handshake error above. E.g. a malicious peer may open TCP connections + // and close them immediately, attempting to flood the node's logs. + // But in any case, this error is propagated to the p2p backend, which logs + // it again, so there is no point in using a level higher than debug here. + log::debug!("Accept failed unexpectedly: {err}"); return Err(err); }, } diff --git a/node-lib/src/config_files/p2p.rs b/node-lib/src/config_files/p2p.rs index d982326a0..a343daf88 100644 --- a/node-lib/src/config_files/p2p.rs +++ b/node-lib/src/config_files/p2p.rs @@ -25,7 +25,7 @@ use serde::{Deserialize, Serialize}; use common::primitives::{semver::SemVer, user_agent::mintlayer_core_user_agent}; use p2p::{ ban_config::BanConfig, - config::{BackendTimeoutsConfig, NodeType, P2pConfig}, + config::{BackendConfig, NodeType, P2pConfig}, peer_manager::config::PeerManagerConfig, }; use utils_networking::IpOrSocketAddress; @@ -193,6 +193,7 @@ impl From for P2pConfig { peer_handshake_timeout: Default::default(), disconnection_timeout: Default::default(), socket_write_timeout: Default::default(), + max_pending_inbound_connections: Default::default(), }, protocol_config: Default::default(), custom_disconnection_reason_for_banning, diff --git a/p2p/src/config.rs b/p2p/src/config.rs index aede13703..3ddb55a9c 100644 --- a/p2p/src/config.rs +++ b/p2p/src/config.rs @@ -37,6 +37,7 @@ make_config_setting!(SyncStallingTimeout, Duration, Duration::from_secs(25)); make_config_setting!(PeerHandshakeTimeout, Duration, Duration::from_secs(10)); make_config_setting!(DisconnectionTimeout, Duration, Duration::from_secs(10)); make_config_setting!(SoketWriteTimeout, Duration, Duration::from_secs(60)); +make_config_setting!(MaxPendingInboundConnections, usize, 100); /// A node type. #[derive(Debug, Copy, Clone)] @@ -144,12 +145,21 @@ pub struct BackendConfig { /// The outbound connection timeout value. pub outbound_connection_timeout: OutboundConnectionTimeout, - /// Timeout for initial peer handshake + /// Timeout for initial peer handshake. pub peer_handshake_timeout: PeerHandshakeTimeout, - /// Timeout for disconnection + /// Timeout for disconnection. pub disconnection_timeout: DisconnectionTimeout, - /// Timeout for the socket write call + /// Timeout for the socket write call. pub socket_write_timeout: SoketWriteTimeout, + + /// The maximum number of pending inbound connections that can exist at the same time. + /// + /// Note: the presence of this limit is important for security, but its specific value is not - + /// it just defines backend's burst tolerance for transport-accepted inbound connections that + /// have not completed the P2P handshake yet. But it makes sense to keep it in the same order + /// as `MAX_CONCURRENT_HANDSHAKES` used by wrapped transport layer's `AdaptedListener`, which + /// limits the number of simultaneous transport-level inbound handshakes. + pub max_pending_inbound_connections: MaxPendingInboundConnections, } diff --git a/p2p/src/net/default_backend/backend.rs b/p2p/src/net/default_backend/backend.rs index 3c7e24a80..e10532078 100644 --- a/p2p/src/net/default_backend/backend.rs +++ b/p2p/src/net/default_backend/backend.rs @@ -39,8 +39,8 @@ use networking::{ use p2p_types::socket_address::SocketAddress; use randomness::{RngExt as _, make_pseudo_rng}; use utils::{ - atomics::SeqCstAtomicBool, eventhandler::EventsController, set_flag::SetFlag, - shallow_clone::ShallowClone, tokio_spawn_in_current_tracing_span, + atomics::SeqCstAtomicBool, debug_panic_or_log, eventhandler::EventsController, + set_flag::SetFlag, shallow_clone::ShallowClone, tokio_spawn_in_current_tracing_span, }; use crate::{ @@ -108,6 +108,7 @@ struct PeerContext { } /// Pending peer data (until handshake message is received) +#[derive(Debug)] struct PendingPeerContext { handle: tokio::task::JoinHandle<()>, @@ -149,6 +150,9 @@ pub struct Backend { /// Pending connections pending_peers: HashMap, + /// The number of inbound peers in `pending_peers` + pending_inbound_peer_count: usize, + /// Map of streams for receiving events from peers. peer_event_stream_map: StreamMap>, @@ -210,6 +214,7 @@ where syncing_event_sender, peers: HashMap::new(), pending_peers: HashMap::new(), + pending_inbound_peer_count: 0, peer_event_stream_map: StreamMap::new(), command_queue: FuturesUnordered::new(), shutdown, @@ -347,21 +352,51 @@ where res = self.socket.accept() => { match res { Ok((stream, address)) => { + // Note: this execution path is peer-triggerable, so we use debug level logging + // until after `pending_inbound_peer_count` has been checked, to prevent malicious + // peers from flooding the default logs. if !self.networking_enabled { - log::info!("Ignoring incoming connection from {address:?} because networking is disabled"); + log::debug!("Ignoring incoming connection from {address:?} because networking is disabled"); } else { - self.create_pending_peer( - stream, - PeerId::new(), - ConnectionInfo::Inbound, - address.into(), - )?; + let max_pending_inbound_conn_count = *self.p2p_config.backend_config.max_pending_inbound_connections; + + if self.pending_inbound_peer_count >= max_pending_inbound_conn_count { + log::debug!( + concat!( + "Ignoring incoming connection from {:?} because the maximum number ", + "of pending incoming connections has been reached {}" + ), + address, + max_pending_inbound_conn_count + ); + } else { + self.create_pending_peer( + stream, + PeerId::new(), + ConnectionInfo::Inbound, + address.into(), + )?; + } } }, Err(err) => { - // Just log the error and let the node continue working + // Note: this execution path is also peer-triggerable, though less reliably than the + // successful path above, so we use debug-level logging here too. See also a similar + // comment in `AdaptedListener::accept` in the transport layer. + // TODO: malicious peers can still flood the default logs making successful connections + // and dropping them immediately (though it will be less severe and more costly for the + // attacker). Consider: + // a) Using debug-level logging specifically for incoming connections (while keeping it + // at "info" for outbound ones) in all (or most) connectivity-related messages (e.g. + // "Peer disconnected", "Assigning peer id", "New peer accepted"). + // b) Implementing a rate-limiter for the log (e.g. using a custom tracing layer or filter), + // either based on the source code location (Bitcoin does this) or on the specified + // log target. + // c) (In the case when some errors have been omitted or printed via debug!), printing + // some kind of info/warn-level summary at regular intervals, e.g. "X incoming connections + // ignored in the last Y seconds". if self.networking_enabled { - log::error!("Accepting a new connection failed unexpectedly: {err}") + log::debug!("Accepting a new connection failed unexpectedly: {err}") } else { log::debug!( "Ignoring failed incoming connection because networking is disabled (err = {err})", @@ -426,7 +461,7 @@ where &format!("Peer[id={peer_id}]"), ); - self.pending_peers.insert( + self.insert_new_pending_peer( peer_id, PendingPeerContext { handle, @@ -456,7 +491,7 @@ where bind_address, connection_info, backend_event_sender, - } = match self.pending_peers.remove(&peer_id) { + } = match self.remove_pending_peer(peer_id) { Some(pending_peer) => pending_peer, // Could be removed if self-connection was detected earlier None => return Ok(()), @@ -568,7 +603,7 @@ where .map(|(peer_id, _pending)| *peer_id); if let Some(peer_id) = pending_outbound_peer_id { - let peer_ctx = self.pending_peers.remove(&peer_id).expect("peer must exist"); + let peer_ctx = self.remove_pending_peer(peer_id).expect("peer must exist"); log::info!( "self-connection detected on address {:?}", @@ -649,7 +684,7 @@ where } PeerEvent::ConnectionClosed => { - if let Some(pending_peer) = self.pending_peers.remove(&peer_id) { + if let Some(pending_peer) = self.remove_pending_peer(peer_id) { // Note: we'll get here if handshake has failed, so no need to use log levels // higher that debug, because the error should have been logged properly already. match pending_peer.connection_info { @@ -802,6 +837,57 @@ where Err(_) => log::error!("sending syncing event from the backend failed unexpectedly"), } } + + fn insert_new_pending_peer(&mut self, peer_id: PeerId, peer_context: PendingPeerContext) { + let is_inbound = peer_context.connection_info.is_inbound(); + + if let Some(old_context) = self.pending_peers.insert(peer_id, peer_context) { + debug_panic_or_log!( + "Pending peer context already exists for a new peer {peer_id}: {old_context:?}" + ); + } else if is_inbound { + self.pending_inbound_peer_count += 1; + + #[cfg(test)] + self.assert_pending_inbound_peer_count_consistency(); + } + + if let Some(observer) = &self.observer { + observer.on_pending_peer_created(peer_id); + } + } + + fn remove_pending_peer(&mut self, peer_id: PeerId) -> Option { + let peer_context = self.pending_peers.remove(&peer_id); + + if let Some(peer_context) = &peer_context { + if peer_context.connection_info.is_inbound() { + self.pending_inbound_peer_count -= 1; + + #[cfg(test)] + self.assert_pending_inbound_peer_count_consistency(); + } + + if let Some(observer) = &self.observer { + observer.on_pending_peer_removed(peer_id); + } + } + + peer_context + } + + #[cfg(test)] + fn assert_pending_inbound_peer_count_consistency(&self) { + let actual_pending_inbound_peer_count = self + .pending_peers + .values() + .filter(|context| context.connection_info.is_inbound()) + .count(); + assert_eq!( + self.pending_inbound_peer_count, + actual_pending_inbound_peer_count + ); + } } // Some boilerplate types and a function for blocking tasks handling diff --git a/p2p/src/net/default_backend/peer/mod.rs b/p2p/src/net/default_backend/peer/mod.rs index 3466415a2..0309df600 100644 --- a/p2p/src/net/default_backend/peer/mod.rs +++ b/p2p/src/net/default_backend/peer/mod.rs @@ -73,6 +73,13 @@ impl ConnectionInfo { ConnectionInfo::Outbound { .. } => ConnectionDirection::Outbound, } } + + pub fn is_inbound(&self) -> bool { + match self { + ConnectionInfo::Inbound => true, + ConnectionInfo::Outbound { .. } => false, + } + } } pub struct Peer { diff --git a/p2p/src/net/default_backend/types.rs b/p2p/src/net/default_backend/types.rs index bf327e5d6..d11303e31 100644 --- a/p2p/src/net/default_backend/types.rs +++ b/p2p/src/net/default_backend/types.rs @@ -417,6 +417,15 @@ pub trait BackendObserver { /// Called after the message has been read from the socket. fn on_message_read(&self, peer_id: PeerId, msg: &Message); + + /// Called after a new peer has been created (i.e. after the peer task has been spawned and + /// `PendingPeerContext` added to the corresponding collection). + fn on_pending_peer_created(&self, peer_id: PeerId); + + /// Called when a `PendingPeerContext` has been removed from the corresponding collection + /// (i.e. either the peer has completed the handshake successfully and is no longer pending, + /// or it failed the handshake and got disconnected). + fn on_pending_peer_removed(&self, peer_id: PeerId); } #[cfg(test)] diff --git a/p2p/src/peer_manager/tests/connections.rs b/p2p/src/peer_manager/tests/connections.rs index 5ed8b56fa..f97ad341d 100644 --- a/p2p/src/peer_manager/tests/connections.rs +++ b/p2p/src/peer_manager/tests/connections.rs @@ -1009,6 +1009,7 @@ async fn connection_timeout_rpc_notified( peer_handshake_timeout: Default::default(), disconnection_timeout: Default::default(), socket_write_timeout: Default::default(), + max_pending_inbound_connections: Default::default(), }, bind_addresses: Default::default(), diff --git a/p2p/src/sync/tests/network_sync.rs b/p2p/src/sync/tests/network_sync.rs index a8e42fe68..910c0e9e8 100644 --- a/p2p/src/sync/tests/network_sync.rs +++ b/p2p/src/sync/tests/network_sync.rs @@ -434,6 +434,7 @@ async fn send_block_from_the_future_again(#[case] seed: Seed) { outbound_connection_timeout: Default::default(), disconnection_timeout: Default::default(), socket_write_timeout: Default::default(), + max_pending_inbound_connections: Default::default(), }, bind_addresses: Default::default(), diff --git a/p2p/src/tests/connection_lockup_when_socket_not_read.rs b/p2p/src/tests/connection_lockup_when_socket_not_read.rs index 736a86040..b32194190 100644 --- a/p2p/src/tests/connection_lockup_when_socket_not_read.rs +++ b/p2p/src/tests/connection_lockup_when_socket_not_read.rs @@ -37,7 +37,7 @@ use test_utils::{ }; use crate::{ - config::{BackendTimeoutsConfig, P2pConfig}, + config::{BackendConfig, P2pConfig}, message::{HeaderList, HeaderListRequest}, net::{ default_backend::types::{HandshakeMessage, Message, MessageTag, P2pTimestamp}, @@ -291,6 +291,7 @@ async fn timeout_when_socket_not_read( outbound_connection_timeout: Default::default(), peer_handshake_timeout: Default::default(), + max_pending_inbound_connections: Default::default(), }, bind_addresses: Default::default(), diff --git a/p2p/src/tests/helpers/mod.rs b/p2p/src/tests/helpers/mod.rs index 78d40e9ff..c9496e2b3 100644 --- a/p2p/src/tests/helpers/mod.rs +++ b/p2p/src/tests/helpers/mod.rs @@ -149,6 +149,8 @@ impl PeerManagerObserver for PeerManagerObserverImpl { pub enum BackendNotification { MessageRead { peer_id: PeerId, message: Message }, MessageWritten { peer_id: PeerId, message: Message }, + PendingPeerCreated { peer_id: PeerId }, + PendingPeerRemoved { peer_id: PeerId }, } pub struct BackendObserverImpl { @@ -185,6 +187,14 @@ impl BackendObserver for BackendObserverImpl { message: message.clone(), }); } + + fn on_pending_peer_created(&self, peer_id: PeerId) { + self.send_notification(BackendNotification::PendingPeerCreated { peer_id }); + } + + fn on_pending_peer_removed(&self, peer_id: PeerId) { + self.send_notification(BackendNotification::PendingPeerRemoved { peer_id }); + } } #[derive(Debug, PartialEq, Eq)] diff --git a/p2p/src/tests/helpers/test_node.rs b/p2p/src/tests/helpers/test_node.rs index 2a9c36969..545e7e0d3 100644 --- a/p2p/src/tests/helpers/test_node.rs +++ b/p2p/src/tests/helpers/test_node.rs @@ -19,6 +19,7 @@ use std::{ sync::{Arc, Mutex}, }; +use logging::log; use tokio::{ sync::{ mpsc::{self}, @@ -375,6 +376,62 @@ where } } + pub async fn wait_for_next_pending_peer_creation_in_backend(&mut self) -> PeerId { + loop { + if let BackendNotification::PendingPeerCreated { peer_id } = + self.backend_notification_receiver.recv().await.unwrap() + { + return peer_id; + } + } + } + + pub async fn wait_for_next_pending_peer_removal_in_backend( + &mut self, + expected_peer_id: PeerId, + ) { + loop { + if let BackendNotification::PendingPeerRemoved { peer_id } = + self.backend_notification_receiver.recv().await.unwrap() + { + assert_eq!(peer_id, expected_peer_id); + return; + } + } + } + + pub async fn assert_no_pending_peer_creation_in_backend(&mut self) { + log::debug!("Asserting no pending peer creation in backend"); + + time::timeout(SHORT_TIMEOUT, async { + loop { + if let BackendNotification::PendingPeerCreated { .. } = + self.backend_notification_receiver.recv().await.unwrap() + { + break; + } + } + }) + .await + .unwrap_err(); + } + + pub async fn assert_no_pending_peer_removal_in_backend(&mut self) { + log::debug!("Asserting no pending peer removal in backend"); + + time::timeout(SHORT_TIMEOUT, async { + loop { + if let BackendNotification::PendingPeerRemoved { .. } = + self.backend_notification_receiver.recv().await.unwrap() + { + break; + } + } + }) + .await + .unwrap_err(); + } + pub async fn get_peers_info(&self) -> TestPeersInfo { query_peer_manager(&self.peer_mgr_event_sender, |peer_mgr| { TestPeersInfo::from_peer_mgr_peer_contexts(peer_mgr.peers()) diff --git a/p2p/src/tests/mod.rs b/p2p/src/tests/mod.rs index 62cb6502c..bf4f575f5 100644 --- a/p2p/src/tests/mod.rs +++ b/p2p/src/tests/mod.rs @@ -26,6 +26,7 @@ mod min_peer_software_version; mod misbehavior; mod peer_discovery_on_stale_tip; mod peer_mgr_events; +mod pending_inbound_connections_limit; mod same_handshake_nonce; mod unsupported_message; mod unsupported_version; diff --git a/p2p/src/tests/pending_inbound_connections_limit.rs b/p2p/src/tests/pending_inbound_connections_limit.rs new file mode 100644 index 000000000..803d47b38 --- /dev/null +++ b/p2p/src/tests/pending_inbound_connections_limit.rs @@ -0,0 +1,232 @@ +// Copyright (c) 2026 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use rstest::rstest; + +use chainstate::ChainstateConfig; +use common::primitives::user_agent::mintlayer_core_user_agent; +use networking::{ + test_helpers::{ + TestTransportChannel, TestTransportMaker, TestTransportNoise, TestTransportTcp, + }, + transport::{TransportSocket, new_message_stream}, +}; +use p2p_test_utils::{SHORT_TIMEOUT, run_with_timeout}; +use randomness::RngExt as _; +use test_utils::{ + BasicTestTimeGetter, assert_matches, + random::{Seed, make_seedable_rng}, +}; + +use crate::{ + config::{BackendConfig, P2pConfig}, + net::default_backend::types::{HandshakeMessage, Message, P2pTimestamp}, + test_helpers::TEST_PROTOCOL_VERSION, + tests::helpers::TestNode, +}; + +// Check that `BackendConfig::max_pending_inbound_connections` actually limits the number of pending +// incoming connections. +async fn pending_inbound_connections_limit(seed: Seed) +where + TTM: TestTransportMaker, + TTM::Transport: TransportSocket, +{ + let mut rng = make_seedable_rng(seed); + + let max_pending_inbound_connections = 5; + let good_peers_count = rng.random_range(1..=3); + + let time_getter = BasicTestTimeGetter::new(); + let chain_config = Arc::new(common::chain::config::create_unit_test_config()); + + let p2p_config = Arc::new(P2pConfig { + backend_config: BackendConfig { + max_pending_inbound_connections: max_pending_inbound_connections.into(), + + outbound_connection_timeout: Default::default(), + peer_handshake_timeout: Default::default(), + disconnection_timeout: Default::default(), + socket_write_timeout: Default::default(), + }, + + bind_addresses: Default::default(), + socks5_proxy: Default::default(), + disable_noise: Default::default(), + boot_nodes: Default::default(), + reserved_nodes: Default::default(), + whitelisted_addresses: Default::default(), + ban_config: Default::default(), + ping_check_period: Default::default(), + ping_timeout: Default::default(), + max_clock_diff: Default::default(), + node_type: Default::default(), + allow_discover_private_ips: Default::default(), + user_agent: mintlayer_core_user_agent(), + sync_stalling_timeout: Default::default(), + peer_manager_config: Default::default(), + protocol_config: Default::default(), + custom_disconnection_reason_for_banning: Default::default(), + }); + + let mut test_node = TestNode::::start( + true, + time_getter.clone(), + Arc::clone(&chain_config), + ChainstateConfig::new(), + Arc::clone(&p2p_config), + TTM::make_transport(), + TTM::make_address().into(), + TEST_PROTOCOL_VERSION.into(), + None, + make_seedable_rng(rng.random()), + ) + .await; + + let transport = TTM::make_transport(); + + let peer_time = P2pTimestamp::from_time(time_getter.get_time_getter().get_time()); + + let mut pending_peers_readers_writers = Vec::new(); + + // A `max_pending_inbound_connections` number of inbound peers connect, but don't initiate + // the handshake. Each peer is registered as pending. + for _ in 0..max_pending_inbound_connections { + let stream = transport.connect(test_node.local_address().socket_addr()).await.unwrap(); + let (msg_reader, msg_writer) = new_message_stream::<_, Message>( + stream, + Some(*p2p_config.protocol_config.max_message_size), + ); + let peer_id = test_node.wait_for_next_pending_peer_creation_in_backend().await; + + pending_peers_readers_writers.push((peer_id, msg_reader, msg_writer)); + } + + test_node.assert_no_pending_peer_removal_in_backend().await; + + // A few more inbound peers attempt to connect. These are not registered as pending and are + // dropped immediately because the limit for pending peers has been reached. + for _ in 0..3 { + let stream = transport.connect(test_node.local_address().socket_addr()).await.unwrap(); + let (mut msg_reader, _) = new_message_stream::<_, Message>( + stream, + Some(*p2p_config.protocol_config.max_message_size), + ); + + test_node.assert_no_pending_peer_creation_in_backend().await; + tokio::time::timeout(SHORT_TIMEOUT, msg_reader.recv()) + .await + .unwrap() + .unwrap_err(); + } + + let mut good_peers_readers_writers = Vec::new(); + + // A few of the pending peers decide to initiate the handshake, after which they're no longer pending. + for _ in 0..good_peers_count { + let peer_idx = rng.random_range(0..pending_peers_readers_writers.len()); + + let (peer_id, mut msg_reader, mut msg_writer) = + pending_peers_readers_writers.remove(peer_idx); + + msg_writer + .send(Message::Handshake(HandshakeMessage::Hello { + protocol_version: TEST_PROTOCOL_VERSION.into(), + network: *chain_config.magic_bytes(), + user_agent: p2p_config.user_agent.clone(), + software_version: *chain_config.software_version(), + services: (*p2p_config.node_type).into(), + receiver_address: None, + current_time: peer_time, + handshake_nonce: 0, + })) + .await + .unwrap(); + + let msg = msg_reader.recv().await.unwrap(); + assert_matches!(msg, Message::Handshake(HandshakeMessage::HelloAck { .. })); + + test_node.wait_for_next_pending_peer_removal_in_backend(peer_id).await; + + // Don't drop the readers/writers to make sure the peer continues to exist. + good_peers_readers_writers.push((peer_id, msg_reader, msg_writer)); + } + + // Since `good_peers_count` peers are no longer pending, the node will accept this number + // of new peers and they will be registered as pending. + for _ in 0..good_peers_count { + let stream = transport.connect(test_node.local_address().socket_addr()).await.unwrap(); + let (msg_reader, msg_writer) = new_message_stream::<_, Message>( + stream, + Some(*p2p_config.protocol_config.max_message_size), + ); + let peer_id = test_node.wait_for_next_pending_peer_creation_in_backend().await; + + pending_peers_readers_writers.push((peer_id, msg_reader, msg_writer)); + } + + // A few more inbound peers attempt to connect. These are not registered as pending and are + // dropped immediately because the limit for pending peers has been reached again. + for _ in 0..3 { + let stream = transport.connect(test_node.local_address().socket_addr()).await.unwrap(); + let (mut msg_reader, _) = new_message_stream::<_, Message>( + stream, + Some(*p2p_config.protocol_config.max_message_size), + ); + + test_node.assert_no_pending_peer_creation_in_backend().await; + tokio::time::timeout(SHORT_TIMEOUT, msg_reader.recv()) + .await + .unwrap() + .unwrap_err(); + } + + test_node.join().await; +} + +#[tracing::instrument(skip(seed))] +#[rstest] +#[case(Seed::from_entropy())] +#[trace] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pending_inbound_connections_limit_tcp(#[case] seed: Seed) { + run_with_timeout(pending_inbound_connections_limit::(seed)).await; +} + +#[tracing::instrument(skip(seed))] +#[rstest] +#[case(Seed::from_entropy())] +#[trace] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pending_inbound_connections_limit_channels(#[case] seed: Seed) { + run_with_timeout(pending_inbound_connections_limit::( + seed, + )) + .await; +} + +#[tracing::instrument(skip(seed))] +#[rstest] +#[case(Seed::from_entropy())] +#[trace] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pending_inbound_connections_limit_noise(#[case] seed: Seed) { + run_with_timeout(pending_inbound_connections_limit::( + seed, + )) + .await; +} From fe0d7b0aca967eac8d83e0e109d77d19a1321755 Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Thu, 4 Jun 2026 14:44:07 +0300 Subject: [PATCH 3/3] p2p: improve pending_inbound_connections_limit test --- p2p/src/tests/helpers/test_node.rs | 4 ++ .../pending_inbound_connections_limit.rs | 57 ++++++++++++++----- 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/p2p/src/tests/helpers/test_node.rs b/p2p/src/tests/helpers/test_node.rs index 545e7e0d3..be5b14f7c 100644 --- a/p2p/src/tests/helpers/test_node.rs +++ b/p2p/src/tests/helpers/test_node.rs @@ -377,6 +377,8 @@ where } pub async fn wait_for_next_pending_peer_creation_in_backend(&mut self) -> PeerId { + log::debug!("Waiting for next pending peer creation in backend"); + loop { if let BackendNotification::PendingPeerCreated { peer_id } = self.backend_notification_receiver.recv().await.unwrap() @@ -390,6 +392,8 @@ where &mut self, expected_peer_id: PeerId, ) { + log::debug!("Waiting for next pending peer removal in backend"); + loop { if let BackendNotification::PendingPeerRemoved { peer_id } = self.backend_notification_receiver.recv().await.unwrap() diff --git a/p2p/src/tests/pending_inbound_connections_limit.rs b/p2p/src/tests/pending_inbound_connections_limit.rs index 803d47b38..db5b8f599 100644 --- a/p2p/src/tests/pending_inbound_connections_limit.rs +++ b/p2p/src/tests/pending_inbound_connections_limit.rs @@ -13,17 +13,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use rstest::rstest; use chainstate::ChainstateConfig; use common::primitives::user_agent::mintlayer_core_user_agent; +use logging::log; use networking::{ test_helpers::{ TestTransportChannel, TestTransportMaker, TestTransportNoise, TestTransportTcp, }, - transport::{TransportSocket, new_message_stream}, + transport::{TransportListener as _, TransportSocket, new_message_stream}, }; use p2p_test_utils::{SHORT_TIMEOUT, run_with_timeout}; use randomness::RngExt as _; @@ -49,7 +50,7 @@ where let mut rng = make_seedable_rng(seed); let max_pending_inbound_connections = 5; - let good_peers_count = rng.random_range(1..=3); + let good_inbound_peers_count = rng.random_range(1..=3); let time_getter = BasicTestTimeGetter::new(); let chain_config = Arc::new(common::chain::config::create_unit_test_config()); @@ -57,9 +58,10 @@ where let p2p_config = Arc::new(P2pConfig { backend_config: BackendConfig { max_pending_inbound_connections: max_pending_inbound_connections.into(), + // Make sure pending connections don't time out during the test. + peer_handshake_timeout: Duration::from_secs(3600).into(), outbound_connection_timeout: Default::default(), - peer_handshake_timeout: Default::default(), disconnection_timeout: Default::default(), socket_write_timeout: Default::default(), }, @@ -101,10 +103,11 @@ where let peer_time = P2pTimestamp::from_time(time_getter.get_time_getter().get_time()); - let mut pending_peers_readers_writers = Vec::new(); + let mut pending_inbound_peers_readers_writers = Vec::new(); // A `max_pending_inbound_connections` number of inbound peers connect, but don't initiate // the handshake. Each peer is registered as pending. + log::debug!("Adding the first batch of inbound peers"); for _ in 0..max_pending_inbound_connections { let stream = transport.connect(test_node.local_address().socket_addr()).await.unwrap(); let (msg_reader, msg_writer) = new_message_stream::<_, Message>( @@ -113,13 +116,14 @@ where ); let peer_id = test_node.wait_for_next_pending_peer_creation_in_backend().await; - pending_peers_readers_writers.push((peer_id, msg_reader, msg_writer)); + pending_inbound_peers_readers_writers.push((peer_id, msg_reader, msg_writer)); } test_node.assert_no_pending_peer_removal_in_backend().await; // A few more inbound peers attempt to connect. These are not registered as pending and are // dropped immediately because the limit for pending peers has been reached. + log::debug!("Adding the second batch of inbound peers"); for _ in 0..3 { let stream = transport.connect(test_node.local_address().socket_addr()).await.unwrap(); let (mut msg_reader, _) = new_message_stream::<_, Message>( @@ -134,14 +138,35 @@ where .unwrap_err(); } - let mut good_peers_readers_writers = Vec::new(); + let mut outbound_peers_data = Vec::new(); + + // Sanity check: outbound connections still create pending peers. This will also ensure + // that the presence of pending outbound peers has no effect on the pending inbound peer + // limiting. + log::debug!("Adding outbound peers"); + for _ in 0..rng.random_range(1..=3) { + let mut listener = transport.bind(vec![TTM::make_address()]).await.unwrap(); + let connect_result_receiver = + test_node.start_connecting(listener.local_addresses().unwrap()[0].into()); + let (stream, _) = listener.accept().await.unwrap(); + let _peer_id = test_node.wait_for_next_pending_peer_creation_in_backend().await; + + // Avoid dropping the stream and the result receiver to ensure that the peer stays pending. + outbound_peers_data.push((stream, connect_result_receiver)); + } + + test_node.assert_no_pending_peer_removal_in_backend().await; + + let mut good_inbound_peers_readers_writers = Vec::new(); - // A few of the pending peers decide to initiate the handshake, after which they're no longer pending. - for _ in 0..good_peers_count { - let peer_idx = rng.random_range(0..pending_peers_readers_writers.len()); + // A few of the pending inbound peers decide to initiate the handshake, after which they're + // no longer pending. + log::debug!("Good inbound peers initiate handshake"); + for _ in 0..good_inbound_peers_count { + let peer_idx = rng.random_range(0..pending_inbound_peers_readers_writers.len()); let (peer_id, mut msg_reader, mut msg_writer) = - pending_peers_readers_writers.remove(peer_idx); + pending_inbound_peers_readers_writers.remove(peer_idx); msg_writer .send(Message::Handshake(HandshakeMessage::Hello { @@ -163,12 +188,13 @@ where test_node.wait_for_next_pending_peer_removal_in_backend(peer_id).await; // Don't drop the readers/writers to make sure the peer continues to exist. - good_peers_readers_writers.push((peer_id, msg_reader, msg_writer)); + good_inbound_peers_readers_writers.push((peer_id, msg_reader, msg_writer)); } - // Since `good_peers_count` peers are no longer pending, the node will accept this number + // Since `good_inbound_peers_count` peers are no longer pending, the node will accept this number // of new peers and they will be registered as pending. - for _ in 0..good_peers_count { + log::debug!("Adding the third batch of inbound peers"); + for _ in 0..good_inbound_peers_count { let stream = transport.connect(test_node.local_address().socket_addr()).await.unwrap(); let (msg_reader, msg_writer) = new_message_stream::<_, Message>( stream, @@ -176,11 +202,12 @@ where ); let peer_id = test_node.wait_for_next_pending_peer_creation_in_backend().await; - pending_peers_readers_writers.push((peer_id, msg_reader, msg_writer)); + pending_inbound_peers_readers_writers.push((peer_id, msg_reader, msg_writer)); } // A few more inbound peers attempt to connect. These are not registered as pending and are // dropped immediately because the limit for pending peers has been reached again. + log::debug!("Adding the fourth batch of inbound peers"); for _ in 0..3 { let stream = transport.connect(test_node.local_address().socket_addr()).await.unwrap(); let (mut msg_reader, _) = new_message_stream::<_, Message>(