Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 60 additions & 31 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
},
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_gossip::cluster_info::ClusterInfo,
solana_measure::measure::Measure,
solana_measure::measure_us,
solana_perf::{data_budget::DataBudget, packet::Packet},
solana_poh::poh_recorder::PohRecorder,
solana_runtime::bank_forks::BankForks,
Expand Down Expand Up @@ -135,24 +135,24 @@ impl Forwarder {
}
}

/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
/// the number of successfully forwarded packets in second part of tuple
fn forward_buffered_packets<'a>(
/// Forwards all valid, unprocessed packets in the iterator, up to a rate limit.
/// Returns whether forwarding succeeded, the number of attempted forwarded packets
/// if any, the time spent forwarding in us, and the leader pubkey if any.
fn forward_packets<'a>(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this fn higher since in the worker PR it will become pub(crate)

&self,
forward_option: &ForwardOption,
forwardable_packets: impl Iterator<Item = &'a Packet>,
banking_stage_stats: &BankingStageStats,
) -> (
std::result::Result<(), TransportError>,
usize,
u64,
Option<Pubkey>,
) {
let Some((leader_pubkey, addr)) = self.get_leader_and_addr(forward_option) else {
return (Ok(()), 0, None);
return (Ok(()), 0, 0, None);
};

self.update_data_budget();

let packet_vec: Vec<_> = forwardable_packets
.filter(|p| !p.meta().forwarded())
.filter(|p| self.data_budget.take(p.meta().size))
Expand All @@ -164,43 +164,50 @@ impl Forwarder {
// fix this so returns the correct number of succeeded packets
// when there's an error sending the batch. This was left as-is for now
// in favor of shipping Quic support, which was considered higher-priority
if !packet_vec.is_empty() {
inc_new_counter_info!("banking_stage-forwarded_packets", packet_vec_len);
let (res, forward_us) = if !packet_vec.is_empty() {
measure_us!(self.forward(forward_option, packet_vec, &addr))
} else {
(Ok(()), 0)
};

let mut measure = Measure::start("banking_stage-forward-us");
(res, packet_vec_len, forward_us, Some(leader_pubkey))
}

let res = if let ForwardOption::ForwardTpuVote = forward_option {
// The vote must be forwarded using only UDP.
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
/// the number of successfully forwarded packets in second part of tuple
fn forward_buffered_packets<'a>(
&self,
forward_option: &ForwardOption,
forwardable_packets: impl Iterator<Item = &'a Packet>,
banking_stage_stats: &BankingStageStats,
) -> (
std::result::Result<(), TransportError>,
usize,
Option<Pubkey>,
) {
let (res, num_packets, forward_us, leader_pubkey) =
self.forward_packets(forward_option, forwardable_packets);

if num_packets > 0 {
inc_new_counter_info!("banking_stage-forwarded_packets", num_packets);
if let ForwardOption::ForwardTpuVote = forward_option {
banking_stage_stats
.forwarded_vote_count
.fetch_add(packet_vec_len, Ordering::Relaxed);
let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(addr)).collect();
batch_send(&self.socket, &pkts).map_err(|err| err.into())
.fetch_add(num_packets, Ordering::Relaxed);
} else {
// All other transactions can be forwarded using QUIC, get_connection() will use
// system wide setting to pick the correct connection object.
banking_stage_stats
.forwarded_transaction_count
.fetch_add(packet_vec_len, Ordering::Relaxed);
let conn = self.connection_cache.get_connection(&addr);
conn.send_data_batch_async(packet_vec)
};
.fetch_add(num_packets, Ordering::Relaxed);
}

measure.stop();
inc_new_counter_info!(
"banking_stage-forward-us",
measure.as_us() as usize,
1000,
1000
);
inc_new_counter_info!("banking_stage-forward-us", forward_us as usize, 1000, 1000);

if let Err(err) = res {
if res.is_err() {
inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1);
return (Err(err), 0, Some(leader_pubkey));
}
}

(Ok(()), packet_vec_len, Some(leader_pubkey))
(res, num_packets, leader_pubkey)
}

/// Get the pubkey and socket address for the leader to forward to
Expand Down Expand Up @@ -231,6 +238,28 @@ impl Forwarder {
)
});
}

fn forward(
&self,
forward_option: &ForwardOption,
packet_vec: Vec<Vec<u8>>,
addr: &SocketAddr,
) -> Result<(), TransportError> {
match forward_option {
ForwardOption::ForwardTpuVote => {
// The vote must be forwarded using only UDP.
let pkts: Vec<_> = packet_vec.into_iter().zip(repeat(*addr)).collect();
batch_send(&self.socket, &pkts).map_err(|err| err.into())
}
ForwardOption::ForwardTransaction => {
// All other transactions can be forwarded using QUIC, get_connection() will use
// system wide setting to pick the correct connection object.
let conn = self.connection_cache.get_connection(addr);
conn.send_data_batch_async(packet_vec)
}
ForwardOption::NotForward => panic!("should not forward"),
Copy link
Copy Markdown
Contributor Author

@apfitzge apfitzge Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only called within forward_packets which exits early on ForwardOption::NotForward due to get_leader_and_addr checks.

}
}
}

#[cfg(test)]
Expand Down