diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index c7f548ba1..7fe8ae75c 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -601,21 +601,7 @@ where C: RaftTypeConfig // Generate Apply command - if log_progress.submitted().map(|x| x.as_ref_vote()) == log_progress.accepted().map(|x| x.as_ref_vote()) { - // Only apply committed entries when submitted and accepted logs are from the same leader. - // - // This ensures submitted logs won't be overridden by pending commands in the queue. - // - // If leaders differ, queued commands may override submitted logs. Example: - // - submitted: append-entries(leader=L1, entry=E2) - // - queued: truncate(E2), save-vote(L2), append-entries(leader=L2, entry=E2') - // Here E2 will be overridden by E2' when the queue executes. - // - // When both have the same leader: - // - A leader never truncates its own written entries - // - Committed entries are visible to all future leaders - // - The submitted logs are guaranteed to be the actual committed entries - + if self.state.io_state.can_safely_apply() { let apply_submitted = apply_progress.submitted(); let apply_accepted = apply_progress.accepted(); diff --git a/openraft/src/raft_state/io_state.rs b/openraft/src/raft_state/io_state.rs index 16a156a94..34921a25e 100644 --- a/openraft/src/raft_state/io_state.rs +++ b/openraft/src/raft_state/io_state.rs @@ -22,8 +22,27 @@ pub(crate) mod monotonic; /// Tracks the state of completed I/O operations: log flushing, applying to state machine, and /// snapshot building. /// -/// These states update only when I/O completes and may lag behind -/// [`RaftState`](`crate::RaftState`). +/// ## RaftState vs IOState +/// +/// [`RaftState`] represents the Engine's logical view — what the system *should* look like after +/// all queued I/O completes. [`IOState`] represents the Runtime's physical view — what has +/// actually been persisted and applied. Between them lies an I/O pipeline: +/// +/// ```text +/// Engine Command Queue Runtime / Storage +/// ────── ───────────── ───────────────── +/// RaftState ────> AppendEntries, ────> IOState +/// (logical) SaveVote, (physical) +/// PurgeLog, ... +/// +/// vote: L2 queued: SaveVote(L2) log_progress.accepted: L2 +/// log_ids: [1..5] queued: Append([3..5]) log_progress.flushed: [1..2] +/// (entries 3..5 not yet on disk) +/// ``` +/// +/// This gap means [`IOState`] may lag behind [`RaftState`]. Commands like +/// [`Command::Respond`] use [`Condition::IOFlushed`] to delay client responses until the +/// relevant I/O completes, ensuring linearizability. /// /// ## Progress Tracking /// @@ -279,6 +298,23 @@ where C: RaftTypeConfig None } } + + /// Checks whether committed log entries can be safely applied to the state machine. + /// + /// This is the **leader safety invariant**: submitted and accepted I/O must originate from + /// the same leader. When this holds, the submitted entries are guaranteed not to be + /// truncated by any queued command, because a leader never truncates its own entries. + /// + /// If leaders differ, queued commands may override submitted logs: + /// + /// - submitted: append-entries(leader=L1, entry=E2) + /// - queued: truncate(E2), save-vote(L2), append-entries(leader=L2, entry=E2') + /// + /// E2 will be overridden by E2' when the queue executes. + pub(crate) fn can_safely_apply(&self) -> bool { + let log_progress = &self.log_progress; + log_progress.submitted().map(|x| x.as_ref_vote()) == log_progress.accepted().map(|x| x.as_ref_vote()) + } } /// Creates a new `IOProgress` wrapped in `Valid`. diff --git a/openraft/src/raft_state/io_state/io_progress.rs b/openraft/src/raft_state/io_state/io_progress.rs index 05e3972f4..f31c9442d 100644 --- a/openraft/src/raft_state/io_state/io_progress.rs +++ b/openraft/src/raft_state/io_state/io_progress.rs @@ -9,12 +9,52 @@ use validit::less_equal; /// /// `T`: A totally ordered type representing the I/O operation identifier (e.g., [`LogIOId`]). /// -/// Invariant: `flushed <= submitted <= accepted` +/// # Three-Stage Progress Model /// -/// For a comprehensive explanation of the three-stage tracking and examples, see: -/// [Log I/O Progress](crate::docs::data::log_io_progress). +/// Every I/O operation in Raft progresses through three stages: +/// +/// ```text +/// Engine Runtime Storage +/// ─────── ─────── ─────── +/// accept() ────> accepted (queued) (queued) +/// │ +/// submit() ────> │ submitted (in flight) +/// │ │ +/// flush() ────> │ │ flushed +/// │ │ │ +/// ▼ ▼ ▼ +/// Next IO Storage write Durable on +/// can be submitted to disk; +/// accepted storage safe to apply +/// ``` +/// +/// - **accepted**: The Engine has logically accepted this I/O operation. The next operation can be +/// accepted immediately; the Engine does not wait for prior operations to complete. +/// - **submitted**: The Runtime has submitted this I/O to the storage layer. Entries at or before +/// this point are readable by [`RaftLogReader`] even if not yet flushed. +/// - **flushed**: The storage layer has confirmed durable persistence. Only after flushing can +/// committed entries be safely applied to the state machine or responded to clients. +/// +/// # Monotonicity Invariant +/// +/// At all times: `accepted >= submitted >= flushed` (in `T`'s partial order). +/// +/// This is enforced by: +/// - Debug-mode assertions in [`accept`](Self::accept), [`submit`](Self::submit), and +/// [`flush`](Self::flush). +/// - The [`Validate`] implementation, which checks this invariant on every validation pass. +/// +/// # Leader Safety Invariant +/// +/// When `submitted` and `accepted` originate from the **same leader** (same vote), +/// the submitted entries are guaranteed not to be truncated by any queued command, +/// because a leader never truncates its own entries. This is the precondition for +/// safely applying committed logs to the state machine. +/// +/// See also: [Log I/O Progress](crate::docs::data::log_io_progress). /// /// [`LogIOId`]: crate::raft_state::io_state::log_io_id::LogIOId +/// [`RaftLogReader`]: crate::storage::RaftLogReader #[derive(Debug, Clone)] #[derive(PartialEq, Eq)] pub(crate) struct IOProgress @@ -203,8 +243,6 @@ where self.accepted.as_ref() } - // Not used until Command reorder is implemented. - #[allow(dead_code)] pub(crate) fn submitted(&self) -> Option<&T> { self.submitted.as_ref() } diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index 7d5142b9e..3e86775b2 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -55,12 +55,28 @@ use crate::vote::RaftVote; use crate::vote::raft_vote::RaftVoteExt; /// A struct used to represent the raft state which a Raft node needs. +/// +/// ## RaftState (Logical) vs IOState (Physical) +/// +/// `RaftState` is the Engine's in-memory shadow of what the system *should* look like +/// after all queued I/O completes. The actual on-disk state lives in [`IOState`], which +/// tracks what has been submitted and flushed to storage. +/// +/// This separation enables pipelined I/O: the Engine can accept multiple operations and +/// queue them as [`Command`]s without waiting for each one to persist. Client responses +/// are deferred via [`Condition::IOFlushed`] until the relevant I/O completes. +/// +/// [`Command`]: crate::engine::Command +/// [`Condition::IOFlushed`]: crate::engine::Condition::IOFlushed #[derive(Clone, Debug)] #[derive(PartialEq, Eq)] pub struct RaftState where C: RaftTypeConfig { /// The vote state of this node. + /// + /// Monotonically increasing in term. When `vote.is_committed() == true`, + /// this node is an established leader (granted by a quorum). pub(crate) vote: Leased, InstantOf>, /// All log ids this node has. @@ -75,11 +91,16 @@ where C: RaftTypeConfig // -- // -- volatile fields: they are not persisted. // -- + /// Monotonically increasing counter for assigning unique IDs to in-flight client requests. pub(crate) last_inflight_id: u64, /// The state of a Raft node, such as Leader or Follower. pub server_state: ServerState, + /// The physical I/O progress — what has actually been submitted and flushed to storage. + /// + /// May lag behind the logical state represented by other fields in this struct. + /// See [`IOState`] for the three-stage progress model and safety invariants. pub(crate) io_state: Valid>, /// The log id up to which the next time it purges. @@ -88,6 +109,7 @@ where C: RaftTypeConfig /// field. pub(crate) purge_upto: Option>, + /// Shared ID generator for tracking replication and leader progress across subsystems. pub(crate) progress_id_gen: SharedIdGenerator, }