Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 111 additions & 66 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,25 @@ use {
},
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_gossip::cluster_info::ClusterInfo,
solana_measure::measure::Measure,
solana_measure::measure_us,
solana_perf::{data_budget::DataBudget, packet::Packet},
solana_poh::poh_recorder::PohRecorder,
solana_runtime::bank_forks::BankForks,
solana_sdk::{pubkey::Pubkey, transport::TransportError},
solana_streamer::sendmmsg::batch_send,
std::{
iter::repeat,
net::UdpSocket,
net::{SocketAddr, UdpSocket},
sync::{atomic::Ordering, Arc, RwLock},
},
};

struct ForwardStats {
successful: bool,
attempted_forwarded_packets: usize,
forward_time_us: u64,
}

pub(crate) struct Forwarder {
poh_recorder: Arc<RwLock<PohRecorder>>,
bank_forks: Arc<RwLock<BankForks>>,
Expand Down Expand Up @@ -94,7 +100,8 @@ impl Forwarder {
slot_metrics_tracker.increment_forwardable_batches_count(1);

let batched_forwardable_packets_count = forward_batch.len();
let (_forward_result, sucessful_forwarded_packets_count, leader_pubkey) = self

let (sucessful_forwarded_packets_count, leader_pubkey) = self
.forward_buffered_packets(
&forward_option,
forward_batch.get_forwardable_packets(),
Expand Down Expand Up @@ -135,33 +142,102 @@ impl Forwarder {
}
}

/// Forward `packets` to the appropriate leader if available using `forward_option`.
fn forward<'a>(
&self,
forward_option: &ForwardOption,
addr: &SocketAddr,
packets: impl Iterator<Item = &'a Packet>,
) -> ForwardStats {
self.update_data_budget();
let packet_vec: Vec<_> = packets
.filter(|p| !p.meta().forwarded())
.filter(|p| self.data_budget.take(p.meta().size))
.filter_map(|p| p.data(..).map(|d| d.to_vec()))
.collect();
let attempted_forwarded_packets = packet_vec.len();

// TODO: see https://github.com/solana-labs/solana/issues/23819
// fix this so returns the correct number of succeeded packets
// when there's an error sending the batch. This was left as-is for now
// in favor of shipping Quic support, which was considered higher-priority
let (successful, forward_time_us) = measure_us!(self
.forward_packets_to(forward_option, packet_vec, addr)
.is_ok());

ForwardStats {
successful,
attempted_forwarded_packets,
forward_time_us,
}
}

/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
/// the number of successfully forwarded packets in second part of tuple
/// the number of successfully forwarded packets and the leader pubkey if any.
fn forward_buffered_packets<'a>(
&self,
forward_option: &ForwardOption,
forwardable_packets: impl Iterator<Item = &'a Packet>,
banking_stage_stats: &BankingStageStats,
) -> (
std::result::Result<(), TransportError>,
usize,
Option<Pubkey>,
) {
let leader_and_addr = match forward_option {
ForwardOption::NotForward => return (Ok(()), 0, None),
) -> (usize, Option<Pubkey>) {
let Some((leader_pubkey, addr)) = self.get_leader_for_forwarding(forward_option) else {
return (0, None);
};

let ForwardStats {
successful,
attempted_forwarded_packets,
forward_time_us,
} = self.forward(forward_option, &addr, forwardable_packets);

if let ForwardOption::ForwardTpuVote = forward_option {
banking_stage_stats
.forwarded_vote_count
.fetch_add(attempted_forwarded_packets, Ordering::Relaxed);
} else {
banking_stage_stats
.forwarded_transaction_count
.fetch_add(attempted_forwarded_packets, Ordering::Relaxed);
}

inc_new_counter_info!(
"banking_stage-forwarded_packets",
attempted_forwarded_packets
);
inc_new_counter_info!(
"banking_stage-forward-us",
forward_time_us as usize,
1000,
1000
);

if !successful {
inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1);
(0, Some(leader_pubkey))
} else {
(attempted_forwarded_packets, Some(leader_pubkey))
}
}

// Get the next leader to forward to, if any
fn get_leader_for_forwarding(
&self,
forward_option: &ForwardOption,
) -> Option<(Pubkey, SocketAddr)> {
match forward_option {
ForwardOption::NotForward => None,
ForwardOption::ForwardTransaction => {
next_leader_tpu_forwards(&self.cluster_info, &self.poh_recorder)
}

ForwardOption::ForwardTpuVote => {
next_leader_tpu_vote(&self.cluster_info, &self.poh_recorder)
}
};
let (leader_pubkey, addr) = match leader_and_addr {
Some(leader_and_addr) => leader_and_addr,
None => return (Ok(()), 0, None),
};
}
}

// Updates the data budget for forwarding packets if enough time has passed
fn update_data_budget(&self) {
const INTERVAL_MS: u64 = 100;
// 12 MB outbound limit per second
const MAX_BYTES_PER_SECOND: usize = 12_000_000;
Expand All @@ -173,59 +249,28 @@ impl Forwarder {
MAX_BYTES_BUDGET,
)
});
}

let packet_vec: Vec<_> = forwardable_packets
.filter_map(|p| {
if !p.meta().forwarded() && self.data_budget.take(p.meta().size) {
Some(p.data(..)?.to_vec())
} else {
None
}
})
.collect();

let packet_vec_len = packet_vec.len();
// TODO: see https://github.com/solana-labs/solana/issues/23819
// fix this so returns the correct number of succeeded packets
// when there's an error sending the batch. This was left as-is for now
// in favor of shipping Quic support, which was considered higher-priority
if !packet_vec.is_empty() {
inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec_len);

let mut measure = Measure::start("banking_stage-forward-us");

let res = if let ForwardOption::ForwardTpuVote = forward_option {
// The vote must be forwarded using only UDP.
banking_stage_stats
.forwarded_vote_count
.fetch_add(packet_vec_len, Ordering::Relaxed);
let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect();
batch_send(&self.socket, &pkts).map_err(|err| err.into())
} else {
// All other transactions can be forwarded using QUIC, get_connection() will use
// system wide setting to pick the correct connection object.
banking_stage_stats
.forwarded_transaction_count
.fetch_add(packet_vec_len, Ordering::Relaxed);
let conn = self.connection_cache.get_connection(&addr);
conn.send_data_batch_async(packet_vec)
};

measure.stop();
inc_new_counter_info!(
"banking_stage-forward-us",
measure.as_us() as usize,
1000,
1000
);

if let Err(err) = res {
inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1);
return (Err(err), 0, Some(leader_pubkey));
}
fn forward_packets_to(
&self,
forward_option: &ForwardOption,
packet_vec: Vec<Vec<u8>>,
addr: &SocketAddr,
) -> Result<(), TransportError> {
if packet_vec.is_empty() {
return Ok(());
}

(Ok(()), packet_vec_len, Some(leader_pubkey))
if let ForwardOption::ForwardTpuVote = forward_option {
// The vote must be forwarded using only UDP.
let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect();
batch_send(&self.socket, &pkts).map_err(|err| err.into())
} else {
// All other transactions can be forwarded using QUIC, get_connection() will use
// system wide setting to pick the correct connection object.
let conn = self.connection_cache.get_connection(addr);
conn.send_data_batch_async(packet_vec)
}
}
}

Expand Down