diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index eb1e2f0e40e..d0de4acb1a2 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -7,7 +7,7 @@ use crate::{ block_storage::tracing::{observe_block, BlockStage}, quorum_store, }; -use aptos_consensus_types::{block::Block, pipelined_block::PipelinedBlock}; +use aptos_consensus_types::{block::Block, common::Author, pipelined_block::PipelinedBlock}; use aptos_crypto::HashValue; use aptos_executor_types::{state_compute_result::StateComputeResult, ExecutorError}; use aptos_logger::prelude::warn; @@ -129,6 +129,24 @@ pub static COMMITTED_OPT_BLOCKS_COUNT: Lazy = Lazy::new(|| { .unwrap() }); +/// Number of opt proposals this validator spawned as leader +pub static SELF_OPT_PROPOSAL_SPAWNED: Lazy = Lazy::new(|| { + register_int_counter!( + "aptos_consensus_self_opt_proposal_spawned", + "Number of opt proposals this validator spawned as leader" + ) + .unwrap() +}); + +/// Number of this validator's opt proposals that committed successfully +pub static SELF_OPT_PROPOSAL_COMMITTED: Lazy = Lazy::new(|| { + register_int_counter!( + "aptos_consensus_self_opt_proposal_committed", + "Number of opt proposals this validator spawned that committed" + ) + .unwrap() +}); + /// Count of the committed transactions since last restart. pub static COMMITTED_TXNS_COUNT: Lazy = Lazy::new(|| { register_int_counter_vec!( @@ -1353,7 +1371,7 @@ pub static FETCH_COMMIT_HISTORY_DURATION: Lazy = Lazy::new(|| ) }); -pub fn update_counters_for_block(block: &Block) { +pub fn update_counters_for_block(block: &Block, my_author: Author) { observe_block(block.timestamp_usecs(), BlockStage::COMMITTED); NUM_BYTES_PER_BLOCK.observe(block.payload().map_or(0, |payload| payload.size()) as f64); COMMITTED_BLOCKS_COUNT.inc(); @@ -1362,6 +1380,9 @@ pub fn update_counters_for_block(block: &Block) { observe_block(block.timestamp_usecs(), BlockStage::COMMITTED_OPT_BLOCK); COMMITTED_OPT_BLOCKS_COUNT.inc(); LAST_COMMITTED_OPT_BLOCK_ROUND.set(block.round() as i64); + if block.author() == Some(my_author) { + SELF_OPT_PROPOSAL_COMMITTED.inc(); + } } let failed_rounds = block .block_data() @@ -1371,7 +1392,7 @@ pub fn update_counters_for_block(block: &Block) { if failed_rounds > 0 { COMMITTED_FAILED_ROUNDS_COUNT.inc_by(failed_rounds as u64); } - quorum_store::counters::update_batch_stats(block); + quorum_store::counters::update_batch_stats(block, my_author); } pub fn update_counters_for_compute_result(compute_result: &StateComputeResult) { @@ -1405,9 +1426,12 @@ pub fn update_counters_for_compute_result(compute_result: &StateComputeResult) { } /// Update various counters for committed blocks -pub fn update_counters_for_committed_blocks(blocks_to_commit: &[Arc]) { +pub fn update_counters_for_committed_blocks( + blocks_to_commit: &[Arc], + my_author: Author, +) { for block in blocks_to_commit { - update_counters_for_block(block.block()); + update_counters_for_block(block.block(), my_author); update_counters_for_compute_result(&block.compute_result()); } } diff --git a/consensus/src/pipeline/pipeline_builder.rs b/consensus/src/pipeline/pipeline_builder.rs index ff52f9b3d8a..b03ee96bbdb 100644 --- a/consensus/src/pipeline/pipeline_builder.rs +++ b/consensus/src/pipeline/pipeline_builder.rs @@ -663,6 +663,7 @@ impl PipelineBuilder { self.payload_manager.clone(), block_store_callback, block.clone(), + self.signer.author(), ), None, ); @@ -1302,6 +1303,7 @@ impl PipelineBuilder { dyn FnOnce(WrappedLedgerInfo, LedgerInfoWithSignatures) + Send + Sync, >, block: Arc, + my_author: Author, ) -> TaskResult { let mut tracker = Tracker::start_waiting("post_commit_ledger", &block); parent_post_commit.await?; @@ -1310,7 +1312,7 @@ impl PipelineBuilder { notify_state_sync_fut.await?; tracker.start_working(); - update_counters_for_block(&block); + update_counters_for_block(&block, my_author); update_counters_for_compute_result(&compute_result); let payload = block.payload().cloned(); diff --git a/consensus/src/quorum_store/counters.rs b/consensus/src/quorum_store/counters.rs index 0311c89524d..946c97d8e89 100644 --- a/consensus/src/quorum_store/counters.rs +++ b/consensus/src/quorum_store/counters.rs @@ -4,7 +4,10 @@ #![allow(clippy::unwrap_used)] use aptos_consensus_types::{ - block::Block, common::Payload, payload::OptQuorumStorePayload, proof_of_store::TBatchInfo, + block::Block, + common::{Author, Payload}, + payload::OptQuorumStorePayload, + proof_of_store::TBatchInfo, }; use aptos_metrics_core::{ exponential_buckets, op_counters::DurationHistogram, register_avg_counter, register_histogram, @@ -195,7 +198,7 @@ pub static TXN_BYTES_PER_BATCH_TYPE_PER_BLOCK: Lazy = Lazy::new(|| .unwrap() }); -pub fn update_batch_stats(block: &Block) { +pub fn update_batch_stats(block: &Block, my_author: Author) { let (proof_num, proof_txn_num, proof_txn_bytes) = block.proof_stats(); BATCH_NUM_PER_BLOCK .with_label_values(&["proof"]) @@ -228,19 +231,20 @@ pub fn update_batch_stats(block: &Block) { .with_label_values(&["opt_batch"]) .observe(opt_batch_txn_bytes as f64); - update_committed_batches_by_author(block); + update_committed_batches_by_author(block, my_author); } -fn update_committed_batches_by_author(block: &Block) { +fn update_committed_batches_by_author(block: &Block, my_author: Author) { let Some(payload) = block.payload() else { return; }; let Payload::OptQuorumStore(opt_qs) = payload else { return; }; + let is_self_proposed = block.author() == Some(my_author); // Helper to record per-author stats for a batch type - fn record_batch_author(author: aptos_types::PeerId, num_txns: u64, batch_type: &str) { + let record_batch_author = |author: aptos_types::PeerId, num_txns: u64, batch_type: &str| { let author_str = author.short_str(); COMMITTED_BATCHES_BY_AUTHOR .with_label_values(&[author_str.as_str(), batch_type]) @@ -248,7 +252,12 @@ fn update_committed_batches_by_author(block: &Block) { COMMITTED_TXNS_BY_AUTHOR .with_label_values(&[author_str.as_str(), batch_type]) .inc_by(num_txns); - } + if is_self_proposed { + SELF_PROPOSED_BATCHES_BY_TYPE_AND_AUTHOR + .with_label_values(&[author_str.as_str(), batch_type]) + .inc(); + } + }; match opt_qs { OptQuorumStorePayload::V1(p) => { @@ -1148,6 +1157,16 @@ pub static COMMITTED_BATCHES_BY_AUTHOR: Lazy = Lazy::new(|| { .unwrap() }); +/// Committed batches in blocks this validator proposed, labeled by batch author + type +pub static SELF_PROPOSED_BATCHES_BY_TYPE_AND_AUTHOR: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "quorum_store_self_proposed_batches_by_type_and_author", + "Committed batches in blocks this validator proposed, by batch author and type", + &["author", "type"] + ) + .unwrap() +}); + /// Counter for committed txns per author and type (proof, opt_batch, inline_batch). pub static COMMITTED_TXNS_BY_AUTHOR: Lazy = Lazy::new(|| { register_int_counter_vec!( diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index 082e8f73fa6..cb6ed671046 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -1473,6 +1473,7 @@ impl RoundManager { let sync_info = self.block_store.sync_info(); let proposal_generator = self.proposal_generator.clone(); let proposer_election = self.proposer_election.clone(); + counters::SELF_OPT_PROPOSAL_SPAWNED.inc(); tokio::spawn(async move { if let Err(e) = monitor!( "generate_and_send_opt_proposal",