Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions cli/src/modules/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,49 @@ impl Rpc {

self.println(&ctx, result);
}
RpcApiOps::GetUtxosByAddressesV2 => {
let mut args = argv;

if args.len() < 6 {
return Err(Error::custom(
"Usage: rpc get-utxos-by-addresses-v2 <addr...> <from_daa_score|None> <to_daa_score|None> <start_address|None> <start_daa_score|None> <limit|None> (limit is a soft cap at the script public key + DAA boundary)",
));
}

let parse_optional_u64 = |value: String| -> Result<Option<u64>> {
if value.eq_ignore_ascii_case("none") { Ok(None) } else { Ok(Some(value.parse::<u64>()?)) }
};

let parse_optional_address = |value: String| -> Result<Option<Address>> {
if value.eq_ignore_ascii_case("none") { Ok(None) } else { Ok(Some(Address::try_from(value.as_str())?)) }
};

let limit = parse_optional_u64(args.pop().unwrap())?;
let start_daa_score = parse_optional_u64(args.pop().unwrap())?;
let start_address = parse_optional_address(args.pop().unwrap())?;
let to_daa_score = parse_optional_u64(args.pop().unwrap())?;
let from_daa_score = parse_optional_u64(args.pop().unwrap())?;

if args.is_empty() {
return Err(Error::custom("Please specify at least one address"));
}

let addresses = args.iter().map(|s| Address::try_from(s.as_str())).collect::<std::result::Result<Vec<_>, _>>()?;
let result = rpc
.get_utxos_by_addresses_v2_call(
None,
GetUtxosByAddressesV2Request::new(
addresses,
from_daa_score,
to_daa_score,
start_address,
start_daa_score,
limit,
),
)
.await?;
self.println(&ctx, result);
}
_ => {
tprintln!(ctx, "rpc method exists but is not supported by the cli: '{op_str}'\r\n");
return Ok(());
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/model/stores/utxo_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait UtxoSetStoreReader {
pub trait UtxoSetStore: UtxoSetStoreReader {
/// Updates the store according to the UTXO diff -- adding and deleting entries correspondingly.
/// Note we define `self` as `mut` in order to require write access even though the compiler does not require it.
/// This is because concurrent readers can interfere with cache consistency.
/// This is because concurrent readers can interfere with cache consistency.
fn write_diff(&mut self, utxo_diff: &UtxoDiff) -> Result<(), StoreError>;
fn write_many(&mut self, utxos: &[(TransactionOutpoint, UtxoEntry)]) -> Result<(), StoreError>;
}
Expand Down Expand Up @@ -153,7 +153,7 @@ impl UtxoSetStoreReader for DbUtxoSetStore {

fn seek_iterator(&self, from_outpoint: Option<TransactionOutpoint>, limit: usize, skip_first: bool) -> UtxoCollectionIterator<'_> {
let seek_key = from_outpoint.map(UtxoKey::from);
Box::new(self.access.seek_iterator(None, seek_key, limit, skip_first).map(|res| {
Box::new(self.access.seek_iterator(None, seek_key, None, limit, skip_first).map(|res| {
let (key, entry) = res?;
let outpoint: TransactionOutpoint = UtxoKey::try_from(key.as_ref()).unwrap().into();
Ok((outpoint, UtxoEntry::clone(&entry)))
Expand Down
13 changes: 8 additions & 5 deletions database/src/access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ where
&self,
bucket: Option<&[u8]>, // iter self.prefix if None, else append bytes to self.prefix.
seek_from: Option<TKey>, // iter whole range if None
seek_to: Option<TKey>, // iter until the seek_to key (exclusive) if Some, else iter whole range.
limit: usize, // amount to take.
skip_first: bool, // skips the first value, (useful in conjunction with the seek-key, as to not re-retrieve).
) -> impl Iterator<Item = KeyDataResult<TData>> + '_
Expand All @@ -237,13 +238,15 @@ where
let mut read_opts = ReadOptions::default();
read_opts.set_iterate_range(rocksdb::PrefixRange(db_key.as_ref()));

let mut db_iterator = match seek_from {
Some(seek_key) => {
self.db.iterator_opt(IteratorMode::From(DbKey::new(&self.prefix, seek_key).as_ref(), Direction::Forward), read_opts)
}
None => self.db.iterator_opt(IteratorMode::Start, read_opts),
if let Some(seek_from) = &seek_from {
read_opts.set_iterate_lower_bound(DbKey::new(db_key.as_ref(), seek_from).as_ref());
};
if let Some(seek_to) = &seek_to {
read_opts.set_iterate_upper_bound(DbKey::new(db_key.as_ref(), seek_to).as_ref());
};

let mut db_iterator = self.db.iterator_opt(IteratorMode::Start, read_opts);

if skip_first {
db_iterator.next();
}
Expand Down
1 change: 1 addition & 0 deletions database/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub enum DatabaseStorePrefixes {
UtxoIndex = 192,
UtxoIndexTips = 193,
CirculatingSupply = 194,
UtxoIndexDbVersion = 195,

// ---- Separator ----
/// Reserved as a separator
Expand Down
46 changes: 41 additions & 5 deletions indexes/core/src/indexed_utxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,31 @@ use std::collections::HashMap;

// TODO: explore potential optimization via custom TransactionOutpoint hasher for below,
// One possible implementation: u64 of transaction id xor'd with 4 bytes of transaction index.
pub type CompactUtxoCollection = HashMap<TransactionOutpoint, CompactUtxoEntry>;
pub type CompactUtxoCollection = HashMap<UtxoEntryKeyData, CompactUtxoEntry>;

/// A deterministic ordered list of UTXOs keyed by [`UtxoEntryKeyData`].
pub type OrderedUtxoCollection = Vec<(UtxoEntryKeyData, CompactUtxoEntry)>;

/// A deterministic ordered list of UTXO collections keyed by [`ScriptPublicKey`].
pub type OrderedUtxoSetByScriptPublicKey = Vec<(ScriptPublicKey, OrderedUtxoCollection)>;

/// A page of ordered UTXOs with an optional cursor for the next group.
#[derive(Clone, Debug, Default)]
pub struct OrderedUtxoSetByScriptPublicKeyPage {
pub entries: OrderedUtxoSetByScriptPublicKey,
pub next_script_public_key: Option<ScriptPublicKey>,
pub next_daa_score: Option<u64>,
}

impl OrderedUtxoSetByScriptPublicKeyPage {
pub fn new(
entries: OrderedUtxoSetByScriptPublicKey,
next_script_public_key: Option<ScriptPublicKey>,
next_daa_score: Option<u64>,
) -> Self {
Self { entries, next_script_public_key, next_daa_score }
}
}

/// A collection of utxos indexed via; [`ScriptPublicKey`] => [`TransactionOutpoint`] => [`CompactUtxoEntry`].
pub type UtxoSetByScriptPublicKey = HashMap<ScriptPublicKey, CompactUtxoCollection>;
Expand All @@ -16,26 +40,38 @@ pub type BalanceByScriptPublicKey = HashMap<ScriptPublicKey, u64>;
// Note: memory optimization compared to go-lang kaspad:
// Unlike `consensus_core::tx::UtxoEntry` the utxoindex utilizes a compacted utxo form, where `script_public_key` field is removed.
// This utxo structure can be utilized in the utxoindex, since utxos are implicitly key'd via its script public key (and outpoint) at all times.
// furthermore, the daa_score is also added to the key (for range queries) and thus is removed from the value as well. This results in a more compact representation of the utxo entry, which is more suitable for storage in the utxoindex.
/// A compacted form of [`UtxoEntry`] without reference to [`ScriptPublicKey`] or [`TransactionOutpoint`]
#[derive(Clone, Copy, Deserialize, Serialize, Debug)]
pub struct CompactUtxoEntry {
pub amount: u64,
pub block_daa_score: u64,
pub is_coinbase: bool,
}

impl CompactUtxoEntry {
/// Creates a new [`CompactUtxoEntry`]
pub fn new(amount: u64, block_daa_score: u64, is_coinbase: bool) -> Self {
Self { amount, block_daa_score, is_coinbase }
pub fn new(amount: u64, is_coinbase: bool) -> Self {
Self { amount, is_coinbase }
}
}

impl MemSizeEstimator for CompactUtxoEntry {}

impl From<UtxoEntry> for CompactUtxoEntry {
fn from(utxo_entry: UtxoEntry) -> Self {
Self { amount: utxo_entry.amount, block_daa_score: utxo_entry.block_daa_score, is_coinbase: utxo_entry.is_coinbase }
Self { amount: utxo_entry.amount, is_coinbase: utxo_entry.is_coinbase }
}
}

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct UtxoEntryKeyData {
pub daa_score: u64,
pub transaction_outpoint: TransactionOutpoint,
}

impl UtxoEntryKeyData {
pub fn new(daa_score: u64, transaction_outpoint: TransactionOutpoint) -> Self {
Self { daa_score, transaction_outpoint }
}
}

Expand Down
12 changes: 6 additions & 6 deletions indexes/processor/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,15 @@ mod tests {
Notification::UtxosChanged(utxo_changed_notification) => {
let mut notification_utxo_added_count = 0;
for (script_public_key, compact_utxo_collection) in utxo_changed_notification.added.iter() {
for (transaction_outpoint, compact_utxo) in compact_utxo_collection.iter() {
for (utxo_entry_key_data, compact_utxo) in compact_utxo_collection.iter() {
let test_utxo = test_notification
.accumulated_utxo_diff
.add
.get(transaction_outpoint)
.get(&utxo_entry_key_data.transaction_outpoint)
.expect("expected transaction outpoint to be in test event");
assert_eq!(test_utxo.script_public_key, *script_public_key);
assert_eq!(test_utxo.amount, compact_utxo.amount);
assert_eq!(test_utxo.block_daa_score, compact_utxo.block_daa_score);
assert_eq!(test_utxo.block_daa_score, utxo_entry_key_data.daa_score);
assert_eq!(test_utxo.is_coinbase, compact_utxo.is_coinbase);
notification_utxo_added_count += 1;
}
Expand All @@ -208,15 +208,15 @@ mod tests {

let mut notification_utxo_removed_count = 0;
for (script_public_key, compact_utxo_collection) in utxo_changed_notification.removed.iter() {
for (transaction_outpoint, compact_utxo) in compact_utxo_collection.iter() {
for (utxo_entry_key_data, compact_utxo) in compact_utxo_collection.iter() {
let test_utxo = test_notification
.accumulated_utxo_diff
.remove
.get(transaction_outpoint)
.get(&utxo_entry_key_data.transaction_outpoint)
.expect("expected transaction outpoint to be in test event");
assert_eq!(test_utxo.script_public_key, *script_public_key);
assert_eq!(test_utxo.amount, compact_utxo.amount);
assert_eq!(test_utxo.block_daa_score, compact_utxo.block_daa_score);
assert_eq!(test_utxo.block_daa_score, utxo_entry_key_data.daa_score);
assert_eq!(test_utxo.is_coinbase, compact_utxo.is_coinbase);
notification_utxo_removed_count += 1;
}
Expand Down
40 changes: 37 additions & 3 deletions indexes/utxoindex/src/core/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use kaspa_consensus_core::{
BlockHashSet,
tx::{ScriptPublicKeys, TransactionOutpoint},
tx::{ScriptPublicKey, ScriptPublicKeys, TransactionOutpoint},
utxo::utxo_diff::UtxoDiff,
};
use kaspa_consensusmanager::spawn_blocking;
use kaspa_database::prelude::StoreResult;
use kaspa_hashes::Hash;
use kaspa_index_core::indexed_utxos::BalanceByScriptPublicKey;
use kaspa_index_core::indexed_utxos::{BalanceByScriptPublicKey, OrderedUtxoSetByScriptPublicKeyPage};
use parking_lot::RwLock;
use std::{collections::HashSet, fmt::Debug, sync::Arc};

Expand All @@ -27,6 +27,17 @@ pub trait UtxoIndexApi: Send + Sync + Debug {
/// Note: Use a read lock when accessing this method
fn get_utxos_by_script_public_keys(&self, script_public_keys: ScriptPublicKeys) -> StoreResult<UtxoSetByScriptPublicKey>;

/// Retrieve ordered UTXOs for multiple script public keys with cursor pagination.
fn get_utxos_by_script_public_keys_by_daa_score_page(
&self,
script_public_keys: ScriptPublicKeys,
from_daa_score: Option<u64>,
to_daa_score: Option<u64>,
start_script_public_key: Option<ScriptPublicKey>,
start_daa_score: Option<u64>,
limit: Option<u64>,
) -> StoreResult<OrderedUtxoSetByScriptPublicKeyPage>;

fn get_balance_by_script_public_keys(&self, script_public_keys: ScriptPublicKeys) -> StoreResult<BalanceByScriptPublicKey>;

// This can have a big memory footprint, so it should be used only for tests.
Expand All @@ -41,7 +52,7 @@ pub trait UtxoIndexApi: Send + Sync + Debug {
///
/// Note:
/// 1) Use a read lock when accessing this method
/// 2) due to potential sync-gaps is_synced is unreliable while consensus is actively resolving virtual states.
/// 2) due to potential sync-gaps is_synced is unreliable while consensus is actively resolving virtual states.
fn is_synced(&self) -> UtxoIndexResult<bool>;

/// Update the utxoindex with the given utxo_diff, and tips.
Expand Down Expand Up @@ -74,6 +85,29 @@ impl UtxoIndexProxy {
spawn_blocking(move || self.inner.read().get_utxos_by_script_public_keys(script_public_keys)).await.unwrap()
}

pub async fn get_utxos_by_script_public_keys_by_daa_score_page(
self,
script_public_keys: ScriptPublicKeys,
from_daa_score: Option<u64>,
to_daa_score: Option<u64>,
start_script_public_key: Option<ScriptPublicKey>,
start_daa_score: Option<u64>,
limit: Option<u64>,
) -> StoreResult<OrderedUtxoSetByScriptPublicKeyPage> {
spawn_blocking(move || {
self.inner.read().get_utxos_by_script_public_keys_by_daa_score_page(
script_public_keys,
from_daa_score,
to_daa_score,
start_script_public_key,
start_daa_score,
limit,
)
})
.await
.unwrap()
}

pub async fn get_balance_by_script_public_keys(
self,
script_public_keys: ScriptPublicKeys,
Expand Down
Loading
Loading