diff --git a/cli/src/modules/rpc.rs b/cli/src/modules/rpc.rs index 6e42922b14..38826dc151 100644 --- a/cli/src/modules/rpc.rs +++ b/cli/src/modules/rpc.rs @@ -313,6 +313,49 @@ impl Rpc { self.println(&ctx, result); } + RpcApiOps::GetUtxosByAddressesV2 => { + let mut args = argv; + + if args.len() < 6 { + return Err(Error::custom( + "Usage: rpc get-utxos-by-addresses-v2 (limit is a soft cap at the script public key + DAA boundary)", + )); + } + + let parse_optional_u64 = |value: String| -> Result> { + if value.eq_ignore_ascii_case("none") { Ok(None) } else { Ok(Some(value.parse::()?)) } + }; + + let parse_optional_address = |value: String| -> Result> { + if value.eq_ignore_ascii_case("none") { Ok(None) } else { Ok(Some(Address::try_from(value.as_str())?)) } + }; + + let limit = parse_optional_u64(args.pop().unwrap())?; + let start_daa_score = parse_optional_u64(args.pop().unwrap())?; + let start_address = parse_optional_address(args.pop().unwrap())?; + let to_daa_score = parse_optional_u64(args.pop().unwrap())?; + let from_daa_score = parse_optional_u64(args.pop().unwrap())?; + + if args.is_empty() { + return Err(Error::custom("Please specify at least one address")); + } + + let addresses = args.iter().map(|s| Address::try_from(s.as_str())).collect::, _>>()?; + let result = rpc + .get_utxos_by_addresses_v2_call( + None, + GetUtxosByAddressesV2Request::new( + addresses, + from_daa_score, + to_daa_score, + start_address, + start_daa_score, + limit, + ), + ) + .await?; + self.println(&ctx, result); + } _ => { tprintln!(ctx, "rpc method exists but is not supported by the cli: '{op_str}'\r\n"); return Ok(()); diff --git a/consensus/src/model/stores/utxo_set.rs b/consensus/src/model/stores/utxo_set.rs index 4aeb9f43ba..af494a28ec 100644 --- a/consensus/src/model/stores/utxo_set.rs +++ b/consensus/src/model/stores/utxo_set.rs @@ -23,7 +23,7 @@ pub trait UtxoSetStoreReader { pub trait UtxoSetStore: UtxoSetStoreReader { /// Updates the store according to the UTXO diff -- adding and deleting entries correspondingly. /// Note we define `self` as `mut` in order to require write access even though the compiler does not require it. - /// This is because concurrent readers can interfere with cache consistency. + /// This is because concurrent readers can interfere with cache consistency. fn write_diff(&mut self, utxo_diff: &UtxoDiff) -> Result<(), StoreError>; fn write_many(&mut self, utxos: &[(TransactionOutpoint, UtxoEntry)]) -> Result<(), StoreError>; } @@ -153,7 +153,7 @@ impl UtxoSetStoreReader for DbUtxoSetStore { fn seek_iterator(&self, from_outpoint: Option, limit: usize, skip_first: bool) -> UtxoCollectionIterator<'_> { let seek_key = from_outpoint.map(UtxoKey::from); - Box::new(self.access.seek_iterator(None, seek_key, limit, skip_first).map(|res| { + Box::new(self.access.seek_iterator(None, seek_key, None, limit, skip_first).map(|res| { let (key, entry) = res?; let outpoint: TransactionOutpoint = UtxoKey::try_from(key.as_ref()).unwrap().into(); Ok((outpoint, UtxoEntry::clone(&entry))) diff --git a/database/src/access.rs b/database/src/access.rs index c624b18624..74c27683fc 100644 --- a/database/src/access.rs +++ b/database/src/access.rs @@ -218,6 +218,7 @@ where &self, bucket: Option<&[u8]>, // iter self.prefix if None, else append bytes to self.prefix. seek_from: Option, // iter whole range if None + seek_to: Option, // iter until the seek_to key (exclusive) if Some, else iter whole range. limit: usize, // amount to take. skip_first: bool, // skips the first value, (useful in conjunction with the seek-key, as to not re-retrieve). ) -> impl Iterator> + '_ @@ -237,13 +238,15 @@ where let mut read_opts = ReadOptions::default(); read_opts.set_iterate_range(rocksdb::PrefixRange(db_key.as_ref())); - let mut db_iterator = match seek_from { - Some(seek_key) => { - self.db.iterator_opt(IteratorMode::From(DbKey::new(&self.prefix, seek_key).as_ref(), Direction::Forward), read_opts) - } - None => self.db.iterator_opt(IteratorMode::Start, read_opts), + if let Some(seek_from) = &seek_from { + read_opts.set_iterate_lower_bound(DbKey::new(db_key.as_ref(), seek_from).as_ref()); + }; + if let Some(seek_to) = &seek_to { + read_opts.set_iterate_upper_bound(DbKey::new(db_key.as_ref(), seek_to).as_ref()); }; + let mut db_iterator = self.db.iterator_opt(IteratorMode::Start, read_opts); + if skip_first { db_iterator.next(); } diff --git a/database/src/registry.rs b/database/src/registry.rs index cd59165f5f..b8d8053ee8 100644 --- a/database/src/registry.rs +++ b/database/src/registry.rs @@ -74,6 +74,7 @@ pub enum DatabaseStorePrefixes { UtxoIndex = 192, UtxoIndexTips = 193, CirculatingSupply = 194, + UtxoIndexDbVersion = 195, // ---- Separator ---- /// Reserved as a separator diff --git a/indexes/core/src/indexed_utxos.rs b/indexes/core/src/indexed_utxos.rs index 69a20d37fa..b92007c7a8 100644 --- a/indexes/core/src/indexed_utxos.rs +++ b/indexes/core/src/indexed_utxos.rs @@ -5,7 +5,31 @@ use std::collections::HashMap; // TODO: explore potential optimization via custom TransactionOutpoint hasher for below, // One possible implementation: u64 of transaction id xor'd with 4 bytes of transaction index. -pub type CompactUtxoCollection = HashMap; +pub type CompactUtxoCollection = HashMap; + +/// A deterministic ordered list of UTXOs keyed by [`UtxoEntryKeyData`]. +pub type OrderedUtxoCollection = Vec<(UtxoEntryKeyData, CompactUtxoEntry)>; + +/// A deterministic ordered list of UTXO collections keyed by [`ScriptPublicKey`]. +pub type OrderedUtxoSetByScriptPublicKey = Vec<(ScriptPublicKey, OrderedUtxoCollection)>; + +/// A page of ordered UTXOs with an optional cursor for the next group. +#[derive(Clone, Debug, Default)] +pub struct OrderedUtxoSetByScriptPublicKeyPage { + pub entries: OrderedUtxoSetByScriptPublicKey, + pub next_script_public_key: Option, + pub next_daa_score: Option, +} + +impl OrderedUtxoSetByScriptPublicKeyPage { + pub fn new( + entries: OrderedUtxoSetByScriptPublicKey, + next_script_public_key: Option, + next_daa_score: Option, + ) -> Self { + Self { entries, next_script_public_key, next_daa_score } + } +} /// A collection of utxos indexed via; [`ScriptPublicKey`] => [`TransactionOutpoint`] => [`CompactUtxoEntry`]. pub type UtxoSetByScriptPublicKey = HashMap; @@ -16,18 +40,18 @@ pub type BalanceByScriptPublicKey = HashMap; // Note: memory optimization compared to go-lang kaspad: // Unlike `consensus_core::tx::UtxoEntry` the utxoindex utilizes a compacted utxo form, where `script_public_key` field is removed. // This utxo structure can be utilized in the utxoindex, since utxos are implicitly key'd via its script public key (and outpoint) at all times. +// furthermore, the daa_score is also added to the key (for range queries) and thus is removed from the value as well. This results in a more compact representation of the utxo entry, which is more suitable for storage in the utxoindex. /// A compacted form of [`UtxoEntry`] without reference to [`ScriptPublicKey`] or [`TransactionOutpoint`] #[derive(Clone, Copy, Deserialize, Serialize, Debug)] pub struct CompactUtxoEntry { pub amount: u64, - pub block_daa_score: u64, pub is_coinbase: bool, } impl CompactUtxoEntry { /// Creates a new [`CompactUtxoEntry`] - pub fn new(amount: u64, block_daa_score: u64, is_coinbase: bool) -> Self { - Self { amount, block_daa_score, is_coinbase } + pub fn new(amount: u64, is_coinbase: bool) -> Self { + Self { amount, is_coinbase } } } @@ -35,7 +59,19 @@ impl MemSizeEstimator for CompactUtxoEntry {} impl From for CompactUtxoEntry { fn from(utxo_entry: UtxoEntry) -> Self { - Self { amount: utxo_entry.amount, block_daa_score: utxo_entry.block_daa_score, is_coinbase: utxo_entry.is_coinbase } + Self { amount: utxo_entry.amount, is_coinbase: utxo_entry.is_coinbase } + } +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct UtxoEntryKeyData { + pub daa_score: u64, + pub transaction_outpoint: TransactionOutpoint, +} + +impl UtxoEntryKeyData { + pub fn new(daa_score: u64, transaction_outpoint: TransactionOutpoint) -> Self { + Self { daa_score, transaction_outpoint } } } diff --git a/indexes/processor/src/processor.rs b/indexes/processor/src/processor.rs index 1b9586c85a..36a96dd3e3 100644 --- a/indexes/processor/src/processor.rs +++ b/indexes/processor/src/processor.rs @@ -191,15 +191,15 @@ mod tests { Notification::UtxosChanged(utxo_changed_notification) => { let mut notification_utxo_added_count = 0; for (script_public_key, compact_utxo_collection) in utxo_changed_notification.added.iter() { - for (transaction_outpoint, compact_utxo) in compact_utxo_collection.iter() { + for (utxo_entry_key_data, compact_utxo) in compact_utxo_collection.iter() { let test_utxo = test_notification .accumulated_utxo_diff .add - .get(transaction_outpoint) + .get(&utxo_entry_key_data.transaction_outpoint) .expect("expected transaction outpoint to be in test event"); assert_eq!(test_utxo.script_public_key, *script_public_key); assert_eq!(test_utxo.amount, compact_utxo.amount); - assert_eq!(test_utxo.block_daa_score, compact_utxo.block_daa_score); + assert_eq!(test_utxo.block_daa_score, utxo_entry_key_data.daa_score); assert_eq!(test_utxo.is_coinbase, compact_utxo.is_coinbase); notification_utxo_added_count += 1; } @@ -208,15 +208,15 @@ mod tests { let mut notification_utxo_removed_count = 0; for (script_public_key, compact_utxo_collection) in utxo_changed_notification.removed.iter() { - for (transaction_outpoint, compact_utxo) in compact_utxo_collection.iter() { + for (utxo_entry_key_data, compact_utxo) in compact_utxo_collection.iter() { let test_utxo = test_notification .accumulated_utxo_diff .remove - .get(transaction_outpoint) + .get(&utxo_entry_key_data.transaction_outpoint) .expect("expected transaction outpoint to be in test event"); assert_eq!(test_utxo.script_public_key, *script_public_key); assert_eq!(test_utxo.amount, compact_utxo.amount); - assert_eq!(test_utxo.block_daa_score, compact_utxo.block_daa_score); + assert_eq!(test_utxo.block_daa_score, utxo_entry_key_data.daa_score); assert_eq!(test_utxo.is_coinbase, compact_utxo.is_coinbase); notification_utxo_removed_count += 1; } diff --git a/indexes/utxoindex/src/core/api/mod.rs b/indexes/utxoindex/src/core/api/mod.rs index 4d2215e15f..2c1eadb4cd 100644 --- a/indexes/utxoindex/src/core/api/mod.rs +++ b/indexes/utxoindex/src/core/api/mod.rs @@ -1,12 +1,12 @@ use kaspa_consensus_core::{ BlockHashSet, - tx::{ScriptPublicKeys, TransactionOutpoint}, + tx::{ScriptPublicKey, ScriptPublicKeys, TransactionOutpoint}, utxo::utxo_diff::UtxoDiff, }; use kaspa_consensusmanager::spawn_blocking; use kaspa_database::prelude::StoreResult; use kaspa_hashes::Hash; -use kaspa_index_core::indexed_utxos::BalanceByScriptPublicKey; +use kaspa_index_core::indexed_utxos::{BalanceByScriptPublicKey, OrderedUtxoSetByScriptPublicKeyPage}; use parking_lot::RwLock; use std::{collections::HashSet, fmt::Debug, sync::Arc}; @@ -27,6 +27,17 @@ pub trait UtxoIndexApi: Send + Sync + Debug { /// Note: Use a read lock when accessing this method fn get_utxos_by_script_public_keys(&self, script_public_keys: ScriptPublicKeys) -> StoreResult; + /// Retrieve ordered UTXOs for multiple script public keys with cursor pagination. + fn get_utxos_by_script_public_keys_by_daa_score_page( + &self, + script_public_keys: ScriptPublicKeys, + from_daa_score: Option, + to_daa_score: Option, + start_script_public_key: Option, + start_daa_score: Option, + limit: Option, + ) -> StoreResult; + fn get_balance_by_script_public_keys(&self, script_public_keys: ScriptPublicKeys) -> StoreResult; // This can have a big memory footprint, so it should be used only for tests. @@ -41,7 +52,7 @@ pub trait UtxoIndexApi: Send + Sync + Debug { /// /// Note: /// 1) Use a read lock when accessing this method - /// 2) due to potential sync-gaps is_synced is unreliable while consensus is actively resolving virtual states. + /// 2) due to potential sync-gaps is_synced is unreliable while consensus is actively resolving virtual states. fn is_synced(&self) -> UtxoIndexResult; /// Update the utxoindex with the given utxo_diff, and tips. @@ -74,6 +85,29 @@ impl UtxoIndexProxy { spawn_blocking(move || self.inner.read().get_utxos_by_script_public_keys(script_public_keys)).await.unwrap() } + pub async fn get_utxos_by_script_public_keys_by_daa_score_page( + self, + script_public_keys: ScriptPublicKeys, + from_daa_score: Option, + to_daa_score: Option, + start_script_public_key: Option, + start_daa_score: Option, + limit: Option, + ) -> StoreResult { + spawn_blocking(move || { + self.inner.read().get_utxos_by_script_public_keys_by_daa_score_page( + script_public_keys, + from_daa_score, + to_daa_score, + start_script_public_key, + start_daa_score, + limit, + ) + }) + .await + .unwrap() + } + pub async fn get_balance_by_script_public_keys( self, script_public_keys: ScriptPublicKeys, diff --git a/indexes/utxoindex/src/index.rs b/indexes/utxoindex/src/index.rs index 200d115562..ac8d27f0be 100644 --- a/indexes/utxoindex/src/index.rs +++ b/indexes/utxoindex/src/index.rs @@ -6,12 +6,16 @@ use crate::{ stores::store_manager::Store, update_container::UtxoIndexChanges, }; -use kaspa_consensus_core::{BlockHashSet, tx::ScriptPublicKeys, utxo::utxo_diff::UtxoDiff}; +use kaspa_consensus_core::{ + BlockHashSet, + tx::{ScriptPublicKey, ScriptPublicKeys}, + utxo::utxo_diff::UtxoDiff, +}; use kaspa_consensusmanager::{ConsensusManager, ConsensusResetHandler}; use kaspa_core::{info, trace}; use kaspa_database::prelude::{DB, StoreError, StoreResult}; use kaspa_hashes::Hash; -use kaspa_index_core::indexed_utxos::BalanceByScriptPublicKey; +use kaspa_index_core::indexed_utxos::{BalanceByScriptPublicKey, OrderedUtxoSetByScriptPublicKeyPage}; use kaspa_utils::arc::ArcExtensions; use parking_lot::RwLock; use std::{ @@ -34,6 +38,10 @@ pub struct UtxoIndex { } impl UtxoIndex { + pub fn has_legacy_db_version(db: Arc) -> UtxoIndexResult { + Store::new(db).has_legacy_db_version() + } + /// Creates a new [`UtxoIndex`] within a [`RwLock`] pub fn new(consensus_manager: Arc, db: Arc) -> UtxoIndexResult>> { let mut utxoindex = @@ -61,7 +69,27 @@ impl UtxoIndexApi for UtxoIndex { fn get_utxos_by_script_public_keys(&self, script_public_keys: ScriptPublicKeys) -> StoreResult { trace!("[{0}] retrieving utxos from {1} script public keys", IDENT, script_public_keys.len()); - self.store.get_utxos_by_script_public_key(script_public_keys) + self.store.get_utxos_by_script_public_keys(script_public_keys) + } + + fn get_utxos_by_script_public_keys_by_daa_score_page( + &self, + script_public_keys: ScriptPublicKeys, + from_daa_score: Option, + to_daa_score: Option, + start_script_public_key: Option, + start_daa_score: Option, + limit: Option, + ) -> StoreResult { + trace!("[{0}] retrieving utxos by daa-score range for {1} script public keys (paged)", IDENT, script_public_keys.len()); + self.store.get_utxos_by_script_public_keys_by_daa_score_page( + script_public_keys, + from_daa_score, + to_daa_score, + start_script_public_key, + start_daa_score, + limit, + ) } /// Retrieve utxos by script public keys from the utxoindex db. @@ -186,6 +214,7 @@ impl UtxoIndexApi for UtxoIndex { trace!("[{0}] committing consensus tips {consensus_tips:?} from consensus db", IDENT); self.store.set_tips(consensus_tips, true)?; + self.store.ensure_db_version_current()?; Ok(()) } @@ -222,7 +251,12 @@ impl ConsensusResetHandler for UtxoIndexConsensusResetHandler { #[cfg(test)] mod tests { - use crate::{UtxoIndex, api::UtxoIndexApi, model::CirculatingSupply, testutils::virtual_change_emulator::VirtualChangeEmulator}; + use crate::{ + UtxoIndex, + api::UtxoIndexApi, + model::{CirculatingSupply, UtxoEntryKeyData}, + testutils::virtual_change_emulator::VirtualChangeEmulator, + }; use kaspa_consensus::{ config::Config, consensus::test_consensus::TestConsensus, @@ -296,10 +330,12 @@ mod tests { .get_utxos_by_script_public_keys(HashSet::from_iter(vec![utxo_entry.script_public_key.clone()])) .expect("expected script public key to be in database"); for (indexed_script_public_key, indexed_compact_utxo_collection) in indexed_utxos.into_iter() { - let compact_utxo = indexed_compact_utxo_collection.get(&tx_outpoint).expect("expected outpoint as key"); + let utxo_entry_key_data = UtxoEntryKeyData::new(utxo_entry.block_daa_score, tx_outpoint); + let compact_utxo = + indexed_compact_utxo_collection.get(&utxo_entry_key_data).expect("expected utxo entry key data as key"); assert_eq!(indexed_script_public_key, utxo_entry.script_public_key); assert_eq!(utxo_entry.amount, compact_utxo.amount); - assert_eq!(utxo_entry.block_daa_score, compact_utxo.block_daa_score); + assert_eq!(utxo_entry.block_daa_score, utxo_entry_key_data.daa_score); assert_eq!(utxo_entry.is_coinbase, compact_utxo.is_coinbase); i += 1; } @@ -327,11 +363,15 @@ mod tests { let mut i = 0; for (script_public_key, compact_utxo_collection) in utxo_changes.added.iter() { - for (tx_outpoint, compact_utxo_entry) in compact_utxo_collection.iter() { - let utxo_entry = virtual_change_emulator.accumulated_utxo_diff.add.get(tx_outpoint).expect("expected utxo_entry"); + for (utxo_entry_key_data, compact_utxo_entry) in compact_utxo_collection.iter() { + let utxo_entry = virtual_change_emulator + .accumulated_utxo_diff + .add + .get(&utxo_entry_key_data.transaction_outpoint) + .expect("expected utxo_entry"); assert_eq!(*script_public_key, utxo_entry.script_public_key); assert_eq!(compact_utxo_entry.amount, utxo_entry.amount); - assert_eq!(compact_utxo_entry.block_daa_score, utxo_entry.block_daa_score); + assert_eq!(utxo_entry_key_data.daa_score, utxo_entry.block_daa_score); assert_eq!(compact_utxo_entry.is_coinbase, utxo_entry.is_coinbase); i += 1; } @@ -341,12 +381,16 @@ mod tests { i = 0; for (script_public_key, compact_utxo_collection) in utxo_changes.removed.iter() { - for (tx_outpoint, compact_utxo_entry) in compact_utxo_collection.iter() { - assert!(virtual_change_emulator.accumulated_utxo_diff.remove.contains_key(tx_outpoint)); - let utxo_entry = virtual_change_emulator.accumulated_utxo_diff.remove.get(tx_outpoint).expect("expected utxo_entry"); + for (utxo_entry_key_data, compact_utxo_entry) in compact_utxo_collection.iter() { + assert!(virtual_change_emulator.accumulated_utxo_diff.remove.contains_key(&utxo_entry_key_data.transaction_outpoint)); + let utxo_entry = virtual_change_emulator + .accumulated_utxo_diff + .remove + .get(&utxo_entry_key_data.transaction_outpoint) + .expect("expected utxo_entry"); assert_eq!(*script_public_key, utxo_entry.script_public_key); assert_eq!(compact_utxo_entry.amount, utxo_entry.amount); - assert_eq!(compact_utxo_entry.block_daa_score, utxo_entry.block_daa_score); + assert_eq!(utxo_entry_key_data.daa_score, utxo_entry.block_daa_score); assert_eq!(compact_utxo_entry.is_coinbase, utxo_entry.is_coinbase); i += 1; } @@ -375,10 +419,12 @@ mod tests { .get_utxos_by_script_public_keys(HashSet::from_iter(vec![utxo_entry.script_public_key.clone()])) .expect("expected script public key to be in database"); for (indexed_script_public_key, indexed_compact_utxo_collection) in indexed_utxos.into_iter() { - let compact_utxo = indexed_compact_utxo_collection.get(&tx_outpoint).expect("expected outpoint as key"); + let utxo_entry_key_data = UtxoEntryKeyData::new(utxo_entry.block_daa_score, tx_outpoint); + let compact_utxo = + indexed_compact_utxo_collection.get(&utxo_entry_key_data).expect("expected utxo entry key data as key"); assert_eq!(indexed_script_public_key, utxo_entry.script_public_key); assert_eq!(utxo_entry.amount, compact_utxo.amount); - assert_eq!(utxo_entry.block_daa_score, compact_utxo.block_daa_score); + assert_eq!(utxo_entry.block_daa_score, utxo_entry_key_data.daa_score); assert_eq!(utxo_entry.is_coinbase, compact_utxo.is_coinbase); i += 1; } diff --git a/indexes/utxoindex/src/stores/db_version.rs b/indexes/utxoindex/src/stores/db_version.rs new file mode 100644 index 0000000000..c45c84c1ef --- /dev/null +++ b/indexes/utxoindex/src/stores/db_version.rs @@ -0,0 +1,43 @@ +use std::sync::Arc; + +use kaspa_database::{ + prelude::{CachedDbItem, DB, DirectDbWriter, StoreResult}, + registry::DatabaseStorePrefixes, +}; + +pub trait UtxoIndexDbVersionStoreReader { + fn get(&self) -> StoreResult; +} + +pub trait UtxoIndexDbVersionStore: UtxoIndexDbVersionStoreReader { + fn set(&mut self, version: u16) -> StoreResult<()>; + fn remove(&mut self) -> StoreResult<()>; +} + +#[derive(Clone)] +pub struct DbUtxoIndexDbVersionStore { + db: Arc, + access: CachedDbItem, +} + +impl DbUtxoIndexDbVersionStore { + pub fn new(db: Arc) -> Self { + Self { db: Arc::clone(&db), access: CachedDbItem::new(db, DatabaseStorePrefixes::UtxoIndexDbVersion.into()) } + } +} + +impl UtxoIndexDbVersionStoreReader for DbUtxoIndexDbVersionStore { + fn get(&self) -> StoreResult { + self.access.read() + } +} + +impl UtxoIndexDbVersionStore for DbUtxoIndexDbVersionStore { + fn set(&mut self, version: u16) -> StoreResult<()> { + self.access.write(DirectDbWriter::new(&self.db), &version) + } + + fn remove(&mut self) -> StoreResult<()> { + self.access.remove(DirectDbWriter::new(&self.db)) + } +} diff --git a/indexes/utxoindex/src/stores/indexed_utxos.rs b/indexes/utxoindex/src/stores/indexed_utxos.rs index c736caf6bb..9fb4422c46 100644 --- a/indexes/utxoindex/src/stores/indexed_utxos.rs +++ b/indexes/utxoindex/src/stores/indexed_utxos.rs @@ -1,4 +1,4 @@ -use crate::core::model::{CompactUtxoCollection, CompactUtxoEntry, UtxoSetByScriptPublicKey}; +use crate::core::model::{CompactUtxoCollection, CompactUtxoEntry, OrderedUtxoSetByScriptPublicKeyPage, UtxoSetByScriptPublicKey}; use kaspa_consensus_core::tx::{ ScriptPublicKey, ScriptPublicKeyVersion, ScriptPublicKeys, ScriptVec, TransactionIndexType, TransactionOutpoint, @@ -7,7 +7,7 @@ use kaspa_core::debug; use kaspa_database::prelude::{CachePolicy, CachedDbAccess, DB, DirectDbWriter, StoreResult}; use kaspa_database::registry::DatabaseStorePrefixes; use kaspa_hashes::Hash; -use kaspa_index_core::indexed_utxos::BalanceByScriptPublicKey; +use kaspa_index_core::indexed_utxos::{BalanceByScriptPublicKey, UtxoEntryKeyData}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::fmt::Display; @@ -63,6 +63,10 @@ pub const TRANSACTION_OUTPOINT_KEY_SIZE: usize = kaspa_hashes::HASH_SIZE + size_ #[derive(Eq, Hash, PartialEq, Debug, Copy, Clone)] struct TransactionOutpointKey([u8; TRANSACTION_OUTPOINT_KEY_SIZE]); +impl TransactionOutpointKey { + pub const EMPTY: Self = TransactionOutpointKey([0; TRANSACTION_OUTPOINT_KEY_SIZE]); +} + impl From for TransactionOutpoint { fn from(key: TransactionOutpointKey) -> Self { let transaction_id = Hash::from_slice(&key.0[..kaspa_hashes::HASH_SIZE]); @@ -88,8 +92,58 @@ impl AsRef<[u8]> for TransactionOutpointKey { } } +pub const DAA_SCORE_KEY_SIZE: usize = size_of::(); + +struct DaaScoreKey([u8; DAA_SCORE_KEY_SIZE]); + +impl From for DaaScoreKey { + fn from(daa_score: u64) -> Self { + DaaScoreKey(daa_score.to_be_bytes()) + } +} + +impl AsRef<[u8]> for DaaScoreKey { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +#[derive(Eq, Hash, PartialEq, Debug, Copy, Clone)] +struct UtxoEntryInnerKey([u8; DAA_SCORE_KEY_SIZE + TRANSACTION_OUTPOINT_KEY_SIZE]); + +impl From<(u64, TransactionOutpoint)> for UtxoEntryInnerKey { + fn from(value: (u64, TransactionOutpoint)) -> Self { + let mut bytes = [0; DAA_SCORE_KEY_SIZE + TRANSACTION_OUTPOINT_KEY_SIZE]; + bytes[..DAA_SCORE_KEY_SIZE].copy_from_slice(&value.0.to_be_bytes()); + bytes[DAA_SCORE_KEY_SIZE..].copy_from_slice(&TransactionOutpointKey::from(&value.1).0); + UtxoEntryInnerKey(bytes) + } +} + +impl From for UtxoEntryKeyData { + fn from(key: UtxoEntryInnerKey) -> Self { + let daa_score = u64::from_be_bytes(key.0[..DAA_SCORE_KEY_SIZE].try_into().unwrap()); + let transaction_outpoint = TransactionOutpoint::from(TransactionOutpointKey(key.0[DAA_SCORE_KEY_SIZE..].try_into().unwrap())); + UtxoEntryKeyData { daa_score, transaction_outpoint } + } +} + +impl From for (u64, TransactionOutpoint) { + fn from(key: UtxoEntryInnerKey) -> Self { + let daa_score = u64::from_be_bytes(key.0[..DAA_SCORE_KEY_SIZE].try_into().unwrap()); + let transaction_outpoint = TransactionOutpoint::from(TransactionOutpointKey(key.0[DAA_SCORE_KEY_SIZE..].try_into().unwrap())); + (daa_score, transaction_outpoint) + } +} + +impl AsRef<[u8]> for UtxoEntryInnerKey { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + /// Full [CompactUtxoEntry] access key. -/// Consists of variable amount of bytes of [ScriptPublicKeyBucket], and 36 bytes of [TransactionOutpointKey] +/// Consists of variable amount of bytes of [ScriptPublicKeyBucket], followed by [DaaScoreKey], and [TransactionOutpointKey]. #[derive(Eq, Hash, PartialEq, Debug, Clone, Serialize, Deserialize)] struct UtxoEntryFullAccessKey(Arc>); @@ -101,9 +155,23 @@ impl Display for UtxoEntryFullAccessKey { impl UtxoEntryFullAccessKey { /// Creates a new [UtxoEntryFullAccessKey] from a [ScriptPublicKeyBucket] and [TransactionOutpointKey]. - pub fn new(script_public_key_bucket: ScriptPublicKeyBucket, transaction_outpoint_key: TransactionOutpointKey) -> Self { - let mut bytes = Vec::with_capacity(script_public_key_bucket.as_ref().len() + TRANSACTION_OUTPOINT_KEY_SIZE); + pub fn new( + script_public_key_bucket: ScriptPublicKeyBucket, + daa_score_key: DaaScoreKey, + transaction_outpoint_key: TransactionOutpointKey, + ) -> Self { + let mut bytes = + Vec::with_capacity(DAA_SCORE_KEY_SIZE + TRANSACTION_OUTPOINT_KEY_SIZE + script_public_key_bucket.as_ref().len()); bytes.extend_from_slice(script_public_key_bucket.as_ref()); + bytes.extend_from_slice(daa_score_key.as_ref()); + bytes.extend_from_slice(transaction_outpoint_key.as_ref()); + Self(Arc::new(bytes)) + } + + /// Creates a new key from inner components (DAA score + outpoint) for bucket-scoped seeks. + pub fn new_inner(daa_score_key: DaaScoreKey, transaction_outpoint_key: TransactionOutpointKey) -> Self { + let mut bytes = Vec::with_capacity(DAA_SCORE_KEY_SIZE + TRANSACTION_OUTPOINT_KEY_SIZE); + bytes.extend_from_slice(daa_score_key.as_ref()); bytes.extend_from_slice(transaction_outpoint_key.as_ref()); Self(Arc::new(bytes)) } @@ -124,6 +192,16 @@ impl AsRef<[u8]> for UtxoEntryFullAccessKey { pub trait UtxoSetByScriptPublicKeyStoreReader { /// Get [UtxoSetByScriptPublicKey] set by queried [ScriptPublicKeys], fn get_utxos_from_script_public_keys(&self, script_public_keys: ScriptPublicKeys) -> StoreResult; + /// Get ordered UTXOs for multiple script public keys, bounded by an inclusive DAA-score range, with cursor pagination. + fn get_utxos_from_script_public_keys_by_daa_score_page( + &self, + script_public_keys: ScriptPublicKeys, + from_daa_score: Option, + to_daa_score: Option, + start_script_public_key: Option, + start_daa_score: Option, + limit: Option, + ) -> StoreResult; fn get_balance_from_script_public_keys(&self, script_public_keys: ScriptPublicKeys) -> StoreResult; fn get_all_outpoints(&self) -> StoreResult>; // This can have a big memory footprint, so it should be used only for tests. } @@ -159,14 +237,18 @@ impl UtxoSetByScriptPublicKeyStoreReader for DbUtxoSetByScriptPublicKeyStore { // to the rpc via pagination, this would alleviate the memory footprint of script public keys with large amount of utxos. fn get_utxos_from_script_public_keys(&self, script_public_keys: ScriptPublicKeys) -> StoreResult { let script_count = script_public_keys.len(); - let mut entries_count: usize = 0; let mut utxos_by_script_public_keys = UtxoSetByScriptPublicKey::new(); + let mut entries_count: usize = 0; for script_public_key in script_public_keys.into_iter() { let script_public_key_bucket = ScriptPublicKeyBucket::from(&script_public_key); let utxos_by_script_public_keys_inner = CompactUtxoCollection::from_iter( - self.access.seek_iterator(Some(script_public_key_bucket.as_ref()), None, usize::MAX, false).map(|res| { - let (key, entry) = res.unwrap(); - (TransactionOutpointKey(<[u8; TRANSACTION_OUTPOINT_KEY_SIZE]>::try_from(&key[..]).unwrap()).into(), entry) + self.access.seek_iterator(Some(script_public_key_bucket.as_ref()), None, None, usize::MAX, false).map(|res| { + let (key, value) = res.unwrap(); + ( + UtxoEntryInnerKey(<[u8; DAA_SCORE_KEY_SIZE + TRANSACTION_OUTPOINT_KEY_SIZE]>::try_from(&key[..]).unwrap()) + .into(), + value, + ) }), ); entries_count += utxos_by_script_public_keys_inner.len(); @@ -176,6 +258,113 @@ impl UtxoSetByScriptPublicKeyStoreReader for DbUtxoSetByScriptPublicKeyStore { Ok(utxos_by_script_public_keys) } + fn get_utxos_from_script_public_keys_by_daa_score_page( + &self, + script_public_keys: ScriptPublicKeys, + from_daa_score: Option, + to_daa_score: Option, + start_script_public_key: Option, + start_daa_score: Option, + limit: Option, + ) -> StoreResult { + let from_daa_score = from_daa_score.unwrap_or(0); + let to_daa_score = to_daa_score.unwrap_or(u64::MAX); + if from_daa_score > to_daa_score { + return Ok(OrderedUtxoSetByScriptPublicKeyPage::default()); + } + + let script_count = script_public_keys.len(); + + let mut script_public_keys = script_public_keys.into_iter().collect::>(); + script_public_keys.sort_by(|a, b| a.version().cmp(&b.version()).then_with(|| a.script().cmp(b.script()))); + + if script_public_keys.is_empty() { + return Ok(OrderedUtxoSetByScriptPublicKeyPage::default()); + } + + let start_index = if let Some(start_script_public_key) = &start_script_public_key { + script_public_keys + .iter() + .position(|script_public_key| script_public_key == start_script_public_key) + .unwrap_or(script_public_keys.len()) + } else { + 0 + }; + + let mut entries_count: u64 = 0; + let mut stop_after_group = false; + let mut should_stop = false; + let mut current_group: Option<(usize, u64)> = None; + let mut next_script_public_key: Option = None; + let mut next_daa_score: Option = None; + let mut utxos_by_script_public_keys = Vec::with_capacity(script_public_keys.len()); + + for (index, script_public_key) in script_public_keys.into_iter().enumerate().skip(start_index) { + // Required for the first address only, resume from the cursor DAA score (if provided), + // but never below the global from_daa_score; later addresses use from_daa_score. + let effective_from_daa_score = if index == start_index { + start_daa_score.map(|start| start.max(from_daa_score)).unwrap_or(from_daa_score) + } else { + from_daa_score + }; + + if effective_from_daa_score > to_daa_score { + continue; + } + + let script_public_key_bucket = ScriptPublicKeyBucket::from(&script_public_key); + let seek_from = UtxoEntryFullAccessKey::new_inner(effective_from_daa_score.into(), TransactionOutpointKey::EMPTY); + + let seek_to = (to_daa_score < u64::MAX) + .then(|| UtxoEntryFullAccessKey::new_inner((to_daa_score + 1).into(), TransactionOutpointKey::EMPTY)); + + let mut ordered_entries = Vec::new(); + for res in self.access.seek_iterator(Some(script_public_key_bucket.as_ref()), Some(seek_from), seek_to, usize::MAX, false) + { + let (key, value) = res.unwrap(); + let key_data: UtxoEntryKeyData = + UtxoEntryInnerKey(<[u8; DAA_SCORE_KEY_SIZE + TRANSACTION_OUTPOINT_KEY_SIZE]>::try_from(&key[..]).unwrap()).into(); + let daa_score = key_data.daa_score; + + if let Some((current_index, current_daa_score)) = current_group { + if current_index != index || current_daa_score != daa_score { + if stop_after_group { + next_script_public_key = Some(script_public_key.clone()); + next_daa_score = Some(daa_score); + should_stop = true; + break; + } + current_group = Some((index, daa_score)); + } + } else { + current_group = Some((index, daa_score)); + } + + ordered_entries.push((key_data, value)); + entries_count += 1; + + if matches!(limit, Some(limit) if entries_count >= limit) { + stop_after_group = true; + } + } + + if !ordered_entries.is_empty() { + utxos_by_script_public_keys.push((script_public_key, ordered_entries)); + } + + if should_stop { + break; + } + } + + debug!( + "IDXPRC, Executed a DAA-range paged query for the utxo set of {} script public keys yielding {} entries", + script_count, entries_count + ); + + Ok(OrderedUtxoSetByScriptPublicKeyPage::new(utxos_by_script_public_keys, next_script_public_key, next_daa_score)) + } + fn get_balance_from_script_public_keys(&self, script_public_keys: ScriptPublicKeys) -> StoreResult { let script_count = script_public_keys.len(); let mut entries_count: usize = 0; @@ -184,7 +373,7 @@ impl UtxoSetByScriptPublicKeyStoreReader for DbUtxoSetByScriptPublicKeyStore { let script_public_key_bucket = ScriptPublicKeyBucket::from(&script_public_key); let balance: u64 = self .access - .seek_iterator(Some(script_public_key_bucket.as_ref()), None, usize::MAX, false) + .seek_iterator(Some(script_public_key_bucket.as_ref()), None, None, usize::MAX, false) .map(|res| { entries_count += 1; let (_, entry) = res.unwrap(); @@ -214,10 +403,11 @@ impl UtxoSetByScriptPublicKeyStore for DbUtxoSetByScriptPublicKeyStore { let mut writer = DirectDbWriter::new(&self.db); let mut to_remove = utxo_entries.iter().flat_map(move |(script_public_key, compact_utxo_collection)| { - compact_utxo_collection.keys().map(move |transaction_outpoint| { + compact_utxo_collection.keys().map(move |utxo_entry_key_data| { UtxoEntryFullAccessKey::new( ScriptPublicKeyBucket::from(script_public_key), - TransactionOutpointKey::from(transaction_outpoint), + DaaScoreKey::from(utxo_entry_key_data.daa_score), + TransactionOutpointKey::from(&utxo_entry_key_data.transaction_outpoint), ) }) }); @@ -235,11 +425,12 @@ impl UtxoSetByScriptPublicKeyStore for DbUtxoSetByScriptPublicKeyStore { let mut writer = DirectDbWriter::new(&self.db); let mut to_add = utxo_entries.iter().flat_map(move |(script_public_key, compact_utxo_collection)| { - compact_utxo_collection.iter().map(move |(transaction_outpoint, compact_utxo)| { + compact_utxo_collection.iter().map(move |(utxo_entry_key_data, compact_utxo)| { ( UtxoEntryFullAccessKey::new( ScriptPublicKeyBucket::from(script_public_key), - TransactionOutpointKey::from(transaction_outpoint), + DaaScoreKey::from(utxo_entry_key_data.daa_score), + TransactionOutpointKey::from(&utxo_entry_key_data.transaction_outpoint), ), *compact_utxo, ) @@ -256,3 +447,168 @@ impl UtxoSetByScriptPublicKeyStore for DbUtxoSetByScriptPublicKeyStore { self.access.delete_all(DirectDbWriter::new(&self.db)) } } + +#[cfg(test)] +mod tests { + use super::*; + use kaspa_database::{create_temp_db, prelude::ConnBuilder}; + + fn create_outpoint(word: u64, index: u32) -> TransactionOutpoint { + TransactionOutpoint::new(Hash::from_u64_word(word), index) + } + + #[test] + fn test_page_ordered_and_range_filtered() { + let (_db_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10)); + let mut store = DbUtxoSetByScriptPublicKeyStore::new(db, CachePolicy::Empty); + + // Intentionally unsorted by script bytes so output ordering can be asserted. + let script_public_key_a = ScriptPublicKey::from_vec(0, vec![0x02]); + let script_public_key_b = ScriptPublicKey::from_vec(0, vec![0x01]); + + let mut to_add = UtxoSetByScriptPublicKey::new(); + to_add.insert( + script_public_key_a.clone(), + CompactUtxoCollection::from_iter([ + (UtxoEntryKeyData::new(10, create_outpoint(10, 0)), CompactUtxoEntry::new(100, false)), + (UtxoEntryKeyData::new(20, create_outpoint(20, 0)), CompactUtxoEntry::new(200, false)), + ]), + ); + to_add.insert( + script_public_key_b.clone(), + CompactUtxoCollection::from_iter([ + (UtxoEntryKeyData::new(15, create_outpoint(15, 0)), CompactUtxoEntry::new(150, false)), + (UtxoEntryKeyData::new(25, create_outpoint(25, 0)), CompactUtxoEntry::new(250, false)), + ]), + ); + + store.add_utxo_entries(&to_add).unwrap(); + + let page = store + .get_utxos_from_script_public_keys_by_daa_score_page( + ScriptPublicKeys::from_iter([script_public_key_a.clone(), script_public_key_b.clone()]), + Some(12), + Some(22), + None, + None, + None, + ) + .unwrap(); + + let ordered = page.entries; + assert!(page.next_script_public_key.is_none()); + assert!(page.next_daa_score.is_none()); + + assert_eq!(ordered.len(), 2); + assert_eq!(ordered[0].0, script_public_key_b); + assert_eq!(ordered[1].0, script_public_key_a); + + assert_eq!(ordered[0].1.len(), 1); + assert_eq!(ordered[0].1[0].0.daa_score, 15); + assert_eq!(ordered[1].1.len(), 1); + assert_eq!(ordered[1].1[0].0.daa_score, 20); + + // Sanity check for the non-range API. + let all = store + .get_utxos_from_script_public_keys(ScriptPublicKeys::from_iter([script_public_key_a.clone(), script_public_key_b.clone()])) + .unwrap(); + assert_eq!(all.get(&script_public_key_a).unwrap().len(), 2); + assert_eq!(all.get(&script_public_key_b).unwrap().len(), 2); + } + + #[test] + fn test_page_soft_limit_finishes_group() { + let (_db_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10)); + let mut store = DbUtxoSetByScriptPublicKeyStore::new(db, CachePolicy::Empty); + + let script_public_key_a = ScriptPublicKey::from_vec(0, vec![0x01]); + let script_public_key_b = ScriptPublicKey::from_vec(0, vec![0x02]); + + let mut to_add = UtxoSetByScriptPublicKey::new(); + to_add.insert( + script_public_key_a.clone(), + CompactUtxoCollection::from_iter([ + (UtxoEntryKeyData::new(10, create_outpoint(10, 0)), CompactUtxoEntry::new(100, false)), + (UtxoEntryKeyData::new(10, create_outpoint(10, 1)), CompactUtxoEntry::new(110, false)), + (UtxoEntryKeyData::new(11, create_outpoint(11, 0)), CompactUtxoEntry::new(120, false)), + ]), + ); + to_add.insert( + script_public_key_b.clone(), + CompactUtxoCollection::from_iter([(UtxoEntryKeyData::new(12, create_outpoint(12, 0)), CompactUtxoEntry::new(130, false))]), + ); + + store.add_utxo_entries(&to_add).unwrap(); + + let page = store + .get_utxos_from_script_public_keys_by_daa_score_page( + ScriptPublicKeys::from_iter([script_public_key_b.clone(), script_public_key_a.clone()]), + None, + None, + None, + None, + Some(1), + ) + .unwrap(); + + assert_eq!(page.next_script_public_key, Some(script_public_key_a.clone())); + assert_eq!(page.next_daa_score, Some(11)); + + let ordered = page.entries; + assert_eq!(ordered.len(), 1); + assert_eq!(ordered[0].0, script_public_key_a); + assert_eq!(ordered[0].1.len(), 2); + assert!(ordered[0].1.iter().all(|(key, _)| key.daa_score == 10)); + } + + #[test] + fn test_page_resumes_from_start_daa_score() { + let (_db_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10)); + let mut store = DbUtxoSetByScriptPublicKeyStore::new(db, CachePolicy::Empty); + + let script_public_key_a = ScriptPublicKey::from_vec(0, vec![0x01]); + let script_public_key_b = ScriptPublicKey::from_vec(0, vec![0x02]); + + let mut to_add = UtxoSetByScriptPublicKey::new(); + to_add.insert( + script_public_key_a.clone(), + CompactUtxoCollection::from_iter([ + (UtxoEntryKeyData::new(10, create_outpoint(10, 0)), CompactUtxoEntry::new(100, false)), + (UtxoEntryKeyData::new(20, create_outpoint(20, 0)), CompactUtxoEntry::new(200, false)), + ]), + ); + to_add.insert( + script_public_key_b.clone(), + CompactUtxoCollection::from_iter([ + (UtxoEntryKeyData::new(15, create_outpoint(15, 0)), CompactUtxoEntry::new(150, false)), + (UtxoEntryKeyData::new(25, create_outpoint(25, 0)), CompactUtxoEntry::new(250, false)), + ]), + ); + + store.add_utxo_entries(&to_add).unwrap(); + + let page = store + .get_utxos_from_script_public_keys_by_daa_score_page( + ScriptPublicKeys::from_iter([script_public_key_b.clone(), script_public_key_a.clone()]), + Some(10), + Some(30), + Some(script_public_key_a.clone()), + Some(18), + None, + ) + .unwrap(); + + assert!(page.next_script_public_key.is_none()); + assert!(page.next_daa_score.is_none()); + + let ordered = page.entries; + assert_eq!(ordered.len(), 2); + assert_eq!(ordered[0].0, script_public_key_a); + assert_eq!(ordered[0].1.len(), 1); + assert_eq!(ordered[0].1[0].0.daa_score, 20); + assert_eq!(ordered[1].0, script_public_key_b); + assert_eq!(ordered[1].1.len(), 2); + assert_eq!(ordered[1].1[0].0.daa_score, 15); + assert_eq!(ordered[1].1[1].0.daa_score, 25); + } +} diff --git a/indexes/utxoindex/src/stores/mod.rs b/indexes/utxoindex/src/stores/mod.rs index f7d782ef37..e76735c138 100644 --- a/indexes/utxoindex/src/stores/mod.rs +++ b/indexes/utxoindex/src/stores/mod.rs @@ -1,3 +1,4 @@ +mod db_version; mod indexed_utxos; pub mod store_manager; mod supply; diff --git a/indexes/utxoindex/src/stores/store_manager.rs b/indexes/utxoindex/src/stores/store_manager.rs index da7e070e87..254fc90128 100644 --- a/indexes/utxoindex/src/stores/store_manager.rs +++ b/indexes/utxoindex/src/stores/store_manager.rs @@ -2,26 +2,31 @@ use std::{collections::HashSet, sync::Arc}; use kaspa_consensus_core::{ BlockHashSet, - tx::{ScriptPublicKeys, TransactionOutpoint}, + tx::{ScriptPublicKey, ScriptPublicKeys, TransactionOutpoint}, }; use kaspa_core::trace; use kaspa_database::prelude::{CachePolicy, DB, StoreResult}; -use kaspa_index_core::indexed_utxos::BalanceByScriptPublicKey; +use kaspa_index_core::indexed_utxos::{BalanceByScriptPublicKey, OrderedUtxoSetByScriptPublicKeyPage}; use crate::{ IDENT, + errors::{UtxoIndexError, UtxoIndexResult}, model::UtxoSetByScriptPublicKey, stores::{ + db_version::{DbUtxoIndexDbVersionStore, UtxoIndexDbVersionStore, UtxoIndexDbVersionStoreReader}, indexed_utxos::{DbUtxoSetByScriptPublicKeyStore, UtxoSetByScriptPublicKeyStore, UtxoSetByScriptPublicKeyStoreReader}, supply::{CirculatingSupplyStore, CirculatingSupplyStoreReader, DbCirculatingSupplyStore}, tips::{DbUtxoIndexTipsStore, UtxoIndexTipsStore, UtxoIndexTipsStoreReader}, }, }; +pub const UTXOINDEX_DB_VERSION: u16 = 1; + #[derive(Clone)] pub struct Store { utxoindex_tips_store: DbUtxoIndexTipsStore, circulating_supply_store: DbCirculatingSupplyStore, + db_version_store: DbUtxoIndexDbVersionStore, utxos_by_script_public_key_store: DbUtxoSetByScriptPublicKeyStore, } @@ -30,14 +35,71 @@ impl Store { Self { utxoindex_tips_store: DbUtxoIndexTipsStore::new(db.clone()), circulating_supply_store: DbCirculatingSupplyStore::new(db.clone()), + db_version_store: DbUtxoIndexDbVersionStore::new(db.clone()), utxos_by_script_public_key_store: DbUtxoSetByScriptPublicKeyStore::new(db, CachePolicy::Empty), } } - pub fn get_utxos_by_script_public_key(&self, script_public_keys: ScriptPublicKeys) -> StoreResult { + pub fn has_legacy_db_version(&self) -> UtxoIndexResult { + // Older UTXO index databases may not have a db-version marker at all. + // In that case we need to distinguish between: + // 1) a fresh/empty DB (no rebuild prompt needed), and + // 2) an existing pre-versioned DB (must rebuild before continuing). + // Tips and circulating-supply are cheap sentinel stores that indicate + // whether the UTXO index DB has already been populated. + let tips_exist = match self.utxoindex_tips_store.get() { + Ok(_) => true, + Err(kaspa_database::prelude::StoreError::KeyNotFound(_)) => false, + Err(err) => return Err(UtxoIndexError::StoreAccessError(err)), + }; + + let supply_exists = match self.circulating_supply_store.get() { + Ok(_) => true, + Err(kaspa_database::prelude::StoreError::KeyNotFound(_)) => false, + Err(err) => return Err(UtxoIndexError::StoreAccessError(err)), + }; + + match self.db_version_store.get() { + Ok(version) => Ok(version != UTXOINDEX_DB_VERSION), + Err(kaspa_database::prelude::StoreError::KeyNotFound(_)) => Ok(tips_exist || supply_exists), + Err(err) => Err(UtxoIndexError::StoreAccessError(err)), + } + } + + pub fn ensure_db_version_current(&mut self) -> UtxoIndexResult<()> { + match self.db_version_store.get() { + Ok(version) if version == UTXOINDEX_DB_VERSION => Ok(()), + Ok(_) | Err(kaspa_database::prelude::StoreError::KeyNotFound(_)) => { + self.db_version_store.set(UTXOINDEX_DB_VERSION)?; + Ok(()) + } + Err(err) => Err(UtxoIndexError::StoreAccessError(err)), + } + } + + pub fn get_utxos_by_script_public_keys(&self, script_public_keys: ScriptPublicKeys) -> StoreResult { self.utxos_by_script_public_key_store.get_utxos_from_script_public_keys(script_public_keys) } + pub fn get_utxos_by_script_public_keys_by_daa_score_page( + &self, + script_public_keys: ScriptPublicKeys, + from_daa_score: Option, + to_daa_score: Option, + start_script_public_key: Option, + start_daa_score: Option, + limit: Option, + ) -> StoreResult { + self.utxos_by_script_public_key_store.get_utxos_from_script_public_keys_by_daa_score_page( + script_public_keys, + from_daa_score, + to_daa_score, + start_script_public_key, + start_daa_score, + limit, + ) + } + pub fn get_balance_by_script_public_key(&self, script_public_keys: ScriptPublicKeys) -> StoreResult { self.utxos_by_script_public_key_store.get_balance_from_script_public_keys(script_public_keys) } @@ -114,6 +176,7 @@ impl Store { // Clear all self.utxoindex_tips_store.remove()?; self.circulating_supply_store.remove()?; + self.db_version_store.remove()?; self.utxos_by_script_public_key_store.delete_all()?; trace!("[{0}] clearing utxoindex database - success!", IDENT); diff --git a/indexes/utxoindex/src/update_container.rs b/indexes/utxoindex/src/update_container.rs index 67bfe697e7..e5fed2f3cb 100644 --- a/indexes/utxoindex/src/update_container.rs +++ b/indexes/utxoindex/src/update_container.rs @@ -4,6 +4,7 @@ use kaspa_consensus_core::{ utxo::utxo_diff::UtxoDiff, }; use kaspa_hashes::Hash; +use kaspa_index_core::indexed_utxos::UtxoEntryKeyData; use kaspa_utils::hashmap::NestedHashMapExtensions; use crate::model::{CirculatingSupplyDiff, CompactUtxoEntry, UtxoChanges, UtxoSetByScriptPublicKey}; @@ -34,8 +35,8 @@ impl UtxoIndexChanges { self.utxo_changes.added.insert_into_nested( utxo_entry.script_public_key, - transaction_outpoint, - CompactUtxoEntry::new(utxo_entry.amount, utxo_entry.block_daa_score, utxo_entry.is_coinbase), + UtxoEntryKeyData::new(utxo_entry.block_daa_score, transaction_outpoint), + CompactUtxoEntry::new(utxo_entry.amount, utxo_entry.is_coinbase), ); } @@ -44,8 +45,8 @@ impl UtxoIndexChanges { self.utxo_changes.removed.insert_into_nested( utxo_entry.script_public_key, - transaction_outpoint, - CompactUtxoEntry::new(utxo_entry.amount, utxo_entry.block_daa_score, utxo_entry.is_coinbase), + UtxoEntryKeyData::new(utxo_entry.block_daa_score, transaction_outpoint), + CompactUtxoEntry::new(utxo_entry.amount, utxo_entry.is_coinbase), ); } } @@ -59,8 +60,8 @@ impl UtxoIndexChanges { self.utxo_changes.added.insert_into_nested( utxo_entry.script_public_key, - transaction_outpoint, - CompactUtxoEntry::new(utxo_entry.amount, utxo_entry.block_daa_score, utxo_entry.is_coinbase), + UtxoEntryKeyData::new(utxo_entry.block_daa_score, transaction_outpoint), + CompactUtxoEntry::new(utxo_entry.amount, utxo_entry.is_coinbase), ); } } diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index a31c34e01d..f683cceb58 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -607,14 +607,32 @@ Do you confirm? (y/n)"; let notify_service = Arc::new(NotifyService::new(notification_root.clone(), notification_recv, subscription_context.clone())); let index_service: Option> = if args.utxoindex { // Use only a single thread for none-consensus databases - let utxoindex_db = kaspa_database::prelude::ConnBuilder::default() - .with_db_path(utxoindex_db_dir) + let mut utxoindex_db = kaspa_database::prelude::ConnBuilder::default() + .with_db_path(utxoindex_db_dir.clone()) .with_files_limit(utxo_files_limit) .with_preset(rocksdb_preset) .with_wal_dir(wal_dir.clone()) .with_cache_budget(cache_budget) .build() .unwrap(); + + if UtxoIndex::has_legacy_db_version(utxoindex_db.clone()).expect("failed checking UTXO index db version") { + let msg = "UTXO index database has an older db version and must be rebuilt to continue. Do you confirm rebuilding the UTXO index DB? (y/n)"; + get_user_approval_or_exit(msg, args.yes); + drop(utxoindex_db); + fs::remove_dir_all(utxoindex_db_dir.as_path()).unwrap(); + fs::create_dir_all(utxoindex_db_dir.as_path()).unwrap(); + + utxoindex_db = kaspa_database::prelude::ConnBuilder::default() + .with_db_path(utxoindex_db_dir.clone()) + .with_files_limit(utxo_files_limit) + .with_preset(rocksdb_preset) + .with_wal_dir(wal_dir.clone()) + .with_cache_budget(cache_budget) + .build() + .unwrap(); + } + let utxoindex = UtxoIndexProxy::new(UtxoIndex::new(consensus_manager.clone(), utxoindex_db).unwrap()); let index_service = Arc::new(IndexService::new(¬ify_service.notifier(), subscription_context.clone(), Some(utxoindex))); Some(index_service) diff --git a/rpc/core/src/api/ops.rs b/rpc/core/src/api/ops.rs index 1ad1344185..324c1e5a75 100644 --- a/rpc/core/src/api/ops.rs +++ b/rpc/core/src/api/ops.rs @@ -12,7 +12,7 @@ use workflow_core::enums::Describe; pub const RPC_API_VERSION: u16 = 1; /// API revision. Change in this value denotes /// backwards-compatible changes. -pub const RPC_API_REVISION: u16 = 0; +pub const RPC_API_REVISION: u16 = 1; #[derive(Describe, Clone, Copy, Debug, PartialEq, Eq, Hash, BorshSerialize, BorshDeserialize, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -140,6 +140,10 @@ pub enum RpcApiOps { GetUtxoReturnAddress = 150, /// Get Virtual Chain from Block V2 GetVirtualChainFromBlockV2 = 151, + /// Get a list of UTXOs for multiple addresses with optional DAA-score range filtering. + /// Supports cursor pagination via start_address/start_daa_score and a soft limit that completes + /// the current script public key + DAA-score group before returning next_address/next_daa_score. + GetUtxosByAddressesV2 = 152, } impl RpcApiOps { diff --git a/rpc/core/src/api/rpc.rs b/rpc/core/src/api/rpc.rs index 649e883250..64afd8244b 100644 --- a/rpc/core/src/api/rpc.rs +++ b/rpc/core/src/api/rpc.rs @@ -356,6 +356,30 @@ pub trait RpcApi: Sync + Send + AnySync { request: GetUtxosByAddressesRequest, ) -> RpcResult; + /// Requests UTXOs for multiple addresses, optionally filtered by an inclusive DAA-score range. + /// + /// This call is only available when this node was started with `--utxoindex`. + async fn get_utxos_by_addresses_v2( + &self, + addresses: Vec, + from_daa_score: Option, + to_daa_score: Option, + start_address: Option, + start_daa_score: Option, + limit: Option, + ) -> RpcResult { + self.get_utxos_by_addresses_v2_call( + None, + GetUtxosByAddressesV2Request::new(addresses, from_daa_score, to_daa_score, start_address, start_daa_score, limit), + ) + .await + } + async fn get_utxos_by_addresses_v2_call( + &self, + connection: Option<&DynRpcConnection>, + request: GetUtxosByAddressesV2Request, + ) -> RpcResult; + /// Requests the blue score of the current selected parent of the virtual block. async fn get_sink_blue_score(&self) -> RpcResult { Ok(self.get_sink_blue_score_call(None, GetSinkBlueScoreRequest {}).await?.blue_score) diff --git a/rpc/core/src/convert/utxo.rs b/rpc/core/src/convert/utxo.rs index 5fc09f6902..492d497a3a 100644 --- a/rpc/core/src/convert/utxo.rs +++ b/rpc/core/src/convert/utxo.rs @@ -3,6 +3,7 @@ use crate::RpcUtxoEntry; use crate::RpcUtxosByAddressesEntry; use kaspa_addresses::Prefix; +use kaspa_index_core::indexed_utxos::OrderedUtxoSetByScriptPublicKey; use kaspa_index_core::indexed_utxos::UtxoSetByScriptPublicKey; use kaspa_txscript::extract_script_pub_key_address; @@ -16,10 +17,36 @@ pub fn utxo_set_into_rpc(item: &UtxoSetByScriptPublicKey, prefix: Option let address = prefix.and_then(|x| extract_script_pub_key_address(script_public_key, x).ok()); utxo_collection .iter() - .map(|(outpoint, entry)| RpcUtxosByAddressesEntry { + .map(|(utxo_entry_key_data, entry)| RpcUtxosByAddressesEntry { address: address.clone(), - outpoint: (*outpoint).into(), - utxo_entry: RpcUtxoEntry::new(entry.amount, script_public_key.clone(), entry.block_daa_score, entry.is_coinbase), + outpoint: utxo_entry_key_data.transaction_outpoint.into(), + utxo_entry: RpcUtxoEntry::new( + entry.amount, + script_public_key.clone(), + utxo_entry_key_data.daa_score, + entry.is_coinbase, + ), + }) + .collect::>() + }) + .collect::>() +} + +pub fn ordered_utxo_set_into_rpc(item: &OrderedUtxoSetByScriptPublicKey, prefix: Option) -> Vec { + item.iter() + .flat_map(|(script_public_key, utxo_collection)| { + let address = prefix.and_then(|x| extract_script_pub_key_address(script_public_key, x).ok()); + utxo_collection + .iter() + .map(|(utxo_entry_key_data, entry)| RpcUtxosByAddressesEntry { + address: address.clone(), + outpoint: utxo_entry_key_data.transaction_outpoint.into(), + utxo_entry: RpcUtxoEntry::new( + entry.amount, + script_public_key.clone(), + utxo_entry_key_data.daa_score, + entry.is_coinbase, + ), }) .collect::>() }) diff --git a/rpc/core/src/error.rs b/rpc/core/src/error.rs index 42b033a902..b6cc714732 100644 --- a/rpc/core/src/error.rs +++ b/rpc/core/src/error.rs @@ -60,6 +60,9 @@ pub enum RpcError { #[error("If includeTransactions is set, then includeBlockVerboseData must be set as well.")] InvalidGetBlocksRequest, + #[error("Invalid GetUtxosByAddressesV2 request: {0}")] + InvalidGetUtxosByAddressesV2Request(String), + #[error("Transaction {0} not found")] TransactionNotFound(TransactionId), diff --git a/rpc/core/src/model/message.rs b/rpc/core/src/model/message.rs index ed3f9b43ce..d6ac6a7a32 100644 --- a/rpc/core/src/model/message.rs +++ b/rpc/core/src/model/message.rs @@ -1506,6 +1506,99 @@ impl Deserializer for GetUtxosByAddressesResponse { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetUtxosByAddressesV2Request { + /// Addresses to query UTXOs for. + pub addresses: Vec, + /// Inclusive DAA-score range start; None defaults to 0. + pub from_daa_score: Option, + /// Inclusive DAA-score range end; None defaults to u64::MAX. + pub to_daa_score: Option, + /// Cursor start address; None starts from the first address. + pub start_address: Option, + /// Cursor start DAA score for start_address; None starts from from_daa_score. + pub start_daa_score: Option, + /// Soft cap on entries; None means no limit, and a page may exceed this to finish the current script public key + DAA-score group. + pub limit: Option, +} + +impl GetUtxosByAddressesV2Request { + pub fn new( + addresses: Vec, + from_daa_score: Option, + to_daa_score: Option, + start_address: Option, + start_daa_score: Option, + limit: Option, + ) -> Self { + Self { addresses, from_daa_score, to_daa_score, start_address, start_daa_score, limit } + } +} + +impl Serializer for GetUtxosByAddressesV2Request { + fn serialize(&self, writer: &mut W) -> std::io::Result<()> { + store!(u16, &1, writer)?; + store!(Vec, &self.addresses, writer)?; + store!(Option, &self.from_daa_score, writer)?; + store!(Option, &self.to_daa_score, writer)?; + store!(Option, &self.start_address, writer)?; + store!(Option, &self.start_daa_score, writer)?; + store!(Option, &self.limit, writer)?; + Ok(()) + } +} + +impl Deserializer for GetUtxosByAddressesV2Request { + fn deserialize(reader: &mut R) -> std::io::Result { + let _version = load!(u16, reader)?; + let addresses = load!(Vec, reader)?; + let from_daa_score = load!(Option, reader)?; + let to_daa_score = load!(Option, reader)?; + let start_address = load!(Option, reader)?; + let start_daa_score = load!(Option, reader)?; + let limit = load!(Option, reader)?; + Ok(Self { addresses, from_daa_score, to_daa_score, start_address, start_daa_score, limit }) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct GetUtxosByAddressesV2Response { + /// UTXO entries for the requested addresses and DAA-score range. + pub entries: Vec, + /// Cursor address for the next page; None means there is no next page. + pub next_address: Option, + /// Cursor DAA score for the next page; None means there is no next page. + pub next_daa_score: Option, +} + +impl GetUtxosByAddressesV2Response { + pub fn new(entries: Vec, next_address: Option, next_daa_score: Option) -> Self { + Self { entries, next_address, next_daa_score } + } +} + +impl Serializer for GetUtxosByAddressesV2Response { + fn serialize(&self, writer: &mut W) -> std::io::Result<()> { + store!(u16, &1, writer)?; + serialize!(Vec, &self.entries, writer)?; + store!(Option, &self.next_address, writer)?; + store!(Option, &self.next_daa_score, writer)?; + Ok(()) + } +} + +impl Deserializer for GetUtxosByAddressesV2Response { + fn deserialize(reader: &mut R) -> std::io::Result { + let _version = load!(u16, reader)?; + let entries = deserialize!(Vec, reader)?; + let next_address = load!(Option, reader)?; + let next_daa_score = load!(Option, reader)?; + Ok(Self { entries, next_address, next_daa_score }) + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct BanRequest { diff --git a/rpc/core/src/wasm/message.rs b/rpc/core/src/wasm/message.rs index 9afa292bee..d1b9a3a4a4 100644 --- a/rpc/core/src/wasm/message.rs +++ b/rpc/core/src/wasm/message.rs @@ -1244,6 +1244,74 @@ try_from! ( args: GetUtxosByAddressesResponse, IGetUtxosByAddressesResponse, { Ok(response) }); +declare! { + IGetUtxosByAddressesV2Request, + "IGetUtxosByAddressesV2Request | Address[] | string[]", + r#" + /** + * Requests UTXOs for multiple addresses with optional inclusive DAA-score bounds. + * + * @category Node RPC + */ + export interface IGetUtxosByAddressesV2Request { + addresses : Address[] | string[]; + fromDaaScore? : bigint; + toDaaScore? : bigint; + startAddress? : Address | string; + startDaaScore? : bigint; + limit? : bigint; + } + "#, +} + +try_from! ( args: IGetUtxosByAddressesV2Request, GetUtxosByAddressesV2Request, { + let js_value = JsValue::from(args); + let request = if let Ok(addresses) = Vec::
::try_from(AddressOrStringArrayT::from(js_value.clone())) { + GetUtxosByAddressesV2Request { + addresses, + from_daa_score: None, + to_daa_score: None, + start_address: None, + start_daa_score: None, + limit: None, + } + } else { + from_value::(js_value)? + }; + Ok(request) +}); + +declare! { + IGetUtxosByAddressesV2Response, + r#" + /** + * + * + * @category Node RPC + */ + export interface IGetUtxosByAddressesV2Response { + entries : UtxoEntryReference[]; + nextAddress? : Address | string; + nextDaaScore? : bigint; + } + "#, +} + +try_from! ( args: GetUtxosByAddressesV2Response, IGetUtxosByAddressesV2Response, { + let GetUtxosByAddressesV2Response { entries, next_address, next_daa_score } = args; + let entries = entries.into_iter().map(UtxoEntryReference::from).collect::>(); + let entries = js_sys::Array::from_iter(entries.into_iter().map(JsValue::from)); + let response = IGetUtxosByAddressesV2Response::default(); + response.set("entries", entries.as_ref())?; + if let Some(address) = next_address { + response.set("nextAddress", &JsValue::from(address.to_string()))?; + } + if let Some(daa_score) = next_daa_score { + response.set("nextDaaScore", &js_sys::BigInt::from(daa_score).into())?; + } + Ok(response) +}); + // --- declare! { diff --git a/rpc/grpc/client/src/lib.rs b/rpc/grpc/client/src/lib.rs index a5033e0f99..7e7d0537a7 100644 --- a/rpc/grpc/client/src/lib.rs +++ b/rpc/grpc/client/src/lib.rs @@ -264,6 +264,7 @@ impl RpcApi for GrpcClient { route!(shutdown_call, Shutdown); route!(get_headers_call, GetHeaders); route!(get_utxos_by_addresses_call, GetUtxosByAddresses); + route!(get_utxos_by_addresses_v2_call, GetUtxosByAddressesV2); route!(get_balance_by_address_call, GetBalanceByAddress); route!(get_balances_by_addresses_call, GetBalancesByAddresses); route!(get_sink_blue_score_call, GetSinkBlueScore); diff --git a/rpc/grpc/core/proto/messages.proto b/rpc/grpc/core/proto/messages.proto index af0a91614d..5312e64917 100644 --- a/rpc/grpc/core/proto/messages.proto +++ b/rpc/grpc/core/proto/messages.proto @@ -67,6 +67,7 @@ message KaspadRequest { GetCurrentBlockColorRequestMessage getCurrentBlockColorRequest = 1110; GetUtxoReturnAddressRequestMessage getUtxoReturnAddressRequest = 1112; GetVirtualChainFromBlockV2RequestMessage getVirtualChainFromBlockV2Request = 1114; + GetUtxosByAddressesV2RequestMessage getUtxosByAddressesV2Request = 1116; } } @@ -134,6 +135,7 @@ message KaspadResponse { GetCurrentBlockColorResponseMessage getCurrentBlockColorResponse = 1111; GetUtxoReturnAddressResponseMessage getUtxoReturnAddressResponse = 1113; GetVirtualChainFromBlockV2ResponseMessage getVirtualChainFromBlockV2Response = 1115; + GetUtxosByAddressesV2ResponseMessage getUtxosByAddressesV2Response = 1117; } } diff --git a/rpc/grpc/core/proto/rpc.proto b/rpc/grpc/core/proto/rpc.proto index b9e283959e..2b744f9837 100644 --- a/rpc/grpc/core/proto/rpc.proto +++ b/rpc/grpc/core/proto/rpc.proto @@ -563,6 +563,36 @@ message GetUtxosByAddressesResponseMessage { RPCError error = 1000; } +// GetUtxosByAddressesV2RequestMessage requests UTXOs for multiple addresses, +// optionally bounded by an inclusive DAA-score range. +// +// This call is only available when this kaspad was started with `--utxoindex`. +message GetUtxosByAddressesV2RequestMessage { + // Addresses to query UTXOs for. + repeated string addresses = 1; + // Inclusive DAA-score range start; omitted defaults to 0. + optional uint64 fromDaaScore = 2; + // Inclusive DAA-score range end; omitted defaults to u64::MAX. + optional uint64 toDaaScore = 3; + // Cursor start address; omitted starts from the first address. + optional string startAddress = 4; + // Cursor start DAA score for startAddress; omitted starts from fromDaaScore. + optional uint64 startDaaScore = 5; + // Soft cap on entries; omitted means no limit and a page may exceed this to finish the current address + DAA-score group. + optional uint64 limit = 6; +} + +message GetUtxosByAddressesV2ResponseMessage { + // UTXO entries for the requested addresses and DAA-score range. + repeated RpcUtxosByAddressesEntry entries = 1; + // Cursor address for the next page; omitted means there is no next page. + optional string nextAddress = 2; + // Cursor DAA score for the next page; omitted means there is no next page. + optional uint64 nextDaaScore = 3; + + RPCError error = 1000; +} + // GetBalanceByAddressRequest returns the total balance in unspent transactions towards a given address // // This call is only available when this kaspad was started with `--utxoindex` diff --git a/rpc/grpc/core/src/convert/kaspad.rs b/rpc/grpc/core/src/convert/kaspad.rs index a2505c6d07..b2f18b52d0 100644 --- a/rpc/grpc/core/src/convert/kaspad.rs +++ b/rpc/grpc/core/src/convert/kaspad.rs @@ -65,6 +65,7 @@ pub mod kaspad_request_convert { impl_into_kaspad_request!(GetCurrentBlockColor); impl_into_kaspad_request!(GetUtxoReturnAddress); impl_into_kaspad_request!(GetVirtualChainFromBlockV2); + impl_into_kaspad_request!(GetUtxosByAddressesV2); impl_into_kaspad_request!(NotifyBlockAdded); impl_into_kaspad_request!(NotifyNewBlockTemplate); @@ -204,6 +205,7 @@ pub mod kaspad_response_convert { impl_into_kaspad_response!(GetCurrentBlockColor); impl_into_kaspad_response!(GetUtxoReturnAddress); impl_into_kaspad_response!(GetVirtualChainFromBlockV2); + impl_into_kaspad_response!(GetUtxosByAddressesV2); impl_into_kaspad_notify_response!(NotifyBlockAdded); impl_into_kaspad_notify_response!(NotifyNewBlockTemplate); diff --git a/rpc/grpc/core/src/convert/message.rs b/rpc/grpc/core/src/convert/message.rs index d60acb3c56..e84068a97b 100644 --- a/rpc/grpc/core/src/convert/message.rs +++ b/rpc/grpc/core/src/convert/message.rs @@ -338,6 +338,26 @@ from!(item: RpcResult<&kaspa_rpc_core::GetUtxosByAddressesResponse>, protowire:: Self { entries: item.entries.iter().map(|x| x.into()).collect(), error: None } }); +from!(item: &kaspa_rpc_core::GetUtxosByAddressesV2Request, protowire::GetUtxosByAddressesV2RequestMessage, { + Self { + addresses: item.addresses.iter().map(|x| x.into()).collect(), + from_daa_score: item.from_daa_score, + to_daa_score: item.to_daa_score, + start_address: item.start_address.as_ref().map(|address| address.to_string()), + start_daa_score: item.start_daa_score, + limit: item.limit, + } +}); +from!(item: RpcResult<&kaspa_rpc_core::GetUtxosByAddressesV2Response>, protowire::GetUtxosByAddressesV2ResponseMessage, { + debug!("GRPC, Creating GetUtxosByAddressesV2 message with {} entries", item.entries.len()); + Self { + entries: item.entries.iter().map(|x| x.into()).collect(), + next_address: item.next_address.as_ref().map(|address| address.to_string()), + next_daa_score: item.next_daa_score, + error: None, + } +}); + from!(item: &kaspa_rpc_core::GetBalanceByAddressRequest, protowire::GetBalanceByAddressRequestMessage, { Self { address: (&item.address).into() } }); @@ -859,6 +879,24 @@ try_from!(item: &protowire::GetUtxosByAddressesResponseMessage, RpcResult, _>>()? } }); +try_from!(item: &protowire::GetUtxosByAddressesV2RequestMessage, kaspa_rpc_core::GetUtxosByAddressesV2Request, { + Self { + addresses: item.addresses.iter().map(|x| x.as_str().try_into()).collect::, _>>()?, + from_daa_score: item.from_daa_score, + to_daa_score: item.to_daa_score, + start_address: item.start_address.as_deref().map(|address| address.try_into()).transpose()?, + start_daa_score: item.start_daa_score, + limit: item.limit, + } +}); +try_from!(item: &protowire::GetUtxosByAddressesV2ResponseMessage, RpcResult, { + Self { + entries: item.entries.iter().map(|x| x.try_into()).collect::, _>>()?, + next_address: item.next_address.as_deref().map(|address| address.try_into()).transpose()?, + next_daa_score: item.next_daa_score, + } +}); + try_from!(item: &protowire::GetBalanceByAddressRequestMessage, kaspa_rpc_core::GetBalanceByAddressRequest, { Self { address: item.address.as_str().try_into()? } }); diff --git a/rpc/grpc/core/src/ops.rs b/rpc/grpc/core/src/ops.rs index 923d9511a1..94b08764f5 100644 --- a/rpc/grpc/core/src/ops.rs +++ b/rpc/grpc/core/src/ops.rs @@ -89,6 +89,7 @@ pub enum KaspadPayloadOps { GetCurrentBlockColor, GetUtxoReturnAddress, GetVirtualChainFromBlockV2, + GetUtxosByAddressesV2, // Subscription commands for starting/stopping notifications NotifyBlockAdded, diff --git a/rpc/grpc/server/src/request_handler/factory.rs b/rpc/grpc/server/src/request_handler/factory.rs index 002b6bc603..74d036ae0c 100644 --- a/rpc/grpc/server/src/request_handler/factory.rs +++ b/rpc/grpc/server/src/request_handler/factory.rs @@ -63,6 +63,7 @@ impl Factory { ResolveFinalityConflict, GetHeaders, GetUtxosByAddresses, + GetUtxosByAddressesV2, GetBalanceByAddress, GetBalancesByAddresses, GetSinkBlueScore, diff --git a/rpc/grpc/server/src/tests/rpc_core_mock.rs b/rpc/grpc/server/src/tests/rpc_core_mock.rs index 3b4a0c6288..f36160f84e 100644 --- a/rpc/grpc/server/src/tests/rpc_core_mock.rs +++ b/rpc/grpc/server/src/tests/rpc_core_mock.rs @@ -298,6 +298,14 @@ impl RpcApi for RpcCoreMock { Err(RpcError::NotImplemented) } + async fn get_utxos_by_addresses_v2_call( + &self, + _connection: Option<&DynRpcConnection>, + _request: GetUtxosByAddressesV2Request, + ) -> RpcResult { + Err(RpcError::NotImplemented) + } + async fn get_sink_blue_score_call( &self, _connection: Option<&DynRpcConnection>, diff --git a/rpc/service/src/converter/index.rs b/rpc/service/src/converter/index.rs index 16dceaeac3..3014a83cf9 100644 --- a/rpc/service/src/converter/index.rs +++ b/rpc/service/src/converter/index.rs @@ -3,7 +3,8 @@ use kaspa_consensus_core::config::Config; use kaspa_index_core::indexed_utxos::UtxoSetByScriptPublicKey; use kaspa_index_core::notification::{self as index_notify, Notification as IndexNotification}; use kaspa_notify::converter::Converter; -use kaspa_rpc_core::{Notification, RpcUtxosByAddressesEntry, UtxosChangedNotification, utxo_set_into_rpc}; +use kaspa_rpc_core::{Notification, RpcUtxosByAddressesEntry, UtxosChangedNotification, ordered_utxo_set_into_rpc, utxo_set_into_rpc}; +use kaspa_utxoindex::model::OrderedUtxoSetByScriptPublicKey; use std::sync::Arc; /// Conversion of consensus_core to rpc_core structures @@ -27,6 +28,10 @@ impl IndexConverter { pub fn get_utxos_by_addresses_entries(&self, item: &UtxoSetByScriptPublicKey) -> Vec { utxo_set_into_rpc(item, Some(self.config.prefix())) } + + pub fn get_ordered_utxos_by_addresses_entries(&self, item: &OrderedUtxoSetByScriptPublicKey) -> Vec { + ordered_utxo_set_into_rpc(item, Some(self.config.prefix())) + } } #[async_trait] diff --git a/rpc/service/src/service.rs b/rpc/service/src/service.rs index ee64be96c1..5389d1043a 100644 --- a/rpc/service/src/service.rs +++ b/rpc/service/src/service.rs @@ -72,6 +72,7 @@ use kaspa_utils::sysinfo::SystemInfo; use kaspa_utils::{channel::Channel, triggers::SingleTrigger}; use kaspa_utils_tower::counters::TowerConnectionCounters; use kaspa_utxoindex::api::UtxoIndexProxy; +use kaspa_utxoindex::model::OrderedUtxoSetByScriptPublicKeyPage; use std::time::Duration; use std::{ collections::HashMap, @@ -275,6 +276,30 @@ impl RpcCoreService { .unwrap_or_default() } + async fn get_ordered_utxo_set_by_script_public_key_page<'a>( + &self, + addresses: impl Iterator, + from_daa_score: Option, + to_daa_score: Option, + start_address: Option<&'a RpcAddress>, + start_daa_score: Option, + limit: Option, + ) -> OrderedUtxoSetByScriptPublicKeyPage { + self.utxoindex + .clone() + .unwrap() + .get_utxos_by_script_public_keys_by_daa_score_page( + addresses.map(pay_to_address_script).collect(), + from_daa_score, + to_daa_score, + start_address.map(pay_to_address_script), + start_daa_score, + limit, + ) + .await + .unwrap_or_default() + } + fn extract_tx_query(&self, filter_transaction_pool: bool, include_orphan_pool: bool) -> RpcResult { match (filter_transaction_pool, include_orphan_pool) { (true, true) => Ok(TransactionQuery::OrphansOnly), @@ -708,6 +733,74 @@ NOTE: This error usually indicates an RPC conversion error between the node and Ok(GetUtxosByAddressesResponse::new(self.index_converter.get_utxos_by_addresses_entries(&entry_map))) } + async fn get_utxos_by_addresses_v2_call( + &self, + _connection: Option<&DynRpcConnection>, + request: GetUtxosByAddressesV2Request, + ) -> RpcResult { + if !self.config.utxoindex { + return Err(RpcError::NoUtxoIndex); + } + if request.start_daa_score.is_some() && request.start_address.is_none() { + return Err(RpcError::InvalidGetUtxosByAddressesV2Request("start_daa_score requires start_address".to_string())); + } + + if request.limit == Some(0) { + return Err(RpcError::InvalidGetUtxosByAddressesV2Request("limit must be greater than zero".to_string())); + } + + if matches!( + request.start_address.as_ref(), + Some(start_address) if !request.addresses.iter().any(|address| address == start_address) + ) { + return Err(RpcError::InvalidGetUtxosByAddressesV2Request("start_address must be included in addresses".to_string())); + } + + let from_daa_score = request.from_daa_score.unwrap_or(0); + let to_daa_score = request.to_daa_score.unwrap_or(u64::MAX); + if from_daa_score > to_daa_score { + return Err(RpcError::InvalidGetUtxosByAddressesV2Request("from_daa_score must be <= to_daa_score".to_string())); + } + + if matches!( + request.start_daa_score, + Some(start_daa_score) if start_daa_score < from_daa_score || start_daa_score > to_daa_score + ) { + return Err(RpcError::InvalidGetUtxosByAddressesV2Request( + "start_daa_score must be within from_daa_score and to_daa_score".to_string(), + )); + } + + let session = self.consensus_manager.consensus().unguarded_session(); + // do not retrieve utxos while in unstable ibd state. + if session.async_is_consensus_in_transitional_ibd_state().await { + return Err(RpcError::ConsensusInTransitionalIbdState); + } + + let page = self + .get_ordered_utxo_set_by_script_public_key_page( + request.addresses.iter(), + request.from_daa_score, + request.to_daa_score, + request.start_address.as_ref(), + request.start_daa_score, + request.limit, + ) + .await; + + let OrderedUtxoSetByScriptPublicKeyPage { entries, next_script_public_key, next_daa_score } = page; + let entries = self.index_converter.get_ordered_utxos_by_addresses_entries(&entries); + let next_address = match next_script_public_key.as_ref() { + Some(script_public_key) => Some( + extract_script_pub_key_address(script_public_key, self.config.prefix()) + .map_err(|err| RpcError::General(err.to_string()))?, + ), + None => None, + }; + + Ok(GetUtxosByAddressesV2Response::new(entries, next_address, next_daa_score)) + } + async fn get_balance_by_address_call( &self, _connection: Option<&DynRpcConnection>, diff --git a/rpc/wrpc/client/src/client.rs b/rpc/wrpc/client/src/client.rs index 4270c05459..928080348f 100644 --- a/rpc/wrpc/client/src/client.rs +++ b/rpc/wrpc/client/src/client.rs @@ -655,6 +655,7 @@ impl RpcApi for KaspaRpcClient { GetSystemInfo, GetUtxoReturnAddress, GetUtxosByAddresses, + GetUtxosByAddressesV2, GetVirtualChainFromBlock, GetVirtualChainFromBlockV2, ResolveFinalityConflict, diff --git a/rpc/wrpc/server/src/router.rs b/rpc/wrpc/server/src/router.rs index 7dc3d6b9bd..0bffe326a2 100644 --- a/rpc/wrpc/server/src/router.rs +++ b/rpc/wrpc/server/src/router.rs @@ -68,6 +68,7 @@ impl Router { GetSyncStatus, GetSystemInfo, GetUtxosByAddresses, + GetUtxosByAddressesV2, GetVirtualChainFromBlock, GetVirtualChainFromBlockV2, ResolveFinalityConflict, diff --git a/rpc/wrpc/wasm/src/client.rs b/rpc/wrpc/wasm/src/client.rs index 4254d2d156..5ea8b07423 100644 --- a/rpc/wrpc/wasm/src/client.rs +++ b/rpc/wrpc/wasm/src/client.rs @@ -1037,6 +1037,10 @@ build_wrpc_wasm_bindgen_interface!( /// specific addresses. /// Returned information: List of UTXOs. GetUtxosByAddresses, + /// Retrieves unspent transaction outputs (UTXOs) for multiple addresses + /// with optional inclusive DAA-score range bounds. + /// Returned information: A list of UTXOs. + GetUtxosByAddressesV2, /// Retrieves the virtual chain corresponding to a specified block hash. /// Returned information: Virtual chain information. GetVirtualChainFromBlock, diff --git a/testing/integration/src/rpc_tests.rs b/testing/integration/src/rpc_tests.rs index d3cc0fdc1f..23917baf31 100644 --- a/testing/integration/src/rpc_tests.rs +++ b/testing/integration/src/rpc_tests.rs @@ -427,6 +427,21 @@ async fn sanity_test() { }) } + KaspadPayloadOps::GetUtxosByAddressesV2 => { + let rpc_client = client.clone(); + tst!(op, { + let addresses = vec![Address::new(Prefix::Simnet, Version::PubKey, &[0u8; 32])]; + let response = rpc_client + .get_utxos_by_addresses_v2_call( + None, + GetUtxosByAddressesV2Request::new(addresses, None, None, None, None, None), + ) + .await + .unwrap(); + assert!(response.entries.is_empty()); + }) + } + KaspadPayloadOps::GetBalanceByAddress => { let rpc_client = client.clone(); tst!(op, { diff --git a/wallet/core/src/tests/rpc_core_mock.rs b/wallet/core/src/tests/rpc_core_mock.rs index aac407d099..ab039b1daf 100644 --- a/wallet/core/src/tests/rpc_core_mock.rs +++ b/wallet/core/src/tests/rpc_core_mock.rs @@ -307,6 +307,14 @@ impl RpcApi for RpcCoreMock { Err(RpcError::NotImplemented) } + async fn get_utxos_by_addresses_v2_call( + &self, + _connection: Option<&DynRpcConnection>, + _request: GetUtxosByAddressesV2Request, + ) -> RpcResult { + Err(RpcError::NotImplemented) + } + async fn get_sink_blue_score_call( &self, _connection: Option<&DynRpcConnection>,