Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions src/sync/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,59 @@ use tempo_alloy::primitives::transaction::SignatureType;
use crate::tempo::{Block, Log, Receipt, Transaction, TempoTxEnvelope};
use crate::types::{BlockRow, LogRow, ReceiptRow, TxRow};

/// Validate that receipts match the transactions in their respective blocks.
///
/// Checks two invariants:
/// 1. Receipt count for each block matches the block's transaction count
/// 2. Every receipt's `transaction_hash` exists in the block's transaction list
///
/// `blocks` and `receipts_per_block` must be parallel slices (same length, same order).
pub fn validate_receipts(blocks: &[Block], receipts_per_block: &[Vec<Receipt>]) -> anyhow::Result<()> {
use alloy::network::ReceiptResponse;
use std::collections::HashSet;

if blocks.len() != receipts_per_block.len() {
anyhow::bail!(
"Block/receipt batch length mismatch: {} blocks vs {} receipt groups",
blocks.len(),
receipts_per_block.len()
);
}

for (block, block_receipts) in blocks.iter().zip(receipts_per_block.iter()) {
let block_num = block.header.number;
let tx_count = block.transactions.len();
let receipt_count = block_receipts.len();

// Check count match
if receipt_count != tx_count {
anyhow::bail!(
"Block {block_num}: receipt count ({receipt_count}) != transaction count ({tx_count})"
);
}

// Build set of tx hashes from the block
let block_tx_hashes: HashSet<_> = block
.transactions
.txns()
.map(|tx| tx.tx_hash())
.collect();

// Verify every receipt references a transaction in this block
for receipt in block_receipts {
let receipt_tx_hash = &receipt.transaction_hash();
if !block_tx_hashes.contains(receipt_tx_hash) {
anyhow::bail!(
"Block {block_num}: receipt references unknown tx hash 0x{}",
hex::encode(receipt_tx_hash)
);
}
}
}

Ok(())
}

pub fn timestamp_from_secs(secs: u64) -> DateTime<Utc> {
Utc.timestamp_opt(secs as i64, 0)
.single()
Expand Down Expand Up @@ -186,6 +239,159 @@ mod tests {
assert!(txs.is_empty());
}

// --- validate_receipts tests ---

use alloy::primitives::B256;
use crate::tempo::{Block, Receipt};

/// Build a minimal Block with the given tx hashes via JSON deserialization.
fn mock_block(number: u64, tx_hashes: &[B256]) -> Block {
let txs: Vec<serde_json::Value> = tx_hashes
.iter()
.enumerate()
.map(|(i, hash)| {
serde_json::json!({
"hash": format!("0x{}", hex::encode(hash)),
"blockHash": "0x0000000000000000000000000000000000000000000000000000000000000001",
"blockNumber": format!("0x{:x}", number),
"transactionIndex": format!("0x{:x}", i),
"from": "0x0000000000000000000000000000000000000001",
"to": "0x0000000000000000000000000000000000000002",
"value": "0x0",
"input": "0x",
"nonce": "0x0",
"gas": "0x5208",
"gasPrice": "0x3b9aca00",
"maxFeePerGas": "0x3b9aca00",
"maxPriorityFeePerGas": "0x0",
"type": "0x2",
"chainId": "0x1",
"accessList": [],
"v": "0x0",
"r": "0x0000000000000000000000000000000000000000000000000000000000000001",
"s": "0x0000000000000000000000000000000000000000000000000000000000000001",
"yParity": "0x0"
})
})
.collect();

let block_json = serde_json::json!({
"hash": "0x0000000000000000000000000000000000000000000000000000000000000001",
"parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"number": format!("0x{:x}", number),
"timestamp": "0x0",
"gasLimit": "0x0",
"gasUsed": "0x0",
"miner": "0x0000000000000000000000000000000000000000",
"sha3Uncles": "0x0000000000000000000000000000000000000000000000000000000000000000",
"extraData": "0x",
"logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
"transactionsRoot": "0x0000000000000000000000000000000000000000000000000000000000000000",
"stateRoot": "0x0000000000000000000000000000000000000000000000000000000000000000",
"receiptsRoot": "0x0000000000000000000000000000000000000000000000000000000000000000",
"mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"nonce": "0x0000000000000000",
"difficulty": "0x0",
"totalDifficulty": "0x0",
"size": "0x0",
"uncles": [],
"transactions": txs
});
serde_json::from_value(block_json).expect("valid block json")
}

/// Build a minimal Receipt referencing the given tx hash at the given block number.
fn mock_receipt(block_num: u64, tx_hash: B256) -> Receipt {
let json = serde_json::json!({
"transactionHash": format!("0x{}", hex::encode(tx_hash)),
"blockHash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"blockNumber": format!("0x{:x}", block_num),
"transactionIndex": "0x0",
"gasUsed": "0x5208",
"effectiveGasPrice": "0x3b9aca00",
"from": "0x0000000000000000000000000000000000000000",
"to": "0x0000000000000000000000000000000000000000",
"contractAddress": null,
"cumulativeGasUsed": "0x5208",
"logs": [],
"logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
"status": "0x1",
"type": "0x2",
"feePayer": "0x0000000000000000000000000000000000000000"
});
serde_json::from_value(json).expect("valid receipt json")
}

#[test]
fn validate_receipts_happy_path() {
let tx1 = B256::repeat_byte(0x11);
let tx2 = B256::repeat_byte(0x22);
let block = mock_block(1, &[tx1, tx2]);
let receipts = vec![mock_receipt(1, tx1), mock_receipt(1, tx2)];

assert!(validate_receipts(&[block], &[receipts]).is_ok());
}

#[test]
fn validate_receipts_empty_block_no_receipts() {
let block = mock_block(1, &[]);
assert!(validate_receipts(&[block], &[vec![]]).is_ok());
}

#[test]
fn validate_receipts_orphan_receipt_rejected() {
let tx1 = B256::repeat_byte(0x11);
let orphan = B256::repeat_byte(0xff);
let block = mock_block(1, &[tx1]);
let receipts = vec![mock_receipt(1, orphan)];

let err = validate_receipts(&[block], &[receipts]).unwrap_err();
assert!(
err.to_string().contains("unknown tx hash"),
"expected orphan receipt error, got: {err}"
);
}

#[test]
fn validate_receipts_fewer_receipts_than_txs() {
let tx1 = B256::repeat_byte(0x11);
let tx2 = B256::repeat_byte(0x22);
let block = mock_block(1, &[tx1, tx2]);
// Only one receipt for a block with two txs
let receipts = vec![mock_receipt(1, tx1)];

let err = validate_receipts(&[block], &[receipts]).unwrap_err();
assert!(
err.to_string().contains("receipt count (1) != transaction count (2)"),
"expected count mismatch error, got: {err}"
);
}

#[test]
fn validate_receipts_more_receipts_than_txs() {
let tx1 = B256::repeat_byte(0x11);
let block = mock_block(1, &[tx1]);
// Two receipts for a block with one tx
let receipts = vec![mock_receipt(1, tx1), mock_receipt(1, tx1)];

let err = validate_receipts(&[block], &[receipts]).unwrap_err();
assert!(
err.to_string().contains("receipt count (2) != transaction count (1)"),
"expected count mismatch error, got: {err}"
);
}

#[test]
fn validate_receipts_batch_length_mismatch() {
let block = mock_block(1, &[]);
// Two receipt groups for one block
let err = validate_receipts(&[block], &[vec![], vec![]]).unwrap_err();
assert!(
err.to_string().contains("batch length mismatch"),
"expected batch length mismatch error, got: {err}"
);
}

#[test]
fn enrich_multi_block_batch() {
let mut txs = vec![
Expand Down
13 changes: 11 additions & 2 deletions src/sync/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::types::SyncState;

use super::decoder::{
decode_block, decode_log, decode_receipt, decode_transaction, enrich_txs_from_receipts,
timestamp_from_secs,
timestamp_from_secs, validate_receipts,
};
use super::fetcher::RpcClient;
use super::sink::SinkSet;
Expand Down Expand Up @@ -555,6 +555,9 @@ impl SyncEngine {
self.realtime_rpc.get_receipts_batch_adaptive(from..=to)
)?;

// Validate receipt membership against block transactions
validate_receipts(&blocks, &receipts)?;

// Validate parent hash chain
self.validate_parent_chain(&blocks).await?;

Expand Down Expand Up @@ -628,6 +631,9 @@ impl SyncEngine {
self.realtime_rpc.get_block_receipts(num)
)?;

// Validate receipt membership against block transactions
validate_receipts(&[block.clone()], &[receipts.clone()])?;

let block_row = decode_block(&block);
let block_ts = timestamp_from_secs(block.header.timestamp);
let mut txs: Vec<_> = block
Expand Down Expand Up @@ -1332,7 +1338,7 @@ async fn is_fully_synced(pool: &Pool, tip_num: u64) -> Result<bool> {
async fn sync_range_standalone(sinks: &SinkSet, rpc: &RpcClient, from: u64, to: u64) -> Result<()> {
use super::decoder::{
decode_block, decode_log, decode_receipt, decode_transaction, enrich_txs_from_receipts,
timestamp_from_secs,
timestamp_from_secs, validate_receipts,
};
use alloy::network::ReceiptResponse;

Expand All @@ -1341,6 +1347,9 @@ async fn sync_range_standalone(sinks: &SinkSet, rpc: &RpcClient, from: u64, to:
rpc.get_receipts_batch_adaptive(from..=to)
)?;

// Validate receipt membership against block transactions
validate_receipts(&blocks, &receipts)?;

let block_timestamps: HashMap<u64, _> = blocks
.iter()
.map(|b| (b.header.number, timestamp_from_secs(b.header.timestamp)))
Expand Down
Loading