diff --git a/testing/integration/src/common/client.rs b/testing/integration/src/common/client.rs index 5ae0d8ede4..a88dc843d1 100644 --- a/testing/integration/src/common/client.rs +++ b/testing/integration/src/common/client.rs @@ -1,10 +1,11 @@ use super::{daemon::Daemon, listener::Listener}; use kaspa_grpc_client::GrpcClient; -use kaspa_notify::{events::EventType, scope::Scope, subscription::Command}; -use kaspa_rpc_core::RpcResult; +use kaspa_notify::{events::EventType, notification::Notification as _, scope::Scope, subscription::Command}; +use kaspa_rpc_core::{Notification, RpcResult}; use std::{ collections::{HashMap, hash_map::Entry}, ops::Deref, + time::Duration, }; /// A multi-listener gRPC client with event type dedicated listeners @@ -45,6 +46,24 @@ impl ListeningClient { self.listeners.get(&event).cloned() } + pub async fn wait_for_notification( + &self, + event: EventType, + timeout_per_notification: Duration, + predicate: impl Fn(&Notification) -> bool, + ) -> Notification { + loop { + let notification = tokio::time::timeout(timeout_per_notification, self.listener(event).unwrap().receiver.recv()) + .await + .unwrap_or_else(|_| panic!("timed out waiting for {event:?} notification")) + .unwrap(); + assert_eq!(notification.event_type(), event); + if predicate(¬ification) { + return notification; + } + } + } + pub fn block_added_listener(&self) -> Option { self.listener(EventType::BlockAdded) } diff --git a/testing/integration/src/common/utils.rs b/testing/integration/src/common/utils.rs index 66bb2df099..b5083146bb 100644 --- a/testing/integration/src/common/utils.rs +++ b/testing/integration/src/common/utils.rs @@ -273,8 +273,11 @@ pub fn is_utxo_spendable(entry: &RpcUtxoEntry, virtual_daa_score: u64, coinbase_ } pub async fn mine_block(pay_address: Address, submitting_client: &GrpcClient, listening_clients: &[ListeningClient]) { - // Discard all unreceived block added notifications in each listening client - listening_clients.iter().for_each(|x| x.block_added_listener().unwrap().drain()); + // Discard notifications from previous blocks so this helper waits only for the block it submits. + listening_clients.iter().for_each(|x| { + x.block_added_listener().unwrap().drain(); + x.virtual_daa_score_changed_listener().unwrap().drain(); + }); // Mine a block let template = submitting_client.get_block_template(pay_address.clone(), vec![]).await.unwrap(); diff --git a/testing/integration/src/daemon_integration_tests.rs b/testing/integration/src/daemon_integration_tests.rs index 02c40f9a59..10b29e3256 100644 --- a/testing/integration/src/daemon_integration_tests.rs +++ b/testing/integration/src/daemon_integration_tests.rs @@ -20,7 +20,10 @@ use kaspa_consensusmanager::ConsensusManager; use kaspa_core::{task::runtime::AsyncRuntime, trace}; use kaspa_grpc_client::GrpcClient; use kaspa_hashes::Hash; -use kaspa_notify::scope::{BlockAddedScope, UtxosChangedScope, VirtualDaaScoreChangedScope}; +use kaspa_notify::{ + events::EventType, + scope::{BlockAddedScope, UtxosChangedScope, VirtualDaaScoreChangedScope}, +}; use kaspa_rpc_core::{Notification, RpcTransaction, RpcTransactionId, api::rpc::RpcApi}; use kaspa_txscript::{ opcodes::codes, pay_to_address_script, pay_to_script_hash_script, pay_to_script_hash_signature_script, @@ -226,6 +229,15 @@ async fn daemon_utxos_propagation_test() { // Some dummy non-monitored address let blank_address = Address::new(kaspad1.network.into(), kaspa_addresses::Version::PubKey, &[0; 32]); + // Create a multi-listener RPC client on each node. Multi-listener subscriptions are propagated + // upstream asynchronously, so subscribe to the streams used by mine_block before the long initial + // mining run and later verify that notifications actually flowed through both listeners. + let mut clients = vec![ListeningClient::connect(&kaspad2).await, ListeningClient::connect(&kaspad1).await]; + for x in clients.iter_mut() { + x.start_notify(BlockAddedScope {}.into()).await.unwrap(); + x.start_notify(VirtualDaaScoreChangedScope {}.into()).await.unwrap(); + } + // Mine 1000 blocks to daemon #1 let initial_blocks = coinbase_maturity; let mut last_block_hash = None; @@ -282,14 +294,28 @@ async fn daemon_utxos_propagation_test() { assert_eq!(accepted_txs_pair.accepted_transaction_ids.len(), 1); } - // Create a multi-listener RPC client on each node... - let mut clients = vec![ListeningClient::connect(&kaspad2).await, ListeningClient::connect(&kaspad1).await]; + // Use the initial mining run as a readiness barrier for the multi-listener notification stack, + // then consume the warm-up history through the final block and virtual DAA notifications so + // the following mine_block calls observe only fresh notifications. + let last_block_hash = last_block_hash.unwrap(); + let timeout_per_notification = Duration::from_secs(10); + for x in clients.iter() { + x.wait_for_notification(EventType::BlockAdded, timeout_per_notification, |notification| { + matches!(notification, Notification::BlockAdded(notification) if notification.block.header.hash == last_block_hash) + }) + .await; + x.wait_for_notification(EventType::VirtualDaaScoreChanged, timeout_per_notification, |notification| { + matches!(notification, Notification::VirtualDaaScoreChanged(notification) if notification.virtual_daa_score == initial_blocks) + }) + .await; + x.block_added_listener().unwrap().drain(); + x.virtual_daa_score_changed_listener().unwrap().drain(); + } - // ...and subscribe each to some notifications + // Subscribe to address-filtered UTXO notifications only after the initial maturity mining, so + // the UTXO listener does not accumulate the 1000 coinbase notifications above. for x in clients.iter_mut() { - x.start_notify(BlockAddedScope {}.into()).await.unwrap(); x.start_notify(UtxosChangedScope::new(vec![miner_address.clone(), user_address.clone()]).into()).await.unwrap(); - x.start_notify(VirtualDaaScoreChangedScope {}.into()).await.unwrap(); } // Mine some extra blocks so the latest miner reward is added to its balance and some UTXOs reach maturity