diff --git a/config.toml b/config.toml index 0159f0a..b5d494f 100644 --- a/config.toml +++ b/config.toml @@ -14,6 +14,7 @@ name = "moderato" chain_id = 42431 rpc_url = "https://eng:aphex-twin-jeff-mills@rpc.testnet.tempo.xyz" pg_url = "postgres://tidx:tidx@postgres:5432/tidx_moderato" +# start_block = 0 # Starting block for indexing (default: 0 = genesis) backfill = true batch_size = 500 concurrency = 8 diff --git a/db/sync_state.sql b/db/sync_state.sql index 826fc94..dc9e621 100644 --- a/db/sync_state.sql +++ b/db/sync_state.sql @@ -13,7 +13,10 @@ CREATE TABLE IF NOT EXISTS sync_state ( -- Persisted in PG so it survives restarts and isn't fooled by realtime sync writing ahead. ALTER TABLE sync_state ADD COLUMN IF NOT EXISTS ch_backfill_block INT8 NOT NULL DEFAULT 0; -COMMENT ON COLUMN sync_state.synced_num IS 'Highest contiguous block synced from genesis (no gaps up to here)'; +ALTER TABLE sync_state ADD COLUMN IF NOT EXISTS start_block INT8 NOT NULL DEFAULT 0; + +COMMENT ON COLUMN sync_state.start_block IS 'Configured starting block for indexing (0=genesis)'; +COMMENT ON COLUMN sync_state.synced_num IS 'Highest contiguous block synced from start_block (no gaps up to here)'; COMMENT ON COLUMN sync_state.tip_num IS 'Highest block synced near chain head (may have gaps below)'; COMMENT ON COLUMN sync_state.backfill_num IS 'Lowest block synced going backwards (NULL=not started, 0=complete)'; COMMENT ON COLUMN sync_state.sync_rate IS 'Current sync rate in blocks/second (rolling 5s window)'; diff --git a/src/cli/status.rs b/src/cli/status.rs index a7def26..0180e56 100644 --- a/src/cli/status.rs +++ b/src/cli/status.rs @@ -113,7 +113,7 @@ fn print_http_status(resp: &serde_json::Value) -> Result<()> { let name = chain["name"].as_str().unwrap_or("unknown"); let chain_id = chain["chain_id"].as_i64().unwrap_or(0); let head_num = chain["head_num"].as_i64().unwrap_or(0); - let synced_num = chain["synced_num"].as_i64().unwrap_or(0); + let _synced_num = chain["synced_num"].as_i64().unwrap_or(0); let lag = chain["lag"].as_i64().unwrap_or(0); println!("┌─ {} (chain_id: {}) ─────────────────────", name, chain_id); @@ -257,7 +257,7 @@ async fn print_json_status(config: &Config) -> Result<()> { Ok(pool) => { let state = load_sync_state(&pool, chain.chain_id).await.ok().flatten(); let tip = state.as_ref().map(|s| s.tip_num).unwrap_or(0); - let gaps = detect_all_gaps(&pool, tip).await.unwrap_or_default(); + let gaps = detect_all_gaps(&pool, tip, chain.start_block).await.unwrap_or_default(); (state, gaps) } Err(_) => (None, vec![]), diff --git a/src/cli/up.rs b/src/cli/up.rs index 2f3a8ca..e473e81 100644 --- a/src/cli/up.rs +++ b/src/cli/up.rs @@ -269,6 +269,7 @@ fn spawn_sync_engine( chain = %chain.name, chain_id = chain.chain_id, rpc = %chain.rpc_url, + start_block = chain.start_block, backfill_limit = throttled_pool.backfill_semaphore.available_permits(), "Starting sync for chain (throttled pool: 16 connections, backfill limited)" ); @@ -374,7 +375,8 @@ fn spawn_sync_engine( .with_batch_size(chain.batch_size) .with_concurrency(chain.concurrency) .with_backfill_first(backfill_first) - .with_trust_rpc(trust_rpc); + .with_trust_rpc(trust_rpc) + .with_start_block(chain.start_block); } Err(e) => { warn!(error = %e, chain = %chain.name, "Failed to create sync engine, retrying in 10s"); diff --git a/src/config.rs b/src/config.rs index cb469d9..c67724d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -107,6 +107,11 @@ pub struct ChainConfig { #[serde(default)] pub pg_password_env: Option, + /// Starting block number for indexing (default: 0 = genesis). + /// When set, backfill will stop at this block instead of going to genesis. + #[serde(default)] + pub start_block: u64, + /// Enable backfill to genesis (default: true) #[serde(default = "default_backfill")] pub backfill: bool, @@ -314,6 +319,22 @@ mod tests { assert!(config.backfill); assert_eq!(config.batch_size, 100); assert_eq!(config.concurrency, 4); + assert_eq!(config.start_block, 0); + } + + #[test] + fn test_chain_config_with_start_block() { + let toml_str = r#" + name = "test" + chain_id = 1 + rpc_url = "http://localhost:8545" + pg_url = "postgres://localhost/test" + start_block = 1000 + "#; + + let config: ChainConfig = toml::from_str(toml_str).unwrap(); + + assert_eq!(config.start_block, 1000); } #[test] @@ -403,6 +424,7 @@ mod tests { rpc_url: "http://localhost:8545".to_string(), pg_url: "postgres://user:pass@localhost/db".to_string(), pg_password_env: None, + start_block: 0, backfill: true, batch_size: 100, concurrency: 4, @@ -428,6 +450,7 @@ mod tests { rpc_url: "http://localhost:8545".to_string(), pg_url: "postgres://user:placeholder@localhost/db".to_string(), pg_password_env: Some("PATH".to_string()), + start_block: 0, backfill: true, batch_size: 100, concurrency: 4, @@ -452,6 +475,7 @@ mod tests { rpc_url: "http://localhost:8545".to_string(), pg_url: "postgres://user:placeholder@localhost/db".to_string(), pg_password_env: Some("NONEXISTENT_VAR_XYZ_999".to_string()), + start_block: 0, backfill: true, batch_size: 100, concurrency: 4, diff --git a/src/service/mod.rs b/src/service/mod.rs index a32d20f..44c9d71 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -60,7 +60,7 @@ pub async fn get_all_status(pool: &Pool) -> Result> { let rows = conn .query( - "SELECT chain_id, head_num, synced_num, tip_num, backfill_num, started_at, updated_at FROM sync_state ORDER BY chain_id", + "SELECT chain_id, head_num, synced_num, tip_num, backfill_num, started_at, updated_at, start_block FROM sync_state ORDER BY chain_id", &[], ) .await?; @@ -77,11 +77,12 @@ pub async fn get_all_status(pool: &Pool) -> Result> { let tip_num: i64 = row.get(3); let backfill_num: Option = row.get(4); let started_at: Option> = row.get(5); + let start_block: i64 = row.get(7); let backfill_remaining = match backfill_num { - None => synced_num.saturating_sub(1), - Some(0) => 0, - Some(n) => n, + None => synced_num.saturating_sub(start_block.max(1)), + Some(n) if n <= start_block => 0, + Some(n) => n.saturating_sub(start_block), }; let sync_rate = started_at.and_then(|started| { diff --git a/src/sync/engine.rs b/src/sync/engine.rs index d9d649e..d0cd3cd 100644 --- a/src/sync/engine.rs +++ b/src/sync/engine.rs @@ -42,6 +42,8 @@ pub struct SyncEngine { backfill_first: bool, /// Skip parent hash validation (trust RPC for reorg handling) trust_rpc: bool, + /// Starting block for indexing (0 = genesis) + start_block: u64, } impl SyncEngine { @@ -70,6 +72,7 @@ impl SyncEngine { concurrency: 4, backfill_first: false, trust_rpc: false, + start_block: 0, }) } @@ -98,6 +101,11 @@ impl SyncEngine { self } + pub fn with_start_block(mut self, start_block: u64) -> Self { + self.start_block = start_block; + self + } + /// Returns the underlying pool (for realtime/API operations). fn pool(&self) -> &Pool { self.throttled_pool.inner() @@ -149,7 +157,7 @@ impl SyncEngine { update_tip_num(self.pool(), self.chain_id, remote_head, remote_head).await?; // Check for gaps - let gaps = detect_all_gaps(self.pool(), remote_head).await?; + let gaps = detect_all_gaps(self.pool(), remote_head, self.start_block).await?; if gaps.is_empty() { info!( chain_id = self.chain_id, @@ -175,6 +183,7 @@ impl SyncEngine { self.chain_id, self.batch_size, self.concurrency, + self.start_block, &mut progress, ) .await @@ -232,6 +241,7 @@ impl SyncEngine { let gapfill_chain_id = self.chain_id; let gapfill_batch_size = self.batch_size; let gapfill_concurrency = self.concurrency; + let gapfill_start_block = self.start_block; let gapfill_handle = tokio::spawn(async move { run_gapfill_loop( gapfill_sinks, @@ -240,6 +250,7 @@ impl SyncEngine { gapfill_chain_id, gapfill_batch_size, gapfill_concurrency, + gapfill_start_block, gapfill_shutdown, ) .await @@ -526,7 +537,7 @@ impl SyncEngine { let state = load_sync_state(self.pool(), self.chain_id) .await? .unwrap_or_default(); - let gaps = detect_all_gaps(self.pool(), state.tip_num).await?; + let gaps = detect_all_gaps(self.pool(), state.tip_num, self.start_block).await?; let mut filled = 0; for (start, end) in gaps { @@ -668,6 +679,7 @@ impl SyncEngine { synced_num: num, tip_num: num, backfill_num: state.backfill_num, + start_block: self.start_block, sync_rate: state.sync_rate, started_at: state.started_at, }; @@ -752,9 +764,13 @@ impl SyncEngine { current_end = current_start.saturating_sub(1); } - // Mark complete if we reached genesis - if state.backfill_num == Some(to) && to == 0 { - info!("Backfill complete to genesis"); + // Mark complete if we reached the target + if state.backfill_num == Some(to) { + if to == 0 { + info!("Backfill complete to genesis"); + } else { + info!(start_block = to, "Backfill complete to start_block"); + } } Ok(synced) @@ -789,6 +805,7 @@ async fn run_gapfill_loop( chain_id: u64, batch_size: u64, concurrency: usize, + start_block: u64, mut shutdown: broadcast::Receiver<()>, ) -> Result<()> { let state = load_sync_state(sinks.pool(), chain_id) @@ -800,6 +817,7 @@ async fn run_gapfill_loop( chain_id = chain_id, batch_size = batch_size, concurrency = concurrency, + start_block = start_block, backfill_limit = backfill_semaphore.available_permits(), "Gap-fill: starting with parallel workers (throttled)" ); @@ -812,7 +830,7 @@ async fn run_gapfill_loop( info!("Gap-fill: shutting down"); break; } - result = tick_gapfill_parallel(&sinks, &backfill_semaphore, &rpc, chain_id, batch_size, concurrency, &mut progress) => { + result = tick_gapfill_parallel(&sinks, &backfill_semaphore, &rpc, chain_id, batch_size, concurrency, start_block, &mut progress) => { if let Err(e) = result { error!(error = %e, "Gap-fill sync tick failed"); tokio::time::sleep(Duration::from_secs(1)).await; @@ -834,6 +852,7 @@ async fn tick_gapfill_parallel( chain_id: u64, batch_size: u64, concurrency: usize, + start_block: u64, progress: &mut SyncProgress, ) -> Result<()> { let pool = sinks.pool(); @@ -859,9 +878,8 @@ async fn tick_gapfill_parallel( // Fast path: use COUNT-based check (btree index scan) to see if there // are any gaps at all. Only fall back to the expensive LAG() window // function when gaps actually exist and we need their exact ranges. - // With 0.5s block time, tip_num races ahead of synced_num constantly, - // so we check the range [1, tip_num] cheaply via COUNT vs expected. - if state.tip_num > 0 && !has_gaps(pool, 1, state.tip_num).await? { + let lower_bound = start_block.max(1); + if state.tip_num >= lower_bound && !has_gaps(pool, lower_bound, state.tip_num).await? { metrics::set_gap_blocks(chain_id, "postgres", 0); metrics::set_gap_count(chain_id, "postgres", 0); metrics::set_synced(chain_id, realtime_lag == 0); @@ -873,10 +891,10 @@ async fn tick_gapfill_parallel( } // Gaps exist — run the expensive window function to find exact ranges - let gaps = detect_all_gaps(pool, state.tip_num).await?; + let gaps = detect_all_gaps(pool, state.tip_num, start_block).await?; if gaps.is_empty() { - // No gaps - fully synced from genesis to tip + // No gaps - fully synced from start_block to tip metrics::set_gap_blocks(chain_id, "postgres", 0); metrics::set_gap_count(chain_id, "postgres", 0); metrics::set_synced(chain_id, realtime_lag == 0); @@ -1143,13 +1161,14 @@ async fn tick_gapfill_parallel_no_throttle( chain_id: u64, batch_size: u64, concurrency: usize, + start_block: u64, progress: &mut SyncProgress, ) -> Result<()> { let pool = sinks.pool(); let state = load_sync_state(pool, chain_id).await?.unwrap_or_default(); - // Detect ALL gaps including from genesis, sorted by end DESC (most recent first) - let gaps = detect_all_gaps(pool, state.tip_num).await?; + // Detect ALL gaps including from start_block, sorted by end DESC (most recent first) + let gaps = detect_all_gaps(pool, state.tip_num, start_block).await?; if gaps.is_empty() { if state.synced_num < state.tip_num { @@ -1321,10 +1340,10 @@ async fn tick_gapfill_parallel_no_throttle( Ok(()) } -/// Check if fully synced (no gaps from genesis to tip) +/// Check if fully synced (no gaps from start_block to tip) #[allow(dead_code)] -async fn is_fully_synced(pool: &Pool, tip_num: u64) -> Result { - let gaps = detect_all_gaps(pool, tip_num).await?; +async fn is_fully_synced(pool: &Pool, tip_num: u64, start_block: u64) -> Result { + let gaps = detect_all_gaps(pool, tip_num, start_block).await?; Ok(gaps.is_empty()) } diff --git a/src/sync/writer.rs b/src/sync/writer.rs index a8d2f78..4bc1dc8 100644 --- a/src/sync/writer.rs +++ b/src/sync/writer.rs @@ -689,7 +689,7 @@ pub async fn load_sync_state(pool: &Pool, chain_id: u64) -> Result Result>(4).map(|n| n as u64), sync_rate: r.get(5), started_at: r.get(6), + start_block: r.get::<_, i64>(7) as u64, })) } @@ -711,7 +712,7 @@ pub async fn load_all_sync_states(pool: &Pool) -> Result> { let rows = conn .query( - "SELECT chain_id, head_num, synced_num, tip_num, backfill_num, sync_rate, started_at FROM sync_state ORDER BY chain_id", + "SELECT chain_id, head_num, synced_num, tip_num, backfill_num, sync_rate, started_at, start_block FROM sync_state ORDER BY chain_id", &[], ) .await?; @@ -726,6 +727,7 @@ pub async fn load_all_sync_states(pool: &Pool) -> Result> { backfill_num: r.get::<_, Option>(4).map(|n| n as u64), sync_rate: r.get(5), started_at: r.get(6), + start_block: r.get::<_, i64>(7) as u64, }) .collect()) } @@ -735,13 +737,14 @@ pub async fn save_sync_state(pool: &Pool, state: &SyncState) -> Result<()> { conn.execute( r#" - INSERT INTO sync_state (chain_id, head_num, synced_num, tip_num, backfill_num, started_at, updated_at) - VALUES ($1, $2, $3, $4, $5, COALESCE($6, NOW()), NOW()) + INSERT INTO sync_state (chain_id, head_num, synced_num, tip_num, backfill_num, start_block, started_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, COALESCE($7, NOW()), NOW()) ON CONFLICT (chain_id) DO UPDATE SET head_num = GREATEST(sync_state.head_num, EXCLUDED.head_num), synced_num = GREATEST(sync_state.synced_num, EXCLUDED.synced_num), tip_num = GREATEST(sync_state.tip_num, EXCLUDED.tip_num), backfill_num = COALESCE(EXCLUDED.backfill_num, sync_state.backfill_num), + start_block = EXCLUDED.start_block, started_at = COALESCE(sync_state.started_at, EXCLUDED.started_at), updated_at = NOW() "#, @@ -751,6 +754,7 @@ pub async fn save_sync_state(pool: &Pool, state: &SyncState) -> Result<()> { &(state.synced_num as i64), &(state.tip_num as i64), &state.backfill_num.map(|n| n as i64), + &(state.start_block as i64), &state.started_at, ], ) @@ -837,10 +841,12 @@ pub async fn has_gaps(pool: &Pool, from: u64, to: u64) -> Result { return Ok(false); } let conn = pool.get().await?; + let from_i64 = i64::try_from(from).unwrap_or(i64::MAX); + let to_i64 = i64::try_from(to).unwrap_or(i64::MAX); let row = conn .query_one( "SELECT COUNT(*) FROM blocks WHERE num >= $1 AND num <= $2", - &[&(from as i64), &(to as i64)], + &[&from_i64, &to_i64], ) .await?; let count: i64 = row.get(0); @@ -854,6 +860,7 @@ pub async fn has_gaps(pool: &Pool, from: u64, to: u64) -> Result { pub async fn detect_gaps(pool: &Pool, below: u64) -> Result> { let conn = pool.get().await?; + let below_i64 = i64::try_from(below).unwrap_or(i64::MAX); let rows = conn .query( r#" @@ -866,7 +873,7 @@ pub async fn detect_gaps(pool: &Pool, below: u64) -> Result> { FROM numbered WHERE num - prev_num > 1 "#, - &[&(below as i64)], + &[&below_i64], ) .await?; @@ -904,11 +911,15 @@ pub async fn detect_blocks_missing_receipts(pool: &Pool, limit: i64) -> Result(0) as u64).collect()) } -/// Detect ALL gaps including from genesis to first block +/// Detect ALL gaps including from start_block to first block /// Returns gaps sorted by end block descending (most recent first) -pub async fn detect_all_gaps(pool: &Pool, tip_num: u64) -> Result> { +pub async fn detect_all_gaps(pool: &Pool, tip_num: u64, start_block: u64) -> Result> { let conn = pool.get().await?; + // The effective lower bound: at minimum block 1 (block 0 is genesis/empty), + // or start_block if configured higher + let lower_bound = start_block.max(1); + // Get the lowest block number we have let min_block: Option = conn .query_one("SELECT MIN(num) FROM blocks", &[]) @@ -917,19 +928,23 @@ pub async fn detect_all_gaps(pool: &Pool, tip_num: u64) -> Result 1) - // Block 0 is typically empty/genesis, so we start from block 1 + // Add gap from lower_bound to first block (if we have any blocks and min > lower_bound) if let Some(min) = min_block { - if min > 1 { - gaps.push((1, min as u64 - 1)); + if (min as u64) > lower_bound { + gaps.push((lower_bound, min as u64 - 1)); } - } else if tip_num > 0 { - // No blocks at all - entire range is a gap (starting from 1) - gaps.push((1, tip_num)); + } else if tip_num >= lower_bound { + // No blocks at all - entire range is a gap + gaps.push((lower_bound, tip_num)); } - // Filter to only gaps up to tip_num - gaps.retain(|(_, end)| *end <= tip_num); + // Filter out gaps entirely below start_block and clamp partial overlaps + gaps.retain(|(_, end)| *end >= lower_bound && *end <= tip_num); + for gap in &mut gaps { + if gap.0 < lower_bound { + gap.0 = lower_bound; + } + } // Sort by end block descending (most recent gaps first) gaps.sort_by(|a, b| b.1.cmp(&a.1)); diff --git a/src/types.rs b/src/types.rs index bd5cfb5..255a50c 100644 --- a/src/types.rs +++ b/src/types.rs @@ -83,6 +83,9 @@ pub struct SyncState { pub tip_num: u64, /// Lowest block synced going backwards (None = not started, Some(0) = complete) pub backfill_num: Option, + /// Configured starting block for indexing (0 = genesis) + #[serde(default)] + pub start_block: u64, /// Current sync rate in blocks/second (rolling window) #[serde(default)] pub sync_rate: Option, @@ -92,9 +95,12 @@ pub struct SyncState { } impl SyncState { - /// Returns true if backfill is complete (reached genesis) + /// Returns true if backfill is complete (reached start_block) pub fn backfill_complete(&self) -> bool { - self.backfill_num == Some(0) + match self.backfill_num { + Some(n) => n <= self.start_block, + None => false, + } } /// Returns true if backfill has started @@ -105,9 +111,9 @@ impl SyncState { /// Returns the number of blocks remaining to backfill pub fn backfill_remaining(&self) -> u64 { match self.backfill_num { - None => self.tip_num, // Haven't started, need to fill 0..tip_num - Some(0) => 0, // Complete - Some(n) => n, // Blocks 0..n remain + None => self.tip_num.saturating_sub(self.start_block), // Haven't started + Some(n) if n <= self.start_block => 0, // Complete + Some(n) => n.saturating_sub(self.start_block), // Blocks start_block..n remain } } @@ -162,3 +168,78 @@ impl SyncState { } } } + +#[cfg(test)] +mod tests { + use super::*; + + fn state_with(start_block: u64, backfill_num: Option, tip_num: u64) -> SyncState { + SyncState { + chain_id: 1, + head_num: tip_num, + synced_num: tip_num, + tip_num, + backfill_num, + start_block, + sync_rate: None, + started_at: None, + } + } + + #[test] + fn test_backfill_complete_with_start_block() { + // backfill_num at start_block => complete + let s = state_with(1000, Some(1000), 5000); + assert!(s.backfill_complete()); + + // backfill_num below start_block => also complete + let s = state_with(1000, Some(500), 5000); + assert!(s.backfill_complete()); + + // backfill_num above start_block => not complete + let s = state_with(1000, Some(1500), 5000); + assert!(!s.backfill_complete()); + + // backfill not started => not complete + let s = state_with(1000, None, 5000); + assert!(!s.backfill_complete()); + } + + #[test] + fn test_backfill_remaining_with_start_block() { + // In progress: 2000 blocks above start_block remain + let s = state_with(1000, Some(3000), 5000); + assert_eq!(s.backfill_remaining(), 2000); + + // Complete: at start_block + let s = state_with(1000, Some(1000), 5000); + assert_eq!(s.backfill_remaining(), 0); + + // Not started: tip_num - start_block + let s = state_with(1000, None, 5000); + assert_eq!(s.backfill_remaining(), 4000); + } + + #[test] + fn test_backfill_remaining_genesis() { + // start_block=0 preserves original behavior + let s = state_with(0, Some(100), 5000); + assert_eq!(s.backfill_remaining(), 100); + + let s = state_with(0, Some(0), 5000); + assert_eq!(s.backfill_remaining(), 0); + + let s = state_with(0, None, 5000); + assert_eq!(s.backfill_remaining(), 5000); + } + + #[test] + fn test_backfill_complete_genesis() { + // start_block=0 preserves original behavior (backfill_num=0 => complete) + let s = state_with(0, Some(0), 5000); + assert!(s.backfill_complete()); + + let s = state_with(0, Some(1), 5000); + assert!(!s.backfill_complete()); + } +} diff --git a/tests/gap_sync_test.rs b/tests/gap_sync_test.rs index 149ca47..e289089 100644 --- a/tests/gap_sync_test.rs +++ b/tests/gap_sync_test.rs @@ -16,7 +16,7 @@ async fn test_detect_all_gaps_empty_table() { db.truncate_all().await; // Empty table with tip_num > 0 should report entire range as gap (starting from block 1) - let gaps = detect_all_gaps(&db.pool, 100).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 100, 0).await.expect("Failed to detect gaps"); assert_eq!(gaps.len(), 1, "Should detect one gap for entire range"); assert_eq!(gaps[0], (1, 100), "Gap should be 1 -> 100 (block 0 is genesis)"); @@ -29,7 +29,7 @@ async fn test_detect_all_gaps_empty_table_tip_zero() { db.truncate_all().await; // Empty table with tip_num = 0 has no gaps - let gaps = detect_all_gaps(&db.pool, 0).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 0, 0).await.expect("Failed to detect gaps"); assert!(gaps.is_empty(), "No gaps when tip_num is 0 and table is empty"); } @@ -59,7 +59,7 @@ async fn test_detect_all_gaps_genesis_missing() { .expect("Failed to insert block"); } - let gaps = detect_all_gaps(&db.pool, 10).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 10, 0).await.expect("Failed to detect gaps"); assert_eq!(gaps.len(), 1, "Should detect one gap from block 1"); assert_eq!(gaps[0], (1, 4), "Gap should be 1 -> 4 (block 0 is genesis)"); @@ -90,7 +90,7 @@ async fn test_detect_all_gaps_genesis_present() { .expect("Failed to insert block"); } - let gaps = detect_all_gaps(&db.pool, 10).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 10, 0).await.expect("Failed to detect gaps"); assert!(gaps.is_empty(), "Should have no gaps when fully synced from genesis"); } @@ -121,7 +121,7 @@ async fn test_detect_all_gaps_multiple_gaps_sorted_by_recency() { .expect("Failed to insert block"); } - let gaps = detect_all_gaps(&db.pool, 16).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 16, 0).await.expect("Failed to detect gaps"); assert_eq!(gaps.len(), 3, "Should detect three gaps"); @@ -156,7 +156,7 @@ async fn test_detect_all_gaps_single_block_gap() { .expect("Failed to insert block"); } - let gaps = detect_all_gaps(&db.pool, 5).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 5, 0).await.expect("Failed to detect gaps"); assert_eq!(gaps.len(), 1, "Should detect one gap"); assert_eq!(gaps[0], (3, 3), "Gap should be single block 3"); @@ -184,7 +184,7 @@ async fn test_detect_all_gaps_only_genesis() { .expect("Failed to insert block"); // With tip = 0, no gaps - let gaps = detect_all_gaps(&db.pool, 0).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 0, 0).await.expect("Failed to detect gaps"); assert!(gaps.is_empty(), "No gaps when only genesis exists and tip is 0"); // With tip = 10 and only block 0, detect_all_gaps uses detect_gaps between existing blocks @@ -194,7 +194,7 @@ async fn test_detect_all_gaps_only_genesis() { // there's no gap from genesis. The gap from 1-10 would be detected as a gap between // existing blocks if we had block 11+. This is correct behavior - gap detection finds // discontinuities, not "how far we've synced." - let gaps = detect_all_gaps(&db.pool, 10).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 10, 0).await.expect("Failed to detect gaps"); // No gaps detected because there are no discontinuities from block 0 onward that are // bounded by existing blocks assert!(gaps.is_empty(), "No gaps when only block 0 exists (no upper bound block)"); @@ -226,7 +226,7 @@ async fn test_detect_all_gaps_filters_beyond_tip() { } // With tip = 10, should only show gaps up to block 10 - let gaps = detect_all_gaps(&db.pool, 10).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 10, 0).await.expect("Failed to detect gaps"); // Gap 7-19 extends beyond tip, but we filter to gaps where end <= tip // Since the gap 7-19 has end=19 > tip=10, it should be filtered out @@ -260,7 +260,7 @@ async fn test_detect_all_gaps_large_gap_range() { .expect("Failed to insert block"); } - let gaps = detect_all_gaps(&db.pool, 1000).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 1000, 0).await.expect("Failed to detect gaps"); assert_eq!(gaps.len(), 1, "Should detect one large gap"); assert_eq!(gaps[0], (1, 999), "Gap should be 1 -> 999"); @@ -296,12 +296,12 @@ async fn test_detect_gaps_vs_detect_all_gaps() { } // detect_gaps only finds gaps between existing blocks - let gaps = detect_gaps(&db.pool, u64::MAX).await.expect("Failed"); + let gaps = detect_gaps(&db.pool, i64::MAX as u64).await.expect("Failed"); assert_eq!(gaps.len(), 1, "detect_gaps only finds middle gaps"); assert_eq!(gaps[0], (7, 9), "detect_gaps should find 7-9"); // detect_all_gaps also includes gap from block 1 - let all_gaps = detect_all_gaps(&db.pool, 11).await.expect("Failed"); + let all_gaps = detect_all_gaps(&db.pool, 11, 0).await.expect("Failed"); assert_eq!(all_gaps.len(), 2, "detect_all_gaps finds block 1 gap + middle gaps"); // Sorted by end descending assert_eq!(all_gaps[0], (7, 9), "Most recent gap first"); @@ -337,7 +337,7 @@ async fn test_gap_order_prioritizes_recent() { .expect("Failed to insert block"); } - let gaps = detect_all_gaps(&db.pool, 100).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 100, 0).await.expect("Failed to detect gaps"); assert_eq!(gaps.len(), 2, "Should detect two gaps"); @@ -372,7 +372,7 @@ async fn test_gap_detection_with_realtime_jump() { .expect("Failed to insert block"); } - let gaps = detect_all_gaps(&db.pool, 1000).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 1000, 0).await.expect("Failed to detect gaps"); assert_eq!(gaps.len(), 1, "Should detect one large gap from jump"); assert_eq!(gaps[0], (6, 994), "Gap should be the jumped-over region"); @@ -407,7 +407,7 @@ async fn test_gap_detection_consecutive_single_block_gaps() { .expect("Failed to insert block"); } - let gaps = detect_all_gaps(&db.pool, 10).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 10, 0).await.expect("Failed to detect gaps"); assert_eq!(gaps.len(), 5, "Should detect 5 single-block gaps"); @@ -444,7 +444,7 @@ async fn test_gap_size_calculation() { .expect("Failed to insert block"); } - let gaps = detect_all_gaps(&db.pool, 30).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 30, 0).await.expect("Failed to detect gaps"); assert_eq!(gaps.len(), 3, "Should detect three gaps"); @@ -483,7 +483,7 @@ async fn test_fully_synced_returns_empty_gaps() { .expect("Failed to insert block"); } - let gaps = detect_all_gaps(&db.pool, 100).await.expect("Failed to detect gaps"); + let gaps = detect_all_gaps(&db.pool, 100, 0).await.expect("Failed to detect gaps"); assert!(gaps.is_empty(), "Fully synced chain should have no gaps"); } @@ -641,7 +641,7 @@ async fn test_has_gaps_agrees_with_detect_all_gaps() { } // has_gaps and detect_all_gaps should agree - let gaps = detect_all_gaps(&db.pool, 12).await.unwrap(); + let gaps = detect_all_gaps(&db.pool, 12, 0).await.unwrap(); let has = has_gaps(&db.pool, 1, 12).await.unwrap(); assert!(!gaps.is_empty(), "detect_all_gaps should find gaps"); @@ -664,7 +664,7 @@ async fn test_has_gaps_agrees_with_detect_all_gaps() { .unwrap(); } - let gaps = detect_all_gaps(&db.pool, 12).await.unwrap(); + let gaps = detect_all_gaps(&db.pool, 12, 0).await.unwrap(); let has = has_gaps(&db.pool, 1, 12).await.unwrap(); assert!(gaps.is_empty(), "detect_all_gaps should find no gaps"); @@ -702,3 +702,161 @@ async fn test_has_gaps_beyond_indexed_range() { // Asking about exactly the indexed range — no gaps assert!(!has_gaps(&db.pool, 1, 10).await.unwrap()); } + +// ============================================================================ +// start_block tests - gap detection respects configured starting block +// ============================================================================ + +#[tokio::test] +#[serial(db)] +async fn test_start_block_ignores_gaps_below() { + let db = TestDb::empty().await; + db.truncate_all().await; + + let conn = db.pool.get().await.expect("Failed to get connection"); + + // Insert blocks 5-10 (missing 1-4) + for num in 5i64..=10 { + conn.execute( + r#"INSERT INTO blocks (num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner) + VALUES ($1, $2, $3, NOW(), $4, 1000000, 100000, $5)"#, + &[ + &num, + &vec![num as u8; 32], + &vec![(num - 1) as u8; 32], + &(num * 1000), + &vec![0u8; 20], + ], + ) + .await + .expect("Failed to insert block"); + } + + // With start_block=0, gap 1-4 is detected + let gaps = detect_all_gaps(&db.pool, 10, 0).await.expect("Failed"); + assert_eq!(gaps.len(), 1); + assert_eq!(gaps[0], (1, 4)); + + // With start_block=5, no gaps (blocks 5-10 are contiguous) + let gaps = detect_all_gaps(&db.pool, 10, 5).await.expect("Failed"); + assert!(gaps.is_empty(), "No gaps when start_block matches first indexed block"); +} + +#[tokio::test] +#[serial(db)] +async fn test_start_block_clamps_partial_gap() { + let db = TestDb::empty().await; + db.truncate_all().await; + + let conn = db.pool.get().await.expect("Failed to get connection"); + + // Insert blocks 10-20 (missing 1-9) + for num in 10i64..=20 { + conn.execute( + r#"INSERT INTO blocks (num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner) + VALUES ($1, $2, $3, NOW(), $4, 1000000, 100000, $5)"#, + &[ + &num, + &vec![num as u8; 32], + &vec![(num - 1) as u8; 32], + &(num * 1000), + &vec![0u8; 20], + ], + ) + .await + .expect("Failed to insert block"); + } + + // With start_block=5, gap is clamped to 5-9 (not 1-9) + let gaps = detect_all_gaps(&db.pool, 20, 5).await.expect("Failed"); + assert_eq!(gaps.len(), 1); + assert_eq!(gaps[0], (5, 9), "Gap should start at start_block, not block 1"); +} + +#[tokio::test] +#[serial(db)] +async fn test_start_block_empty_table() { + let db = TestDb::empty().await; + db.truncate_all().await; + + // Empty table with start_block=100, tip=200: gap is 100-200 + let gaps = detect_all_gaps(&db.pool, 200, 100).await.expect("Failed"); + assert_eq!(gaps.len(), 1); + assert_eq!(gaps[0], (100, 200)); + + // Empty table with start_block > tip: no gap + let gaps = detect_all_gaps(&db.pool, 50, 100).await.expect("Failed"); + assert!(gaps.is_empty(), "No gaps when start_block > tip_num"); +} + +#[tokio::test] +#[serial(db)] +async fn test_start_block_with_middle_gap() { + let db = TestDb::empty().await; + db.truncate_all().await; + + let conn = db.pool.get().await.expect("Failed to get connection"); + + // Insert blocks 100-104 and 108-110 (gap at 105-107, missing 1-99) + for num in [100i64, 101, 102, 103, 104, 108, 109, 110] { + conn.execute( + r#"INSERT INTO blocks (num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner) + VALUES ($1, $2, $3, NOW(), $4, 1000000, 100000, $5)"#, + &[ + &num, + &vec![num as u8; 32], + &vec![(num - 1) as u8; 32], + &(num * 1000), + &vec![0u8; 20], + ], + ) + .await + .expect("Failed to insert block"); + } + + // With start_block=100, only the middle gap 105-107 should be detected + let gaps = detect_all_gaps(&db.pool, 110, 100).await.expect("Failed"); + assert_eq!(gaps.len(), 1); + assert_eq!(gaps[0], (105, 107), "Only middle gap should be reported"); + + // With start_block=0, both the genesis gap and middle gap should appear + let gaps = detect_all_gaps(&db.pool, 110, 0).await.expect("Failed"); + assert_eq!(gaps.len(), 2); + assert_eq!(gaps[0], (105, 107), "Middle gap first (most recent)"); + assert_eq!(gaps[1], (1, 99), "Genesis gap second"); +} + +#[tokio::test] +#[serial(db)] +async fn test_start_block_fully_synced() { + let db = TestDb::empty().await; + db.truncate_all().await; + + let conn = db.pool.get().await.expect("Failed to get connection"); + + // Insert contiguous blocks 50-60 + for num in 50i64..=60 { + conn.execute( + r#"INSERT INTO blocks (num, hash, parent_hash, timestamp, timestamp_ms, gas_limit, gas_used, miner) + VALUES ($1, $2, $3, NOW(), $4, 1000000, 100000, $5)"#, + &[ + &num, + &vec![num as u8; 32], + &vec![(num - 1) as u8; 32], + &(num * 1000), + &vec![0u8; 20], + ], + ) + .await + .expect("Failed to insert block"); + } + + // Fully synced from start_block=50 to tip=60 + let gaps = detect_all_gaps(&db.pool, 60, 50).await.expect("Failed"); + assert!(gaps.is_empty(), "No gaps when fully synced from start_block to tip"); + + // But not fully synced if start_block=0 + let gaps = detect_all_gaps(&db.pool, 60, 0).await.expect("Failed"); + assert_eq!(gaps.len(), 1); + assert_eq!(gaps[0], (1, 49)); +} diff --git a/tests/smoke_test.rs b/tests/smoke_test.rs index 58a7724..be63066 100644 --- a/tests/smoke_test.rs +++ b/tests/smoke_test.rs @@ -397,7 +397,7 @@ async fn test_gap_detection() { .expect("Failed to insert block"); } - let gaps = detect_gaps(&db.pool, u64::MAX).await.expect("Failed to detect gaps"); + let gaps = detect_gaps(&db.pool, i64::MAX as u64).await.expect("Failed to detect gaps"); assert_eq!(gaps.len(), 1, "Should detect one gap"); assert_eq!(gaps[0], (4, 4), "Gap should be block 4"); @@ -428,7 +428,7 @@ async fn test_gap_detection_multiple_gaps() { .expect("Failed to insert block"); } - let gaps = detect_gaps(&db.pool, u64::MAX).await.expect("Failed to detect gaps"); + let gaps = detect_gaps(&db.pool, i64::MAX as u64).await.expect("Failed to detect gaps"); assert_eq!(gaps.len(), 2, "Should detect two gaps"); assert_eq!(gaps[0], (3, 4), "First gap should be blocks 3-4"); @@ -442,7 +442,7 @@ async fn test_gap_detection_empty_table() { db.truncate_all().await; // Empty table should have no gaps - let gaps = detect_gaps(&db.pool, u64::MAX).await.expect("Failed to detect gaps"); + let gaps = detect_gaps(&db.pool, i64::MAX as u64).await.expect("Failed to detect gaps"); assert!(gaps.is_empty(), "Empty table should have no gaps"); } @@ -529,6 +529,7 @@ async fn test_sync_state_save_and_load() { synced_num: 800, tip_num: 950, backfill_num: Some(100), + start_block: 0, sync_rate: None, started_at: Some(chrono::Utc::now()), }; @@ -552,6 +553,7 @@ async fn test_sync_state_methods() { synced_num: 800, tip_num: 950, backfill_num: Some(100), + start_block: 0, sync_rate: None, started_at: None, }; @@ -578,6 +580,80 @@ async fn test_sync_state_methods() { assert_eq!(complete.backfill_remaining(), 0); } +#[tokio::test] +#[serial(db)] +async fn test_sync_state_start_block_persistence() { + let db = TestDb::empty().await; + db.truncate_all().await; + + let chain_id = 88888u64; + + let state = SyncState { + chain_id, + head_num: 5000, + synced_num: 3000, + tip_num: 4500, + backfill_num: Some(1500), + start_block: 1000, + sync_rate: None, + started_at: Some(chrono::Utc::now()), + }; + + save_sync_state(&db.pool, &state).await.expect("Failed to save"); + + let loaded = load_sync_state(&db.pool, chain_id).await.expect("Failed to load").unwrap(); + assert_eq!(loaded.start_block, 1000, "start_block should round-trip through DB"); + assert_eq!(loaded.backfill_num, Some(1500)); + assert!(!loaded.backfill_complete(), "Not complete: backfill_num(1500) > start_block(1000)"); + assert_eq!(loaded.backfill_remaining(), 500, "1500 - 1000 = 500 blocks remaining"); + + // Update: backfill reaches start_block + let done = SyncState { + backfill_num: Some(1000), + ..state.clone() + }; + save_sync_state(&db.pool, &done).await.expect("Failed to save"); + + let loaded = load_sync_state(&db.pool, chain_id).await.expect("Failed to load").unwrap(); + assert!(loaded.backfill_complete(), "Complete: backfill_num == start_block"); + assert_eq!(loaded.backfill_remaining(), 0); +} + +#[tokio::test] +#[serial(db)] +async fn test_sync_state_start_block_update() { + let db = TestDb::empty().await; + db.truncate_all().await; + + let chain_id = 99999u64; + + // First save with start_block=0 + let state = SyncState { + chain_id, + head_num: 100, + synced_num: 100, + tip_num: 100, + backfill_num: None, + start_block: 0, + sync_rate: None, + started_at: None, + }; + save_sync_state(&db.pool, &state).await.expect("Failed to save"); + + let loaded = load_sync_state(&db.pool, chain_id).await.expect("Failed").unwrap(); + assert_eq!(loaded.start_block, 0); + + // Update with start_block=500 (user changed config) + let updated = SyncState { + start_block: 500, + ..state.clone() + }; + save_sync_state(&db.pool, &updated).await.expect("Failed to save"); + + let loaded = load_sync_state(&db.pool, chain_id).await.expect("Failed").unwrap(); + assert_eq!(loaded.start_block, 500, "start_block should update on conflict"); +} + #[tokio::test] #[serial(db)] async fn test_gap_fill_scenario_multiple_restarts() { @@ -643,7 +719,7 @@ async fn test_gap_fill_scenario_multiple_restarts() { } // Detect all gaps - let gaps = detect_gaps(&db.pool, u64::MAX).await.expect("Failed to detect gaps"); + let gaps = detect_gaps(&db.pool, i64::MAX as u64).await.expect("Failed to detect gaps"); // Should have 2 gaps: // Gap 1: 101-289 (between run 1 and run 2) @@ -682,7 +758,7 @@ async fn test_gap_detection_single_block_gaps() { .expect("Failed to insert block"); } - let gaps = detect_gaps(&db.pool, u64::MAX).await.expect("Failed to detect gaps"); + let gaps = detect_gaps(&db.pool, i64::MAX as u64).await.expect("Failed to detect gaps"); assert_eq!(gaps.len(), 4, "Should detect four single-block gaps"); assert_eq!(gaps[0], (2, 2), "Gap at block 2"); @@ -716,7 +792,7 @@ async fn test_gap_detection_contiguous_blocks() { .expect("Failed to insert block"); } - let gaps = detect_gaps(&db.pool, u64::MAX).await.expect("Failed to detect gaps"); + let gaps = detect_gaps(&db.pool, i64::MAX as u64).await.expect("Failed to detect gaps"); assert!(gaps.is_empty(), "Contiguous blocks should have no gaps"); }