Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ tracing-subscriber = { version = "0.3.22", features = ["json"] }
url = "2.5.0"
vergen = "8"
winner-selection = { path = "crates/winner-selection" }
yellowstone-grpc-client = "13.1.0"
yellowstone-grpc-proto = { version = "12.4.0", default-features = false }

[workspace.lints]
Expand Down
3 changes: 3 additions & 0 deletions crates/solana-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ name = "solana-indexer"
path = "src/main.rs"

[dependencies]
dashmap = { workspace = true }
observe = { workspace = true }
prometheus = { workspace = true }
prometheus-metric-storage = { workspace = true }
solana-client = { workspace = true }
solana-sdk = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync"] }
tracing = { workspace = true }
yellowstone-grpc-client = { workspace = true }
yellowstone-grpc-proto = { workspace = true }

[lints]
Expand Down
70 changes: 70 additions & 0 deletions crates/solana-indexer/src/indexer/decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//! The decoder pulls `StreamUpdate`s from the ingester, decodes
//! settlement-program and SolFlow transactions, joins account-update snapshots,
//! and persists typed events.

// TODO: This file only declares the component skeleton. The `run` body is
// `unimplemented!`; the dispatch logic and persist path arrive in a later
// change.

use {
crate::{
traits::store::Store,
types::{
errors::StoreError,
shared::{PartialEvent, PartialEventKey, StreamUpdate},
},
},
dashmap::DashMap,
solana_sdk::pubkey::Pubkey,
std::sync::Arc,
tokio::sync::mpsc::Receiver,
};

/// Decoder component.
///
/// The watchdog holds a clone of the same `partials` map, so the two operate on
/// the same concurrent map without any message passing between them.
pub struct Decoder<St: Store> {
/// Store implementor.
pub store: St,

/// Incoming `StreamUpdate` from the ingester.
pub rx: Receiver<StreamUpdate>,

/// Shared in-memory map of partial events keyed by `PartialEventKey`,
/// holding either-half events waiting for their pair. The watchdog holds a
/// clone of this `Arc`.
pub partials: Arc<DashMap<PartialEventKey, PartialEvent>>,

/// Settlement program id (filter target for the decoder).
pub settlement_program: Pubkey,

/// SolFlow program id (filter target for the decoder).
pub solflow_program: Pubkey,
}

impl<St: Store> Decoder<St> {
/// Construct a new decoder. The caller owns the channel capacity decision.
pub fn new(
store: St,
rx: Receiver<StreamUpdate>,
partials: Arc<DashMap<PartialEventKey, PartialEvent>>,
settlement_program: Pubkey,
solflow_program: Pubkey,
) -> Self {
Self {
store,
rx,
partials,
settlement_program,
solflow_program,
}
}

/// Main loop. Pulls `StreamUpdate` from the receiver, runs the decode
/// pipeline, persists, and records partial events in the shared map for the
/// watchdog to read.
pub async fn run(&mut self) -> Result<(), StoreError> {
unimplemented!()
}
}
63 changes: 63 additions & 0 deletions crates/solana-indexer/src/indexer/finalization.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//! The finalization worker updates the commitment level of the transactions
//! tracked by the indexer, promoting rows written at `confirmed` to
//! `finalized`.
//!
//! It does so through two flows. Two are needed because the relevant RPC
//! methods trade off differently: `getSignatureStatuses` is batchable but the
//! node only retains statuses for recent slots, while `getTransaction` reaches
//! arbitrarily old transactions on archival nodes but costs one call per
//! signature. The batched pass handles the common case cheaply; the per-row
//! sweep catches rows that age out of it.
//!
//! - **Promotion pass**: batch-polls `getSignatureStatuses` (at most
//! [`PROMOTION_BATCH_LIMIT`] signatures per call) over rows still at
//! `confirmed` that are at least [`FINALIZATION_WINDOW_SLOTS`] behind the
//! chain tip, and promotes rows whose `confirmationStatus` is `"finalized"`.
//!
//! - **Aged-row sweep**: fallback for rows past the signature-status retention
//! horizon ([`SIGNATURE_STATUS_RETENTION_SLOTS`]), which the promotion pass
//! can no longer check. Each row costs one `getTransaction` call; a non-null
//! response promotes to `finalized`, a null response marks `rolled_back`.

// TODO: This file only declares the component skeleton. The `run` body is
// `unimplemented!`; both flows arrive in a later change.

use crate::traits::{solana_client::SolanaClient, store::Store};

/// Typical number of slots for a transaction to finalize (~12.8 s). The
/// promotion pass skips rows fresher than this.
#[allow(dead_code)]
pub const FINALIZATION_WINDOW_SLOTS: u64 = 32;

/// Upper limit for the `getSignatureStatuses` batch RPC call.
#[allow(dead_code)]
pub const PROMOTION_BATCH_LIMIT: usize = 256;

/// Approximate slot horizon past which `getSignatureStatuses` no longer returns
/// a result.
#[allow(dead_code)]
pub const SIGNATURE_STATUS_RETENTION_SLOTS: u64 = 150;

/// Transaction finalization worker. See the module docs for the two flows it
/// runs.
pub struct FinalizationWorker<St: Store, R: SolanaClient> {
/// Store implementor.
pub store: St,

/// RPC implementor.
pub rpc: R,
}

impl<St: Store, R: SolanaClient> FinalizationWorker<St, R> {
/// Construct a new finalization worker.
pub fn new(store: St, rpc: R) -> Self {
Self { store, rpc }
}

/// Outer loop. Runs the promotion pass and the aged-row sweep on a timer.
///
/// Placeholder for now; implemented in a later change.
pub async fn run(&mut self) {
unimplemented!("implemented in PR 11–12")
}
}
59 changes: 59 additions & 0 deletions crates/solana-indexer/src/indexer/ingester.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! The ingester owns the yellowstone gRPC stream. It drains the socket as fast
//! as yellowstone delivers, pushes tagged updates into the channel, and updates
//! `LATEST_CHAIN_SLOT` on every slot-filter message. It performs no decoding.

// TODO: This file only declares the component skeleton. The `run` body is
// `unimplemented!`; the actual drain and reconnect with backoff logic arrives
// in a later change.

use {
crate::{traits::store::Store, types::shared::StreamUpdate},
std::sync::atomic::AtomicU64,
tokio::sync::mpsc::Sender,
yellowstone_grpc_client::GrpcConnector,
};

/// The sole writer is the ingester, on every slot-filter message. Anchors the
/// partial-event watchdog and the finalization worker. Cold start is zero; the
/// watchdog skips its comparison on the first tick.
pub static LATEST_CHAIN_SLOT: AtomicU64 = AtomicU64::new(0);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using a global static AtomicU64 for LATEST_CHAIN_SLOT introduces shared mutable state across the entire process. This causes race conditions and flakiness when running unit/integration tests in parallel (Cargo's default behavior). It also prevents running multiple indexer instances in the same process.

Actionable Suggestion:
Remove the global static and instead pass an Arc<AtomicU64> (or a shared state struct) to the constructors of Ingester, Decoder, PartialEventWatchdog, and FinalizationWorker.

References
  1. Focus exclusively on identifying missing edge cases, potential race conditions, or logic that deviates from the PR's stated goals. (link)


/// Cap on the exponential backoff between reconnect attempts.
#[allow(dead_code)]
pub const RECONNECT_BACKOFF_CAP: std::time::Duration = std::time::Duration::from_secs(30);

/// Capacity of the channel from the ingester to the decoder.
pub const INGEST_TO_DECODER_CAPACITY: usize = 1024;

/// Ingester component.
///
/// Generic over a `GrpcConnector` implementor so the unit tests can drive it
/// with a mock.
pub struct Ingester<C: GrpcConnector, St: Store> {
/// gRPC connector implementor
pub connector: C,

/// Sends `StreamUpdate` to the decoder. Should be bounded to
/// `RECONNECT_BACKOFF_CAP` entries.
pub tx: Sender<StreamUpdate>,

/// Store implementor; used to checkpoint the slot.
pub store: St,
}

impl<C: GrpcConnector, St: Store> Ingester<C, St> {
/// Construct a new ingester. The caller owns the channel capacity decision.
pub fn new(connector: C, tx: Sender<StreamUpdate>, store: St) -> Self {
Self {
connector,
tx,
store,
}
}

/// TODO: Outer loop: open the subscription, drain it, push into the
/// channel, reconnect on failure with exponential backoff.
pub async fn run(&mut self) {
unimplemented!()
}
}
39 changes: 39 additions & 0 deletions crates/solana-indexer/src/indexer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//! Consumer components of the Solana settlement indexer.
//!
//! The four components and their roles:
//!
//! - [`Ingester`]: subscribes to the Yellowstone gRPC stream and drains it as
//! fast as updates arrive, forwarding them to the decoder. It does no
//! decoding itself, so the socket never backs up behind slow processing. It
//! is also the single writer of the "latest chain slot" counter that the
//! other components use to know how far the chain has advanced.
//!
//! - [`Decoder`]: receives the raw stream updates, picks out transactions
//! belonging to the settlement and SolFlow programs, matches each transaction
//! with its corresponding account-update snapshot, and persists the resulting
//! typed events to the store.
//!
//! - [`PartialEventWatchdog`]: some events arrive in two halves (a transaction
//! update and an account update) that don't always land together. The decoder
//! parks the half it has in a map shared with the watchdog; the watchdog
//! periodically scans that map and dead-letters any entry whose other half
//! never showed up within the slot window, recording which half went missing.
//!
//! - [`FinalizationWorker`]: rows are first written at the `confirmed`
//! commitment level. This worker re-checks them against the chain and
//! promotes them to `finalized`, or marks them rolled back if the transaction
//! disappeared. It uses a cheap batched RPC call for recent rows and falls
//! back to one-call-per-row lookups for rows old enough that the batched
//! method no longer reports them.

pub mod decoder;
pub mod finalization;
pub mod ingester;
pub mod watchdog;

pub use {
decoder::Decoder,
finalization::FinalizationWorker,
ingester::Ingester,
watchdog::PartialEventWatchdog,
};
Loading
Loading