Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ The format is loosely based on [Keep a Changelog](https://keepachangelog.com/en/
- Adding an already existing relayable local transaction to the mempool will cause p2p to re-announce it.
- Transactions are now announced in batches at irregular intervals (previously, a random delay was added
before each individual transaction announcement).
- Security improvements:
- The number of pending inbound connections is now limited.

- Mempool:
- Various optimizations were made.
Expand Down
2 changes: 1 addition & 1 deletion dns-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn run(options: DnsServerRunOptions) -> anyhow::Result<Never> {
sync_stalling_timeout: Default::default(),
peer_manager_config: Default::default(),
protocol_config: Default::default(),
backend_timeouts: Default::default(),
backend_config: Default::default(),
custom_disconnection_reason_for_banning: Default::default(),
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use futures::{
stream::{FuturesUnordered, StreamExt},
};

use logging::log;

use crate::{
Result,
transport::{TransportListener, TransportSocket, impls::stream_adapter::traits::StreamAdapter},
Expand Down Expand Up @@ -63,7 +65,10 @@ impl<S: StreamAdapter<T::Stream>, T: TransportSocket> TransportListener for Adap
match handshake_res {
(Ok(handshake), addr) => return Ok((handshake, addr)),
(Err(err), addr) => {
logging::log::warn!("Handshake with {addr} failed: {err}");
// Note: failed transport handshakes are peer-triggerable. In particular,
// a malicious peer can deliberately and repeatedly fail the Noise handshake,
// so we keep this at debug level to avoid flooding default logs.
log::debug!("Handshake with {addr} failed: {err}");
continue;
},
}
Expand All @@ -80,7 +85,12 @@ impl<S: StreamAdapter<T::Stream>, T: TransportSocket> TransportListener for Adap
self.handshakes.push(handshake_with_addr);
},
Err(err) => {
logging::log::error!("Accept failed unexpectedly: {err}");
// Note: this can also be peer-triggered, though less reliably than the
// handshake error above. E.g. a malicious peer may open TCP connections
// and close them immediately, attempting to flood the node's logs.
// But in any case, this error is propagated to the p2p backend, which logs
// it again, so there is no point in using a level higher than debug here.
log::debug!("Accept failed unexpectedly: {err}");
return Err(err);
},
}
Expand Down
5 changes: 3 additions & 2 deletions node-lib/src/config_files/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use serde::{Deserialize, Serialize};
use common::primitives::{semver::SemVer, user_agent::mintlayer_core_user_agent};
use p2p::{
ban_config::BanConfig,
config::{BackendTimeoutsConfig, NodeType, P2pConfig},
config::{BackendConfig, NodeType, P2pConfig},
peer_manager::config::PeerManagerConfig,
};
use utils_networking::IpOrSocketAddress;
Expand Down Expand Up @@ -186,13 +186,14 @@ impl From<P2pConfigFile> for P2pConfig {

min_peer_software_version,
},
backend_timeouts: BackendTimeoutsConfig {
backend_config: BackendConfig {
outbound_connection_timeout: outbound_connection_timeout
.map(|t| Duration::from_secs(t.into()))
.into(),
peer_handshake_timeout: Default::default(),
disconnection_timeout: Default::default(),
socket_write_timeout: Default::default(),
max_pending_inbound_connections: Default::default(),
},
protocol_config: Default::default(),
custom_disconnection_reason_for_banning,
Expand Down
2 changes: 1 addition & 1 deletion p2p/backend-test-suite/src/block_announcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ where
sync_stalling_timeout: Default::default(),
peer_manager_config: Default::default(),
protocol_config: Default::default(),
backend_timeouts: Default::default(),
backend_config: Default::default(),
custom_disconnection_reason_for_banning: Default::default(),
});
let shutdown = Arc::new(SeqCstAtomicBool::new(false));
Expand Down
26 changes: 18 additions & 8 deletions p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ make_config_setting!(SyncStallingTimeout, Duration, Duration::from_secs(25));
make_config_setting!(PeerHandshakeTimeout, Duration, Duration::from_secs(10));
make_config_setting!(DisconnectionTimeout, Duration, Duration::from_secs(10));
make_config_setting!(SoketWriteTimeout, Duration, Duration::from_secs(60));
make_config_setting!(MaxPendingInboundConnections, usize, 100);

/// A node type.
#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -121,8 +122,8 @@ pub struct P2pConfig {
/// Various limits related to the protocol; these should only be overridden in tests.
pub protocol_config: ProtocolConfig,

/// Various timeouts used by the backend.
pub backend_timeouts: BackendTimeoutsConfig,
/// Various settings specific to the backend.
pub backend_config: BackendConfig,

/// If set, this text will be sent to banned peers as part of the DisconnectionReason.
pub custom_disconnection_reason_for_banning: Option<String>,
Expand All @@ -134,22 +135,31 @@ impl P2pConfig {
/// It is calculated as the max clock diff setting plus handshake timeout to allow for
/// imprecisions caused by the network latency.
pub fn effective_max_clock_diff(&self) -> Duration {
*self.max_clock_diff + *self.backend_timeouts.peer_handshake_timeout
*self.max_clock_diff + *self.backend_config.peer_handshake_timeout
}
}

/// Part of P2pConfig containing various timeouts used by the backend.
/// Part of P2pConfig containing various settings specific to the backend.
#[derive(Default, Debug, Clone)]
pub struct BackendTimeoutsConfig {
pub struct BackendConfig {
/// The outbound connection timeout value.
pub outbound_connection_timeout: OutboundConnectionTimeout,

/// Timeout for initial peer handshake
/// Timeout for initial peer handshake.
pub peer_handshake_timeout: PeerHandshakeTimeout,

/// Timeout for disconnection
/// Timeout for disconnection.
pub disconnection_timeout: DisconnectionTimeout,

/// Timeout for the socket write call
/// Timeout for the socket write call.
pub socket_write_timeout: SoketWriteTimeout,

/// The maximum number of pending inbound connections that can exist at the same time.
///
/// Note: the presence of this limit is important for security, but its specific value is not -
/// it just defines backend's burst tolerance for transport-accepted inbound connections that
/// have not completed the P2P handshake yet. But it makes sense to keep it in the same order
/// as `MAX_CONCURRENT_HANDSHAKES` used by wrapped transport layer's `AdaptedListener`, which
/// limits the number of simultaneous transport-level inbound handshakes.
pub max_pending_inbound_connections: MaxPendingInboundConnections,
}
2 changes: 1 addition & 1 deletion p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ pub fn make_p2p<S: PeerDbStorage + 'static>(
//
// For more details, see `Peer::handshake` and `P2pConfig`.
assert_eq!(
*p2p_config.backend_timeouts.peer_handshake_timeout,
*p2p_config.backend_config.peer_handshake_timeout,
Duration::from_secs(10),
"Handshake timeout changed"
);
Expand Down
118 changes: 102 additions & 16 deletions p2p/src/net/default_backend/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use networking::{
use p2p_types::socket_address::SocketAddress;
use randomness::{RngExt as _, make_pseudo_rng};
use utils::{
atomics::SeqCstAtomicBool, eventhandler::EventsController, set_flag::SetFlag,
shallow_clone::ShallowClone, tokio_spawn_in_current_tracing_span,
atomics::SeqCstAtomicBool, debug_panic_or_log, eventhandler::EventsController,
set_flag::SetFlag, shallow_clone::ShallowClone, tokio_spawn_in_current_tracing_span,
};

use crate::{
Expand Down Expand Up @@ -108,6 +108,7 @@ struct PeerContext {
}

/// Pending peer data (until handshake message is received)
#[derive(Debug)]
struct PendingPeerContext {
handle: tokio::task::JoinHandle<()>,

Expand Down Expand Up @@ -149,6 +150,9 @@ pub struct Backend<T: TransportSocket> {
/// Pending connections
pending_peers: HashMap<PeerId, PendingPeerContext>,

/// The number of inbound peers in `pending_peers`
pending_inbound_peer_count: usize,

/// Map of streams for receiving events from peers.
peer_event_stream_map: StreamMap<PeerId, ReceiverStream<PeerEvent>>,

Expand Down Expand Up @@ -210,6 +214,7 @@ where
syncing_event_sender,
peers: HashMap::new(),
pending_peers: HashMap::new(),
pending_inbound_peer_count: 0,
peer_event_stream_map: StreamMap::new(),
command_queue: FuturesUnordered::new(),
shutdown,
Expand Down Expand Up @@ -347,21 +352,51 @@ where
res = self.socket.accept() => {
match res {
Ok((stream, address)) => {
// Note: this execution path is peer-triggerable, so we use debug level logging
// until after `pending_inbound_peer_count` has been checked, to prevent malicious
// peers from flooding the default logs.
if !self.networking_enabled {
log::info!("Ignoring incoming connection from {address:?} because networking is disabled");
log::debug!("Ignoring incoming connection from {address:?} because networking is disabled");
} else {
self.create_pending_peer(
stream,
PeerId::new(),
ConnectionInfo::Inbound,
address.into(),
)?;
let max_pending_inbound_conn_count = *self.p2p_config.backend_config.max_pending_inbound_connections;

if self.pending_inbound_peer_count >= max_pending_inbound_conn_count {
log::debug!(
concat!(
"Ignoring incoming connection from {:?} because the maximum number ",
"of pending incoming connections has been reached {}"
),
address,
max_pending_inbound_conn_count
);
} else {
self.create_pending_peer(
stream,
PeerId::new(),
ConnectionInfo::Inbound,
address.into(),
)?;
}
}
},
Err(err) => {
// Just log the error and let the node continue working
// Note: this execution path is also peer-triggerable, though less reliably than the
// successful path above, so we use debug-level logging here too. See also a similar
// comment in `AdaptedListener::accept` in the transport layer.
// TODO: malicious peers can still flood the default logs making successful connections
// and dropping them immediately (though it will be less severe and more costly for the
// attacker). Consider:
// a) Using debug-level logging specifically for incoming connections (while keeping it
// at "info" for outbound ones) in all (or most) connectivity-related messages (e.g.
// "Peer disconnected", "Assigning peer id", "New peer accepted").
// b) Implementing a rate-limiter for the log (e.g. using a custom tracing layer or filter),
// either based on the source code location (Bitcoin does this) or on the specified
// log target.
// c) (In the case when some errors have been omitted or printed via debug!), printing
// some kind of info/warn-level summary at regular intervals, e.g. "X incoming connections
// ignored in the last Y seconds".
if self.networking_enabled {
log::error!("Accepting a new connection failed unexpectedly: {err}")
log::debug!("Accepting a new connection failed unexpectedly: {err}")
} else {
log::debug!(
"Ignoring failed incoming connection because networking is disabled (err = {err})",
Expand Down Expand Up @@ -426,7 +461,7 @@ where
&format!("Peer[id={peer_id}]"),
);

self.pending_peers.insert(
self.insert_new_pending_peer(
peer_id,
PendingPeerContext {
handle,
Expand Down Expand Up @@ -456,7 +491,7 @@ where
bind_address,
connection_info,
backend_event_sender,
} = match self.pending_peers.remove(&peer_id) {
} = match self.remove_pending_peer(peer_id) {
Some(pending_peer) => pending_peer,
// Could be removed if self-connection was detected earlier
None => return Ok(()),
Expand Down Expand Up @@ -568,7 +603,7 @@ where
.map(|(peer_id, _pending)| *peer_id);

if let Some(peer_id) = pending_outbound_peer_id {
let peer_ctx = self.pending_peers.remove(&peer_id).expect("peer must exist");
let peer_ctx = self.remove_pending_peer(peer_id).expect("peer must exist");

log::info!(
"self-connection detected on address {:?}",
Expand Down Expand Up @@ -649,7 +684,7 @@ where
}

PeerEvent::ConnectionClosed => {
if let Some(pending_peer) = self.pending_peers.remove(&peer_id) {
if let Some(pending_peer) = self.remove_pending_peer(peer_id) {
// Note: we'll get here if handshake has failed, so no need to use log levels
// higher that debug, because the error should have been logged properly already.
match pending_peer.connection_info {
Expand Down Expand Up @@ -732,7 +767,7 @@ where
local_services_override,
} => {
let connection_fut = timeout(
*self.p2p_config.backend_timeouts.outbound_connection_timeout,
*self.p2p_config.backend_config.outbound_connection_timeout,
self.transport.connect(address.socket_addr()),
);

Expand Down Expand Up @@ -802,6 +837,57 @@ where
Err(_) => log::error!("sending syncing event from the backend failed unexpectedly"),
}
}

fn insert_new_pending_peer(&mut self, peer_id: PeerId, peer_context: PendingPeerContext) {
let is_inbound = peer_context.connection_info.is_inbound();

if let Some(old_context) = self.pending_peers.insert(peer_id, peer_context) {
debug_panic_or_log!(
"Pending peer context already exists for a new peer {peer_id}: {old_context:?}"
);
} else if is_inbound {
self.pending_inbound_peer_count += 1;

#[cfg(test)]
self.assert_pending_inbound_peer_count_consistency();
}

if let Some(observer) = &self.observer {
observer.on_pending_peer_created(peer_id);
}
}

fn remove_pending_peer(&mut self, peer_id: PeerId) -> Option<PendingPeerContext> {
let peer_context = self.pending_peers.remove(&peer_id);

if let Some(peer_context) = &peer_context {
if peer_context.connection_info.is_inbound() {
self.pending_inbound_peer_count -= 1;

#[cfg(test)]
self.assert_pending_inbound_peer_count_consistency();
}

if let Some(observer) = &self.observer {
observer.on_pending_peer_removed(peer_id);
}
}

peer_context
}

#[cfg(test)]
fn assert_pending_inbound_peer_count_consistency(&self) {
let actual_pending_inbound_peer_count = self
.pending_peers
.values()
.filter(|context| context.connection_info.is_inbound())
.count();
assert_eq!(
self.pending_inbound_peer_count,
actual_pending_inbound_peer_count
);
}
}

// Some boilerplate types and a function for blocking tasks handling
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/net/default_backend/peer/handshake_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl HandshakeHandler {
socket_writer: &mut MessageWriter<S, Message>,
) -> crate::Result<CommonProtocolVersion> {
// handshake with remote peer and send peer's info to backend
let handshake_timeout = *self.p2p_config.backend_timeouts.peer_handshake_timeout;
let handshake_timeout = *self.p2p_config.backend_config.peer_handshake_timeout;
let handshake_res = timeout(
handshake_timeout,
self.handshake_impl(peer_event_sender, socket_reader, socket_writer),
Expand Down
11 changes: 9 additions & 2 deletions p2p/src/net/default_backend/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ impl ConnectionInfo {
ConnectionInfo::Outbound { .. } => ConnectionDirection::Outbound,
}
}

pub fn is_inbound(&self) -> bool {
match self {
ConnectionInfo::Inbound => true,
ConnectionInfo::Outbound { .. } => false,
}
}
}

pub struct Peer<T: TransportSocket> {
Expand Down Expand Up @@ -356,7 +363,7 @@ where
match send_result {
Ok(()) => {
let disconnect_result = tokio::time::timeout(
*p2p_config.backend_timeouts.disconnection_timeout,
*p2p_config.backend_config.disconnection_timeout,
async {
match writer_event_receiver.recv().await {
Some(WriterEvent::WriterClosed(result)) => {
Expand Down Expand Up @@ -483,7 +490,7 @@ async fn writer_loop<S: PeerStream>(
}

tokio::time::timeout(
*p2p_config.backend_timeouts.socket_write_timeout,
*p2p_config.backend_config.socket_write_timeout,
socket_writer.send(*message),
)
.await
Expand Down
Loading
Loading