Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,136 changes: 1,951 additions & 185 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ signature-validator = { path = "crates/signature-validator" }
simulator = { path = "crates/simulator" }
solana-indexer = { path = "crates/solana-indexer" }
solana-sdk = "4"
solana-client = "4"
yellowstone-grpc-proto = { version = "12.4.0", default-features = false }
solver = { path = "crates/solver" }
solvers = { path = "crates/solvers" }
Expand Down
1 change: 1 addition & 0 deletions crates/solana-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ observe = { workspace = true }
prometheus = { workspace = true }
prometheus-metric-storage = { workspace = true }
solana-sdk = { workspace = true }
solana-client = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
yellowstone-grpc-proto = { workspace = true }
Expand Down
4 changes: 4 additions & 0 deletions crates/solana-indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//! `solana-indexer` — Solana settlement indexer.

#![allow(async_fn_in_trait)]
#![warn(missing_docs)]

pub mod traits;
pub mod types;
4 changes: 4 additions & 0 deletions crates/solana-indexer/src/traits/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//! Traits for external dependencies.

pub mod solana_client;
pub mod store;
41 changes: 41 additions & 0 deletions crates/solana-indexer/src/traits/solana_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//! Solana RPC interface for the finalization worker.

use {
crate::types::{
commitment::{AccountInfo, SignatureStatus},
recovery::GetSignaturesOpts,
wire::SubscribeUpdateTransactionInfo,
},
solana_client::client_error::ClientError,
solana_sdk::{pubkey::Pubkey, signature::Signature},
};

/// Interface for RPC calls the finalization worker needs:
/// promoting confirmed transactions to finalized, sweeping aged rows,
/// and reading account state for recovery.
pub trait SolanaClient {
/// Fetch status for multiple transaction signatures (up to 256).
/// `None` = transaction signature not found.
async fn get_signature_statuses(
&self,
signatures: &[Signature],
) -> Result<Vec<Option<SignatureStatus>>, ClientError>;

/// Fetch a transaction by its signature. `Ok(None)` = never landed.
async fn get_transaction(
&self,
signature: &Signature,
) -> Result<Option<SubscribeUpdateTransactionInfo>, ClientError>;

/// List all transaction signatures for a program address (used for
/// backfill).
async fn get_signatures_for_address(
&self,
address: &Pubkey,
opts: GetSignaturesOpts,
) -> Result<Vec<Signature>, ClientError>;

/// Read account data. `Ok(None)` = account does not exist (deleted or not
/// initialized).
async fn get_account_info(&self, address: &Pubkey) -> Result<Option<AccountInfo>, ClientError>;
}
73 changes: 73 additions & 0 deletions crates/solana-indexer/src/traits/store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//! PostgreSQL persistence layer for decoded events and slot state.

use {
crate::types::{
commitment::{Commitment, UnfinalizedRow},
dead_letter::DeadLetterEntry,
errors::StoreError,
events::DecodedEvent,
recovery::PdaSnapshot,
},
std::ops::Range,
};

/// PostgreSQL persistence. Used by Decoder, Watchdog, and FinalizationWorker.
pub trait Store {
/// Save decoded events and advance the slot watermark atomically.
async fn persist_events(
&self,
events: Vec<DecodedEvent>,
new_watermark: u64,
) -> Result<(), StoreError>;

/// Record a slot checkpoint. Rejects downward writes.
async fn write_watermark(&self, slot: u64) -> Result<(), StoreError>;

/// Read persisted watermark for resuming after reconnect.
async fn read_watermark(&self) -> Result<Option<u64>, StoreError>;

/// Move stale partials (>32 slots behind) to dead letter table.
async fn write_dead_letter(&self, entry: DeadLetterEntry);

/// Record gaps that fell outside the replay window (write-only in v0.1).
async fn record_lost_slot_range(&self, range: Range<u64>);
Comment thread
tilacog marked this conversation as resolved.
Outdated

/// Primary promotion pass: fetch `confirmed` rows whose `slot` is at or
/// above the finalization-window threshold (`slot >= newer_than_slot`).
/// `limit` caps the batch at 256 (RPC batch size). Returns `Err` on
/// backend failure so the caller can back off rather than
/// silently stall on a dead store.
async fn get_confirmed_rows(
&self,
newer_than_slot: u64,
limit: usize,
) -> Result<Vec<UnfinalizedRow>, StoreError>;

/// Safety-net sweep for `confirmed` rows the primary promotion pass missed
/// (i.e. rows that aged past the signature-status retention horizon,
/// ~150 slots behind the chain tip). Returns `Err` on backend failure
/// (see `get_confirmed_rows`).
async fn get_aged_rows(
&self,
retention_horizon_slot: u64,
) -> Result<Vec<UnfinalizedRow>, StoreError>;

/// Flip the `commitment` label on a specific row.
///
/// The row's `table` field tells the implementer which `solana.*` table to
/// UPDATE.
async fn update_commitment(
&self,
row: &UnfinalizedRow,
new_commitment: Commitment,
) -> Result<(), StoreError>;

/// Persist a single event during recovery/backfills, not the live ingestion
/// path.
///
/// Unlike `persist_events`, this does not advance the watermark.
async fn backfill_event(&self, event: DecodedEvent) -> Result<(), StoreError>;

/// Upsert on-chain PDA state for reconciliation.
async fn upsert_pda_snapshot(&self, snapshot: PdaSnapshot) -> Result<(), StoreError>;
}
4 changes: 2 additions & 2 deletions crates/solana-indexer/src/types/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ pub enum StreamUpdate {
Account {
/// Slot the message was observed at.
slot: u64,
/// Optional signature linking the write back to its originating
/// transaction.
/// Optional transaction signature linking the write back to its
/// originating transaction.
txn_signature: Option<Signature>,
/// Wire message body.
inner: Box<SubscribeUpdateAccountInfo>,
Expand Down
7 changes: 1 addition & 6 deletions crates/solana-indexer/src/types/recovery.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
//! Recovery-flow types: PDA snapshots and the options struct for
//! `getSignaturesForAddress` backfills.

use solana_sdk::pubkey::Pubkey;

/// Current on-chain snapshot of an order PDA, read by `getAccountInfo` for
/// reconciliation.
#[derive(Debug, Clone)]
Expand All @@ -26,9 +24,6 @@ pub struct GetSignaturesOpts {
pub from_slot: Option<u64>,
/// End slot (inclusive). `None` means "to the tip".
pub to_slot: Option<u64>,
/// Cap on the number of signatures returned.
/// Cap on the number of transaction signatures returned.
pub limit: Option<usize>,
/// Optional address filter (used when back-filling both programs
/// in a single pass).
pub address: Option<Pubkey>,
}
Loading