diff --git a/crates/anvil/src/cmd.rs b/crates/anvil/src/cmd.rs index 68bd83a1e6e89..fb75529b82908 100644 --- a/crates/anvil/src/cmd.rs +++ b/crates/anvil/src/cmd.rs @@ -227,6 +227,18 @@ impl NodeArgs { let compute_units_per_second = if self.evm.no_rate_limit { Some(u64::MAX) } else { self.evm.compute_units_per_second }; + // Validate that secondary fork URLs don't have conflicting block number suffixes + if self.evm.fork_url.len() > 1 { + for fork in &self.evm.fork_url[1..] { + if fork.block.is_some() { + eyre::bail!( + "Block number suffixes (@block) on secondary --fork-url values are not supported. \ + Use --fork-block-number to set the fork block for all endpoints." + ); + } + } + } + let hardfork = match &self.hardfork { Some(hf) => { if self.evm.networks.is_optimism() { @@ -260,7 +272,7 @@ impl NodeArgs { _ => self .evm .fork_url - .as_ref() + .first() .and_then(|f| f.block) .map(|num| ForkChoice::Block(num as i128)), }) @@ -270,7 +282,7 @@ impl NodeArgs { .fork_request_retries(self.evm.fork_request_retries) .fork_retry_backoff(self.evm.fork_retry_backoff.map(Duration::from_millis)) .fork_compute_units_per_second(compute_units_per_second) - .with_eth_rpc_url(self.evm.fork_url.map(|fork| fork.url)) + .with_fork_urls(self.evm.fork_url.into_iter().map(|f| f.url).collect()) .with_base_fee(self.evm.block_base_fee_per_gas) .disable_min_priority_fee(self.evm.disable_min_priority_fee) .with_no_storage_caching(self.evm.no_storage_caching) @@ -426,6 +438,10 @@ pub struct AnvilEvmArgs { /// Fetch state over a remote endpoint instead of starting from an empty state. /// /// If you want to fetch state from a specific block number, add a block number like `http://localhost:8545@1400000` or use the `--fork-block-number` argument. + /// + /// Multiple `--fork-url` flags can be provided to distribute requests across endpoints + /// using round-robin load balancing. On failure, the retry layer rotates to the next + /// endpoint. #[arg( long, short, @@ -433,7 +449,7 @@ pub struct AnvilEvmArgs { value_name = "URL", help_heading = "Fork config" )] - pub fork_url: Option, + pub fork_url: Vec, /// Headers to use for the rpc client, e.g. "User-Agent: test-agent" /// @@ -630,13 +646,45 @@ pub struct AnvilEvmArgs { /// Resolves an alias passed as fork-url to the matching url defined in the rpc_endpoints section /// of the project configuration file. /// Does nothing if the fork-url is not a configured alias. +/// +/// When an alias maps to an `RpcEndpoint` with multiple `endpoints`, all URLs are expanded +/// into additional `--fork-url` entries for multi-endpoint load balancing. impl AnvilEvmArgs { pub fn resolve_rpc_alias(&mut self) { - if let Some(fork_url) = &self.fork_url - && let Ok(config) = Config::load_with_providers(FigmentProviders::Anvil) - && let Some(Ok(url)) = config.get_rpc_url_with_alias(&fork_url.url) - { - self.fork_url = Some(ForkUrl { url: url.to_string(), block: fork_url.block }); + if let Ok(config) = Config::load_with_providers(FigmentProviders::Anvil) { + let mut resolved_urls = Vec::new(); + for fork_url in &self.fork_url { + let mut endpoints = config.rpc_endpoints.clone().resolved(); + if let Some(endpoint) = endpoints.remove(&fork_url.url) { + // Alias matched — expand all URLs from the endpoint config + match endpoint.all_urls() { + Ok(urls) => { + for (i, url) in urls.into_iter().enumerate() { + resolved_urls.push(ForkUrl { + url, + // Only the first URL inherits the block suffix + block: if i == 0 { fork_url.block } else { None }, + }); + } + } + Err(e) => { + warn!(target: "node", alias=%fork_url.url, %e, "could not resolve all endpoints, using primary endpoint only"); + if let Ok(url) = endpoint.url() { + resolved_urls.push(ForkUrl { url, block: fork_url.block }); + } else { + resolved_urls.push(fork_url.clone()); + } + } + } + } else if let Some(Ok(url)) = config.get_rpc_url_with_alias(&fork_url.url) { + // Try mesc or other resolution + resolved_urls.push(ForkUrl { url: url.to_string(), block: fork_url.block }); + } else { + // Not an alias — keep as-is + resolved_urls.push(fork_url.clone()); + } + } + self.fork_url = resolved_urls; } } } @@ -965,4 +1013,65 @@ mod tests { ["::1", "1.1.1.1", "2.2.2.2"].map(|ip| ip.parse::().unwrap()).to_vec() ); } + + #[test] + fn can_parse_multiple_fork_urls() { + let args: NodeArgs = NodeArgs::parse_from([ + "anvil", + "--fork-url", + "http://localhost:8545", + "--fork-url", + "http://localhost:8546", + "--fork-url", + "http://localhost:8547", + ]); + assert_eq!(args.evm.fork_url.len(), 3); + assert_eq!(args.evm.fork_url[0].url, "http://localhost:8545"); + assert_eq!(args.evm.fork_url[1].url, "http://localhost:8546"); + assert_eq!(args.evm.fork_url[2].url, "http://localhost:8547"); + + // Block suffix on first URL should work + let args: NodeArgs = NodeArgs::parse_from([ + "anvil", + "--fork-url", + "http://localhost:8545@1000000", + "--fork-url", + "http://localhost:8546", + ]); + assert_eq!(args.evm.fork_url[0].block, Some(1000000)); + assert_eq!(args.evm.fork_url[1].block, None); + } + + #[test] + fn rejects_block_suffix_on_secondary_fork_urls() { + let args: NodeArgs = NodeArgs::parse_from([ + "anvil", + "--fork-url", + "http://localhost:8545@1000000", + "--fork-url", + "http://localhost:8546@2000000", + ]); + let result = args.into_node_config(); + assert!(result.is_err()); + assert!( + result.unwrap_err().to_string().contains("Block number suffixes"), + "should reject block suffix on secondary fork URL" + ); + } + + #[test] + fn fork_dependent_args_require_fork_url() { + // All these args have `requires = "fork_url"` — they should fail without --fork-url + let cases = [ + vec!["anvil", "--fork-header", "X-Api-Key: test"], + vec!["anvil", "--timeout", "5000"], + vec!["anvil", "--retries", "3"], + vec!["anvil", "--fork-block-number", "100"], + vec!["anvil", "--fork-retry-backoff", "500"], + ]; + for args in &cases { + let result = NodeArgs::try_parse_from(args); + assert!(result.is_err(), "expected error when using {:?} without --fork-url", args[1]); + } + } } diff --git a/crates/anvil/src/config.rs b/crates/anvil/src/config.rs index c9b63f08cb2f5..23cd6e61bc076 100644 --- a/crates/anvil/src/config.rs +++ b/crates/anvil/src/config.rs @@ -134,11 +134,13 @@ pub struct NodeConfig { pub port: u16, /// maximum number of transactions in a block pub max_transactions: usize, - /// url of the rpc server that should be used for any rpc calls - pub eth_rpc_url: Option, + /// Fork URLs for RPC calls. The first entry is the primary endpoint. + /// When multiple URLs are provided, requests are distributed using + /// round-robin load balancing with retry-based failover. + pub fork_urls: Vec, /// pins the block number or transaction hash for the state fork pub fork_choice: Option, - /// headers to use with `eth_rpc_url` + /// headers to use with fork RPC endpoints pub fork_headers: Vec, /// specifies chain id for cache to skip fetching from remote in offline-start mode pub fork_chain_id: Option, @@ -268,12 +270,19 @@ Block number: {} Block hash: {:?} Chain ID: {} "#, - fork.eth_rpc_url(), + fork.eth_rpc_url().as_deref().unwrap_or("none"), fork.block_number(), fork.block_hash(), fork.chain_id() ); + if self.fork_urls.len() > 1 { + let _ = writeln!(s, "Endpoints: {}", self.fork_urls.len()); + for (i, url) in self.fork_urls.iter().enumerate() { + let _ = writeln!(s, " ({i}) {url}"); + } + } + if let Some(tx_hash) = fork.transaction_hash() { let _ = writeln!(s, "Transaction hash: {tx_hash}"); } @@ -393,7 +402,7 @@ Genesis Number json!({ "available_accounts": available_accounts, "private_keys": private_keys, - "endpoint": fork.eth_rpc_url(), + "endpoint": fork.eth_rpc_url().unwrap_or_default(), "block_number": fork.block_number(), "block_hash": fork.block_hash(), "chain_id": fork.chain_id(), @@ -466,7 +475,7 @@ impl Default for NodeConfig { mixed_mining: false, port: NODE_PORT, max_transactions: 1_000, - eth_rpc_url: None, + fork_urls: vec![], fork_choice: None, account_generator: None, base_fee: None, @@ -855,10 +864,19 @@ impl NodeConfig { self } - /// Sets the `eth_rpc_url` to use when forking + /// Sets the `eth_rpc_url` to use when forking (single endpoint convenience). #[must_use] pub fn with_eth_rpc_url>(mut self, eth_rpc_url: Option) -> Self { - self.eth_rpc_url = eth_rpc_url.map(Into::into); + if let Some(url) = eth_rpc_url { + self.fork_urls = vec![url.into()]; + } + self + } + + /// Sets the fork URLs for load-balanced multi-endpoint forking. + #[must_use] + pub fn with_fork_urls(mut self, fork_urls: Vec) -> Self { + self.fork_urls = fork_urls; self } @@ -891,7 +909,7 @@ impl NodeConfig { self } - /// Sets the `fork_headers` to use with `eth_rpc_url` + /// Sets the `fork_headers` to use with fork RPC endpoints #[must_use] pub fn with_fork_headers(mut self, headers: Vec) -> Self { self.fork_headers = headers; @@ -1017,7 +1035,7 @@ impl NodeConfig { /// /// See also [ Config::foundry_block_cache_file()] pub fn block_cache_path(&self, block: u64) -> Option { - if self.no_storage_caching || self.eth_rpc_url.is_none() { + if self.no_storage_caching || self.fork_urls.is_empty() { return None; } let chain_id = self.get_chain_id(); @@ -1145,7 +1163,7 @@ impl NodeConfig { ); let (db, fork): (Arc>>, Option) = - if let Some(eth_rpc_url) = self.eth_rpc_url.clone() { + if let Some(eth_rpc_url) = self.fork_urls.first().cloned() { self.setup_fork_db(eth_rpc_url, &mut evm_env, &fees).await? } else { (Arc::new(TokioRwLock::new(Box::::default())), None) @@ -1208,7 +1226,7 @@ impl NodeConfig { // Writes the default create2 deployer to the backend, // if the option is not disabled and we are not forking. - if !self.disable_default_create2_deployer && self.eth_rpc_url.is_none() { + if !self.disable_default_create2_deployer && self.fork_urls.is_empty() { backend .set_create2_deployer(DEFAULT_CREATE2_DEPLOYER) .await @@ -1248,6 +1266,10 @@ impl NodeConfig { fees: &FeeManager, ) -> Result<(ForkedDatabase, ClientForkConfig)> { debug!(target: "node", ?eth_rpc_url, "setting up fork db"); + + // Always bootstrap with the primary URL only to avoid race conditions + // where discovery calls (get_chain_id, find_latest_fork_block, get_block) + // hit different endpoints that may be at different chain tips. let provider = Arc::new( ProviderBuilder::new(ð_rpc_url) .timeout(self.fork_request_timeout) @@ -1409,6 +1431,25 @@ latest block number: {latest_block}" BlockchainDb::new(meta, self.block_cache_path(fork_block_number)) }; + // After bootstrap, rebuild the provider with round-robin if multiple URLs are + // configured. This ensures bootstrap used only the primary endpoint for consistency, + // while ongoing requests are distributed across all endpoints. + let provider = if self.fork_urls.len() > 1 { + debug!(target: "node", urls=?self.fork_urls, "using multi-endpoint round-robin provider"); + Arc::new( + ProviderBuilder::new(ð_rpc_url) + .timeout(self.fork_request_timeout) + .initial_backoff(self.fork_retry_backoff.as_millis() as u64) + .compute_units_per_second(self.compute_units_per_second) + .max_retry(self.fork_request_retries) + .headers(self.fork_headers.clone()) + .build_fallback(self.fork_urls.clone()) + .wrap_err("failed to establish round-robin provider to fork urls")?, + ) + } else { + provider + }; + // This will spawn the background thread that will use the provider to fetch // blockchain data from the other client let backend = SharedBackend::spawn_backend( @@ -1419,7 +1460,7 @@ latest block number: {latest_block}" .await; let config = ClientForkConfig { - eth_rpc_url, + fork_urls: self.fork_urls.clone(), block_number: fork_block_number, block_hash, transaction_hash: self.fork_choice.and_then(|fc| fc.transaction_hash()), @@ -1432,6 +1473,7 @@ latest block number: {latest_block}" retries: self.fork_request_retries, backoff: self.fork_retry_backoff, compute_units_per_second: self.compute_units_per_second, + headers: self.fork_headers.clone(), total_difficulty: block.header.total_difficulty.unwrap_or_default(), blob_gas_used: block.header.blob_gas_used().map(|g| g as u128), blob_excess_gas_and_price: evm_env.block_env.blob_excess_gas_and_price, diff --git a/crates/anvil/src/eth/api.rs b/crates/anvil/src/eth/api.rs index f48a2d96cc12c..0a0fd97d06556 100644 --- a/crates/anvil/src/eth/api.rs +++ b/crates/anvil/src/eth/api.rs @@ -416,7 +416,7 @@ impl EthApi { let config = fork.config.read(); NodeForkConfig { - fork_url: Some(config.eth_rpc_url.clone()), + fork_url: config.eth_rpc_url().map(|s| s.to_string()), fork_block_number: Some(config.block_number), fork_retry_backoff: Some(config.backoff.as_millis()), } @@ -527,7 +527,7 @@ impl EthApi { /// Sets the backend rpc url /// /// Handler for ETH RPC call: `anvil_setRpcUrl` - pub fn anvil_set_rpc_url(&self, url: String) -> Result<()> { + pub async fn anvil_set_rpc_url(&self, url: String) -> Result<()> { node_info!("anvil_setRpcUrl"); if let Some(fork) = self.backend.get_fork() { let mut config = fork.config.write(); @@ -543,9 +543,11 @@ impl EthApi { )?, // .interval(interval), ); config.provider = new_provider; - trace!(target: "backend", "Updated fork rpc from \"{}\" to \"{}\"", config.eth_rpc_url, url); - config.eth_rpc_url = url; + trace!(target: "backend", "Updated fork rpc from \"{}\" to \"{}\"", config.eth_rpc_url().unwrap_or("none"), url); + config.fork_urls = vec![url.clone()]; } + // Keep node_config in sync so anvil_reset(None) uses the updated URL + self.backend.node_config.write().await.fork_urls = vec![url]; Ok(()) } @@ -1791,7 +1793,7 @@ impl EthApi { EthRequest::EvmMineDetailed(mine) => { self.evm_mine_detailed(mine.and_then(|p| p.params)).await.to_rpc_result() } - EthRequest::SetRpcUrl(url) => self.anvil_set_rpc_url(url).to_rpc_result(), + EthRequest::SetRpcUrl(url) => self.anvil_set_rpc_url(url).await.to_rpc_result(), EthRequest::EthSendUnsignedTransaction(tx) => { self.eth_send_unsigned_transaction(*tx).await.to_rpc_result() } diff --git a/crates/anvil/src/eth/backend/db.rs b/crates/anvil/src/eth/backend/db.rs index de9ad434252f3..d1c842d12d108 100644 --- a/crates/anvil/src/eth/backend/db.rs +++ b/crates/anvil/src/eth/backend/db.rs @@ -84,7 +84,7 @@ where /// Helper trait to reset the DB if it's forked pub trait MaybeForkedDatabase { - fn maybe_reset(&mut self, _url: Option, block_number: BlockId) -> Result<(), String>; + fn maybe_reset(&mut self, _urls: Vec, block_number: BlockId) -> Result<(), String>; fn maybe_flush_cache(&self) -> Result<(), String>; @@ -375,7 +375,7 @@ impl + Debug> MaybeFullDatabase for CacheD } impl> MaybeForkedDatabase for CacheDB { - fn maybe_reset(&mut self, _url: Option, _block_number: BlockId) -> Result<(), String> { + fn maybe_reset(&mut self, _urls: Vec, _block_number: BlockId) -> Result<(), String> { Err("not supported".to_string()) } diff --git a/crates/anvil/src/eth/backend/fork.rs b/crates/anvil/src/eth/backend/fork.rs index b1bff4e66cd21..bc87fe2fc2052 100644 --- a/crates/anvil/src/eth/backend/fork.rs +++ b/crates/anvil/src/eth/backend/fork.rs @@ -97,8 +97,8 @@ impl ClientFork { self.config.read().block_hash } - pub fn eth_rpc_url(&self) -> String { - self.config.read().eth_rpc_url.clone() + pub fn eth_rpc_url(&self) -> Option { + self.config.read().eth_rpc_url().map(|s| s.to_string()) } pub fn chain_id(&self) -> u64 { @@ -269,7 +269,7 @@ impl ClientFork { /// Reset the fork to a fresh forked state, and optionally update the fork config pub async fn reset( &self, - url: Option, + urls: Vec, block_number: impl Into, ) -> Result<(), BlockchainError> { let block_number = block_number.into(); @@ -277,12 +277,12 @@ impl ClientFork { self.database .write() .await - .maybe_reset(url.clone(), block_number) + .maybe_reset(urls.clone(), block_number) .map_err(BlockchainError::Internal)?; } - if let Some(url) = url { - self.config.write().update_url(url)?; + if !urls.is_empty() { + self.config.write().update_urls(urls)?; let override_chain_id = self.config.read().override_chain_id; let chain_id = if let Some(chain_id) = override_chain_id { chain_id @@ -629,7 +629,10 @@ impl ClientFork { /// Contains all fork metadata #[derive(Clone, Debug)] pub struct ClientForkConfig { - pub eth_rpc_url: String, + /// All fork URLs. The first entry is the primary endpoint. + /// When multiple URLs are present, requests are distributed using + /// round-robin load balancing with retry-based failover. + pub fork_urls: Vec, /// The block number of the forked block pub block_number: u64, /// The hash of the forked block @@ -655,6 +658,8 @@ pub struct ClientForkConfig { pub backoff: Duration, /// available CUPS pub compute_units_per_second: u64, + /// Headers to include with RPC requests + pub headers: Vec, /// total difficulty of the chain until this block pub total_difficulty: U256, /// Transactions to force include in the forked chain @@ -662,27 +667,40 @@ pub struct ClientForkConfig { } impl ClientForkConfig { - /// Updates the provider URL + /// Returns the primary RPC URL (first entry in `fork_urls`). + pub fn eth_rpc_url(&self) -> Option<&str> { + self.fork_urls.first().map(|s| s.as_str()) + } + + /// Updates the provider URLs /// /// # Errors /// /// This will fail if no new provider could be established (erroneous URL) - fn update_url(&mut self, url: String) -> Result<(), BlockchainError> { - // let interval = self.provider.get_interval(); - self.provider = Arc::new( - ProviderBuilder::::new(url.as_str()) - .timeout(self.timeout) - // .timeout_retry(self.retries) - .max_retry(self.retries) - .initial_backoff(self.backoff.as_millis() as u64) - .compute_units_per_second(self.compute_units_per_second) - .build() - .map_err(|e| BlockchainError::InvalidUrl(format!("{url}: {e}")))?, /* .interval(interval), */ - ); - trace!(target: "fork", "Updated rpc url {}", url); - self.eth_rpc_url = url; + fn update_urls(&mut self, urls: Vec) -> Result<(), BlockchainError> { + let primary = urls.first().ok_or_else(|| { + BlockchainError::InvalidUrl("at least one fork URL required".to_string()) + })?; + + let builder = ProviderBuilder::::new(primary.as_str()) + .timeout(self.timeout) + .max_retry(self.retries) + .initial_backoff(self.backoff.as_millis() as u64) + .compute_units_per_second(self.compute_units_per_second) + .headers(self.headers.clone()); + + self.provider = Arc::new(if urls.len() > 1 { + builder + .build_fallback(urls.clone()) + .map_err(|e| BlockchainError::InvalidUrl(format!("{primary}: {e}")))? + } else { + builder.build().map_err(|e| BlockchainError::InvalidUrl(format!("{primary}: {e}")))? + }); + trace!(target: "fork", "Updated fork urls: {:?}", urls); + self.fork_urls = urls; Ok(()) } + /// Updates the block forked off `(block number, block hash, timestamp)` pub fn update_block( &mut self, diff --git a/crates/anvil/src/eth/backend/mem/fork_db.rs b/crates/anvil/src/eth/backend/mem/fork_db.rs index 308cdd2e4334c..497e8ac8e4428 100644 --- a/crates/anvil/src/eth/backend/mem/fork_db.rs +++ b/crates/anvil/src/eth/backend/mem/fork_db.rs @@ -156,8 +156,8 @@ impl MaybeFullDatabase for ForkDbStateSnapshot { } impl MaybeForkedDatabase for ForkedDatabase { - fn maybe_reset(&mut self, url: Option, block_number: BlockId) -> Result<(), String> { - self.reset(url, block_number) + fn maybe_reset(&mut self, urls: Vec, block_number: BlockId) -> Result<(), String> { + self.reset(urls, block_number) } fn maybe_flush_cache(&self) -> Result<(), String> { diff --git a/crates/anvil/src/eth/backend/mem/in_memory_db.rs b/crates/anvil/src/eth/backend/mem/in_memory_db.rs index 766805ee7ae9d..1d06ad03cae2f 100644 --- a/crates/anvil/src/eth/backend/mem/in_memory_db.rs +++ b/crates/anvil/src/eth/backend/mem/in_memory_db.rs @@ -128,7 +128,7 @@ impl MaybeFullDatabase for MemDb { } impl MaybeForkedDatabase for MemDb { - fn maybe_reset(&mut self, _url: Option, _block_number: BlockId) -> Result<(), String> { + fn maybe_reset(&mut self, _urls: Vec, _block_number: BlockId) -> Result<(), String> { Err("not supported".to_string()) } diff --git a/crates/anvil/src/eth/backend/mem/mod.rs b/crates/anvil/src/eth/backend/mem/mod.rs index ed928ef25213d..b538086fe6f10 100644 --- a/crates/anvil/src/eth/backend/mem/mod.rs +++ b/crates/anvil/src/eth/backend/mem/mod.rs @@ -246,7 +246,7 @@ pub struct Backend { prune_state_history_config: PruneStateHistoryConfig, /// max number of blocks with transactions in memory transaction_block_keeper: Option, - node_config: Arc>, + pub(crate) node_config: Arc>, /// Slots in an epoch slots_in_an_epoch: u64, /// Precompiles to inject to the EVM. @@ -2079,6 +2079,7 @@ impl Backend { // we want to force the correct base fee for the next block during // `setup_fork_db_config` node_config.base_fee.take(); + node_config.fork_urls = vec![eth_rpc_url.clone()]; node_config.setup_fork_db_config(eth_rpc_url, &mut evm_env, &self.fees).await? }; @@ -2101,7 +2102,9 @@ impl Backend { let block_number = forking.block_number.map(BlockNumber::from).unwrap_or(BlockNumber::Latest); // reset the fork entirely and reapply the genesis config - fork.reset(forking.json_rpc_url.clone(), block_number).await?; + let reset_urls = + forking.json_rpc_url.as_ref().map(|url| vec![url.clone()]).unwrap_or_default(); + fork.reset(reset_urls, block_number).await?; let fork_block_number = fork.block_number(); let fork_block = fork .block_by_number(fork_block_number) @@ -2115,7 +2118,8 @@ impl Backend { // If rpc url is unspecified, then update the fork with the new block number and // existing rpc url, this updates the cache path { - let maybe_fork_url = { self.node_config.read().await.eth_rpc_url.clone() }; + let maybe_fork_url = + { self.node_config.read().await.fork_urls.first().cloned() }; if let Some(fork_url) = maybe_fork_url { self.reset_block_number(fork_url, fork_block_number).await?; } @@ -2229,6 +2233,8 @@ impl Backend { ) -> Result<(), BlockchainError> { let mut node_config = self.node_config.write().await; node_config.fork_choice = Some(ForkChoice::Block(fork_block_number as i128)); + // Update fork_urls so setup_fork_db_config uses the correct URL set + node_config.fork_urls = vec![fork_url.clone()]; let mut evm_env = self.evm_env.read().clone(); let (forked_db, client_fork_config) = diff --git a/crates/anvil/tests/it/fork.rs b/crates/anvil/tests/it/fork.rs index e802d8a19be7b..37d3f6f4a1dc4 100644 --- a/crates/anvil/tests/it/fork.rs +++ b/crates/anvil/tests/it/fork.rs @@ -2011,3 +2011,61 @@ async fn test_config_with_osaka_hardfork_with_precompile_factory() { &expected_system_contracts, ); } + +// Regression tests: verify that `anvil_setRpcUrl` and `anvil_reset` keep +// `ClientForkConfig.fork_urls` in sync so that subsequent resets don't +// silently revert to stale URLs. + +#[tokio::test(flavor = "multi_thread")] +async fn test_anvil_set_rpc_url_syncs_fork_config() { + // Spawn an origin node and fork off it + let (_origin_api, origin_handle) = spawn(NodeConfig::test()).await; + let origin_url = origin_handle.http_endpoint(); + + let (api, _handle) = spawn(NodeConfig::test().with_eth_rpc_url(Some(origin_url.clone()))).await; + + // Verify initial fork URL + let fork = api.backend.get_fork().unwrap(); + assert_eq!(fork.config.read().fork_urls, vec![origin_url.clone()]); + + // Spawn a second origin to use as the new URL + let (_origin2_api, origin2_handle) = spawn(NodeConfig::test()).await; + let new_url = origin2_handle.http_endpoint(); + + // Set RPC URL via the API + api.anvil_set_rpc_url(new_url.clone()).await.unwrap(); + + // Verify ClientForkConfig is updated + let fork = api.backend.get_fork().unwrap(); + assert_eq!( + fork.config.read().fork_urls, + vec![new_url.clone()], + "ClientForkConfig.fork_urls should be updated after anvil_setRpcUrl" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_anvil_reset_with_url_updates_fork_urls() { + // Spawn an origin node and fork off it + let (_origin_api, origin_handle) = spawn(NodeConfig::test()).await; + let origin_url = origin_handle.http_endpoint(); + + let (api, _handle) = spawn(NodeConfig::test().with_eth_rpc_url(Some(origin_url.clone()))).await; + + // Spawn a second origin + let (_origin2_api, origin2_handle) = spawn(NodeConfig::test()).await; + let new_url = origin2_handle.http_endpoint(); + + // Reset fork with a new URL + api.anvil_reset(Some(Forking { json_rpc_url: Some(new_url.clone()), block_number: None })) + .await + .unwrap(); + + // Verify the fork config uses the new URL, not the old one + let fork = api.backend.get_fork().unwrap(); + assert_eq!( + fork.config.read().fork_urls, + vec![new_url.clone()], + "ClientForkConfig.fork_urls should reflect the new URL after anvil_reset" + ); +} diff --git a/crates/common/src/provider/mod.rs b/crates/common/src/provider/mod.rs index 3b7432cd67c55..1620342989a29 100644 --- a/crates/common/src/provider/mod.rs +++ b/crates/common/src/provider/mod.rs @@ -9,6 +9,7 @@ use crate::{ provider::{curl_transport::CurlTransport, runtime_transport::RuntimeTransportBuilder}, }; use alloy_chains::NamedChain; +use alloy_json_rpc::{RequestPacket, ResponsePacket}; use alloy_network::{Network, NetworkWallet}; use alloy_provider::{ Identity, ProviderBuilder as AlloyProviderBuilder, RootProvider, @@ -16,7 +17,9 @@ use alloy_provider::{ network::{AnyNetwork, EthereumWallet}, }; use alloy_rpc_client::ClientBuilder; -use alloy_transport::{layers::RetryBackoffLayer, utils::guess_local_url}; +use alloy_transport::{ + TransportError, TransportFut, layers::RetryBackoffLayer, utils::guess_local_url, +}; use eyre::{Result, WrapErr}; use foundry_config::Config; use reqwest::Url; @@ -25,8 +28,14 @@ use std::{ net::SocketAddr, path::{Path, PathBuf}, str::FromStr, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + task::{Context, Poll}, time::Duration, }; +use tower::Service; use url::ParseError; /// The assumed block time for unknown chains. @@ -75,6 +84,56 @@ pub fn try_get_http_provider(builder: impl AsRef) -> Result ProviderBuilder::new(builder.as_ref()).build() } +/// A round-robin transport that distributes requests across multiple transports. +/// +/// Each request is sent to exactly one transport, rotating through the list. +/// Failover on error is handled by the retry layer above this service. +#[derive(Clone)] +pub struct RoundRobinService { + transports: Arc>, + next: Arc, +} + +impl RoundRobinService { + /// Creates a new round-robin service from a non-empty list of transports. + /// + /// # Panics + /// + /// Panics if `transports` is empty. + pub fn new(transports: Vec) -> Self { + assert!(!transports.is_empty(), "RoundRobinService requires at least one transport"); + Self { transports: Arc::new(transports), next: Arc::new(AtomicUsize::new(0)) } + } +} + +impl Service for RoundRobinService +where + S: Service< + RequestPacket, + Response = ResponsePacket, + Error = TransportError, + Future = TransportFut<'static>, + > + Clone + + Send + + Sync + + 'static, +{ + type Response = ResponsePacket; + type Error = TransportError; + type Future = TransportFut<'static>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: RequestPacket) -> Self::Future { + let transports = self.transports.clone(); + let idx = self.next.fetch_add(1, Ordering::Relaxed) % transports.len(); + let mut transport = transports[idx].clone(); + transport.call(req) + } +} + /// Helper type to construct a `RetryProvider` /// /// This builder is generic over the network type `N`, defaulting to `AnyNetwork`. @@ -368,6 +427,73 @@ impl ProviderBuilder { } impl ProviderBuilder { + /// Constructs a `RetryProvider` backed by multiple URLs using round-robin load balancing. + /// + /// Each request is sent to exactly one transport, rotating through the list via + /// [`RoundRobinService`]. There is no health scoring or endpoint deprioritization. + /// On failure, the `RetryBackoffLayer` retries the request, which naturally hits + /// the next transport in the rotation. + pub fn build_fallback(self, urls: Vec) -> Result> { + let Self { + chain, + max_retry, + initial_backoff, + timeout, + compute_units_per_second, + jwt, + headers, + accept_invalid_certs, + no_proxy, + curl_mode, + .. + } = self; + + eyre::ensure!(!urls.is_empty(), "at least one fork URL is required"); + eyre::ensure!(!curl_mode, "curl mode is not supported with multiple fork URLs"); + + // Build a RuntimeTransport for each URL, using the same URL normalization + // as ProviderBuilder::new() (handles localhost:port, raw socket addrs, IPC paths) + let mut parsed_urls = Vec::with_capacity(urls.len()); + let transports: Vec<_> = urls + .iter() + .map(|url_str| { + let builder = Self::new(url_str); + let url = builder.url?; + parsed_urls.push(url.clone()); + Ok(RuntimeTransportBuilder::new(url) + .with_timeout(timeout) + .with_headers(headers.clone()) + .with_jwt(jwt.clone()) + .accept_invalid_certs(accept_invalid_certs) + .no_proxy(no_proxy) + .build()) + }) + .collect::>>()?; + + let round_robin = RoundRobinService::new(transports); + + let retry_layer = + RetryBackoffLayer::new(max_retry, initial_backoff, compute_units_per_second); + // Use normalized/parsed URLs for local detection, consistent with build() + let is_local = parsed_urls.iter().all(|url| guess_local_url(url.as_str())); + let client = ClientBuilder::default().layer(retry_layer).transport(round_robin, is_local); + + if !is_local { + client.set_poll_interval( + chain + .average_blocktime_hint() + .map(|hint| hint.min(DEFAULT_UNKNOWN_CHAIN_BLOCK_TIME)) + .unwrap_or(DEFAULT_UNKNOWN_CHAIN_BLOCK_TIME) + .mul_f32(POLL_INTERVAL_BLOCK_TIME_SCALE_FACTOR), + ); + } + + let provider = + AlloyProviderBuilder::<_, _, N>::default().connect_provider(RootProvider::new(client)); + + Ok(provider) + } + /// Constructs the `RetryProvider` with a wallet. pub fn build_with_wallet + Clone>( self, diff --git a/crates/config/src/endpoints.rs b/crates/config/src/endpoints.rs index 64ee7e05fc75f..60a15206bc51a 100644 --- a/crates/config/src/endpoints.rs +++ b/crates/config/src/endpoints.rs @@ -277,6 +277,11 @@ pub struct RpcEndpoint { /// endpoint url or env pub endpoint: RpcEndpointUrl, + /// Additional fallback endpoints for load-balanced multi-endpoint forking. + /// When set, requests are distributed across all endpoints (primary + extra) + /// with automatic failover. + pub extra_endpoints: Vec, + /// Token to be used as authentication pub auth: Option, @@ -293,6 +298,7 @@ impl RpcEndpoint { pub fn resolve(self) -> ResolvedRpcEndpoint { ResolvedRpcEndpoint { endpoint: self.endpoint.resolve(), + extra_endpoints: self.extra_endpoints.into_iter().map(|e| e.resolve()).collect(), auth: self.auth.map(|auth| auth.resolve()), config: self.config, } @@ -301,7 +307,7 @@ impl RpcEndpoint { impl fmt::Display for RpcEndpoint { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let Self { endpoint, auth, config } = self; + let Self { endpoint, auth, config, .. } = self; write!(f, "{endpoint}")?; write!(f, "{config}")?; if let Some(auth) = auth { @@ -316,16 +322,24 @@ impl Serialize for RpcEndpoint { where S: Serializer, { - if self.config.retries.is_none() - && self.config.retry_backoff.is_none() - && self.config.compute_units_per_second.is_none() - && self.auth.is_none() - { - // serialize as endpoint if there's no additional config + let has_config = self.config.retries.is_some() + || self.config.retry_backoff.is_some() + || self.config.compute_units_per_second.is_some() + || self.auth.is_some(); + + if !has_config && self.extra_endpoints.is_empty() { + // serialize as plain endpoint string if there's no additional config self.endpoint.serialize(serializer) } else { - let mut map = serializer.serialize_map(Some(5))?; - map.serialize_entry("endpoint", &self.endpoint)?; + let mut map = serializer.serialize_map(None)?; + if self.extra_endpoints.is_empty() { + map.serialize_entry("endpoint", &self.endpoint)?; + } else { + // Serialize all endpoints as an array under "endpoints" + let all: Vec<&RpcEndpointUrl> = + std::iter::once(&self.endpoint).chain(&self.extra_endpoints).collect(); + map.serialize_entry("endpoints", &all)?; + } map.serialize_entry("retries", &self.config.retries)?; map.serialize_entry("retry_backoff", &self.config.retry_backoff)?; map.serialize_entry("compute_units_per_second", &self.config.compute_units_per_second)?; @@ -348,10 +362,13 @@ impl<'de> Deserialize<'de> for RpcEndpoint { }); } + // Support both single "endpoint" and array "endpoints" for backwards compatibility #[derive(Deserialize)] struct RpcEndpointConfigInner { #[serde(alias = "url")] - endpoint: RpcEndpointUrl, + endpoint: Option, + /// Array of endpoint URLs for multi-endpoint load balancing + endpoints: Option>, retries: Option, retry_backoff: Option, compute_units_per_second: Option, @@ -360,14 +377,43 @@ impl<'de> Deserialize<'de> for RpcEndpoint { let RpcEndpointConfigInner { endpoint, + endpoints, retries, retry_backoff, compute_units_per_second, auth, } = serde_json::from_value(value).map_err(serde::de::Error::custom)?; + let (primary, extra) = match (endpoint, endpoints) { + // Single endpoint: endpoint = "..." + (Some(ep), None) => (ep, vec![]), + // Array of endpoints: endpoints = ["...", "..."] + (None, Some(mut eps)) => { + if eps.is_empty() { + return Err(serde::de::Error::custom( + "endpoints array must contain at least one URL", + )); + } + let primary = eps.remove(0); + (primary, eps) + } + // Both provided — error + (Some(_), Some(_)) => { + return Err(serde::de::Error::custom( + "cannot specify both `endpoint` and `endpoints`", + )); + } + // Neither provided — error + (None, None) => { + return Err(serde::de::Error::custom( + "must specify either `endpoint` or `endpoints`", + )); + } + }; + Ok(Self { - endpoint, + endpoint: primary, + extra_endpoints: extra, auth, config: RpcEndpointConfig { retries, retry_backoff, compute_units_per_second }, }) @@ -384,6 +430,7 @@ impl Default for RpcEndpoint { fn default() -> Self { Self { endpoint: RpcEndpointUrl::Url("http://localhost:8545".to_string()), + extra_endpoints: vec![], config: RpcEndpointConfig::default(), auth: None, } @@ -394,21 +441,38 @@ impl Default for RpcEndpoint { #[derive(Clone, Debug, PartialEq, Eq)] pub struct ResolvedRpcEndpoint { pub endpoint: Result, + /// Additional resolved endpoints for multi-endpoint load balancing. + pub extra_endpoints: Vec>, pub auth: Option>, pub config: RpcEndpointConfig, } impl ResolvedRpcEndpoint { - /// Returns the url this type holds, see [`RpcEndpoint::resolve`] + /// Returns the primary url this type holds, see [`RpcEndpoint::resolve`] pub fn url(&self) -> Result { self.endpoint.clone() } + /// Returns all resolved URLs (primary + extra) for multi-endpoint configurations. + /// Returns an empty vec if no extra endpoints are configured. + pub fn all_urls(&self) -> Result, UnresolvedEnvVarError> { + let primary = self.endpoint.clone()?; + if self.extra_endpoints.is_empty() { + return Ok(vec![primary]); + } + let mut urls = vec![primary]; + for ep in &self.extra_endpoints { + urls.push(ep.clone()?); + } + Ok(urls) + } + // Returns true if all environment variables are resolved successfully pub fn is_unresolved(&self) -> bool { let endpoint_err = self.endpoint.is_err(); + let extra_err = self.extra_endpoints.iter().any(|e| e.is_err()); let auth_err = self.auth.as_ref().map(|auth| auth.is_err()).unwrap_or(false); - endpoint_err || auth_err + endpoint_err || extra_err || auth_err } // Attempts to resolve unresolved environment variables into a new instance @@ -419,6 +483,11 @@ impl ResolvedRpcEndpoint { if let Err(err) = self.endpoint { self.endpoint = err.try_resolve() } + for ep in &mut self.extra_endpoints { + if let Err(err) = std::mem::replace(ep, Ok(String::new())) { + *ep = err.try_resolve(); + } + } if let Some(Err(err)) = self.auth { self.auth = Some(err.try_resolve()) } @@ -483,6 +552,7 @@ mod tests { config, RpcEndpoint { endpoint: RpcEndpointUrl::Url("http://localhost:8545".to_string()), + extra_endpoints: vec![], config: RpcEndpointConfig { retries: Some(5), retry_backoff: Some(250), @@ -498,6 +568,7 @@ mod tests { config, RpcEndpoint { endpoint: RpcEndpointUrl::Url("http://localhost:8545".to_string()), + extra_endpoints: vec![], config: RpcEndpointConfig { retries: None, retry_backoff: None, @@ -507,4 +578,62 @@ mod tests { } ); } + + #[test] + fn serde_rpc_config_multi_endpoints() { + // Array of endpoints via "endpoints" key + let s = r#"{ + "endpoints": ["https://rpc1.example.com", "https://rpc2.example.com", "https://rpc3.example.com"], + "retries": 5, + "retry_backoff": 1000 + }"#; + let config: RpcEndpoint = serde_json::from_str(s).unwrap(); + assert_eq!( + config, + RpcEndpoint { + endpoint: RpcEndpointUrl::Url("https://rpc1.example.com".to_string()), + extra_endpoints: vec![ + RpcEndpointUrl::Url("https://rpc2.example.com".to_string()), + RpcEndpointUrl::Url("https://rpc3.example.com".to_string()), + ], + config: RpcEndpointConfig { + retries: Some(5), + retry_backoff: Some(1000), + compute_units_per_second: None, + }, + auth: None, + } + ); + + // Resolved URLs + let resolved = config.resolve(); + let all_urls = resolved.all_urls().unwrap(); + assert_eq!( + all_urls, + vec![ + "https://rpc1.example.com".to_string(), + "https://rpc2.example.com".to_string(), + "https://rpc3.example.com".to_string(), + ] + ); + } + + #[test] + fn serde_rpc_config_rejects_both_endpoint_and_endpoints() { + let s = r#"{ + "endpoint": "https://rpc1.example.com", + "endpoints": ["https://rpc2.example.com"] + }"#; + let result: Result = serde_json::from_str(s); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("cannot specify both")); + } + + #[test] + fn serde_rpc_config_rejects_empty_endpoints() { + let s = r#"{ "endpoints": [] }"#; + let result: Result = serde_json::from_str(s); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("at least one URL")); + } } diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index c0f8249f37dfd..f148432c42bed 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -3681,6 +3681,7 @@ mod tests { "mainnet", RpcEndpointType::Config(RpcEndpoint { endpoint: RpcEndpointUrl::Env("${_CONFIG_MAINNET}".to_string()), + extra_endpoints: vec![], config: RpcEndpointConfig { retries: Some(3), retry_backoff: Some(1000), @@ -3706,6 +3707,7 @@ mod tests { "mainnet", RpcEndpointType::Config(RpcEndpoint { endpoint: RpcEndpointUrl::Env("${_CONFIG_MAINNET}".to_string()), + extra_endpoints: vec![], config: RpcEndpointConfig { retries: Some(3), retry_backoff: Some(1000), @@ -3753,6 +3755,7 @@ mod tests { "mainnet", RpcEndpointType::Config(RpcEndpoint { endpoint: RpcEndpointUrl::Env("${_CONFIG_MAINNET}".to_string()), + extra_endpoints: vec![], config: RpcEndpointConfig { retries: Some(3), retry_backoff: Some(1000), @@ -3779,6 +3782,7 @@ mod tests { endpoint: RpcEndpointUrl::Url( "https://eth-mainnet.alchemyapi.io/v2/123455".to_string() ), + extra_endpoints: vec![], config: RpcEndpointConfig { retries: Some(3), retry_backoff: Some(1000), diff --git a/crates/evm/core/src/fork/database.rs b/crates/evm/core/src/fork/database.rs index 6f539d356e5a9..aefa0e2ee9741 100644 --- a/crates/evm/core/src/fork/database.rs +++ b/crates/evm/core/src/fork/database.rs @@ -71,7 +71,7 @@ impl ForkedDatabase { /// Reset the fork to a fresh forked state, and optionally update the fork config pub fn reset( &mut self, - _url: Option, + _urls: Vec, block_number: impl Into, ) -> Result<(), String> { self.backend.set_pinned_block(block_number).map_err(|err| err.to_string())?;