diff --git a/src/sync/decoder.rs b/src/sync/decoder.rs index 2c8ecc0..d712f45 100644 --- a/src/sync/decoder.rs +++ b/src/sync/decoder.rs @@ -7,6 +7,59 @@ use tempo_alloy::primitives::transaction::SignatureType; use crate::tempo::{Block, Log, Receipt, Transaction, TempoTxEnvelope}; use crate::types::{BlockRow, LogRow, ReceiptRow, TxRow}; +/// Validate that receipts match the transactions in their respective blocks. +/// +/// Checks two invariants: +/// 1. Receipt count for each block matches the block's transaction count +/// 2. Every receipt's `transaction_hash` exists in the block's transaction list +/// +/// `blocks` and `receipts_per_block` must be parallel slices (same length, same order). +pub fn validate_receipts(blocks: &[Block], receipts_per_block: &[Vec]) -> anyhow::Result<()> { + use alloy::network::ReceiptResponse; + use std::collections::HashSet; + + if blocks.len() != receipts_per_block.len() { + anyhow::bail!( + "Block/receipt batch length mismatch: {} blocks vs {} receipt groups", + blocks.len(), + receipts_per_block.len() + ); + } + + for (block, block_receipts) in blocks.iter().zip(receipts_per_block.iter()) { + let block_num = block.header.number; + let tx_count = block.transactions.len(); + let receipt_count = block_receipts.len(); + + // Check count match + if receipt_count != tx_count { + anyhow::bail!( + "Block {block_num}: receipt count ({receipt_count}) != transaction count ({tx_count})" + ); + } + + // Build set of tx hashes from the block + let block_tx_hashes: HashSet<_> = block + .transactions + .txns() + .map(|tx| tx.tx_hash()) + .collect(); + + // Verify every receipt references a transaction in this block + for receipt in block_receipts { + let receipt_tx_hash = &receipt.transaction_hash(); + if !block_tx_hashes.contains(receipt_tx_hash) { + anyhow::bail!( + "Block {block_num}: receipt references unknown tx hash 0x{}", + hex::encode(receipt_tx_hash) + ); + } + } + } + + Ok(()) +} + pub fn timestamp_from_secs(secs: u64) -> DateTime { Utc.timestamp_opt(secs as i64, 0) .single() @@ -186,6 +239,159 @@ mod tests { assert!(txs.is_empty()); } + // --- validate_receipts tests --- + + use alloy::primitives::B256; + use crate::tempo::{Block, Receipt}; + + /// Build a minimal Block with the given tx hashes via JSON deserialization. + fn mock_block(number: u64, tx_hashes: &[B256]) -> Block { + let txs: Vec = tx_hashes + .iter() + .enumerate() + .map(|(i, hash)| { + serde_json::json!({ + "hash": format!("0x{}", hex::encode(hash)), + "blockHash": "0x0000000000000000000000000000000000000000000000000000000000000001", + "blockNumber": format!("0x{:x}", number), + "transactionIndex": format!("0x{:x}", i), + "from": "0x0000000000000000000000000000000000000001", + "to": "0x0000000000000000000000000000000000000002", + "value": "0x0", + "input": "0x", + "nonce": "0x0", + "gas": "0x5208", + "gasPrice": "0x3b9aca00", + "maxFeePerGas": "0x3b9aca00", + "maxPriorityFeePerGas": "0x0", + "type": "0x2", + "chainId": "0x1", + "accessList": [], + "v": "0x0", + "r": "0x0000000000000000000000000000000000000000000000000000000000000001", + "s": "0x0000000000000000000000000000000000000000000000000000000000000001", + "yParity": "0x0" + }) + }) + .collect(); + + let block_json = serde_json::json!({ + "hash": "0x0000000000000000000000000000000000000000000000000000000000000001", + "parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "number": format!("0x{:x}", number), + "timestamp": "0x0", + "gasLimit": "0x0", + "gasUsed": "0x0", + "miner": "0x0000000000000000000000000000000000000000", + "sha3Uncles": "0x0000000000000000000000000000000000000000000000000000000000000000", + "extraData": "0x", + "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "transactionsRoot": "0x0000000000000000000000000000000000000000000000000000000000000000", + "stateRoot": "0x0000000000000000000000000000000000000000000000000000000000000000", + "receiptsRoot": "0x0000000000000000000000000000000000000000000000000000000000000000", + "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "nonce": "0x0000000000000000", + "difficulty": "0x0", + "totalDifficulty": "0x0", + "size": "0x0", + "uncles": [], + "transactions": txs + }); + serde_json::from_value(block_json).expect("valid block json") + } + + /// Build a minimal Receipt referencing the given tx hash at the given block number. + fn mock_receipt(block_num: u64, tx_hash: B256) -> Receipt { + let json = serde_json::json!({ + "transactionHash": format!("0x{}", hex::encode(tx_hash)), + "blockHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "blockNumber": format!("0x{:x}", block_num), + "transactionIndex": "0x0", + "gasUsed": "0x5208", + "effectiveGasPrice": "0x3b9aca00", + "from": "0x0000000000000000000000000000000000000000", + "to": "0x0000000000000000000000000000000000000000", + "contractAddress": null, + "cumulativeGasUsed": "0x5208", + "logs": [], + "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "status": "0x1", + "type": "0x2", + "feePayer": "0x0000000000000000000000000000000000000000" + }); + serde_json::from_value(json).expect("valid receipt json") + } + + #[test] + fn validate_receipts_happy_path() { + let tx1 = B256::repeat_byte(0x11); + let tx2 = B256::repeat_byte(0x22); + let block = mock_block(1, &[tx1, tx2]); + let receipts = vec![mock_receipt(1, tx1), mock_receipt(1, tx2)]; + + assert!(validate_receipts(&[block], &[receipts]).is_ok()); + } + + #[test] + fn validate_receipts_empty_block_no_receipts() { + let block = mock_block(1, &[]); + assert!(validate_receipts(&[block], &[vec![]]).is_ok()); + } + + #[test] + fn validate_receipts_orphan_receipt_rejected() { + let tx1 = B256::repeat_byte(0x11); + let orphan = B256::repeat_byte(0xff); + let block = mock_block(1, &[tx1]); + let receipts = vec![mock_receipt(1, orphan)]; + + let err = validate_receipts(&[block], &[receipts]).unwrap_err(); + assert!( + err.to_string().contains("unknown tx hash"), + "expected orphan receipt error, got: {err}" + ); + } + + #[test] + fn validate_receipts_fewer_receipts_than_txs() { + let tx1 = B256::repeat_byte(0x11); + let tx2 = B256::repeat_byte(0x22); + let block = mock_block(1, &[tx1, tx2]); + // Only one receipt for a block with two txs + let receipts = vec![mock_receipt(1, tx1)]; + + let err = validate_receipts(&[block], &[receipts]).unwrap_err(); + assert!( + err.to_string().contains("receipt count (1) != transaction count (2)"), + "expected count mismatch error, got: {err}" + ); + } + + #[test] + fn validate_receipts_more_receipts_than_txs() { + let tx1 = B256::repeat_byte(0x11); + let block = mock_block(1, &[tx1]); + // Two receipts for a block with one tx + let receipts = vec![mock_receipt(1, tx1), mock_receipt(1, tx1)]; + + let err = validate_receipts(&[block], &[receipts]).unwrap_err(); + assert!( + err.to_string().contains("receipt count (2) != transaction count (1)"), + "expected count mismatch error, got: {err}" + ); + } + + #[test] + fn validate_receipts_batch_length_mismatch() { + let block = mock_block(1, &[]); + // Two receipt groups for one block + let err = validate_receipts(&[block], &[vec![], vec![]]).unwrap_err(); + assert!( + err.to_string().contains("batch length mismatch"), + "expected batch length mismatch error, got: {err}" + ); + } + #[test] fn enrich_multi_block_batch() { let mut txs = vec![ diff --git a/src/sync/engine.rs b/src/sync/engine.rs index d9d649e..9248781 100644 --- a/src/sync/engine.rs +++ b/src/sync/engine.rs @@ -13,7 +13,7 @@ use crate::types::SyncState; use super::decoder::{ decode_block, decode_log, decode_receipt, decode_transaction, enrich_txs_from_receipts, - timestamp_from_secs, + timestamp_from_secs, validate_receipts, }; use super::fetcher::RpcClient; use super::sink::SinkSet; @@ -555,6 +555,9 @@ impl SyncEngine { self.realtime_rpc.get_receipts_batch_adaptive(from..=to) )?; + // Validate receipt membership against block transactions + validate_receipts(&blocks, &receipts)?; + // Validate parent hash chain self.validate_parent_chain(&blocks).await?; @@ -628,6 +631,9 @@ impl SyncEngine { self.realtime_rpc.get_block_receipts(num) )?; + // Validate receipt membership against block transactions + validate_receipts(&[block.clone()], &[receipts.clone()])?; + let block_row = decode_block(&block); let block_ts = timestamp_from_secs(block.header.timestamp); let mut txs: Vec<_> = block @@ -1332,7 +1338,7 @@ async fn is_fully_synced(pool: &Pool, tip_num: u64) -> Result { async fn sync_range_standalone(sinks: &SinkSet, rpc: &RpcClient, from: u64, to: u64) -> Result<()> { use super::decoder::{ decode_block, decode_log, decode_receipt, decode_transaction, enrich_txs_from_receipts, - timestamp_from_secs, + timestamp_from_secs, validate_receipts, }; use alloy::network::ReceiptResponse; @@ -1341,6 +1347,9 @@ async fn sync_range_standalone(sinks: &SinkSet, rpc: &RpcClient, from: u64, to: rpc.get_receipts_batch_adaptive(from..=to) )?; + // Validate receipt membership against block transactions + validate_receipts(&blocks, &receipts)?; + let block_timestamps: HashMap = blocks .iter() .map(|b| (b.header.number, timestamp_from_secs(b.header.timestamp)))