Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ name = "moderato"
chain_id = 42431
rpc_url = "https://eng:aphex-twin-jeff-mills@rpc.testnet.tempo.xyz"
pg_url = "postgres://tidx:tidx@postgres:5432/tidx_moderato"
# start_block = 0 # Starting block for indexing (default: 0 = genesis)
backfill = true
batch_size = 500
concurrency = 8
Expand Down
5 changes: 4 additions & 1 deletion db/sync_state.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ CREATE TABLE IF NOT EXISTS sync_state (
-- Persisted in PG so it survives restarts and isn't fooled by realtime sync writing ahead.
ALTER TABLE sync_state ADD COLUMN IF NOT EXISTS ch_backfill_block INT8 NOT NULL DEFAULT 0;

COMMENT ON COLUMN sync_state.synced_num IS 'Highest contiguous block synced from genesis (no gaps up to here)';
ALTER TABLE sync_state ADD COLUMN IF NOT EXISTS start_block INT8 NOT NULL DEFAULT 0;

COMMENT ON COLUMN sync_state.start_block IS 'Configured starting block for indexing (0=genesis)';
COMMENT ON COLUMN sync_state.synced_num IS 'Highest contiguous block synced from start_block (no gaps up to here)';
COMMENT ON COLUMN sync_state.tip_num IS 'Highest block synced near chain head (may have gaps below)';
COMMENT ON COLUMN sync_state.backfill_num IS 'Lowest block synced going backwards (NULL=not started, 0=complete)';
COMMENT ON COLUMN sync_state.sync_rate IS 'Current sync rate in blocks/second (rolling 5s window)';
Expand Down
4 changes: 2 additions & 2 deletions src/cli/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ fn print_http_status(resp: &serde_json::Value) -> Result<()> {
let name = chain["name"].as_str().unwrap_or("unknown");
let chain_id = chain["chain_id"].as_i64().unwrap_or(0);
let head_num = chain["head_num"].as_i64().unwrap_or(0);
let synced_num = chain["synced_num"].as_i64().unwrap_or(0);
let _synced_num = chain["synced_num"].as_i64().unwrap_or(0);
let lag = chain["lag"].as_i64().unwrap_or(0);

println!("┌─ {} (chain_id: {}) ─────────────────────", name, chain_id);
Expand Down Expand Up @@ -257,7 +257,7 @@ async fn print_json_status(config: &Config) -> Result<()> {
Ok(pool) => {
let state = load_sync_state(&pool, chain.chain_id).await.ok().flatten();
let tip = state.as_ref().map(|s| s.tip_num).unwrap_or(0);
let gaps = detect_all_gaps(&pool, tip).await.unwrap_or_default();
let gaps = detect_all_gaps(&pool, tip, chain.start_block).await.unwrap_or_default();
(state, gaps)
}
Err(_) => (None, vec![]),
Expand Down
4 changes: 3 additions & 1 deletion src/cli/up.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ fn spawn_sync_engine(
chain = %chain.name,
chain_id = chain.chain_id,
rpc = %chain.rpc_url,
start_block = chain.start_block,
backfill_limit = throttled_pool.backfill_semaphore.available_permits(),
"Starting sync for chain (throttled pool: 16 connections, backfill limited)"
);
Expand Down Expand Up @@ -374,7 +375,8 @@ fn spawn_sync_engine(
.with_batch_size(chain.batch_size)
.with_concurrency(chain.concurrency)
.with_backfill_first(backfill_first)
.with_trust_rpc(trust_rpc);
.with_trust_rpc(trust_rpc)
.with_start_block(chain.start_block);
}
Err(e) => {
warn!(error = %e, chain = %chain.name, "Failed to create sync engine, retrying in 10s");
Expand Down
24 changes: 24 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ pub struct ChainConfig {
#[serde(default)]
pub pg_password_env: Option<String>,

/// Starting block number for indexing (default: 0 = genesis).
/// When set, backfill will stop at this block instead of going to genesis.
#[serde(default)]
pub start_block: u64,

/// Enable backfill to genesis (default: true)
#[serde(default = "default_backfill")]
pub backfill: bool,
Expand Down Expand Up @@ -314,6 +319,22 @@ mod tests {
assert!(config.backfill);
assert_eq!(config.batch_size, 100);
assert_eq!(config.concurrency, 4);
assert_eq!(config.start_block, 0);
}

#[test]
fn test_chain_config_with_start_block() {
let toml_str = r#"
name = "test"
chain_id = 1
rpc_url = "http://localhost:8545"
pg_url = "postgres://localhost/test"
start_block = 1000
"#;

let config: ChainConfig = toml::from_str(toml_str).unwrap();

assert_eq!(config.start_block, 1000);
}

#[test]
Expand Down Expand Up @@ -403,6 +424,7 @@ mod tests {
rpc_url: "http://localhost:8545".to_string(),
pg_url: "postgres://user:pass@localhost/db".to_string(),
pg_password_env: None,
start_block: 0,
backfill: true,
batch_size: 100,
concurrency: 4,
Expand All @@ -428,6 +450,7 @@ mod tests {
rpc_url: "http://localhost:8545".to_string(),
pg_url: "postgres://user:placeholder@localhost/db".to_string(),
pg_password_env: Some("PATH".to_string()),
start_block: 0,
backfill: true,
batch_size: 100,
concurrency: 4,
Expand All @@ -452,6 +475,7 @@ mod tests {
rpc_url: "http://localhost:8545".to_string(),
pg_url: "postgres://user:placeholder@localhost/db".to_string(),
pg_password_env: Some("NONEXISTENT_VAR_XYZ_999".to_string()),
start_block: 0,
backfill: true,
batch_size: 100,
concurrency: 4,
Expand Down
9 changes: 5 additions & 4 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub async fn get_all_status(pool: &Pool) -> Result<Vec<SyncStatus>> {

let rows = conn
.query(
"SELECT chain_id, head_num, synced_num, tip_num, backfill_num, started_at, updated_at FROM sync_state ORDER BY chain_id",
"SELECT chain_id, head_num, synced_num, tip_num, backfill_num, started_at, updated_at, start_block FROM sync_state ORDER BY chain_id",
&[],
)
.await?;
Expand All @@ -77,11 +77,12 @@ pub async fn get_all_status(pool: &Pool) -> Result<Vec<SyncStatus>> {
let tip_num: i64 = row.get(3);
let backfill_num: Option<i64> = row.get(4);
let started_at: Option<DateTime<Utc>> = row.get(5);
let start_block: i64 = row.get(7);

let backfill_remaining = match backfill_num {
None => synced_num.saturating_sub(1),
Some(0) => 0,
Some(n) => n,
None => synced_num.saturating_sub(start_block.max(1)),
Some(n) if n <= start_block => 0,
Some(n) => n.saturating_sub(start_block),
};

let sync_rate = started_at.and_then(|started| {
Expand Down
51 changes: 35 additions & 16 deletions src/sync/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct SyncEngine {
backfill_first: bool,
/// Skip parent hash validation (trust RPC for reorg handling)
trust_rpc: bool,
/// Starting block for indexing (0 = genesis)
start_block: u64,
}

impl SyncEngine {
Expand Down Expand Up @@ -70,6 +72,7 @@ impl SyncEngine {
concurrency: 4,
backfill_first: false,
trust_rpc: false,
start_block: 0,
})
}

Expand Down Expand Up @@ -98,6 +101,11 @@ impl SyncEngine {
self
}

pub fn with_start_block(mut self, start_block: u64) -> Self {
self.start_block = start_block;
self
}

/// Returns the underlying pool (for realtime/API operations).
fn pool(&self) -> &Pool {
self.throttled_pool.inner()
Expand Down Expand Up @@ -149,7 +157,7 @@ impl SyncEngine {
update_tip_num(self.pool(), self.chain_id, remote_head, remote_head).await?;

// Check for gaps
let gaps = detect_all_gaps(self.pool(), remote_head).await?;
let gaps = detect_all_gaps(self.pool(), remote_head, self.start_block).await?;
if gaps.is_empty() {
info!(
chain_id = self.chain_id,
Expand All @@ -175,6 +183,7 @@ impl SyncEngine {
self.chain_id,
self.batch_size,
self.concurrency,
self.start_block,
&mut progress,
)
.await
Expand Down Expand Up @@ -232,6 +241,7 @@ impl SyncEngine {
let gapfill_chain_id = self.chain_id;
let gapfill_batch_size = self.batch_size;
let gapfill_concurrency = self.concurrency;
let gapfill_start_block = self.start_block;
let gapfill_handle = tokio::spawn(async move {
run_gapfill_loop(
gapfill_sinks,
Expand All @@ -240,6 +250,7 @@ impl SyncEngine {
gapfill_chain_id,
gapfill_batch_size,
gapfill_concurrency,
gapfill_start_block,
gapfill_shutdown,
)
.await
Expand Down Expand Up @@ -526,7 +537,7 @@ impl SyncEngine {
let state = load_sync_state(self.pool(), self.chain_id)
.await?
.unwrap_or_default();
let gaps = detect_all_gaps(self.pool(), state.tip_num).await?;
let gaps = detect_all_gaps(self.pool(), state.tip_num, self.start_block).await?;
let mut filled = 0;

for (start, end) in gaps {
Expand Down Expand Up @@ -668,6 +679,7 @@ impl SyncEngine {
synced_num: num,
tip_num: num,
backfill_num: state.backfill_num,
start_block: self.start_block,
sync_rate: state.sync_rate,
started_at: state.started_at,
};
Expand Down Expand Up @@ -752,9 +764,13 @@ impl SyncEngine {
current_end = current_start.saturating_sub(1);
}

// Mark complete if we reached genesis
if state.backfill_num == Some(to) && to == 0 {
info!("Backfill complete to genesis");
// Mark complete if we reached the target
if state.backfill_num == Some(to) {
if to == 0 {
info!("Backfill complete to genesis");
} else {
info!(start_block = to, "Backfill complete to start_block");
}
}

Ok(synced)
Expand Down Expand Up @@ -789,6 +805,7 @@ async fn run_gapfill_loop(
chain_id: u64,
batch_size: u64,
concurrency: usize,
start_block: u64,
mut shutdown: broadcast::Receiver<()>,
) -> Result<()> {
let state = load_sync_state(sinks.pool(), chain_id)
Expand All @@ -800,6 +817,7 @@ async fn run_gapfill_loop(
chain_id = chain_id,
batch_size = batch_size,
concurrency = concurrency,
start_block = start_block,
backfill_limit = backfill_semaphore.available_permits(),
"Gap-fill: starting with parallel workers (throttled)"
);
Expand All @@ -812,7 +830,7 @@ async fn run_gapfill_loop(
info!("Gap-fill: shutting down");
break;
}
result = tick_gapfill_parallel(&sinks, &backfill_semaphore, &rpc, chain_id, batch_size, concurrency, &mut progress) => {
result = tick_gapfill_parallel(&sinks, &backfill_semaphore, &rpc, chain_id, batch_size, concurrency, start_block, &mut progress) => {
if let Err(e) = result {
error!(error = %e, "Gap-fill sync tick failed");
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -834,6 +852,7 @@ async fn tick_gapfill_parallel(
chain_id: u64,
batch_size: u64,
concurrency: usize,
start_block: u64,
progress: &mut SyncProgress,
) -> Result<()> {
let pool = sinks.pool();
Expand All @@ -859,9 +878,8 @@ async fn tick_gapfill_parallel(
// Fast path: use COUNT-based check (btree index scan) to see if there
// are any gaps at all. Only fall back to the expensive LAG() window
// function when gaps actually exist and we need their exact ranges.
// With 0.5s block time, tip_num races ahead of synced_num constantly,
// so we check the range [1, tip_num] cheaply via COUNT vs expected.
if state.tip_num > 0 && !has_gaps(pool, 1, state.tip_num).await? {
let lower_bound = start_block.max(1);
if state.tip_num >= lower_bound && !has_gaps(pool, lower_bound, state.tip_num).await? {
metrics::set_gap_blocks(chain_id, "postgres", 0);
metrics::set_gap_count(chain_id, "postgres", 0);
metrics::set_synced(chain_id, realtime_lag == 0);
Expand All @@ -873,10 +891,10 @@ async fn tick_gapfill_parallel(
}

// Gaps exist — run the expensive window function to find exact ranges
let gaps = detect_all_gaps(pool, state.tip_num).await?;
let gaps = detect_all_gaps(pool, state.tip_num, start_block).await?;

if gaps.is_empty() {
// No gaps - fully synced from genesis to tip
// No gaps - fully synced from start_block to tip
metrics::set_gap_blocks(chain_id, "postgres", 0);
metrics::set_gap_count(chain_id, "postgres", 0);
metrics::set_synced(chain_id, realtime_lag == 0);
Expand Down Expand Up @@ -1143,13 +1161,14 @@ async fn tick_gapfill_parallel_no_throttle(
chain_id: u64,
batch_size: u64,
concurrency: usize,
start_block: u64,
progress: &mut SyncProgress,
) -> Result<()> {
let pool = sinks.pool();
let state = load_sync_state(pool, chain_id).await?.unwrap_or_default();

// Detect ALL gaps including from genesis, sorted by end DESC (most recent first)
let gaps = detect_all_gaps(pool, state.tip_num).await?;
// Detect ALL gaps including from start_block, sorted by end DESC (most recent first)
let gaps = detect_all_gaps(pool, state.tip_num, start_block).await?;

if gaps.is_empty() {
if state.synced_num < state.tip_num {
Expand Down Expand Up @@ -1321,10 +1340,10 @@ async fn tick_gapfill_parallel_no_throttle(
Ok(())
}

/// Check if fully synced (no gaps from genesis to tip)
/// Check if fully synced (no gaps from start_block to tip)
#[allow(dead_code)]
async fn is_fully_synced(pool: &Pool, tip_num: u64) -> Result<bool> {
let gaps = detect_all_gaps(pool, tip_num).await?;
async fn is_fully_synced(pool: &Pool, tip_num: u64, start_block: u64) -> Result<bool> {
let gaps = detect_all_gaps(pool, tip_num, start_block).await?;
Ok(gaps.is_empty())
}

Expand Down
Loading