diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index c41aad09742..895f12acaa2 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -9,7 +9,7 @@ use { }, solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, solana_gossip::cluster_info::ClusterInfo, - solana_measure::measure::Measure, + solana_measure::measure_us, solana_perf::{data_budget::DataBudget, packet::Packet}, solana_poh::poh_recorder::PohRecorder, solana_runtime::bank_forks::BankForks, @@ -17,11 +17,17 @@ use { solana_streamer::sendmmsg::batch_send, std::{ iter::repeat, - net::UdpSocket, + net::{SocketAddr, UdpSocket}, sync::{atomic::Ordering, Arc, RwLock}, }, }; +struct ForwardStats { + successful: bool, + attempted_forwarded_packets: usize, + forward_time_us: u64, +} + pub(crate) struct Forwarder { poh_recorder: Arc>, bank_forks: Arc>, @@ -94,7 +100,8 @@ impl Forwarder { slot_metrics_tracker.increment_forwardable_batches_count(1); let batched_forwardable_packets_count = forward_batch.len(); - let (_forward_result, sucessful_forwarded_packets_count, leader_pubkey) = self + + let (sucessful_forwarded_packets_count, leader_pubkey) = self .forward_buffered_packets( &forward_option, forward_batch.get_forwardable_packets(), @@ -135,20 +142,90 @@ impl Forwarder { } } + /// Forward `packets` to the appropriate leader if available using `forward_option`. + fn forward<'a>( + &self, + forward_option: &ForwardOption, + addr: &SocketAddr, + packets: impl Iterator, + ) -> ForwardStats { + self.update_data_budget(); + let packet_vec: Vec<_> = packets + .filter(|p| !p.meta().forwarded()) + .filter(|p| self.data_budget.take(p.meta().size)) + .filter_map(|p| p.data(..).map(|d| d.to_vec())) + .collect(); + let attempted_forwarded_packets = packet_vec.len(); + + // TODO: see https://github.com/solana-labs/solana/issues/23819 + // fix this so returns the correct number of succeeded packets + // when there's an error sending the batch. This was left as-is for now + // in favor of shipping Quic support, which was considered higher-priority + let (successful, forward_time_us) = measure_us!(self + .forward_packets_to(forward_option, packet_vec, addr) + .is_ok()); + + ForwardStats { + successful, + attempted_forwarded_packets, + forward_time_us, + } + } + /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns - /// the number of successfully forwarded packets in second part of tuple + /// the number of successfully forwarded packets and the leader pubkey if any. fn forward_buffered_packets<'a>( &self, forward_option: &ForwardOption, forwardable_packets: impl Iterator, banking_stage_stats: &BankingStageStats, - ) -> ( - std::result::Result<(), TransportError>, - usize, - Option, - ) { - let leader_and_addr = match forward_option { - ForwardOption::NotForward => return (Ok(()), 0, None), + ) -> (usize, Option) { + let Some((leader_pubkey, addr)) = self.get_leader_for_forwarding(forward_option) else { + return (0, None); + }; + + let ForwardStats { + successful, + attempted_forwarded_packets, + forward_time_us, + } = self.forward(forward_option, &addr, forwardable_packets); + + if let ForwardOption::ForwardTpuVote = forward_option { + banking_stage_stats + .forwarded_vote_count + .fetch_add(attempted_forwarded_packets, Ordering::Relaxed); + } else { + banking_stage_stats + .forwarded_transaction_count + .fetch_add(attempted_forwarded_packets, Ordering::Relaxed); + } + + inc_new_counter_info!( + "banking_stage-forwarded_packets", + attempted_forwarded_packets + ); + inc_new_counter_info!( + "banking_stage-forward-us", + forward_time_us as usize, + 1000, + 1000 + ); + + if !successful { + inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1); + (0, Some(leader_pubkey)) + } else { + (attempted_forwarded_packets, Some(leader_pubkey)) + } + } + + // Get the next leader to forward to, if any + fn get_leader_for_forwarding( + &self, + forward_option: &ForwardOption, + ) -> Option<(Pubkey, SocketAddr)> { + match forward_option { + ForwardOption::NotForward => None, ForwardOption::ForwardTransaction => { next_leader_tpu_forwards(&self.cluster_info, &self.poh_recorder) } @@ -156,12 +233,11 @@ impl Forwarder { ForwardOption::ForwardTpuVote => { next_leader_tpu_vote(&self.cluster_info, &self.poh_recorder) } - }; - let (leader_pubkey, addr) = match leader_and_addr { - Some(leader_and_addr) => leader_and_addr, - None => return (Ok(()), 0, None), - }; + } + } + // Updates the data budget for forwarding packets if enough time has passed + fn update_data_budget(&self) { const INTERVAL_MS: u64 = 100; // 12 MB outbound limit per second const MAX_BYTES_PER_SECOND: usize = 12_000_000; @@ -173,59 +249,28 @@ impl Forwarder { MAX_BYTES_BUDGET, ) }); + } - let packet_vec: Vec<_> = forwardable_packets - .filter_map(|p| { - if !p.meta().forwarded() && self.data_budget.take(p.meta().size) { - Some(p.data(..)?.to_vec()) - } else { - None - } - }) - .collect(); - - let packet_vec_len = packet_vec.len(); - // TODO: see https://github.com/solana-labs/solana/issues/23819 - // fix this so returns the correct number of succeeded packets - // when there's an error sending the batch. This was left as-is for now - // in favor of shipping Quic support, which was considered higher-priority - if !packet_vec.is_empty() { - inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec_len); - - let mut measure = Measure::start("banking_stage-forward-us"); - - let res = if let ForwardOption::ForwardTpuVote = forward_option { - // The vote must be forwarded using only UDP. - banking_stage_stats - .forwarded_vote_count - .fetch_add(packet_vec_len, Ordering::Relaxed); - let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect(); - batch_send(&self.socket, &pkts).map_err(|err| err.into()) - } else { - // All other transactions can be forwarded using QUIC, get_connection() will use - // system wide setting to pick the correct connection object. - banking_stage_stats - .forwarded_transaction_count - .fetch_add(packet_vec_len, Ordering::Relaxed); - let conn = self.connection_cache.get_connection(&addr); - conn.send_data_batch_async(packet_vec) - }; - - measure.stop(); - inc_new_counter_info!( - "banking_stage-forward-us", - measure.as_us() as usize, - 1000, - 1000 - ); - - if let Err(err) = res { - inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1); - return (Err(err), 0, Some(leader_pubkey)); - } + fn forward_packets_to( + &self, + forward_option: &ForwardOption, + packet_vec: Vec>, + addr: &SocketAddr, + ) -> Result<(), TransportError> { + if packet_vec.is_empty() { + return Ok(()); } - (Ok(()), packet_vec_len, Some(leader_pubkey)) + if let ForwardOption::ForwardTpuVote = forward_option { + // The vote must be forwarded using only UDP. + let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect(); + batch_send(&self.socket, &pkts).map_err(|err| err.into()) + } else { + // All other transactions can be forwarded using QUIC, get_connection() will use + // system wide setting to pick the correct connection object. + let conn = self.connection_cache.get_connection(addr); + conn.send_data_batch_async(packet_vec) + } } }