diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index ab78b4006b8..39ff4721259 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -9,7 +9,7 @@ use { }, solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection}, solana_gossip::cluster_info::ClusterInfo, - solana_measure::measure::Measure, + solana_measure::measure_us, solana_perf::{data_budget::DataBudget, packet::Packet}, solana_poh::poh_recorder::PohRecorder, solana_runtime::bank_forks::BankForks, @@ -135,24 +135,24 @@ impl Forwarder { } } - /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns - /// the number of successfully forwarded packets in second part of tuple - fn forward_buffered_packets<'a>( + /// Forwards all valid, unprocessed packets in the iterator, up to a rate limit. + /// Returns whether forwarding succeeded, the number of attempted forwarded packets + /// if any, the time spent forwarding in us, and the leader pubkey if any. + fn forward_packets<'a>( &self, forward_option: &ForwardOption, forwardable_packets: impl Iterator, - banking_stage_stats: &BankingStageStats, ) -> ( std::result::Result<(), TransportError>, usize, + u64, Option, ) { let Some((leader_pubkey, addr)) = self.get_leader_and_addr(forward_option) else { - return (Ok(()), 0, None); + return (Ok(()), 0, 0, None); }; self.update_data_budget(); - let packet_vec: Vec<_> = forwardable_packets .filter(|p| !p.meta().forwarded()) .filter(|p| self.data_budget.take(p.meta().size)) @@ -164,43 +164,50 @@ impl Forwarder { // fix this so returns the correct number of succeeded packets // when there's an error sending the batch. This was left as-is for now // in favor of shipping Quic support, which was considered higher-priority - if !packet_vec.is_empty() { - inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec_len); + let (res, forward_us) = if !packet_vec.is_empty() { + measure_us!(self.forward(forward_option, packet_vec, &addr)) + } else { + (Ok(()), 0) + }; - let mut measure = Measure::start("banking_stage-forward-us"); + (res, packet_vec_len, forward_us, Some(leader_pubkey)) + } - let res = if let ForwardOption::ForwardTpuVote = forward_option { - // The vote must be forwarded using only UDP. + /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns + /// the number of successfully forwarded packets in second part of tuple + fn forward_buffered_packets<'a>( + &self, + forward_option: &ForwardOption, + forwardable_packets: impl Iterator, + banking_stage_stats: &BankingStageStats, + ) -> ( + std::result::Result<(), TransportError>, + usize, + Option, + ) { + let (res, num_packets, forward_us, leader_pubkey) = + self.forward_packets(forward_option, forwardable_packets); + + if num_packets > 0 { + inc_new_counter_info!("banking_stage-forwarded_packets", num_packets); + if let ForwardOption::ForwardTpuVote = forward_option { banking_stage_stats .forwarded_vote_count - .fetch_add(packet_vec_len, Ordering::Relaxed); - let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect(); - batch_send(&self.socket, &pkts).map_err(|err| err.into()) + .fetch_add(num_packets, Ordering::Relaxed); } else { - // All other transactions can be forwarded using QUIC, get_connection() will use - // system wide setting to pick the correct connection object. banking_stage_stats .forwarded_transaction_count - .fetch_add(packet_vec_len, Ordering::Relaxed); - let conn = self.connection_cache.get_connection(&addr); - conn.send_data_batch_async(packet_vec) - }; + .fetch_add(num_packets, Ordering::Relaxed); + } - measure.stop(); - inc_new_counter_info!( - "banking_stage-forward-us", - measure.as_us() as usize, - 1000, - 1000 - ); + inc_new_counter_info!("banking_stage-forward-us", forward_us as usize, 1000, 1000); - if let Err(err) = res { + if res.is_err() { inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1); - return (Err(err), 0, Some(leader_pubkey)); } } - (Ok(()), packet_vec_len, Some(leader_pubkey)) + (res, num_packets, leader_pubkey) } /// Get the pubkey and socket address for the leader to forward to @@ -231,6 +238,28 @@ impl Forwarder { ) }); } + + fn forward( + &self, + forward_option: &ForwardOption, + packet_vec: Vec>, + addr: &SocketAddr, + ) -> Result<(), TransportError> { + match forward_option { + ForwardOption::ForwardTpuVote => { + // The vote must be forwarded using only UDP. + let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(*addr)).collect(); + batch_send(&self.socket, &pkts).map_err(|err| err.into()) + } + ForwardOption::ForwardTransaction => { + // All other transactions can be forwarded using QUIC, get_connection() will use + // system wide setting to pick the correct connection object. + let conn = self.connection_cache.get_connection(addr); + conn.send_data_batch_async(packet_vec) + } + ForwardOption::NotForward => panic!("should not forward"), + } + } } #[cfg(test)]