Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,599 changes: 1,490 additions & 109 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ shared = { path = "crates/shared" }
signature-validator = { path = "crates/signature-validator" }
simulator = { path = "crates/simulator" }
solana-indexer = { path = "crates/solana-indexer" }
solana-sdk = "4"
solver = { path = "crates/solver" }
solvers = { path = "crates/solvers" }
solvers-dto = { path = "crates/solvers-dto" }
Expand Down Expand Up @@ -146,6 +147,7 @@ tracing-subscriber = { version = "0.3.22", features = ["json"] }
url = "2.5.0"
vergen = "8"
winner-selection = { path = "crates/winner-selection" }
yellowstone-grpc-proto = { version = "12.4.0", default-features = false }

[workspace.lints]
clippy.cast_possible_wrap = "deny"
9 changes: 9 additions & 0 deletions crates/solana-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,14 @@ path = "src/lib.rs"
name = "solana-indexer"
path = "src/main.rs"

[dependencies]
observe = { workspace = true }
prometheus = { workspace = true }
prometheus-metric-storage = { workspace = true }
solana-sdk = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
yellowstone-grpc-proto = { workspace = true }

[lints]
workspace = true
2 changes: 2 additions & 0 deletions crates/solana-indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
//! `solana-indexer` — Solana settlement indexer.

pub mod types;
62 changes: 62 additions & 0 deletions crates/solana-indexer/src/types/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//! Message types passed over the internal channels.
//!
//! The ingester pushes [`StreamUpdate`] into the channel to the decoder; the
//! decoder pushes [`PartialEvent`] / [`PartialHalf`] to the partial-event
//! watchdog.

use crate::types::{
Signature,
wire::{SubscribeUpdateAccountInfo, SubscribeUpdateTransactionInfo},
};

/// From `Ingester` → `Decoder`.
///
/// One multiplexed wire message, tagged with the slot the message was observed
/// at. The org file names the channel payload "Event"; the spec defines that
/// type as `StreamUpdate`, and that is what this crate uses.
#[derive(Debug, Clone)]
pub enum StreamUpdate {
/// A transaction-update slot message.
Tx {
/// Slot the message was observed at.
slot: u64,
/// Transaction signature.
signature: Signature,
/// Wire message body.
inner: Box<SubscribeUpdateTransactionInfo>,
},
/// An account-update slot message.
Account {
/// Slot the message was observed at.
slot: u64,
/// Optional signature linking the write back to its originating
/// transaction.
txn_signature: Option<Signature>,
/// Wire message body.
inner: Box<SubscribeUpdateAccountInfo>,
},
}

/// From `Decoder` → `PartialEventWatchdog`.
///
/// The watchdog holds incomplete `(slot, signature)` pairs until both halves
/// arrive; each delivery carries the half that just landed.
#[derive(Debug, Clone, Copy)]
pub struct PartialEvent {
/// Slot the partial was observed at.
pub slot: u64,
/// Transaction signature the partial corresponds to.
pub signature: Signature,
}
Comment on lines +44 to +50

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The doc comment for PartialEvent states that "each delivery carries the half that just landed," but the struct definition is missing the half: PartialHalf field. Without this field, the watchdog cannot receive or hold the actual data halves to match and reconstruct the full event. Additionally, since PartialHalf contains heap-allocated Box types, PartialEvent cannot derive Copy and should only derive Debug, Clone.

Suggested change
#[derive(Debug, Clone, Copy)]
pub struct PartialEvent {
/// Slot the partial was observed at.
pub slot: u64,
/// Transaction signature the partial corresponds to.
pub signature: Signature,
}
#[derive(Debug, Clone)]
pub struct PartialEvent {
/// Slot the partial was observed at.
pub slot: u64,
/// Transaction signature the partial corresponds to.
pub signature: Signature,
/// The half that just landed.
pub half: PartialHalf,
}


/// One of the two halves a [`StreamUpdate`] can produce.
///
/// The decoder pushes one `PartialEvent` per `StreamUpdate` it processes; the
/// watchdog uses the `(slot, signature)` key to match pairs.
#[derive(Debug, Clone)]
pub enum PartialHalf {
/// Transaction-update half.
Tx(Box<SubscribeUpdateTransactionInfo>),
/// Account-update half.
Account(Box<SubscribeUpdateAccountInfo>),
}
60 changes: 60 additions & 0 deletions crates/solana-indexer/src/types/commitment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! Commitment-tracking types: confirmation state, signature status, and the row
//! shapes consumed by the finalization worker.

use {crate::types::Signature, solana_sdk::pubkey::Pubkey};

/// On-chain commitment of a transaction or row.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Commitment {
/// The row is at `confirmed` commitment; the finalization worker still has
/// work to do.
Confirmed,
/// The row is at `finalized` commitment.
Finalized,
/// The row's transaction never landed (or was rolled back).
RolledBack,
}

impl Commitment {
/// String label used in `solana.*` `commitment` columns.
pub fn as_str(self) -> &'static str {
match self {
Self::Confirmed => "confirmed",
Self::Finalized => "finalized",
Self::RolledBack => "rolled_back",
}
}
}

/// Result of an RPC `getSignatureStatuses` poll.
#[derive(Debug, Clone, Copy)]
pub struct SignatureStatus {
/// Slot the transaction landed at, if known.
pub slot: u64,
/// Confirmation status reported by the RPC.
pub confirmation_status: Commitment,
}

/// Snapshot of an account at a given slot (from `getAccountInfo`).
#[derive(Debug, Clone)]
pub struct AccountInfo {
/// Slot the snapshot was read at.
pub slot: u64,
/// Account data (serialized).
pub data: Vec<u8>,
/// Account owner program.
pub owner: Pubkey,
}

/// A `solana.*` row that has not yet reached `finalized` commitment — the kind
/// picked up by the aged-row sweep, where `commitment = 'confirmed'` and the
/// row's slot is at least one finalization window behind `LATEST_CHAIN_SLOT`.
#[derive(Debug, Clone)]
pub struct UnfinalizedRow {
/// Table the row lives in.
pub table: &'static str,
/// Transaction signature.
pub signature: Signature,
/// Slot the row was inserted at.
pub slot: u64,
}
43 changes: 43 additions & 0 deletions crates/solana-indexer/src/types/dead_letter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//! Dead-letter types: events that failed to persist and were diverted to
//! `solana.dead_letter` for operator follow-up.

use crate::types::Signature;

/// A decoded event whose write to `solana.*` failed and was diverted to
/// `solana.dead_letter`.
#[derive(Debug, Clone)]
pub struct DeadLetterEntry {
/// Slot the event was observed at.
pub slot: u64,
/// Transaction signature, if the failure was per-transaction.
pub signature: Option<Signature>,
/// Why the event landed in the dead-letter table.
pub reason: DeadLetterReason,
/// Original raw bytes for replay.
pub raw_bytes: Vec<u8>,
}

/// Why a row landed in the dead-letter table.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeadLetterReason {
/// Decoder received both halves but couldn't parse them.
DecoderError,
/// Watchdog gave up: account-update half never arrived.
AccountUpdateMissing,
/// Watchdog gave up: transaction-update half never arrived.
TxUpdateMissing,
/// Settlement landed but no `proposed_solutions` row matched.
SolutionUidUnmatchable,
}

impl DeadLetterReason {
/// String label used in `solana.dead_letter.reason`.
pub fn as_str(self) -> &'static str {
match self {
Self::DecoderError => "decoder_error",
Self::AccountUpdateMissing => "account_update_missing",
Self::TxUpdateMissing => "tx_update_missing",
Self::SolutionUidUnmatchable => "solution_uid_unmatchable",
}
}
}
54 changes: 54 additions & 0 deletions crates/solana-indexer/src/types/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//! Error types used across the indexer's domain.

use thiserror::Error;

/// Failures surfaced from the decoder.
#[derive(Debug, Error, PartialEq, Eq)]
pub enum DecodeError {
/// The discriminator byte(s) at the start of the instruction data did not
/// match any known instruction on either program.
#[error("unknown instruction discriminator")]
UnknownDiscriminator,
/// The ALT (Address Lookup Table) loaded-address list could not be resolved
/// against the full account list.
#[error("alt resolution failed")]
AltResolutionFailed,
/// The instruction was recognised but its schema did not match the on-chain
/// layout.
#[error("schema mismatch")]
SchemaMismatch,
}

/// Failures surfaced from the persistence boundary.
#[derive(Debug, Error, PartialEq, Eq)]
pub enum StoreError {
/// The SQL `ON CONFLICT` clause rejected the write (e.g. watermark
/// regression).
#[error("store conflict")]
Conflict,
/// The store is temporarily unavailable (e.g. connection lost, pool
/// exhausted). The caller is expected to retry.
#[error("store unavailable")]
Unavailable,
}

/// Failures surfaced from the stream boundary.
#[derive(Debug, Error)]
pub enum StreamError {
/// The stream has been disconnected by the server.
#[error("stream disconnected")]
Disconnected,
/// The internal mpsc send timed out (backpressure on the decoder).
#[error("stream send timeout")]
SendTimeout,
/// The resume slot is outside the provider's replay window. The caller
/// should reset `from_slot` to `LATEST_CHAIN_SLOT − replay_window`,
/// record the lost range, and retry the subscription.
#[error("replay window exceeded")]
ReplayWindowExceeded {
/// The slot the subscriber attempted to resume from.
attempted_slot: u64,
/// The earliest slot the provider can still serve.
earliest_replayable_slot: u64,
},
}
Loading
Loading