From 1fd8fe2b9e81ac2895ea616eee3b9ed97134eb41 Mon Sep 17 00:00:00 2001 From: MauroFab Date: Fri, 21 Mar 2025 13:34:38 -0300 Subject: [PATCH] Refactor --- batcher/aligned-batcher/src/lib.rs | 519 +++++++++++++++++++++-------- 1 file changed, 377 insertions(+), 142 deletions(-) diff --git a/batcher/aligned-batcher/src/lib.rs b/batcher/aligned-batcher/src/lib.rs index 9c25d8f69d..54edb6c328 100644 --- a/batcher/aligned-batcher/src/lib.rs +++ b/batcher/aligned-batcher/src/lib.rs @@ -99,6 +99,16 @@ pub struct Batcher { pub telemetry: TelemetrySender, } +/// Enum representing the possible outcomes of nonce validation +pub enum NoncedValidationResult { + /// Nonce is valid and matches the expected value + Valid, + /// Message could be a valid replacement (nonce is lower than expected) + Replacement, + /// Nonce is invalid (higher than expected or user not found) + Invalid, +} + impl Batcher { pub async fn new(config_file: String) -> Self { dotenv().ok(); @@ -559,11 +569,129 @@ impl Batcher { debug!("Received message with nonce: {msg_nonce:?}"); self.metrics.received_proofs.inc(); - // * ---------------------------------------------------* - // * Perform validations over the message * - // * ---------------------------------------------------* + // Validation step 1: Check chain ID + if !self.validate_chain_id(&client_msg, &ws_conn_sink).await { + return Ok(()); + } + + // Validation step 2: Check payment service address + if !self.validate_payment_service_address(&client_msg, &ws_conn_sink).await { + return Ok(()); + } + + // Validation step 3: Verify signature + let Ok(addr) = client_msg.verify_signature() else { + error!("Signature verification error"); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::InvalidSignature, + ) + .await; + self.metrics.user_error(&["invalid_signature", ""]); + return Ok(()); + }; + info!("Message signature verified"); + + // Validation step 4: Check proof size + if !self.validate_proof_size(&client_msg, &ws_conn_sink).await { + return Ok(()); + } + + let nonced_verification_data = client_msg.verification_data.clone(); + + // Validation step 5 & 6: Verify proof if pre-verification is enabled + if !self.validate_and_verify_proof(&nonced_verification_data, &ws_conn_sink).await { + return Ok(()); + } + + // Handle different paths for non-paying vs paying addresses + if self.is_nonpaying(&addr) { + return self + .handle_nonpaying_msg(ws_conn_sink.clone(), &client_msg) + .await; + } + + info!("Handling paying message"); + + // Validation step 7: Check if user balance is unlocked + if !self.validate_user_balance_unlocked(&addr, &ws_conn_sink).await { + return Ok(()); + } + + // Validation step 8: Check if user is in state, get nonce from Ethereum if not + let batch_state_lock = self.prepare_user_state(&addr, &ws_conn_sink).await?; + + // Validation step 9: Validate user state and balance + if !self.validate_user_state_and_balance(&addr, &nonced_verification_data, &batch_state_lock, &ws_conn_sink).await { + return Ok(()); + } + + // Validation step 10: Check nonce validity + let validation_result = self.validate_nonce( + &addr, + &nonced_verification_data, + &batch_state_lock, + &ws_conn_sink, + ).await; + + match validation_result { + NoncedValidationResult::Valid => {}, + NoncedValidationResult::Replacement => { + // Handle replacement message case + self.handle_replacement_message( + batch_state_lock, + nonced_verification_data, + ws_conn_sink.clone(), + client_msg.signature, + addr, + ) + .await; + return Ok(()); + }, + NoncedValidationResult::Invalid => return Ok(()), + } + + // Validation step 11: Validate max fee + if !self.validate_max_fee(&addr, &nonced_verification_data, &batch_state_lock, &ws_conn_sink).await { + return Ok(()); + } + + // Final step: Add to batch + if let Err(e) = self + .add_to_batch( + batch_state_lock, + nonced_verification_data, + ws_conn_sink.clone(), + client_msg.signature, + addr, + ) + .await + { + error!("Error while adding entry to batch: {e:?}"); + send_message(ws_conn_sink, SubmitProofResponseMessage::AddToBatchError).await; + self.metrics.user_error(&["add_to_batch_error", ""]); + return Ok(()); + }; + + info!("Verification data message handled"); + Ok(()) + } - // This check does not save against "Holesky" and "HoleskyStage", since both are chain_id 17000 + /// Validates the chain ID in the client message. + /// + /// Ensures that the chain ID in the message matches the batcher's configured chain ID. + /// If validation fails, sends an InvalidChainId error response to the client and logs a warning. + /// + /// # Telemetry + /// Records user error metrics if validation fails, but does not interact with external telemetry services. + /// + /// # Returns + /// `true` if validation passes, `false` otherwise. + async fn validate_chain_id( + &self, + client_msg: &SubmitProofMessage, + ws_conn_sink: &WsMessageSink, + ) -> bool { let msg_chain_id = client_msg.verification_data.chain_id; if msg_chain_id != self.chain_id { warn!("Received message with incorrect chain id: {msg_chain_id}"); @@ -573,10 +701,26 @@ impl Batcher { ) .await; self.metrics.user_error(&["invalid_chain_id", ""]); - return Ok(()); + return false; } + true + } - // This checks saves against "Holesky" and "HoleskyStage", since each one has a different payment service address + /// Validates the payment service address in the client message. + /// + /// Ensures that the payment service address in the message matches the batcher's configured payment service address. + /// If validation fails, sends an InvalidPaymentServiceAddress error response to the client and logs a warning. + /// + /// # Telemetry + /// Records user error metrics if validation fails, but does not interact with external telemetry services. + /// + /// # Returns + /// `true` if validation passes, `false` otherwise. + async fn validate_payment_service_address( + &self, + client_msg: &SubmitProofMessage, + ws_conn_sink: &WsMessageSink, + ) -> bool { let msg_payment_service_addr = client_msg.verification_data.payment_service_addr; if msg_payment_service_addr != self.payment_service.address() { warn!("Received message with incorrect payment service address: {msg_payment_service_addr}"); @@ -590,22 +734,26 @@ impl Batcher { .await; self.metrics .user_error(&["invalid_payment_service_address", ""]); - return Ok(()); + return false; } + true + } - info!("Verifying message signature..."); - let Ok(addr) = client_msg.verify_signature() else { - error!("Signature verification error"); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::InvalidSignature, - ) - .await; - self.metrics.user_error(&["invalid_signature", ""]); - return Ok(()); - }; - info!("Message signature verified"); - + /// Validates the proof size in the client message. + /// + /// Ensures that the proof size does not exceed the maximum allowed size. + /// If validation fails, sends a ProofTooLarge error response to the client and logs an error. + /// + /// # Telemetry + /// Records user error metrics if validation fails, but does not interact with external telemetry services. + /// + /// # Returns + /// `true` if validation passes, `false` otherwise. + async fn validate_proof_size( + &self, + client_msg: &SubmitProofMessage, + ws_conn_sink: &WsMessageSink, + ) -> bool { let proof_size = client_msg.verification_data.verification_data.proof.len(); if proof_size > self.max_proof_size { error!("Proof size exceeds the maximum allowed size."); @@ -615,84 +763,127 @@ impl Batcher { ) .await; self.metrics.user_error(&["proof_too_large", ""]); - return Ok(()); + return false; } + true + } - let nonced_verification_data = client_msg.verification_data.clone(); - - // When pre-verification is enabled, batcher will verify proofs for faster feedback with clients - if self.pre_verification_is_enabled { - let verification_data = &nonced_verification_data.verification_data; - if self - .is_verifier_disabled(verification_data.proving_system) - .await - { - warn!( - "Verifier for proving system {} is disabled, skipping verification", - verification_data.proving_system - ); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::DisabledVerifier( - verification_data.proving_system, - )), - ) - .await; - self.metrics.user_error(&[ - "disabled_verifier", - &format!("{}", verification_data.proving_system), - ]); - return Ok(()); - } - - if !zk_utils::verify(verification_data).await { - error!("Invalid proof detected. Verification failed"); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::RejectedProof), - ) - .await; - self.metrics.user_error(&[ - "rejected_proof", - &format!("{}", verification_data.proving_system), - ]); - return Ok(()); - } + /// Validates and verifies the proof in the verification data if pre-verification is enabled. + /// + /// First checks if the verifier for the proof's proving system is disabled. + /// If pre-verification is enabled and the verifier is not disabled, verifies the proof. + /// If any validation fails, sends an appropriate error response to the client. + /// + /// # Telemetry + /// Records user error metrics if validation fails, but does not interact with external telemetry services. + /// + /// # Returns + /// `true` if validation passes or if pre-verification is disabled, `false` otherwise. + async fn validate_and_verify_proof( + &self, + nonced_verification_data: &NoncedVerificationData, + ws_conn_sink: &WsMessageSink, + ) -> bool { + // Skip verification if pre-verification is disabled + if !self.pre_verification_is_enabled { + return true; } - if self.is_nonpaying(&addr) { - return self - .handle_nonpaying_msg(ws_conn_sink.clone(), &client_msg) - .await; + let verification_data = &nonced_verification_data.verification_data; + + // Check if verifier is disabled + if self.is_verifier_disabled(verification_data.proving_system).await { + warn!( + "Verifier for proving system {} is disabled, skipping verification", + verification_data.proving_system + ); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::DisabledVerifier( + verification_data.proving_system, + )), + ) + .await; + self.metrics.user_error(&[ + "disabled_verifier", + &format!("{}", verification_data.proving_system), + ]); + return false; } - info!("Handling paying message"); + // Verify the proof + if !zk_utils::verify(verification_data).await { + error!("Invalid proof detected. Verification failed"); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::RejectedProof), + ) + .await; + self.metrics.user_error(&[ + "rejected_proof", + &format!("{}", verification_data.proving_system), + ]); + return false; + } + + true + } - // We don't need a batch state lock here, since if the user locks its funds - // after the check, some blocks should pass until he can withdraw. - // It is safe to do just do this here. - if self.user_balance_is_unlocked(&addr).await { + /// Validates that the user's balance is not unlocked (available for withdrawal). + /// + /// If the user's balance is unlocked, it means the user is not eligible for submitting proofs + /// as they can withdraw their funds at any time. Sends an InsufficientBalance error response + /// to the client if validation fails. + /// + /// # Telemetry + /// Records user error metrics if validation fails, but does not interact with external telemetry services. + /// + /// # Returns + /// `true` if validation passes (balance is not unlocked), `false` otherwise. + async fn validate_user_balance_unlocked( + &self, + addr: &Address, + ws_conn_sink: &WsMessageSink, + ) -> bool { + if self.user_balance_is_unlocked(addr).await { send_message( ws_conn_sink.clone(), - SubmitProofResponseMessage::InsufficientBalance(addr), + SubmitProofResponseMessage::InsufficientBalance(*addr), ) .await; self.metrics.user_error(&["insufficient_balance", ""]); - return Ok(()); + return false; } + true + } - // We aquire the lock first only to query if the user is already present and the lock is dropped. - // If it was not present, then the user nonce is queried to the Aligned contract. - // Lastly, we get a lock of the batch state again and insert the user state if it was still missing. - + /// Prepares the user state for processing the proof submission. + /// + /// Checks if the user is already in the batch state. If not, fetches the user's nonce from Ethereum + /// and creates a new user state. Returns a lock on the batch state for further processing. + /// + /// # Telemetry + /// Records user error metrics if fetching the nonce from Ethereum fails, but does not interact + /// with external telemetry services. + /// + /// # Returns + /// A Result containing a MutexGuard to the BatchState if successful, or an Error if fetching the + /// user nonce from Ethereum fails. + async fn prepare_user_state( + &self, + addr: &Address, + ws_conn_sink: &WsMessageSink, + ) -> Result, Error> { + // Check if the user is already in state let is_user_in_state: bool; { let batch_state_lock = self.batch_state.lock().await; - is_user_in_state = batch_state_lock.user_states.contains_key(&addr); + is_user_in_state = batch_state_lock.user_states.contains_key(addr); } + // If not in state, get nonce from Ethereum and create user state if !is_user_in_state { - let ethereum_user_nonce = match self.get_user_nonce_from_ethereum(addr).await { + let ethereum_user_nonce = match self.get_user_nonce_from_ethereum(*addr).await { Ok(ethereum_user_nonce) => ethereum_user_nonce, Err(e) => { error!( @@ -704,22 +895,44 @@ impl Batcher { ) .await; self.metrics.user_error(&["eth_rpc_error", ""]); - return Ok(()); + return Err(Error::AlreadyClosed); } }; + let user_state = UserState::new(ethereum_user_nonce); let mut batch_state_lock = self.batch_state.lock().await; batch_state_lock .user_states - .entry(addr) + .entry(*addr) .or_insert(user_state); } + + // Return with lock acquired + Ok(self.batch_state.lock().await) + } - // * ---------------------------------------------------* - // * Perform validations over user state * - // * ---------------------------------------------------* - - let Some(user_balance) = self.get_user_balance(&addr).await else { + /// Validates the user's state and balance for processing the proof submission. + /// + /// Performs several checks: + /// 1. Fetches the user's balance from Ethereum + /// 2. Gets the user's last max fee limit from the batch state + /// 3. Gets the user's accumulated fee in the queue from the batch state + /// 4. Verifies the user has enough balance to cover the new proof's max fee plus accumulated fees + /// + /// # Telemetry + /// Records user error metrics if any validation fails, but does not interact with external telemetry services. + /// + /// # Returns + /// `true` if all validations pass, `false` otherwise. + async fn validate_user_state_and_balance( + &self, + addr: &Address, + nonced_verification_data: &NoncedVerificationData, + batch_state_lock: &MutexGuard<'_, BatchState>, + ws_conn_sink: &WsMessageSink, + ) -> bool { + // Get user balance + let Some(user_balance) = self.get_user_balance(addr).await else { error!("Could not get balance for address {addr:?}"); send_message( ws_conn_sink.clone(), @@ -727,67 +940,86 @@ impl Batcher { ) .await; self.metrics.user_error(&["eth_rpc_error", ""]); - return Ok(()); + return false; }; - // For now on until the message is fully processed, the batch state is locked - // This is needed because we need to query the user state to make validations and - // finally add the proof to the batch queue. - - let batch_state_lock = self.batch_state.lock().await; - - let msg_max_fee = nonced_verification_data.max_fee; + // Get user's last max fee limit let Some(user_last_max_fee_limit) = - batch_state_lock.get_user_last_max_fee_limit(&addr).await + batch_state_lock.get_user_last_max_fee_limit(addr).await else { - std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::AddToBatchError, ) .await; self.metrics.user_error(&["batcher_state_error", ""]); - return Ok(()); + return false; }; - let Some(user_accumulated_fee) = batch_state_lock.get_user_total_fees_in_queue(&addr).await + // Get user's accumulated fee + let Some(user_accumulated_fee) = batch_state_lock.get_user_total_fees_in_queue(addr).await else { - std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::AddToBatchError, ) .await; self.metrics.user_error(&["batcher_state_error", ""]); - return Ok(()); + return false; }; + // Verify user has enough balance + let msg_max_fee = nonced_verification_data.max_fee; if !self.verify_user_has_enough_balance(user_balance, user_accumulated_fee, msg_max_fee) { - std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), - SubmitProofResponseMessage::InsufficientBalance(addr), + SubmitProofResponseMessage::InsufficientBalance(*addr), ) .await; self.metrics.user_error(&["insufficient_balance", ""]); - return Ok(()); + return false; } - let cached_user_nonce = batch_state_lock.get_user_nonce(&addr).await; + true + } + + /// Validates the nonce in the verification data against the user's expected nonce. + /// + /// Performs three checks: + /// 1. Confirms the user has a valid cached nonce in the batch state + /// 2. If the message nonce is higher than expected, rejects it as invalid + /// 3. If the message nonce is lower than expected, marks it as a potential replacement message + /// 4. If the message nonce matches the expected nonce, validates it as correct + /// + /// # Telemetry + /// Records user error metrics if nonce validation fails, but does not interact with external telemetry services. + /// + /// # Returns + /// A `NoncedValidationResult` indicating whether the nonce is valid, invalid, or represents a replacement message. + async fn validate_nonce( + &self, + addr: &Address, + nonced_verification_data: &NoncedVerificationData, + batch_state_lock: &MutexGuard<'_, BatchState>, + ws_conn_sink: &WsMessageSink, + ) -> NoncedValidationResult { + let msg_nonce = nonced_verification_data.nonce; + + // Get cached user nonce + let cached_user_nonce = batch_state_lock.get_user_nonce(addr).await; let Some(expected_nonce) = cached_user_nonce else { error!("Failed to get cached user nonce: User not found in user states, but it should have been already inserted"); - std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::AddToBatchError, ) .await; self.metrics.user_error(&["batcher_state_error", ""]); - return Ok(()); + return NoncedValidationResult::Invalid; }; + // Nonce is too high - invalid message if expected_nonce < msg_nonce { - std::mem::drop(batch_state_lock); warn!("Invalid nonce for address {addr}, expected nonce: {expected_nonce:?}, received nonce: {msg_nonce:?}"); send_message( ws_conn_sink.clone(), @@ -795,27 +1027,51 @@ impl Batcher { ) .await; self.metrics.user_error(&["invalid_nonce", ""]); - return Ok(()); + return NoncedValidationResult::Invalid; } - // In this case, the message might be a replacement one. If it is valid, - // we replace the old entry with the new from the replacement message. + // Nonce is too low - could be a replacement message if expected_nonce > msg_nonce { info!("Possible replacement message received: Expected nonce {expected_nonce:?} - message nonce: {msg_nonce:?}"); - self.handle_replacement_message( - batch_state_lock, - nonced_verification_data, + return NoncedValidationResult::Replacement; + } + + // Nonce is correct + NoncedValidationResult::Valid + } + + /// Validates the max fee in the verification data against the user's last max fee limit. + /// + /// Ensures that the max fee in the message does not exceed the user's last max fee limit. + /// If the max fee is too high, sends an InvalidMaxFee error response to the client. + /// + /// # Telemetry + /// Records user error metrics if validation fails, but does not interact with external telemetry services. + /// + /// # Returns + /// `true` if validation passes, `false` otherwise. + async fn validate_max_fee( + &self, + addr: &Address, + nonced_verification_data: &NoncedVerificationData, + batch_state_lock: &MutexGuard<'_, BatchState>, + ws_conn_sink: &WsMessageSink, + ) -> bool { + let msg_max_fee = nonced_verification_data.max_fee; + + let Some(user_last_max_fee_limit) = + batch_state_lock.get_user_last_max_fee_limit(addr).await + else { + send_message( ws_conn_sink.clone(), - client_msg.signature, - addr, + SubmitProofResponseMessage::AddToBatchError, ) .await; - - return Ok(()); - } + self.metrics.user_error(&["batcher_state_error", ""]); + return false; + }; if msg_max_fee > user_last_max_fee_limit { - std::mem::drop(batch_state_lock); warn!("Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}"); send_message( ws_conn_sink.clone(), @@ -823,31 +1079,10 @@ impl Batcher { ) .await; self.metrics.user_error(&["invalid_max_fee", ""]); - return Ok(()); + return false; } - - // * ---------------------------------------------------------------------* - // * Add message data into the queue and update user state * - // * ---------------------------------------------------------------------* - - if let Err(e) = self - .add_to_batch( - batch_state_lock, - nonced_verification_data, - ws_conn_sink.clone(), - client_msg.signature, - addr, - ) - .await - { - error!("Error while adding entry to batch: {e:?}"); - send_message(ws_conn_sink, SubmitProofResponseMessage::AddToBatchError).await; - self.metrics.user_error(&["add_to_batch_error", ""]); - return Ok(()); - }; - - info!("Verification data message handled"); - Ok(()) + + true } async fn is_verifier_disabled(&self, verifier: ProvingSystemId) -> bool {