Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@ use color_eyre::eyre::{eyre, Report};
use futures::{select, FutureExt};
use tokio::sync::oneshot;
use tower::builder::ServiceBuilder;
use tower::util::BoxService;

use crate::components::{tokio::RuntimeRun, Inbound};
use crate::config::ZebradConfig;
use crate::{
components::{mempool, tokio::TokioComponent, ChainSync},
components::{
mempool::{self, Mempool},
tokio::{RuntimeRun, TokioComponent},
ChainSync, Inbound,
},
config::ZebradConfig,
prelude::*,
};

Expand Down Expand Up @@ -65,7 +69,8 @@ impl StartCmd {
.await;

info!("initializing mempool");
let mempool = mempool::Mempool::new(config.network.network);
let mempool_service = BoxService::new(Mempool::new(config.network.network));
let mempool = ServiceBuilder::new().buffer(20).service(mempool_service);
Comment thread
oxarbitrage marked this conversation as resolved.

info!("initializing network");
// The service that our node uses to respond to requests by peers. The
Expand All @@ -80,7 +85,7 @@ impl StartCmd {
state.clone(),
chain_verifier.clone(),
tx_verifier.clone(),
mempool,
mempool.clone(),
));

let (peer_set, address_book) =
Expand Down
16 changes: 11 additions & 5 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ use zebra_consensus::transaction;
use zebra_consensus::{chain::VerifyChainError, error::TransactionError};
use zebra_network::AddressBook;

use super::mempool::downloads::{
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
use super::mempool::{
self as mp,
downloads::{
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
},
};
// Re-use the syncer timeouts for consistency.
use super::{
Expand All @@ -38,13 +41,14 @@ use downloads::Downloads as BlockDownloads;

type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
type Mempool = Buffer<BoxService<mp::Request, mp::Response, mp::BoxError>, mp::Request>;
type BlockVerifier = Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>;
type TxVerifier = Buffer<
BoxService<transaction::Request, transaction::Response, TransactionError>,
transaction::Request,
>;
type InboundBlockDownloads = BlockDownloads<Timeout<Outbound>, Timeout<BlockVerifier>, State>;
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State, Mempool>;

pub type NetworkSetupData = (Outbound, Arc<std::sync::Mutex<AddressBook>>);

Expand Down Expand Up @@ -134,7 +138,7 @@ pub struct Inbound {
state: State,

/// A service that manages transactions in the memory pool.
mempool: mempool::Mempool,
mempool: Mempool,
}

impl Inbound {
Expand All @@ -143,7 +147,7 @@ impl Inbound {
state: State,
block_verifier: BlockVerifier,
tx_verifier: TxVerifier,
mempool: mempool::Mempool,
mempool: Mempool,
) -> Self {
Self {
network_setup: Setup::AwaitingNetwork {
Expand Down Expand Up @@ -195,6 +199,7 @@ impl Service<zn::Request> for Inbound {
Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT),
Timeout::new(tx_verifier, TRANSACTION_VERIFY_TIMEOUT),
self.state.clone(),
self.mempool.clone(),
));
result = Ok(());
Setup::Initialized {
Expand Down Expand Up @@ -350,6 +355,7 @@ impl Service<zn::Request> for Inbound {
zn::Request::PushTransaction(_transaction) => {
debug!("ignoring unimplemented request");
// TODO: send to Tx Download & Verify Stream
// https://github.com/ZcashFoundation/zebra/issues/2692
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::AdvertiseTransactionIds(transactions) => {
Expand Down
7 changes: 5 additions & 2 deletions zebrad/src/components/inbound/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashSet;
use super::mempool::{unmined_transactions_in_blocks, Mempool};

use tokio::sync::oneshot;
use tower::{builder::ServiceBuilder, ServiceExt};
use tower::{builder::ServiceBuilder, util::BoxService, ServiceExt};

use zebra_chain::{
parameters::Network,
Expand All @@ -26,6 +26,9 @@ async fn mempool_requests_for_transactions() {
let added_transactions = add_some_stuff_to_mempool(&mut mempool_service, network);
let added_transaction_ids: Vec<UnminedTxId> = added_transactions.iter().map(|t| t.id).collect();

let mempool_service = BoxService::new(mempool_service);
let mempool = ServiceBuilder::new().buffer(1).service(mempool_service);

let (block_verifier, transaction_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
Expand All @@ -39,7 +42,7 @@ async fn mempool_requests_for_transactions() {
state_service,
block_verifier.clone(),
transaction_verifier.clone(),
mempool_service,
mempool,
));

// Test `Request::MempoolTransactionIds`
Expand Down
9 changes: 8 additions & 1 deletion zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use zebra_chain::{
transaction::{UnminedTx, UnminedTxId},
};

use crate::BoxError;
pub use crate::BoxError;

mod crawler;
pub mod downloads;
Expand All @@ -35,12 +35,14 @@ pub use self::storage::tests::unmined_transactions_in_blocks;
pub enum Request {
TransactionIds,
TransactionsById(HashSet<UnminedTxId>),
RejectedTransactionIds(HashSet<UnminedTxId>),
Comment thread
oxarbitrage marked this conversation as resolved.
}

#[derive(Debug)]
pub enum Response {
Transactions(Vec<UnminedTx>),
TransactionIds(Vec<UnminedTxId>),
RejectedTransactionIds(Vec<UnminedTxId>),
}

/// Mempool async management and query service.
Expand Down Expand Up @@ -93,6 +95,11 @@ impl Service<Request> for Mempool {
let rsp = Ok(self.storage.clone().transactions(ids)).map(Response::Transactions);
async move { rsp }.boxed()
}
Request::RejectedTransactionIds(ids) => {
let rsp = Ok(self.storage.clone().rejected_transactions(ids))
.map(Response::RejectedTransactionIds);
async move { rsp }.boxed()
}
}
}
}
96 changes: 81 additions & 15 deletions zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use zebra_consensus::transaction as tx;
use zebra_network as zn;
use zebra_state as zs;

use crate::components::mempool as mp;
use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};

type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand Down Expand Up @@ -85,14 +86,16 @@ pub enum DownloadAction {
/// Represents a [`Stream`] of download and verification tasks.
#[pin_project]
#[derive(Debug)]
pub struct Downloads<ZN, ZV, ZS>
pub struct Downloads<ZN, ZV, ZS, ZM>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
ZM::Future: Send,
{
// Services
/// A service that forwards requests to connected peers, and returns their
Expand All @@ -105,6 +108,9 @@ where
/// A service that manages cached blockchain state.
state: ZS,

/// A service that manages the mempool.
mempool: ZM,

// Internal downloads state
/// A list of pending transaction download and verify tasks.
#[pin]
Expand All @@ -115,14 +121,16 @@ where
cancel_handles: HashMap<UnminedTxId, oneshot::Sender<()>>,
}

impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
impl<ZN, ZV, ZS, ZM> Stream for Downloads<ZN, ZV, ZS, ZM>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
ZM::Future: Send,
{
type Item = Result<UnminedTxId, BoxError>;

Expand Down Expand Up @@ -158,26 +166,29 @@ where
}
}

impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
impl<ZN, ZV, ZS, ZM> Downloads<ZN, ZV, ZS, ZM>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
ZM::Future: Send,
{
/// Initialize a new download stream with the provided `network` and
/// `verifier` services.
///
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
/// timeout limits should be applied to the `network` service passed into
/// this constructor.
pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self {
pub fn new(network: ZN, verifier: ZV, state: ZS, mempool: ZM) -> Self {
Self {
network,
verifier,
state,
mempool,
pending: FuturesUnordered::new(),
cancel_handles: HashMap::new(),
}
Expand Down Expand Up @@ -213,19 +224,11 @@ where

let network = self.network.clone();
let verifier = self.verifier.clone();
let state = self.state.clone();
let mut state = self.state.clone();
let mut mempool = self.mempool.clone();

let fut = async move {
// TODO: adapt this for transaction / mempool
// // Check if the block is already in the state.
// // BUG: check if the hash is in any chain (#862).
// // Depth only checks the main chain.
// match state.oneshot(zs::Request::Depth(hash)).await {
// Ok(zs::Response::Depth(None)) => Ok(()),
// Ok(zs::Response::Depth(Some(_))) => Err("already present".into()),
// Ok(_) => unreachable!("wrong response"),
// Err(e) => Err(e),
// }?;
Self::should_download(&mut state, &mut mempool, txid).await?;

let height = match state.oneshot(zs::Request::Tip).await {
Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()),
Expand Down Expand Up @@ -298,4 +301,67 @@ where

DownloadAction::AddedToQueue
}

/// Check if transaction should be downloaded and verified.
///
/// If it is already in the mempool (or in its rejected list)
/// or in state, then it shouldn't be downloaded (and an error is returned).
async fn should_download(
state: &mut ZS,
mempool: &mut ZM,
txid: UnminedTxId,
) -> Result<(), BoxError> {
// Check if the transaction is already in the mempool.
match mempool
.ready_and()
.await?
.call(mp::Request::TransactionsById(
[txid].iter().cloned().collect(),
))
.await
{
Ok(mp::Response::Transactions(txs)) => {
if txs.is_empty() {
Ok(())
} else {
Err("already present in mempool".into())
}
}
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(e),
}?;

// Check if the transaction is in the mempool rejected list.
match mempool
.oneshot(mp::Request::RejectedTransactionIds(
[txid].iter().cloned().collect(),
))
.await
Comment thread
dconnolly marked this conversation as resolved.
{
Ok(mp::Response::RejectedTransactionIds(txs)) => {
if txs.is_empty() {
Ok(())
} else {
Err("in mempool rejected list".into())
}
}
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(e),
}?;

// Check if the transaction is already in the state.
match state
.ready_and()
.await?
.call(zs::Request::Transaction(txid.mined_id()))
.await
{
Ok(zs::Response::Transaction(None)) => Ok(()),
Ok(zs::Response::Transaction(Some(_))) => Err("already present in state".into()),
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(e),
}?;

Ok(())
}
}
8 changes: 8 additions & 0 deletions zebrad/src/components/mempool/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,12 @@ impl Storage {
.filter(|tx| tx_ids.contains(&tx.id))
.collect()
}

/// Returns the set of [`UnminedTxId`]s matching ids in the rejected list.
pub fn rejected_transactions(self, tx_ids: HashSet<UnminedTxId>) -> Vec<UnminedTxId> {
Comment thread
oxarbitrage marked this conversation as resolved.
tx_ids
.into_iter()
.filter(|tx| self.rejected.contains_key(tx))
.collect()
}
}
13 changes: 13 additions & 0 deletions zebrad/src/components/mempool/storage/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ fn mempool_storage_basic_for_network(network: Network) -> Result<()> {
assert!(!storage.clone().contains(&tx.id));
}

// Query all the ids we have for rejected, get back `total - MEMPOOL_SIZE`
let all_ids: HashSet<UnminedTxId> = unmined_transactions.iter().map(|tx| tx.id).collect();
let rejected_ids: HashSet<UnminedTxId> = unmined_transactions
.iter()
.take(total_transactions - MEMPOOL_SIZE)
.map(|tx| tx.id)
.collect();
// Convert response to a `HashSet` as we need a fixed order to compare.
let rejected_response: HashSet<UnminedTxId> =
storage.rejected_transactions(all_ids).into_iter().collect();

assert_eq!(rejected_response, rejected_ids);

Ok(())
}

Expand Down
Loading