Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions cli/src/modules/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ use crate::imports::*;
use convert_case::{Case, Casing};
use kaspa_rpc_core::api::ops::RpcApiOps;

fn parse_get_headers_direction(direction: Option<&str>) -> Result<bool> {
match direction.map(str::to_ascii_lowercase).as_deref() {
None => Ok(true),
Some("ascending" | "asc" | "true" | "1") => Ok(true),
Some("descending" | "desc" | "false" | "0") => Ok(false),
Some(_) => Err(Error::custom("Direction must be one of: ascending, asc, true, 1, descending, desc, false, 0")),
}
}

#[derive(Default, Handler)]
#[help("Execute RPC commands against the connected Kaspa node")]
pub struct Rpc;
Expand Down Expand Up @@ -162,10 +171,16 @@ impl Rpc {
let result = rpc.shutdown_call(None, ShutdownRequest {}).await?;
self.println(&ctx, result);
}
// RpcApiOps::GetHeaders => {
// let result = rpc.get_headers_call(GetHeadersRequest { }).await?;
// self.println(&ctx, result);
// }
RpcApiOps::GetHeaders => {
if argv.len() < 2 {
return Err(Error::custom("Usage: rpc get-headers <start_hash> <limit> [ascending|descending|true|false]"));
}
let start_hash = RpcHash::from_hex(argv.remove(0).as_str())?;
let limit = argv.remove(0).parse::<u64>()?;
let is_ascending = parse_get_headers_direction(argv.first().map(String::as_str))?;
let result = rpc.get_headers_call(None, GetHeadersRequest { start_hash, limit, is_ascending }).await?;
self.println(&ctx, result);
}
RpcApiOps::GetUtxosByAddresses => {
if argv.is_empty() {
return Err(Error::custom("Please specify at least one address"));
Expand Down Expand Up @@ -340,3 +355,29 @@ impl Rpc {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn get_headers_direction_defaults_to_ascending() {
assert!(parse_get_headers_direction(None).unwrap());
}

#[test]
fn get_headers_direction_accepts_aliases() {
for direction in ["ascending", "asc", "true", "1"] {
assert!(parse_get_headers_direction(Some(direction)).unwrap());
}

for direction in ["descending", "desc", "false", "0"] {
assert!(!parse_get_headers_direction(Some(direction)).unwrap());
}
}

#[test]
fn get_headers_direction_rejects_unknown_values() {
assert!(parse_get_headers_direction(Some("sideways")).is_err());
}
}
7 changes: 6 additions & 1 deletion components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl ConsensusSessionOwned {
.await
}

/// Returns the antipast of block `hash` from the POV of `context`, i.e. `antipast(hash)past(context)`.
/// Returns the antipast of block `hash` from the POV of `context`, i.e. the intersection of `antipast(hash)` and `past(context)`.
/// Since this might be an expensive operation for deep blocks, we allow the caller to specify a limit
/// `max_traversal_allowed` on the maximum amount of blocks to traverse for obtaining the answer
pub async fn async_get_antipast_from_pov(
Expand Down Expand Up @@ -367,6 +367,11 @@ impl ConsensusSessionOwned {
self.clone().spawn_blocking(move |c| c.create_virtual_selected_chain_block_locator(low, high)).await
}

/// Returns up to `limit` hashes from `start` toward the virtual selected-chain tip, including `start`.
pub async fn async_get_virtual_selected_chain_from(&self, start: Hash, limit: usize) -> ConsensusResult<Vec<Hash>> {
self.clone().spawn_blocking(move |c| c.get_virtual_selected_chain_from(start, limit)).await
}

pub async fn async_create_block_locator_from_pruning_point(&self, high: Hash, limit: usize) -> ConsensusResult<Vec<Hash>> {
self.clone().spawn_blocking(move |c| c.create_block_locator_from_pruning_point(high, limit)).await
}
Expand Down
7 changes: 6 additions & 1 deletion consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ pub trait ConsensusApi: Send + Sync {
unimplemented!()
}

/// Returns the antipast of block `hash` from the POV of `context`, i.e. `antipast(hash)past(context)`.
/// Returns the antipast of block `hash` from the POV of `context`, i.e. the intersection of `antipast(hash)` and `past(context)`.
/// Since this might be an expensive operation for deep blocks, we allow the caller to specify a limit
/// `max_traversal_allowed` on the maximum amount of blocks to traverse for obtaining the answer
fn get_antipast_from_pov(&self, hash: Hash, context: Hash, max_traversal_allowed: Option<u64>) -> ConsensusResult<Vec<Hash>> {
Expand All @@ -295,6 +295,11 @@ pub trait ConsensusApi: Send + Sync {
unimplemented!()
}

/// Returns up to `limit` hashes from `start` toward the virtual selected-chain tip, including `start`.
fn get_virtual_selected_chain_from(&self, start: Hash, limit: usize) -> ConsensusResult<Vec<Hash>> {
unimplemented!()
}

fn create_block_locator_from_pruning_point(&self, high: Hash, limit: usize) -> ConsensusResult<Vec<Hash>> {
unimplemented!()
}
Expand Down
27 changes: 27 additions & 0 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use kaspa_consensus_core::{
consensus::{ConsensusError, ConsensusResult},
difficulty::DifficultyError,
pruning::PruningImportError,
sync::SyncManagerError,
tx::TxResult,
},
header::Header,
Expand Down Expand Up @@ -1159,6 +1160,32 @@ impl ConsensusApi for Consensus {
Ok(self.services.sync_manager.create_virtual_selected_chain_block_locator(low, high)?)
}

fn get_virtual_selected_chain_from(&self, start: Hash, limit: usize) -> ConsensusResult<Vec<Hash>> {
let _guard = self.pruning_lock.blocking_read();
self.validate_block_exists(start)?;

if limit == 0 {
return Ok(Vec::new());
}

let selected_chain = self.storage.selected_chain_store.read();
let start_index =
selected_chain.get_by_hash(start).optional().unwrap().ok_or(SyncManagerError::BlockNotInSelectedParentChain(start))?;
let (tip_index, _) =
selected_chain.get_tip().map_err(|err| ConsensusError::GeneralOwned(format!("selected chain tip read failed: {err}")))?;
let limit_end_index = start_index.saturating_add(limit.saturating_sub(1) as u64);
let end_index = cmp::min(tip_index, limit_end_index);
let mut hashes = Vec::with_capacity((end_index - start_index + 1) as usize);
for index in start_index..=end_index {
hashes.push(
selected_chain
.get_by_index(index)
.map_err(|err| ConsensusError::GeneralOwned(format!("selected chain hash read failed: {err}")))?,
);
}
Ok(hashes)
}

fn pruning_point_headers(&self) -> Vec<Arc<Header>> {
// PRUNE SAFETY: index is monotonic and past pruning point headers are expected permanently
let (pruning_point, pruning_index) = self.pruning_point_store.read().pruning_point_and_index().unwrap();
Expand Down
4 changes: 3 additions & 1 deletion rpc/core/src/api/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,9 @@ pub trait RpcApi: Sync + Send + AnySync {
}
async fn shutdown_call(&self, connection: Option<&DynRpcConnection>, request: ShutdownRequest) -> RpcResult<ShutdownResponse>;

/// Requests headers between the given `start_hash` and the current virtual, up to the given limit.
/// Requests selected-parent-chain headers from `start_hash`, up to the given inclusive limit.
///
/// Ascending requests walk toward the sink. Descending requests walk toward genesis.
async fn get_headers(&self, start_hash: RpcHash, limit: u64, is_ascending: bool) -> RpcResult<Vec<RpcHeader>> {
Ok(self.get_headers_call(None, GetHeadersRequest::new(start_hash, limit, is_ascending)).await?.headers)
}
Expand Down
3 changes: 3 additions & 0 deletions rpc/core/src/model/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1215,8 +1215,11 @@ impl Deserializer for ShutdownResponse {
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetHeadersRequest {
/// The selected-parent-chain header where traversal starts.
pub start_hash: RpcHash,
/// The maximum number of headers to return, including `start_hash`.
pub limit: u64,
/// When true, walk toward the sink; when false, walk toward genesis.
pub is_ascending: bool,
}

Expand Down
4 changes: 4 additions & 0 deletions rpc/core/src/wasm/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,11 @@ declare! {
IGetHeadersRequest,
r#"
/**
* Selected-parent-chain header request.
*
* If `isAscending` is true, traversal walks from `startHash` toward the
* sink. Otherwise, traversal walks from `startHash` toward genesis.
* `limit` includes `startHash` when greater than zero.
*
* @category Node RPC
*/
Expand Down
6 changes: 4 additions & 2 deletions rpc/grpc/core/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,9 @@ message ShutdownResponseMessage {
RPCError error = 1000;
}

// GetHeadersRequestMessage requests headers between the given startHash and the
// current virtual, up to the given limit.
// GetHeadersRequestMessage requests selected-parent-chain headers from
// startHash, up to the inclusive limit. Ascending requests walk toward the
// sink; descending requests walk toward genesis.
message GetHeadersRequestMessage {
string startHash = 1;
uint64 limit = 2;
Expand All @@ -499,6 +500,7 @@ message GetHeadersRequestMessage {

message GetHeadersResponseMessage {
repeated string headers = 1;
repeated RpcBlockHeader blockHeaders = 2;
RPCError error = 1000;
}

Expand Down
128 changes: 123 additions & 5 deletions rpc/grpc/core/src/convert/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,11 @@ from!(item: &kaspa_rpc_core::GetHeadersRequest, protowire::GetHeadersRequestMess
Self { start_hash: item.start_hash.to_string(), limit: item.limit, is_ascending: item.is_ascending }
});
from!(item: RpcResult<&kaspa_rpc_core::GetHeadersResponse>, protowire::GetHeadersResponseMessage, {
Self { headers: item.headers.iter().map(|x| x.hash.to_string()).collect(), error: None }
Self {
headers: item.headers.iter().map(|x| x.hash.to_string()).collect(),
block_headers: item.headers.iter().map(protowire::RpcBlockHeader::from).collect(),
error: None,
}
});

from!(item: &kaspa_rpc_core::GetUtxosByAddressesRequest, protowire::GetUtxosByAddressesRequestMessage, {
Expand Down Expand Up @@ -848,8 +852,19 @@ try_from!(item: &protowire::GetHeadersRequestMessage, kaspa_rpc_core::GetHeaders
Self { start_hash: RpcHash::from_str(&item.start_hash)?, limit: item.limit, is_ascending: item.is_ascending }
});
try_from!(item: &protowire::GetHeadersResponseMessage, RpcResult<kaspa_rpc_core::GetHeadersResponse>, {
// TODO
Self { headers: vec![] }
if !item.headers.is_empty() && item.block_headers.is_empty() {
return Err(RpcError::General("get headers response contains only header hashes without full headers".to_string()));
}
let headers = item.block_headers.iter().map(kaspa_rpc_core::RpcHeader::try_from).collect::<RpcResult<Vec<_>>>()?;
if item.headers.len() != headers.len() {
return Err(RpcError::General("get headers response has inconsistent legacy hashes and full headers".to_string()));
}
for (legacy_hash, header) in item.headers.iter().zip(headers.iter()) {
if RpcHash::from_str(legacy_hash)? != header.hash {
return Err(RpcError::General("get headers response has inconsistent legacy hashes and full headers".to_string()));
}
}
Self { headers }
});

try_from!(item: &protowire::GetUtxosByAddressesRequestMessage, kaspa_rpc_core::GetUtxosByAddressesRequest, {
Expand Down Expand Up @@ -1099,9 +1114,112 @@ try_from!(&protowire::NotifySinkBlueScoreChangedResponseMessage, RpcResult<kaspa

#[cfg(test)]
mod tests {
use kaspa_rpc_core::{RpcError, RpcResult, SubmitBlockRejectReason, SubmitBlockReport, SubmitBlockResponse};
use kaspa_consensus_core::header::Header;
use kaspa_rpc_core::{
GetHeadersResponse, RpcError, RpcHash, RpcHeader, RpcResult, SubmitBlockRejectReason, SubmitBlockReport, SubmitBlockResponse,
};

use crate::protowire::{self, GetHeadersResponseMessage, SubmitBlockResponseMessage, submit_block_response_message::RejectReason};

fn new_unique() -> RpcHash {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(1);
let c = COUNTER.fetch_add(1, Ordering::Relaxed);
RpcHash::from_u64_word(c)
}

fn rpc_header() -> RpcHeader {
Header::new_finalized(
0,
vec![vec![new_unique()]].try_into().unwrap(),
new_unique(),
new_unique(),
new_unique(),
123,
456,
789,
101_112,
131_415.into(),
161_718,
new_unique(),
)
.into()
}

fn legacy_header_hash() -> String {
new_unique().to_string()
}

#[test]
fn get_headers_response_accepts_empty_success() {
let protowire = GetHeadersResponseMessage { headers: Vec::new(), block_headers: Vec::new(), error: None };

let rpc_core: RpcResult<GetHeadersResponse> = (&protowire).try_into();

assert!(matches!(rpc_core, Ok(GetHeadersResponse { headers }) if headers.is_empty()));
}

#[test]
fn get_headers_response_rejects_hashes_without_full_headers() {
let protowire = GetHeadersResponseMessage { headers: vec![legacy_header_hash()], block_headers: Vec::new(), error: None };

use crate::protowire::{self, SubmitBlockResponseMessage, submit_block_response_message::RejectReason};
let rpc_core: RpcResult<GetHeadersResponse> = (&protowire).try_into();

assert!(
matches!(rpc_core, Err(RpcError::General(message)) if message == "get headers response contains only header hashes without full headers")
);
}

#[test]
fn get_headers_response_accepts_matching_legacy_hashes_and_full_headers() {
let header = rpc_header();
let protowire =
GetHeadersResponseMessage { headers: vec![header.hash.to_string()], block_headers: vec![(&header).into()], error: None };

let rpc_core: RpcResult<GetHeadersResponse> = (&protowire).try_into();

let headers = rpc_core.unwrap().headers;
assert_eq!(headers.len(), 1);
assert_eq!(headers[0].hash, header.hash);
}

#[test]
fn get_headers_response_rejects_legacy_hash_count_mismatch() {
let header = rpc_header();
let protowire = GetHeadersResponseMessage { headers: Vec::new(), block_headers: vec![(&header).into()], error: None };

let rpc_core: RpcResult<GetHeadersResponse> = (&protowire).try_into();

assert!(
matches!(rpc_core, Err(RpcError::General(message)) if message == "get headers response has inconsistent legacy hashes and full headers")
);
}

#[test]
fn get_headers_response_rejects_legacy_hash_mismatch() {
let header = rpc_header();
let protowire =
GetHeadersResponseMessage { headers: vec![legacy_header_hash()], block_headers: vec![(&header).into()], error: None };

let rpc_core: RpcResult<GetHeadersResponse> = (&protowire).try_into();

assert!(
matches!(rpc_core, Err(RpcError::General(message)) if message == "get headers response has inconsistent legacy hashes and full headers")
);
}

#[test]
fn get_headers_response_error_takes_precedence_over_payload() {
let protowire = GetHeadersResponseMessage {
headers: vec![legacy_header_hash()],
block_headers: Vec::new(),
error: Some(protowire::RpcError { message: "upstream error".to_string() }),
};

let rpc_core: RpcResult<GetHeadersResponse> = (&protowire).try_into();

assert!(matches!(rpc_core, Err(RpcError::General(message)) if message == "upstream error"));
}

#[test]
fn test_submit_block_response() {
Expand Down
Loading
Loading