Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions testing/integration/src/common/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use super::{daemon::Daemon, listener::Listener};
use kaspa_grpc_client::GrpcClient;
use kaspa_notify::{events::EventType, scope::Scope, subscription::Command};
use kaspa_rpc_core::RpcResult;
use kaspa_notify::{events::EventType, notification::Notification as _, scope::Scope, subscription::Command};
use kaspa_rpc_core::{Notification, RpcResult};
use std::{
collections::{HashMap, hash_map::Entry},
ops::Deref,
time::Duration,
};

/// A multi-listener gRPC client with event type dedicated listeners
Expand Down Expand Up @@ -45,6 +46,24 @@ impl ListeningClient {
self.listeners.get(&event).cloned()
}

pub async fn wait_for_notification(
&self,
event: EventType,
timeout_per_notification: Duration,
predicate: impl Fn(&Notification) -> bool,
) -> Notification {
loop {
let notification = tokio::time::timeout(timeout_per_notification, self.listener(event).unwrap().receiver.recv())
.await
.unwrap_or_else(|_| panic!("timed out waiting for {event:?} notification"))
.unwrap();
assert_eq!(notification.event_type(), event);
if predicate(&notification) {
return notification;
}
}
}

pub fn block_added_listener(&self) -> Option<Listener> {
self.listener(EventType::BlockAdded)
}
Expand Down
7 changes: 5 additions & 2 deletions testing/integration/src/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,11 @@ pub fn is_utxo_spendable(entry: &RpcUtxoEntry, virtual_daa_score: u64, coinbase_
}

pub async fn mine_block(pay_address: Address, submitting_client: &GrpcClient, listening_clients: &[ListeningClient]) {
// Discard all unreceived block added notifications in each listening client
listening_clients.iter().for_each(|x| x.block_added_listener().unwrap().drain());
// Discard notifications from previous blocks so this helper waits only for the block it submits.
listening_clients.iter().for_each(|x| {
x.block_added_listener().unwrap().drain();
x.virtual_daa_score_changed_listener().unwrap().drain();
});

// Mine a block
let template = submitting_client.get_block_template(pay_address.clone(), vec![]).await.unwrap();
Expand Down
38 changes: 32 additions & 6 deletions testing/integration/src/daemon_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use kaspa_consensusmanager::ConsensusManager;
use kaspa_core::{task::runtime::AsyncRuntime, trace};
use kaspa_grpc_client::GrpcClient;
use kaspa_hashes::Hash;
use kaspa_notify::scope::{BlockAddedScope, UtxosChangedScope, VirtualDaaScoreChangedScope};
use kaspa_notify::{
events::EventType,
scope::{BlockAddedScope, UtxosChangedScope, VirtualDaaScoreChangedScope},
};
use kaspa_rpc_core::{Notification, RpcTransaction, RpcTransactionId, api::rpc::RpcApi};
use kaspa_txscript::{
opcodes::codes, pay_to_address_script, pay_to_script_hash_script, pay_to_script_hash_signature_script,
Expand Down Expand Up @@ -226,6 +229,15 @@ async fn daemon_utxos_propagation_test() {
// Some dummy non-monitored address
let blank_address = Address::new(kaspad1.network.into(), kaspa_addresses::Version::PubKey, &[0; 32]);

// Create a multi-listener RPC client on each node. Multi-listener subscriptions are propagated
// upstream asynchronously, so subscribe to the streams used by mine_block before the long initial
// mining run and later verify that notifications actually flowed through both listeners.
let mut clients = vec![ListeningClient::connect(&kaspad2).await, ListeningClient::connect(&kaspad1).await];
for x in clients.iter_mut() {
x.start_notify(BlockAddedScope {}.into()).await.unwrap();
x.start_notify(VirtualDaaScoreChangedScope {}.into()).await.unwrap();
}

// Mine 1000 blocks to daemon #1
let initial_blocks = coinbase_maturity;
let mut last_block_hash = None;
Expand Down Expand Up @@ -282,14 +294,28 @@ async fn daemon_utxos_propagation_test() {
assert_eq!(accepted_txs_pair.accepted_transaction_ids.len(), 1);
}

// Create a multi-listener RPC client on each node...
let mut clients = vec![ListeningClient::connect(&kaspad2).await, ListeningClient::connect(&kaspad1).await];
// Use the initial mining run as a readiness barrier for the multi-listener notification stack,
// then consume the warm-up history through the final block and virtual DAA notifications so
// the following mine_block calls observe only fresh notifications.
let last_block_hash = last_block_hash.unwrap();
let timeout_per_notification = Duration::from_secs(10);
for x in clients.iter() {
x.wait_for_notification(EventType::BlockAdded, timeout_per_notification, |notification| {
matches!(notification, Notification::BlockAdded(notification) if notification.block.header.hash == last_block_hash)
})
.await;
x.wait_for_notification(EventType::VirtualDaaScoreChanged, timeout_per_notification, |notification| {
matches!(notification, Notification::VirtualDaaScoreChanged(notification) if notification.virtual_daa_score == initial_blocks)
})
.await;
x.block_added_listener().unwrap().drain();
x.virtual_daa_score_changed_listener().unwrap().drain();
}

// ...and subscribe each to some notifications
// Subscribe to address-filtered UTXO notifications only after the initial maturity mining, so
// the UTXO listener does not accumulate the 1000 coinbase notifications above.
for x in clients.iter_mut() {
x.start_notify(BlockAddedScope {}.into()).await.unwrap();
x.start_notify(UtxosChangedScope::new(vec![miner_address.clone(), user_address.clone()]).into()).await.unwrap();
x.start_notify(VirtualDaaScoreChangedScope {}.into()).await.unwrap();
}

// Mine some extra blocks so the latest miner reward is added to its balance and some UTXOs reach maturity
Expand Down
Loading