From 13436af706efef2fe472f79eda015a64f36d6b68 Mon Sep 17 00:00:00 2001 From: Aditi Srinivasan Date: Tue, 23 Dec 2025 14:32:01 -0500 Subject: [PATCH] WIP fix: disable peer discovery for replication --- src/bootstrap/replication/service.rs | 114 +++++++++++++-------------- 1 file changed, 57 insertions(+), 57 deletions(-) diff --git a/src/bootstrap/replication/service.rs b/src/bootstrap/replication/service.rs index 6741227af..063a34876 100644 --- a/src/bootstrap/replication/service.rs +++ b/src/bootstrap/replication/service.rs @@ -1,10 +1,10 @@ use crate::bootstrap::replication::error::BootstrapError; -use crate::bootstrap::replication::peer_discovery::PeerDiscoverer; +// use crate::bootstrap::replication::peer_discovery::PeerDiscoverer; use crate::bootstrap::replication::rpc_client::RpcClientsManager; use crate::cfg::Config; use crate::core::validations; use crate::core::validations::message::validate_message_hash; -use crate::network::gossip; +// use crate::network::gossip; use crate::proto::shard_trie_entry_with_message::TrieMessage; use crate::proto::{self, ReplicationTriePartStatus, ShardSnapshotMetadata}; use crate::storage::store::block_engine::BlockEngine; @@ -40,7 +40,7 @@ use std::{ }; use std::{fs, io}; use tokio::signal::ctrl_c; -use tokio::sync::{oneshot, Mutex, RwLock}; +use tokio::sync::{Mutex, RwLock}; use tokio::task::JoinSet; use tracing::{debug, error, info, warn}; @@ -150,7 +150,7 @@ pub struct ReplicatorBootstrap { shutdown: Arc, fc_network: crate::proto::FarcasterNetwork, statsd_client: StatsdClientWrapper, - gossip_config: gossip::Config, + // gossip_config: gossip::Config, data_shard_ids: Vec, shard0_metadata: Arc>, rocksdb_dir: String, @@ -165,7 +165,7 @@ impl ReplicatorBootstrap { shutdown: Arc::new(AtomicBool::new(false)), fc_network: app_config.fc_network, statsd_client, - gossip_config: app_config.gossip.clone(), + // gossip_config: app_config.gossip.clone(), data_shard_ids: app_config.consensus.shard_ids.clone(), shard0_metadata: Arc::new(Mutex::new(ShardSnapshotMetadata::default())), rocksdb_dir: app_config.rocksdb_dir.clone(), @@ -186,7 +186,7 @@ impl ReplicatorBootstrap { shutdown: Arc::new(AtomicBool::new(false)), fc_network: app_config.fc_network, statsd_client, - gossip_config: app_config.gossip.clone(), + // gossip_config: app_config.gossip.clone(), data_shard_ids: app_config.consensus.shard_ids.clone(), shard0_metadata: Arc::new(Mutex::new(ShardSnapshotMetadata::default())), rocksdb_dir: app_config.rocksdb_dir.clone(), @@ -269,39 +269,39 @@ impl ReplicatorBootstrap { } // Determine target height from first manager (all should agree) for discovery validation - let first_manager = rpc_client_managers - .values() - .next() - .ok_or_else(|| BootstrapError::GenericError("No RPC managers initialized".into()))?; - let target_height = first_manager.get_metadata().height; - - let (mut disc_shutdown_tx_opt, mut discovery_thread_handle_opt) = { - let (disc_shutdown_tx, disc_shutdown_rx) = oneshot::channel(); - - // Clone/move all inputs needed inside the new thread. We construct PeerDiscoverer - // inside the thread so we don't have to move a !Send swarm across threads. - let gossip_config = self.gossip_config.clone(); - let rpc_mgr = first_manager.clone(); - let statsd = self.statsd_client.clone(); - let network = self.fc_network; - let handle = tokio::spawn(async move { - match PeerDiscoverer::new( - &gossip_config, - rpc_mgr, - target_height, - network, - statsd, - disc_shutdown_rx, - ) - .await - { - Ok(peer_discoverer) => peer_discoverer.run().await, - Err(e) => warn!("Peer discovery disabled (failed to start): {}", e), - } - }); - - (Some(disc_shutdown_tx), Some(handle)) - }; + // let first_manager = rpc_client_managers + // .values() + // .next() + // .ok_or_else(|| BootstrapError::GenericError("No RPC managers initialized".into()))?; + // let target_height = first_manager.get_metadata().height; + + // let (mut disc_shutdown_tx_opt, mut discovery_thread_handle_opt) = { + // let (disc_shutdown_tx, disc_shutdown_rx) = oneshot::channel(); + + // // Clone/move all inputs needed inside the new thread. We construct PeerDiscoverer + // // inside the thread so we don't have to move a !Send swarm across threads. + // let gossip_config = self.gossip_config.clone(); + // let rpc_mgr = first_manager.clone(); + // let statsd = self.statsd_client.clone(); + // let network = self.fc_network; + // let handle = tokio::spawn(async move { + // match PeerDiscoverer::new( + // &gossip_config, + // rpc_mgr, + // target_height, + // network, + // statsd, + // disc_shutdown_rx, + // ) + // .await + // { + // Ok(peer_discoverer) => peer_discoverer.run().await, + // Err(e) => warn!("Peer discovery disabled (failed to start): {}", e), + // } + // }); + + // (Some(disc_shutdown_tx), Some(handle)) + // }; // Create tasks for each shard to run in parallel let mut shard_tasks = JoinSet::new(); @@ -385,26 +385,26 @@ impl ReplicatorBootstrap { info!("All shard tasks have been shut down."); }; - let shutdown_gossip_discovery_fn = - async |disc_shutdown_tx: &mut Option>, - discovery_thread_handle: &mut Option>| { - if let (Some(tx), Some(handle)) = - (disc_shutdown_tx.take(), discovery_thread_handle.take()) - { - let _ = tx.send(()); - let _ = handle.await; - } - }; + // let shutdown_gossip_discovery_fn = + // async |disc_shutdown_tx: &mut Option>, + // discovery_thread_handle: &mut Option>| { + // if let (Some(tx), Some(handle)) = + // (disc_shutdown_tx.take(), discovery_thread_handle.take()) + // { + // let _ = tx.send(()); + // let _ = handle.await; + // } + // }; loop { if shard_tasks.is_empty() { // All tasks completed successfully // Shut down discovery task - shutdown_gossip_discovery_fn( - &mut disc_shutdown_tx_opt, - &mut discovery_thread_handle_opt, - ) - .await; + // shutdown_gossip_discovery_fn( + // &mut disc_shutdown_tx_opt, + // &mut discovery_thread_handle_opt, + // ) + // .await; // Write the final metadata from the server into the new DB. self.write_final_metadata_to_db( @@ -444,13 +444,13 @@ impl ReplicatorBootstrap { error!("Shard task failed: {}", e); // Shutdown all remaining tasks as well shutdown_and_drain_tasks_fn(&mut shard_tasks).await; - shutdown_gossip_discovery_fn(&mut disc_shutdown_tx_opt, &mut discovery_thread_handle_opt).await; + // shutdown_gossip_discovery_fn(&mut disc_shutdown_tx_opt, &mut discovery_thread_handle_opt).await; return Err(e); } Err(e) => { error!("Shard task join error: {}", e); - shutdown_gossip_discovery_fn(&mut disc_shutdown_tx_opt, &mut discovery_thread_handle_opt).await; + // shutdown_gossip_discovery_fn(&mut disc_shutdown_tx_opt, &mut discovery_thread_handle_opt).await; return Err(BootstrapError::TransactionReplayError(format!( "Shard task join error: {}", e @@ -461,7 +461,7 @@ impl ReplicatorBootstrap { _ = ctrl_c() => { info!("Shutdown signal received, stopping all shard replication tasks"); shutdown_and_drain_tasks_fn(&mut shard_tasks).await; - shutdown_gossip_discovery_fn(&mut disc_shutdown_tx_opt, &mut discovery_thread_handle_opt).await; + // shutdown_gossip_discovery_fn(&mut disc_shutdown_tx_opt, &mut discovery_thread_handle_opt).await; return Ok(WorkUnitResponse::Stopped); }