Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,599 changes: 1,490 additions & 109 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ shared = { path = "crates/shared" }
signature-validator = { path = "crates/signature-validator" }
simulator = { path = "crates/simulator" }
solana-indexer = { path = "crates/solana-indexer" }
solana-sdk = "4"
yellowstone-grpc-proto = { version = "12.4.0", default-features = false }
solver = { path = "crates/solver" }
solvers = { path = "crates/solvers" }
solvers-dto = { path = "crates/solvers-dto" }
Expand Down
9 changes: 9 additions & 0 deletions crates/solana-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,14 @@ path = "src/lib.rs"
name = "solana-indexer"
path = "src/main.rs"

[dependencies]
observe = { workspace = true }
prometheus = { workspace = true }
prometheus-metric-storage = { workspace = true }
solana-sdk = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
yellowstone-grpc-proto = { workspace = true }

[lints]
workspace = true
2 changes: 2 additions & 0 deletions crates/solana-indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
//! `solana-indexer` — Solana settlement indexer.

pub mod types;
62 changes: 62 additions & 0 deletions crates/solana-indexer/src/types/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//! Message types passed over the internal channels.
//!
//! The ingester pushes [`StreamUpdate`] into the channel to the decoder; the
//! decoder pushes [`PartialEvent`] / [`PartialHalf`] to the partial-event
//! watchdog.

use crate::types::{
Signature,
wire::{SubscribeUpdateAccountInfo, SubscribeUpdateTransactionInfo},
};

/// From `Ingester` → `Decoder`.
///
/// One multiplexed wire message, tagged with the slot the message was observed
/// at. The org file names the channel payload "Event"; the spec defines that
/// type as `StreamUpdate`, and that is what this crate uses.
#[derive(Debug, Clone)]
pub enum StreamUpdate {
/// A transaction-update slot message.
Tx {
/// Slot the message was observed at.
slot: u64,
/// Transaction signature.
signature: Signature,
/// Wire message body.
inner: Box<SubscribeUpdateTransactionInfo>,
},
/// An account-update slot message.
Account {
/// Slot the message was observed at.
slot: u64,
/// Optional signature linking the write back to its originating
/// transaction.
txn_signature: Option<Signature>,
/// Wire message body.
inner: Box<SubscribeUpdateAccountInfo>,
},
}

/// From `Decoder` → `PartialEventWatchdog`.
///
/// The watchdog holds incomplete `(slot, signature)` pairs until both halves
/// arrive; each delivery carries the half that just landed.
#[derive(Debug, Clone, Copy)]
pub struct PartialEvent {
/// Slot the partial was observed at.
pub slot: u64,
/// Transaction signature the partial corresponds to.
pub signature: Signature,
}
Comment on lines +44 to +50

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The doc comment for PartialEvent states that "each delivery carries the half that just landed," but the struct definition is missing the half: PartialHalf field. Without this field, the watchdog cannot receive or hold the actual data halves to match and reconstruct the full event. Additionally, since PartialHalf contains heap-allocated Box types, PartialEvent cannot derive Copy and should only derive Debug, Clone.

Suggested change
#[derive(Debug, Clone, Copy)]
pub struct PartialEvent {
/// Slot the partial was observed at.
pub slot: u64,
/// Transaction signature the partial corresponds to.
pub signature: Signature,
}
#[derive(Debug, Clone)]
pub struct PartialEvent {
/// Slot the partial was observed at.
pub slot: u64,
/// Transaction signature the partial corresponds to.
pub signature: Signature,
/// The half that just landed.
pub half: PartialHalf,
}


/// One of the two halves a [`StreamUpdate`] can produce.
///
/// The decoder pushes one `PartialEvent` per `StreamUpdate` it processes; the
/// watchdog uses the `(slot, signature)` key to match pairs.
#[derive(Debug, Clone)]
pub enum PartialHalf {
/// Transaction-update half.
Tx(Box<SubscribeUpdateTransactionInfo>),
/// Account-update half.
Account(Box<SubscribeUpdateAccountInfo>),
}
60 changes: 60 additions & 0 deletions crates/solana-indexer/src/types/commitment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! Commitment-tracking types: confirmation state, signature status, and the row
//! shapes consumed by the finalization worker.

use {crate::types::Signature, solana_sdk::pubkey::Pubkey};

/// On-chain commitment of a transaction or row.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Commitment {
/// The row is at `confirmed` commitment; the finalization worker still has
/// work to do.
Confirmed,
/// The row is at `finalized` commitment.
Finalized,
/// The row's transaction never landed (or was rolled back).
RolledBack,
}

impl Commitment {
/// String label used in `solana.*` `commitment` columns.
pub fn as_str(self) -> &'static str {
match self {
Self::Confirmed => "confirmed",
Self::Finalized => "finalized",
Self::RolledBack => "rolled_back",
}
}
}

/// Result of an RPC `getSignatureStatuses` poll.
#[derive(Debug, Clone, Copy)]
pub struct SignatureStatus {
/// Slot the transaction landed at, if known.
pub slot: u64,
/// Confirmation status reported by the RPC.
pub confirmation_status: Commitment,
}

/// Snapshot of an account at a given slot (from `getAccountInfo`).
#[derive(Debug, Clone)]
pub struct AccountInfo {
/// Slot the snapshot was read at.
pub slot: u64,
/// Account data (serialized).
pub data: Vec<u8>,
/// Account owner program.
pub owner: Pubkey,
}

/// A `solana.*` row that has not yet reached `finalized` commitment — the kind
/// picked up by the aged-row sweep, where `commitment = 'confirmed'` and the
/// row's slot is at least one finalization window behind `LATEST_CHAIN_SLOT`.
#[derive(Debug, Clone)]
pub struct UnfinalizedRow {
/// Table the row lives in.
pub table: &'static str,
/// Transaction signature.
pub signature: Signature,
/// Slot the row was inserted at.
pub slot: u64,
}
43 changes: 43 additions & 0 deletions crates/solana-indexer/src/types/dead_letter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
//! Dead-letter types: events that failed to persist and were diverted to
//! `solana.dead_letter` for operator follow-up.

use crate::types::Signature;

/// A decoded event whose write to `solana.*` failed and was diverted to
/// `solana.dead_letter`.
#[derive(Debug, Clone)]
pub struct DeadLetterEntry {
/// Slot the event was observed at.
pub slot: u64,
/// Transaction signature, if the failure was per-transaction.
pub signature: Option<Signature>,
/// Why the event landed in the dead-letter table.
pub reason: DeadLetterReason,
/// Original raw bytes for replay.
pub raw_bytes: Vec<u8>,
}

/// Why a row landed in the dead-letter table.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeadLetterReason {
/// Decoder received both halves but couldn't parse them.
DecoderError,
/// Watchdog gave up: account-update half never arrived.
AccountUpdateMissing,
/// Watchdog gave up: transaction-update half never arrived.
TxUpdateMissing,
/// Settlement landed but no `proposed_solutions` row matched.
SolutionUidUnmatchable,
}

impl DeadLetterReason {
/// String label used in `solana.dead_letter.reason`.
pub fn as_str(self) -> &'static str {
match self {
Self::DecoderError => "decoder_error",
Self::AccountUpdateMissing => "account_update_missing",
Self::TxUpdateMissing => "tx_update_missing",
Self::SolutionUidUnmatchable => "solution_uid_unmatchable",
}
}
}
54 changes: 54 additions & 0 deletions crates/solana-indexer/src/types/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//! Error types used across the indexer's domain.

use thiserror::Error;

/// Failures surfaced from the decoder.
#[derive(Debug, Error, PartialEq, Eq)]
pub enum DecodeError {
/// The discriminator byte(s) at the start of the instruction data did not
/// match any known instruction on either program.
#[error("unknown instruction discriminator")]
UnknownDiscriminator,
/// The ALT (Address Lookup Table) loaded-address list could not be resolved
/// against the full account list.
#[error("alt resolution failed")]
AltResolutionFailed,
/// The instruction was recognised but its schema did not match the on-chain
/// layout.
#[error("schema mismatch")]
SchemaMismatch,
}

/// Failures surfaced from the persistence boundary.
#[derive(Debug, Error, PartialEq, Eq)]
pub enum StoreError {
/// The SQL `ON CONFLICT` clause rejected the write (e.g. watermark
/// regression).
#[error("store conflict")]
Conflict,
/// The store is temporarily unavailable (e.g. connection lost, pool
/// exhausted). The caller is expected to retry.
#[error("store unavailable")]
Unavailable,
}

/// Failures surfaced from the stream boundary.
#[derive(Debug, Error)]
pub enum StreamError {
/// The stream has been disconnected by the server.
#[error("stream disconnected")]
Disconnected,
/// The internal mpsc send timed out (backpressure on the decoder).
#[error("stream send timeout")]
SendTimeout,
/// The resume slot is outside the provider's replay window. The caller
/// should reset `from_slot` to `LATEST_CHAIN_SLOT − replay_window`,
/// record the lost range, and retry the subscription.
#[error("replay window exceeded")]
ReplayWindowExceeded {
/// The slot the subscriber attempted to resume from.
attempted_slot: u64,
/// The earliest slot the provider can still serve.
earliest_replayable_slot: u64,
},
}
143 changes: 143 additions & 0 deletions crates/solana-indexer/src/types/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
//! Domain event taxonomy.
//!
//! The settlement program and SolFlow each have their own enum
//! (`SettlementEvent`, `SolFlowEvent`); the decoder's handoff to the
//! persistence step is the sum [`DecodedEvent`]. Per-order accounting
//! is reconstructed from [`TradeDelta`] snapshots.

use {crate::types::Signature, solana_sdk::pubkey::Pubkey};

/// Change in a single order's `amount_withdrawn` and `amount_received`
/// between two consecutive account snapshots.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TradeDelta {
/// Order UID this delta applies to.
pub order_uid: [u8; 32],
/// Change in `amount_withdrawn` since the previous snapshot.
pub amount_withdrawn_delta: u64,
/// Change in `amount_received` since the previous snapshot.
pub amount_received_delta: u64,
/// `true` when post-trade `amount_withdrawn` equals the order's
/// full sell amount, or `amount_received` equals the full buy
/// amount.
pub order_fulfilled: bool,
}

/// Settlement-program events decoded from on-chain instructions.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SettlementEvent {
/// A new order was created on-chain.
OrderCreated {
/// Intent hash of the order.
intent_hash: [u8; 32],
/// Owner of the order.
owner: Pubkey,
/// Address that created the order (relayer / solver).
created_by: Pubkey,
},
/// An order was closed.
OrderClosed {
/// Intent hash of the order.
intent_hash: [u8; 32],
},
/// An order was cancelled.
OrderCancelled {
/// Intent hash of the order.
intent_hash: [u8; 32],
},
/// A settlement was finalized on-chain.
SettlementFinalized {
/// Auction id this settlement belongs to.
auction_id: i64,
/// Solver that won the auction.
solver: Pubkey,
/// Transaction signature.
tx_signature: Signature,
/// Slot the settlement was observed at.
slot: u64,
/// Per-order accounting deltas.
trades: Vec<TradeDelta>,
},
/// A new buffer PDA was created.
BufferCreated {
/// Token the buffer is denominated in.
token: Pubkey,
},
/// A buffer PDA was used by a transaction.
BufferUsed {
/// Token the buffer is denominated in.
token: Pubkey,
/// Transaction signature that consumed the buffer.
tx_signature: Signature,
},
/// A manager was updated (e.g. ownership rotation).
ManagerUpdated {
/// Previous manager.
from: Pubkey,
/// New manager.
to: Pubkey,
},
/// A solver was added to the allow-list.
SolverAdded {
/// Solver that was added.
solver: Pubkey,
},
/// A solver was removed from the allow-list.
SolverRemoved {
/// Solver that was removed.
solver: Pubkey,
},
/// Generic solver interaction (instruction observed but not decoded into
/// one of the structured events above).
SolverInteraction {
/// Transaction signature.
tx_signature: Signature,
/// Index of the instruction within the transaction.
ix_index: u8,
},
}

/// SolFlow-side events, populates the `solana.sol_flow` table.
///
/// Note: the paired `solana.orders` row for `OrderEnabled` is written by the
/// settlement-program decode path, not here.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SolFlowEvent {
/// A new order was created on SolFlow.
OrderCreated {
/// Custodial PDA that holds the wSOL for this order.
custodial_pda: Pubkey,
/// Real owner of the order.
real_owner: Pubkey,
/// Order UID.
order_uid: [u8; 32],
/// From `meta.post_token_balances` on the custodial wSOL
/// account.
sol_amount: u64,
},
/// An order was enabled (custody transferred to settlement program).
OrderEnabled {
/// Custodial PDA.
custodial_pda: Pubkey,
/// Address that enabled the order.
enabler: Pubkey,
/// Order UID.
order_uid: [u8; 32],
},
/// An order was recovered (e.g. after a stuck-state cleanup).
OrderRecovered {
/// Custodial PDA.
custodial_pda: Pubkey,
/// Slot the recovery was observed at.
slot: u64,
},
}

/// Sum of the two program-side event enums for the persistence step.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DecodedEvent {
/// A settlement-program event.
Settlement(SettlementEvent),
/// A SolFlow event.
SolFlow(SolFlowEvent),
}
Loading
Loading