Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use color_eyre::eyre::{eyre, Report};
use futures::{select, FutureExt};
use tokio::sync::oneshot;
use tower::builder::ServiceBuilder;
use tower::util::BoxService;

use crate::components::mempool::Mempool;
Comment thread
oxarbitrage marked this conversation as resolved.
Outdated
use crate::components::{tokio::RuntimeRun, Inbound};
use crate::config::ZebradConfig;
use crate::{
Expand Down Expand Up @@ -65,7 +67,8 @@ impl StartCmd {
.await;

info!("initializing mempool");
let mempool = mempool::Mempool::new(config.network.network);
let mempool_service = BoxService::new(Mempool::new(config.network.network));
let mempool = ServiceBuilder::new().buffer(20).service(mempool_service);
Comment thread
oxarbitrage marked this conversation as resolved.

info!("initializing network");
// The service that our node uses to respond to requests by peers. The
Expand All @@ -80,7 +83,7 @@ impl StartCmd {
state.clone(),
chain_verifier.clone(),
tx_verifier.clone(),
mempool,
mempool.clone(),
));

let (peer_set, address_book) =
Expand Down
10 changes: 7 additions & 3 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use zebra_consensus::transaction;
use zebra_consensus::{chain::VerifyChainError, error::TransactionError};
use zebra_network::AddressBook;

use super::mempool as mp;
Comment thread
oxarbitrage marked this conversation as resolved.
Outdated
use super::mempool::downloads::{
Downloads as TxDownloads, TRANSACTION_DOWNLOAD_TIMEOUT, TRANSACTION_VERIFY_TIMEOUT,
};
Expand All @@ -38,13 +39,14 @@ use downloads::Downloads as BlockDownloads;

type Outbound = Buffer<BoxService<zn::Request, zn::Response, zn::BoxError>, zn::Request>;
type State = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
type Mempool = Buffer<BoxService<mp::Request, mp::Response, mp::BoxError>, mp::Request>;
type BlockVerifier = Buffer<BoxService<Arc<Block>, block::Hash, VerifyChainError>, Arc<Block>>;
type TxVerifier = Buffer<
BoxService<transaction::Request, transaction::Response, TransactionError>,
transaction::Request,
>;
type InboundBlockDownloads = BlockDownloads<Timeout<Outbound>, Timeout<BlockVerifier>, State>;
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State>;
type InboundTxDownloads = TxDownloads<Timeout<Outbound>, Timeout<TxVerifier>, State, Mempool>;

pub type NetworkSetupData = (Outbound, Arc<std::sync::Mutex<AddressBook>>);

Expand Down Expand Up @@ -134,7 +136,7 @@ pub struct Inbound {
state: State,

/// A service that manages transactions in the memory pool.
mempool: mempool::Mempool,
mempool: Mempool,
}

impl Inbound {
Expand All @@ -143,7 +145,7 @@ impl Inbound {
state: State,
block_verifier: BlockVerifier,
tx_verifier: TxVerifier,
mempool: mempool::Mempool,
mempool: Mempool,
) -> Self {
Self {
network_setup: Setup::AwaitingNetwork {
Expand Down Expand Up @@ -195,6 +197,7 @@ impl Service<zn::Request> for Inbound {
Timeout::new(outbound, TRANSACTION_DOWNLOAD_TIMEOUT),
Timeout::new(tx_verifier, TRANSACTION_VERIFY_TIMEOUT),
self.state.clone(),
self.mempool.clone(),
));
result = Ok(());
Setup::Initialized {
Expand Down Expand Up @@ -352,6 +355,7 @@ impl Service<zn::Request> for Inbound {
zn::Request::PushTransaction(_transaction) => {
debug!("ignoring unimplemented request");
// TODO: send to Tx Download & Verify Stream
// https://github.com/ZcashFoundation/zebra/issues/2692
async { Ok(zn::Response::Nil) }.boxed()
}
zn::Request::AdvertiseTransactionIds(transactions) => {
Expand Down
7 changes: 5 additions & 2 deletions zebrad/src/components/inbound/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use tower::ServiceExt;
use tower::{util::BoxService, ServiceExt};

use super::mempool::{unmined_transactions_in_blocks, Mempool};

Expand Down Expand Up @@ -29,6 +29,9 @@ async fn mempool_requests_for_transaction_ids() {
.map(|t| t.id)
.collect();

let mempool_service = BoxService::new(mempool_service);
let mempool = ServiceBuilder::new().buffer(20).service(mempool_service);
Comment thread
oxarbitrage marked this conversation as resolved.
Outdated

let (block_verifier, transaction_verifier) =
zebra_consensus::chain::init(consensus_config.clone(), network, state_service.clone())
.await;
Expand All @@ -42,7 +45,7 @@ async fn mempool_requests_for_transaction_ids() {
state_service,
block_verifier.clone(),
transaction_verifier.clone(),
mempool_service,
mempool,
));

let request = inbound_service
Expand Down
8 changes: 7 additions & 1 deletion zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use zebra_chain::{
transaction::{UnminedTx, UnminedTxId},
};

use crate::BoxError;
pub use crate::BoxError;

mod crawler;
pub mod downloads;
Expand All @@ -35,6 +35,7 @@ pub use self::storage::tests::unmined_transactions_in_blocks;
pub enum Request {
TransactionIds,
TransactionsById(HashSet<UnminedTxId>),
RejectedTransactionsIds(HashSet<UnminedTxId>),
}

#[derive(Debug)]
Expand Down Expand Up @@ -93,6 +94,11 @@ impl Service<Request> for Mempool {
let rsp = Ok(self.storage.clone().transactions(ids)).map(Response::Transactions);
async move { rsp }.boxed()
}
Request::RejectedTransactionsIds(ids) => {
let rsp = Ok(self.storage.clone().rejected_transactions(ids))
.map(Response::TransactionIds);
Comment thread
dconnolly marked this conversation as resolved.
Outdated
async move { rsp }.boxed()
}
}
}
}
96 changes: 81 additions & 15 deletions zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use zebra_consensus::transaction as tx;
use zebra_network as zn;
use zebra_state as zs;

use crate::components::mempool as mp;
use crate::components::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};

type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand Down Expand Up @@ -85,14 +86,16 @@ pub enum DownloadAction {
/// Represents a [`Stream`] of download and verification tasks.
#[pin_project]
#[derive(Debug)]
pub struct Downloads<ZN, ZV, ZS>
pub struct Downloads<ZN, ZV, ZS, ZM>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
ZM::Future: Send,
{
// Services
/// A service that forwards requests to connected peers, and returns their
Expand All @@ -105,6 +108,9 @@ where
/// A service that manages cached blockchain state.
state: ZS,

/// A service that manages the mempool.
mempool: ZM,

// Internal downloads state
/// A list of pending transaction download and verify tasks.
#[pin]
Expand All @@ -115,14 +121,16 @@ where
cancel_handles: HashMap<UnminedTxId, oneshot::Sender<()>>,
}

impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
impl<ZN, ZV, ZS, ZM> Stream for Downloads<ZN, ZV, ZS, ZM>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
ZM::Future: Send,
{
type Item = Result<UnminedTxId, BoxError>;

Expand Down Expand Up @@ -158,26 +166,29 @@ where
}
}

impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
impl<ZN, ZV, ZS, ZM> Downloads<ZN, ZV, ZS, ZM>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
ZM: Service<mp::Request, Response = mp::Response, Error = BoxError> + Send + Clone + 'static,
ZM::Future: Send,
{
/// Initialize a new download stream with the provided `network` and
/// `verifier` services.
///
/// The [`Downloads`] stream is agnostic to the network policy, so retry and
/// timeout limits should be applied to the `network` service passed into
/// this constructor.
pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self {
pub fn new(network: ZN, verifier: ZV, state: ZS, mempool: ZM) -> Self {
Self {
network,
verifier,
state,
mempool,
pending: FuturesUnordered::new(),
cancel_handles: HashMap::new(),
}
Expand Down Expand Up @@ -213,19 +224,11 @@ where

let network = self.network.clone();
let verifier = self.verifier.clone();
let state = self.state.clone();
let mut state = self.state.clone();
let mut mempool = self.mempool.clone();

let fut = async move {
// TODO: adapt this for transaction / mempool
// // Check if the block is already in the state.
// // BUG: check if the hash is in any chain (#862).
// // Depth only checks the main chain.
// match state.oneshot(zs::Request::Depth(hash)).await {
// Ok(zs::Response::Depth(None)) => Ok(()),
// Ok(zs::Response::Depth(Some(_))) => Err("already present".into()),
// Ok(_) => unreachable!("wrong response"),
// Err(e) => Err(e),
// }?;
Self::should_download(&mut state, &mut mempool, txid).await?;

let height = match state.oneshot(zs::Request::Tip).await {
Ok(zs::Response::Tip(None)) => Err("no block at the tip".into()),
Expand Down Expand Up @@ -298,4 +301,67 @@ where

DownloadAction::AddedToQueue
}

/// Check if transaction should be downloaded and verified.
///
/// If it is already in the mempool (or in its rejected list)
/// or in state, then it shouldn't be downloaded (and an error is returned).
async fn should_download(
state: &mut ZS,
mempool: &mut ZM,
txid: UnminedTxId,
) -> Result<(), BoxError> {
// Check if the transaction is already in the mempool.
match mempool
.ready_and()
.await?
.call(mp::Request::TransactionsById(
[txid].iter().cloned().collect(),
))
.await
{
Ok(mp::Response::Transactions(txs)) => {
if txs.is_empty() {
Ok(())
} else {
Err("already present in mempool".into())
}
}
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(e),
}?;

// Check if the transaction is in the mempool rejected list.
match mempool
.oneshot(mp::Request::RejectedTransactionsIds(
[txid].iter().cloned().collect(),
))
.await
Comment thread
dconnolly marked this conversation as resolved.
{
Ok(mp::Response::TransactionIds(txs)) => {
if txs.is_empty() {
Ok(())
} else {
Err("in mempool rejected list".into())
}
}
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(e),
}?;

// Check if the transaction is already in the state.
match state
.ready_and()
.await?
.call(zs::Request::Transaction(txid.mined_id()))
.await
{
Ok(zs::Response::Transaction(None)) => Ok(()),
Ok(zs::Response::Transaction(Some(_))) => Err("already present in state".into()),
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(e),
}?;

Ok(())
}
}
8 changes: 8 additions & 0 deletions zebrad/src/components/mempool/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,12 @@ impl Storage {
.filter(|tx| tx_ids.contains(&tx.id))
.collect()
}

/// Returns the set of [`UnminedTxId`]s matching ids in the rejected list.
pub fn rejected_transactions(self, tx_ids: HashSet<UnminedTxId>) -> Vec<UnminedTxId> {
Comment thread
oxarbitrage marked this conversation as resolved.
tx_ids
.into_iter()
.filter(|tx| self.rejected.contains_key(tx))
.collect()
}
}