Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
block_storage::tracing::{observe_block, BlockStage},
quorum_store,
};
use aptos_consensus_types::{block::Block, pipelined_block::PipelinedBlock};
use aptos_consensus_types::{block::Block, common::Author, pipelined_block::PipelinedBlock};
use aptos_crypto::HashValue;
use aptos_executor_types::{state_compute_result::StateComputeResult, ExecutorError};
use aptos_logger::prelude::warn;
Expand Down Expand Up @@ -129,6 +129,24 @@ pub static COMMITTED_OPT_BLOCKS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
.unwrap()
});

/// Number of opt proposals this validator spawned as leader
pub static SELF_OPT_PROPOSAL_SPAWNED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"aptos_consensus_self_opt_proposal_spawned",
"Number of opt proposals this validator spawned as leader"
)
.unwrap()
});

/// Number of this validator's opt proposals that committed successfully
pub static SELF_OPT_PROPOSAL_COMMITTED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"aptos_consensus_self_opt_proposal_committed",
"Number of opt proposals this validator spawned that committed"
)
.unwrap()
});

/// Count of the committed transactions since last restart.
pub static COMMITTED_TXNS_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand Down Expand Up @@ -1353,7 +1371,7 @@ pub static FETCH_COMMIT_HISTORY_DURATION: Lazy<DurationHistogram> = Lazy::new(||
)
});

pub fn update_counters_for_block(block: &Block) {
pub fn update_counters_for_block(block: &Block, my_author: Author) {
observe_block(block.timestamp_usecs(), BlockStage::COMMITTED);
NUM_BYTES_PER_BLOCK.observe(block.payload().map_or(0, |payload| payload.size()) as f64);
COMMITTED_BLOCKS_COUNT.inc();
Expand All @@ -1362,6 +1380,9 @@ pub fn update_counters_for_block(block: &Block) {
observe_block(block.timestamp_usecs(), BlockStage::COMMITTED_OPT_BLOCK);
COMMITTED_OPT_BLOCKS_COUNT.inc();
LAST_COMMITTED_OPT_BLOCK_ROUND.set(block.round() as i64);
if block.author() == Some(my_author) {
SELF_OPT_PROPOSAL_COMMITTED.inc();
}
}
let failed_rounds = block
.block_data()
Expand All @@ -1371,7 +1392,7 @@ pub fn update_counters_for_block(block: &Block) {
if failed_rounds > 0 {
COMMITTED_FAILED_ROUNDS_COUNT.inc_by(failed_rounds as u64);
}
quorum_store::counters::update_batch_stats(block);
quorum_store::counters::update_batch_stats(block, my_author);
}

pub fn update_counters_for_compute_result(compute_result: &StateComputeResult) {
Expand Down Expand Up @@ -1405,9 +1426,12 @@ pub fn update_counters_for_compute_result(compute_result: &StateComputeResult) {
}

/// Update various counters for committed blocks
pub fn update_counters_for_committed_blocks(blocks_to_commit: &[Arc<PipelinedBlock>]) {
pub fn update_counters_for_committed_blocks(
blocks_to_commit: &[Arc<PipelinedBlock>],
my_author: Author,
) {
for block in blocks_to_commit {
update_counters_for_block(block.block());
update_counters_for_block(block.block(), my_author);
update_counters_for_compute_result(&block.compute_result());
}
}
Expand Down
4 changes: 3 additions & 1 deletion consensus/src/pipeline/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ impl PipelineBuilder {
self.payload_manager.clone(),
block_store_callback,
block.clone(),
self.signer.author(),
),
None,
);
Expand Down Expand Up @@ -1302,6 +1303,7 @@ impl PipelineBuilder {
dyn FnOnce(WrappedLedgerInfo, LedgerInfoWithSignatures) + Send + Sync,
>,
block: Arc<Block>,
my_author: Author,
) -> TaskResult<PostCommitResult> {
let mut tracker = Tracker::start_waiting("post_commit_ledger", &block);
parent_post_commit.await?;
Expand All @@ -1310,7 +1312,7 @@ impl PipelineBuilder {
notify_state_sync_fut.await?;

tracker.start_working();
update_counters_for_block(&block);
update_counters_for_block(&block, my_author);
update_counters_for_compute_result(&compute_result);

let payload = block.payload().cloned();
Expand Down
31 changes: 25 additions & 6 deletions consensus/src/quorum_store/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
#![allow(clippy::unwrap_used)]

use aptos_consensus_types::{
block::Block, common::Payload, payload::OptQuorumStorePayload, proof_of_store::TBatchInfo,
block::Block,
common::{Author, Payload},
payload::OptQuorumStorePayload,
proof_of_store::TBatchInfo,
};
use aptos_metrics_core::{
exponential_buckets, op_counters::DurationHistogram, register_avg_counter, register_histogram,
Expand Down Expand Up @@ -195,7 +198,7 @@ pub static TXN_BYTES_PER_BATCH_TYPE_PER_BLOCK: Lazy<HistogramVec> = Lazy::new(||
.unwrap()
});

pub fn update_batch_stats(block: &Block) {
pub fn update_batch_stats(block: &Block, my_author: Author) {
let (proof_num, proof_txn_num, proof_txn_bytes) = block.proof_stats();
BATCH_NUM_PER_BLOCK
.with_label_values(&["proof"])
Expand Down Expand Up @@ -228,27 +231,33 @@ pub fn update_batch_stats(block: &Block) {
.with_label_values(&["opt_batch"])
.observe(opt_batch_txn_bytes as f64);

update_committed_batches_by_author(block);
update_committed_batches_by_author(block, my_author);
}

fn update_committed_batches_by_author(block: &Block) {
fn update_committed_batches_by_author(block: &Block, my_author: Author) {
let Some(payload) = block.payload() else {
return;
};
let Payload::OptQuorumStore(opt_qs) = payload else {
return;
};
let is_self_proposed = block.author() == Some(my_author);

// Helper to record per-author stats for a batch type
fn record_batch_author(author: aptos_types::PeerId, num_txns: u64, batch_type: &str) {
let record_batch_author = |author: aptos_types::PeerId, num_txns: u64, batch_type: &str| {
let author_str = author.short_str();
COMMITTED_BATCHES_BY_AUTHOR
.with_label_values(&[author_str.as_str(), batch_type])
.inc();
COMMITTED_TXNS_BY_AUTHOR
.with_label_values(&[author_str.as_str(), batch_type])
.inc_by(num_txns);
}
if is_self_proposed {
SELF_PROPOSED_BATCHES_BY_TYPE_AND_AUTHOR
.with_label_values(&[author_str.as_str(), batch_type])
.inc();
}
};

match opt_qs {
OptQuorumStorePayload::V1(p) => {
Expand Down Expand Up @@ -1148,6 +1157,16 @@ pub static COMMITTED_BATCHES_BY_AUTHOR: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Committed batches in blocks this validator proposed, labeled by batch author + type
pub static SELF_PROPOSED_BATCHES_BY_TYPE_AND_AUTHOR: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"quorum_store_self_proposed_batches_by_type_and_author",
"Committed batches in blocks this validator proposed, by batch author and type",
&["author", "type"]
)
.unwrap()
});

/// Counter for committed txns per author and type (proof, opt_batch, inline_batch).
pub static COMMITTED_TXNS_BY_AUTHOR: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand Down
1 change: 1 addition & 0 deletions consensus/src/round_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1473,6 +1473,7 @@ impl RoundManager {
let sync_info = self.block_store.sync_info();
let proposal_generator = self.proposal_generator.clone();
let proposer_election = self.proposer_election.clone();
counters::SELF_OPT_PROPOSAL_SPAWNED.inc();
tokio::spawn(async move {
if let Err(e) = monitor!(
"generate_and_send_opt_proposal",
Expand Down
Loading