Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions src/network/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,21 +309,24 @@ impl SnapchainGossip {
return Err(Box::new(e));
}
} else {
// Create a Gossipsub topic
let topic = gossipsub::IdentTopic::new(CONSENSUS_TOPIC);
// subscribes to our topic
let result = swarm.behaviour_mut().gossipsub.subscribe(&topic);
if let Err(e) = result {
warn!("Failed to subscribe to topic: {:?}", e);
return Err(Box::new(e));
}
}

let topic = gossipsub::IdentTopic::new(MEMPOOL_TOPIC);
let result = swarm.behaviour_mut().gossipsub.subscribe(&topic);
if let Err(e) = result {
warn!("Failed to subscribe to topic: {:?}", e);
return Err(Box::new(e));
}
// Both validators and read nodes join the mempool mesh: validators
// consume mempool messages for inclusion in blocks; read nodes accept
// client-submitted messages via RPC and need to be useful relays for
// them. A node that publishes to a topic without subscribing falls
// back to fanout (best-effort, TTL'd, not in the mesh).
let topic = gossipsub::IdentTopic::new(MEMPOOL_TOPIC);
let result = swarm.behaviour_mut().gossipsub.subscribe(&topic);
if let Err(e) = result {
warn!("Failed to subscribe to topic: {:?}", e);
return Err(Box::new(e));
}

let topic = gossipsub::IdentTopic::new(CONTACT_INFO);
Expand Down
218 changes: 218 additions & 0 deletions src/network/gossip_tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::consensus::consensus::SystemMessage;
use crate::core::types::SnapchainValidatorContext;
use crate::mempool::mempool::{MempoolRequest, MempoolSource};
use crate::network::gossip::{Config, GossipEvent, SnapchainGossip};
use crate::proto::{FarcasterNetwork, Message, MessageData};
Expand All @@ -15,6 +16,60 @@ use tokio::{select, time};
const HOST_FOR_TEST: &str = "127.0.0.1";
const BASE_PORT_FOR_TEST: u32 = 9382;

/// Re-publishes `message` on `publisher_tx` every 200ms, polling `receiver` for
/// a matching `SystemMessage::Mempool(...)` delivery, and returns `true` as
/// soon as one arrives (or `false` once `deadline` elapses). Breaks the
/// publisher loop early if the gossip event channel closes — without that, a
/// dead gossip task would burn the full deadline before failing.
///
/// Repeat sends are safe: gossipsub message-id dedup ensures the receiver sees
/// each unique payload at most once. This is the right shape for tests where
/// the publisher hasn't yet formed a mesh with subscribed peers — the first
/// few `publish` calls can return `InsufficientPeers` and are only warn-logged
/// and dropped by `SnapchainGossip::publish`.
async fn publish_until_received(
publisher_tx: mpsc::Sender<GossipEvent<SnapchainValidatorContext>>,
receiver: &mut mpsc::Receiver<SystemMessage>,
message: Message,
deadline: Duration,
) -> bool {
let expected_hash = message.hash.clone();

let cast_for_publisher = message;
let publisher = tokio::spawn(async move {
loop {
if publisher_tx
.send(GossipEvent::BroadcastMempoolMessage(
MempoolMessage::UserMessage(cast_for_publisher.clone()),
))
.await
.is_err()
{
break;
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
});

let end = tokio::time::Instant::now() + deadline;
let received = loop {
if tokio::time::Instant::now() >= end {
break false;
}
match tokio::time::timeout(Duration::from_millis(200), receiver.recv()).await {
Ok(Some(SystemMessage::Mempool(MempoolRequest::AddMessage(
MempoolMessage::UserMessage(msg),
MempoolSource::Gossip,
_,
)))) if msg.hash == expected_hash => break true,
_ => {}
}
};

publisher.abort();
received
}

async fn wait_for_message(
system_rx: &mut mpsc::Receiver<SystemMessage>,
expected_message: Message,
Expand Down Expand Up @@ -162,6 +217,169 @@ async fn test_gossip_communication() {
assert_eq!(receive_counts, 1);
}

/// Regression test for #865: read nodes must subscribe to MEMPOOL_TOPIC so that
/// they receive mempool gossip from validator peers. Before the fix, a read
/// node never joined the mempool mesh and never delivered inbound mempool
/// messages to its system channel, regardless of how many validator peers
/// were publishing.
#[tokio::test]
#[serial]
async fn test_read_node_receives_mempool_gossip() {
let validator_keypair = Keypair::generate();
let read_node_keypair = Keypair::generate();

let validator_port = BASE_PORT_FOR_TEST + 20;
let validator_addr = format!("/ip4/{HOST_FOR_TEST}/udp/{validator_port}/quic-v1");

let read_node_port = BASE_PORT_FOR_TEST + 21;
let read_node_addr = format!("/ip4/{HOST_FOR_TEST}/udp/{read_node_port}/quic-v1");

let validator_config = Config::new(validator_addr.clone(), read_node_addr.clone())
.with_contact_info_interval(Duration::from_millis(100));
let read_node_config = Config::new(read_node_addr.clone(), validator_addr.clone())
.with_contact_info_interval(Duration::from_millis(100));

let (validator_system_tx, _) = mpsc::channel::<SystemMessage>(100);
let (read_node_system_tx, mut read_node_system_rx) = mpsc::channel::<SystemMessage>(100);

let mut validator_gossip = SnapchainGossip::create(
validator_keypair,
&validator_config,
Some(validator_system_tx),
false,
FarcasterNetwork::Devnet,
statsd_client(),
)
.await
.unwrap();

let mut read_node_gossip = SnapchainGossip::create(
read_node_keypair,
&read_node_config,
Some(read_node_system_tx),
true,
FarcasterNetwork::Devnet,
statsd_client(),
)
.await
.unwrap();

let validator_tx = validator_gossip.tx.clone();

tokio::spawn(async move { validator_gossip.start().await });
tokio::spawn(async move { read_node_gossip.start().await });

let cast_add = messages_factory::casts::create_cast_add(456, "regression", None, None);
let received = publish_until_received(
validator_tx,
&mut read_node_system_rx,
cast_add,
Duration::from_secs(10),
)
.await;
assert!(
received,
"read node did not receive mempool gossip from validator within deadline"
);
}

/// Multi-hop variant of the regression test. Topology:
///
/// ```text
/// read_node_1 <--> read_node_2 <--> validator
/// ```
///
/// `read_node_1` and `validator` only bootstrap to `read_node_2`, never to
/// each other, and autodiscovery is left off (default). For a mempool message
/// published on `read_node_1` to reach `validator`, gossipsub must forward
/// it through `read_node_2`'s mempool-topic mesh. Crucially, this is
/// distinguishable from the fanout-fallback path that makes the simpler 2-node
/// regression pass even when read nodes don't subscribe: in the bug scenario
/// `read_node_2` isn't subscribed to MEMPOOL_TOPIC, `read_node_1`'s only
/// direct peer that knows the topic is no one, fanout has nothing to fall back
/// to, and the publish never leaves `read_node_1`.
#[tokio::test]
#[serial]
async fn test_read_node_relays_mempool_gossip() {
let read_node_1_keypair = Keypair::generate();
let read_node_2_keypair = Keypair::generate();
let validator_keypair = Keypair::generate();

let read_node_1_port = BASE_PORT_FOR_TEST + 30;
let read_node_1_addr = format!("/ip4/{HOST_FOR_TEST}/udp/{read_node_1_port}/quic-v1");

let read_node_2_port = BASE_PORT_FOR_TEST + 31;
let read_node_2_addr = format!("/ip4/{HOST_FOR_TEST}/udp/{read_node_2_port}/quic-v1");

let validator_port = BASE_PORT_FOR_TEST + 32;
let validator_addr = format!("/ip4/{HOST_FOR_TEST}/udp/{validator_port}/quic-v1");

// read_node_1 and validator only bootstrap to read_node_2, never to each
// other. read_node_2 has no bootstrap (it'll be dialled by the others).
let read_node_1_config = Config::new(read_node_1_addr.clone(), read_node_2_addr.clone())
.with_contact_info_interval(Duration::from_millis(100));
let read_node_2_config = Config::new(read_node_2_addr.clone(), "".to_string())
.with_contact_info_interval(Duration::from_millis(100));
let validator_config = Config::new(validator_addr.clone(), read_node_2_addr.clone())
.with_contact_info_interval(Duration::from_millis(100));

let (read_node_1_system_tx, _) = mpsc::channel::<SystemMessage>(100);
let (read_node_2_system_tx, _) = mpsc::channel::<SystemMessage>(100);
let (validator_system_tx, mut validator_system_rx) = mpsc::channel::<SystemMessage>(100);

let mut read_node_1_gossip = SnapchainGossip::create(
read_node_1_keypair,
&read_node_1_config,
Some(read_node_1_system_tx),
true,
FarcasterNetwork::Devnet,
statsd_client(),
)
.await
.unwrap();

let mut read_node_2_gossip = SnapchainGossip::create(
read_node_2_keypair,
&read_node_2_config,
Some(read_node_2_system_tx),
true,
FarcasterNetwork::Devnet,
statsd_client(),
)
.await
.unwrap();

let mut validator_gossip = SnapchainGossip::create(
validator_keypair,
&validator_config,
Some(validator_system_tx),
false,
FarcasterNetwork::Devnet,
statsd_client(),
)
.await
.unwrap();

let read_node_1_tx = read_node_1_gossip.tx.clone();

tokio::spawn(async move { read_node_1_gossip.start().await });
tokio::spawn(async move { read_node_2_gossip.start().await });
tokio::spawn(async move { validator_gossip.start().await });

let cast_add = messages_factory::casts::create_cast_add(789, "multi-hop", None, None);
let received = publish_until_received(
read_node_1_tx,
&mut validator_system_rx,
cast_add,
Duration::from_secs(15),
)
.await;
assert!(
received,
"validator did not receive mempool gossip relayed through read_node_2 within deadline"
);
}

#[tokio::test]
#[serial]
async fn test_bootstrap_peer_reconnection() {
Expand Down
Loading
Loading