Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 1 addition & 15 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,21 +601,7 @@ where C: RaftTypeConfig

// Generate Apply command

if log_progress.submitted().map(|x| x.as_ref_vote()) == log_progress.accepted().map(|x| x.as_ref_vote()) {
// Only apply committed entries when submitted and accepted logs are from the same leader.
//
// This ensures submitted logs won't be overridden by pending commands in the queue.
//
// If leaders differ, queued commands may override submitted logs. Example:
// - submitted: append-entries(leader=L1, entry=E2)
// - queued: truncate(E2), save-vote(L2), append-entries(leader=L2, entry=E2')
// Here E2 will be overridden by E2' when the queue executes.
//
// When both have the same leader:
// - A leader never truncates its own written entries
// - Committed entries are visible to all future leaders
// - The submitted logs are guaranteed to be the actual committed entries

if self.state.io_state.can_safely_apply() {
let apply_submitted = apply_progress.submitted();
let apply_accepted = apply_progress.accepted();

Expand Down
40 changes: 38 additions & 2 deletions openraft/src/raft_state/io_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,27 @@ pub(crate) mod monotonic;
/// Tracks the state of completed I/O operations: log flushing, applying to state machine, and
/// snapshot building.
///
/// These states update only when I/O completes and may lag behind
/// [`RaftState`](`crate::RaftState`).
/// ## RaftState vs IOState
///
/// [`RaftState`] represents the Engine's logical view — what the system *should* look like after
/// all queued I/O completes. [`IOState`] represents the Runtime's physical view — what has
/// actually been persisted and applied. Between them lies an I/O pipeline:
///
/// ```text
/// Engine Command Queue Runtime / Storage
/// ────── ───────────── ─────────────────
/// RaftState ────> AppendEntries, ────> IOState
/// (logical) SaveVote, (physical)
/// PurgeLog, ...
///
/// vote: L2 queued: SaveVote(L2) log_progress.accepted: L2
/// log_ids: [1..5] queued: Append([3..5]) log_progress.flushed: [1..2]
/// (entries 3..5 not yet on disk)
/// ```
///
/// This gap means [`IOState`] may lag behind [`RaftState`]. Commands like
/// [`Command::Respond`] use [`Condition::IOFlushed`] to delay client responses until the
/// relevant I/O completes, ensuring linearizability.
///
/// ## Progress Tracking
///
Expand Down Expand Up @@ -279,6 +298,23 @@ where C: RaftTypeConfig
None
}
}

/// Checks whether committed log entries can be safely applied to the state machine.
///
/// This is the **leader safety invariant**: submitted and accepted I/O must originate from
/// the same leader. When this holds, the submitted entries are guaranteed not to be
/// truncated by any queued command, because a leader never truncates its own entries.
///
/// If leaders differ, queued commands may override submitted logs:
///
/// - submitted: append-entries(leader=L1, entry=E2)
/// - queued: truncate(E2), save-vote(L2), append-entries(leader=L2, entry=E2')
///
/// E2 will be overridden by E2' when the queue executes.
pub(crate) fn can_safely_apply(&self) -> bool {
let log_progress = &self.log_progress;
log_progress.submitted().map(|x| x.as_ref_vote()) == log_progress.accepted().map(|x| x.as_ref_vote())
}
}

/// Creates a new `IOProgress` wrapped in `Valid`.
Expand Down
48 changes: 43 additions & 5 deletions openraft/src/raft_state/io_state/io_progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,52 @@ use validit::less_equal;
///
/// `T`: A totally ordered type representing the I/O operation identifier (e.g., [`LogIOId`]).
///
/// Invariant: `flushed <= submitted <= accepted`
/// # Three-Stage Progress Model
///
/// For a comprehensive explanation of the three-stage tracking and examples, see:
/// [Log I/O Progress](crate::docs::data::log_io_progress).
/// Every I/O operation in Raft progresses through three stages:
///
/// ```text
/// Engine Runtime Storage
/// ─────── ─────── ───────
/// accept() ────> accepted (queued) (queued)
/// │
/// submit() ────> │ submitted (in flight)
/// │ │
/// flush() ────> │ │ flushed
/// │ │ │
/// ▼ ▼ ▼
/// Next IO Storage write Durable on
/// can be submitted to disk;
/// accepted storage safe to apply
/// ```
///
/// - **accepted**: The Engine has logically accepted this I/O operation. The next operation can be
/// accepted immediately; the Engine does not wait for prior operations to complete.
/// - **submitted**: The Runtime has submitted this I/O to the storage layer. Entries at or before
/// this point are readable by [`RaftLogReader`] even if not yet flushed.
/// - **flushed**: The storage layer has confirmed durable persistence. Only after flushing can
/// committed entries be safely applied to the state machine or responded to clients.
///
/// # Monotonicity Invariant
///
/// At all times: `accepted >= submitted >= flushed` (in `T`'s partial order).
///
/// This is enforced by:
/// - Debug-mode assertions in [`accept`](Self::accept), [`submit`](Self::submit), and
/// [`flush`](Self::flush).
/// - The [`Validate`] implementation, which checks this invariant on every validation pass.
///
/// # Leader Safety Invariant
///
/// When `submitted` and `accepted` originate from the **same leader** (same vote),
/// the submitted entries are guaranteed not to be truncated by any queued command,
/// because a leader never truncates its own entries. This is the precondition for
/// safely applying committed logs to the state machine.
///
/// See also: [Log I/O Progress](crate::docs::data::log_io_progress).
///
/// [`LogIOId`]: crate::raft_state::io_state::log_io_id::LogIOId
/// [`RaftLogReader`]: crate::storage::RaftLogReader
#[derive(Debug, Clone)]
#[derive(PartialEq, Eq)]
pub(crate) struct IOProgress<T>
Expand Down Expand Up @@ -203,8 +243,6 @@ where
self.accepted.as_ref()
}

// Not used until Command reorder is implemented.
#[allow(dead_code)]
pub(crate) fn submitted(&self) -> Option<&T> {
self.submitted.as_ref()
}
Expand Down
22 changes: 22 additions & 0 deletions openraft/src/raft_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,28 @@
use crate::vote::raft_vote::RaftVoteExt;

/// A struct used to represent the raft state which a Raft node needs.
///
/// ## RaftState (Logical) vs IOState (Physical)
///
/// `RaftState` is the Engine's in-memory shadow of what the system *should* look like
/// after all queued I/O completes. The actual on-disk state lives in [`IOState`], which

Check failure on line 62 in openraft/src/raft_state/mod.rs

View workflow job for this annotation

GitHub Actions / lint

public documentation for `RaftState` links to private item `IOState`
/// tracks what has been submitted and flushed to storage.
///
/// This separation enables pipelined I/O: the Engine can accept multiple operations and
/// queue them as [`Command`]s without waiting for each one to persist. Client responses
/// are deferred via [`Condition::IOFlushed`] until the relevant I/O completes.
///
/// [`Command`]: crate::engine::Command

Check failure on line 69 in openraft/src/raft_state/mod.rs

View workflow job for this annotation

GitHub Actions / lint

public documentation for `RaftState` links to private item `crate::engine::Command`
/// [`Condition::IOFlushed`]: crate::engine::Condition::IOFlushed

Check failure on line 70 in openraft/src/raft_state/mod.rs

View workflow job for this annotation

GitHub Actions / lint

public documentation for `RaftState` links to private item `crate::engine::Condition::IOFlushed`
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
pub struct RaftState<C>
where C: RaftTypeConfig
{
/// The vote state of this node.
///
/// Monotonically increasing in term. When `vote.is_committed() == true`,
/// this node is an established leader (granted by a quorum).
pub(crate) vote: Leased<VoteOf<C>, InstantOf<C>>,

/// All log ids this node has.
Expand All @@ -75,11 +91,16 @@
// --
// -- volatile fields: they are not persisted.
// --
/// Monotonically increasing counter for assigning unique IDs to in-flight client requests.
pub(crate) last_inflight_id: u64,

/// The state of a Raft node, such as Leader or Follower.
pub server_state: ServerState,

/// The physical I/O progress — what has actually been submitted and flushed to storage.
///
/// May lag behind the logical state represented by other fields in this struct.
/// See [`IOState`] for the three-stage progress model and safety invariants.
pub(crate) io_state: Valid<IOState<C>>,

/// The log id up to which the next time it purges.
Expand All @@ -88,6 +109,7 @@
/// field.
pub(crate) purge_upto: Option<LogIdOf<C>>,

/// Shared ID generator for tracking replication and leader progress across subsystems.
pub(crate) progress_id_gen: SharedIdGenerator,
}

Expand Down
Loading