diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 9144146e3..3632f2b02 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -373,6 +373,7 @@ jobs: - 'raft-kv-memstore-network-v2' - 'raft-kv-memstore-opendal-snapshot-data' - 'raft-kv-memstore-single-threaded' + - 'raft-kv-fjall' - 'raft-kv-rocksdb' - 'multi-raft-kv' diff --git a/Cargo.toml b/Cargo.toml index 1094117be..4b52eca37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,7 @@ exclude = [ "examples/raft-kv-memstore-opendal-snapshot-data", "examples/raft-kv-rocksdb", "examples/multi-raft-kv", + "examples/raft-kv-fjall", "rt-monoio", "rt-compio", diff --git a/Makefile b/Makefile index 4cb8627d8..836ae59ff 100644 --- a/Makefile +++ b/Makefile @@ -45,6 +45,7 @@ test-examples: cargo test --manifest-path examples/raft-kv-memstore-network-v2/Cargo.toml cargo test --manifest-path examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml cargo test --manifest-path examples/raft-kv-memstore-single-threaded/Cargo.toml + cargo test --manifest-path examples/raft-kv-fjall/Cargo.toml cargo test --manifest-path examples/raft-kv-rocksdb/Cargo.toml cargo test --manifest-path examples/rocksstore/Cargo.toml cargo test --manifest-path examples/multi-raft-kv/Cargo.toml @@ -103,6 +104,7 @@ lint: cargo fmt --manifest-path examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml cargo fmt --manifest-path examples/raft-kv-memstore-single-threaded/Cargo.toml cargo fmt --manifest-path examples/raft-kv-memstore/Cargo.toml + cargo fmt --manifest-path examples/raft-kv-fjall/Cargo.toml cargo fmt --manifest-path examples/raft-kv-rocksdb/Cargo.toml cargo fmt --manifest-path examples/multi-raft-kv/Cargo.toml cargo clippy --no-deps --all-targets -- -D warnings @@ -118,6 +120,7 @@ lint: cargo clippy --no-deps --manifest-path examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml --all-targets -- -D warnings cargo clippy --no-deps --manifest-path examples/raft-kv-memstore-single-threaded/Cargo.toml --all-targets -- -D warnings cargo clippy --no-deps --manifest-path examples/raft-kv-memstore/Cargo.toml --all-targets -- -D warnings + cargo clippy --no-deps --manifest-path examples/raft-kv-fjall/Cargo.toml --all-targets -- -D warnings cargo clippy --no-deps --manifest-path examples/raft-kv-rocksdb/Cargo.toml --all-targets -- -D warnings cargo clippy --no-deps --manifest-path examples/multi-raft-kv/Cargo.toml --all-targets -- -D warnings # Bug: clippy --all-targets reports false warning about unused dep in @@ -166,6 +169,7 @@ check: RUSTFLAGS="-D warnings" cargo check --manifest-path examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml RUSTFLAGS="-D warnings" cargo check --manifest-path examples/raft-kv-memstore-single-threaded/Cargo.toml RUSTFLAGS="-D warnings" cargo check --manifest-path examples/raft-kv-memstore/Cargo.toml + RUSTFLAGS="-D warnings" cargo check --manifest-path examples/raft-kv-fjall/Cargo.toml RUSTFLAGS="-D warnings" cargo check --manifest-path examples/raft-kv-rocksdb/Cargo.toml RUSTFLAGS="-D warnings" cargo check --manifest-path examples/rocksstore/Cargo.toml RUSTFLAGS="-D warnings" cargo check --manifest-path examples/multi-raft-kv/Cargo.toml @@ -188,6 +192,7 @@ clean: cargo clean --manifest-path examples/raft-kv-memstore-opendal-snapshot-data/Cargo.toml cargo clean --manifest-path examples/raft-kv-memstore-single-threaded/Cargo.toml cargo clean --manifest-path examples/raft-kv-memstore/Cargo.toml + cargo clean --manifest-path examples/raft-kv-fjall/Cargo.toml cargo clean --manifest-path examples/raft-kv-rocksdb/Cargo.toml cargo clean --manifest-path examples/rocksstore/Cargo.toml cargo clean --manifest-path examples/multi-raft-kv/Cargo.toml diff --git a/examples/README.md b/examples/README.md index a3a52e24f..0a5586ad5 100644 --- a/examples/README.md +++ b/examples/README.md @@ -17,6 +17,7 @@ This directory contains example applications demonstrating different implementat | Example | Log | State Machine | RaftNetwork Impl | RaftNetwork | Client | Server | Special Features | |---------|-----|---------------|------------------|-------------|--------|--------|------------------| | [raft-kv-memstore] | [log-mem] | [sm-mem] | HTTP/reqwest | RaftNetwork | reqwest | actix-web | Basic example | +| [raft-kv-fjall] | [fjall] | [fjall] | HTTP/reqwest([network-v1]) | RaftNetwork | reqwest | actix-web | Persistent storage | | [raft-kv-rocksdb] | [rocksstore] | [rocksstore] | HTTP/reqwest([network-v1]) | RaftNetwork | reqwest | actix-web | Persistent storage | | [raft-kv-memstore-network-v2] | [log-mem] | [sm-mem] | HTTP/reqwest | RaftNetworkV2 | reqwest | actix-web | Network V2 interface | | [multi-raft-kv] | [log-mem] | [sm-mem] | HTTP/channel | GroupRouter | channel | in-memory | Multi-Raft groups | @@ -48,6 +49,7 @@ The following symbolic links are provided for backward compatibility: [raft-kv-memstore]: raft-kv-memstore/ +[raft-kv-fjall]: raft-kv-fjall/ [raft-kv-rocksdb]: raft-kv-rocksdb/ [raft-kv-memstore-network-v2]: raft-kv-memstore-network-v2/ [raft-kv-memstore-grpc]: raft-kv-memstore-grpc/ diff --git a/examples/raft-kv-fjall/Cargo.toml b/examples/raft-kv-fjall/Cargo.toml new file mode 100644 index 000000000..bbc6bc604 --- /dev/null +++ b/examples/raft-kv-fjall/Cargo.toml @@ -0,0 +1,46 @@ +[package] +name = "raft-kv-fjall" +version = "0.1.0" +readme = "README.md" + +edition = "2024" +authors = [ + "ariesdevil ", +] +categories = ["algorithms", "asynchronous", "data-structures"] +description = "An example distributed key-value store built upon `openraft`." +homepage = "https://github.com/databendlabs/openraft" +keywords = ["raft", "consensus"] +license = "MIT OR Apache-2.0" +repository = "https://github.com/databendlabs/openraft" + +[[bin]] +name = "raft-key-value-fjall" +path = "src/bin/main.rs" + +[dependencies] +client-http = { path = "../client-http" } +network-v1-http = { path = "../network-v1-http" } +openraft = { path = "../../openraft", features = ["serde", "type-alias"] } +openraft-legacy = { path = "../../legacy" } +types-kv = { path = "../types-kv" } + +actix-web = { version = "4.0.0-rc.2" } +clap = { version = "4.1.11", features = ["derive", "env"] } +fjall = {version = "3.0.1"} +futures = { version = "0.3" } +serde = { version = "1.0.114", features = ["derive"] } +serde_json = { version = "1.0.57" } +tracing = { version = "0.1.40" } +tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } +byteorder = "1.5.0" + +[dev-dependencies] +maplit = { version = "1.0.2" } +tempfile = { version = "3.4.0" } + + +[features] + +[package.metadata.docs.rs] +all-features = true diff --git a/examples/raft-kv-fjall/README.md b/examples/raft-kv-fjall/README.md new file mode 100644 index 000000000..e37c42d7e --- /dev/null +++ b/examples/raft-kv-fjall/README.md @@ -0,0 +1,47 @@ +# openraft-fjall-kv-example + +A fjall-backed persistent storage implementation for Openraft, demonstrating production-ready log storage and state machine patterns. + +## Key Features Demonstrated + +- **Persistent storage**: [`RaftLogStorage`] and [`RaftStateMachine`] with fjall +- **Column families**: Separate storage for logs, state machine, and metadata +- **Durability**: On-disk persistence for cluster recovery +- **Performance**: Efficient batch operations and compaction + +## Overview + +This example implements: +- **[`RaftLogStorage`](https://docs.rs/openraft/latest/openraft/storage/trait.RaftLogStorage.html)** - Persistent Raft log storage +- **[`RaftStateMachine`](https://docs.rs/openraft/latest/openraft/storage/trait.RaftStateMachine.html)** - Persistent application state machine + +Built with [fjall v3](https://fjall-rs.github.io/) for production-grade durability and performance. + +## Architecture + +**Storage structure**: +- Logs stored in dedicated fjall key space +- State machine data in separate key space +- Vote and metadata persisted independently + +**Asynchronous I/O operations**: +- WAL flush operations run in spawned tasks to avoid blocking the async runtime +- `save_vote()` and `append_to_log()` spawn async tasks for disk persistence +- Callbacks receive actual flush results for proper error propagation +- Log truncation (`purge()`) doesn't require immediate persistence + +**Key Code Locations**: +- Storage implementation: `src/store.rs` +- Log storage with async WAL flush: `src/log_store.rs` +- Type definitions: See parent example for network and client implementations + +## Comparison + +| Feature | kv-fjall | memstore | +|---------|--------------|----------| +| Storage | fjall (disk) | Memory | +| Persistence | Yes | No | +| Recovery | Full | None | +| Complexity | Higher | Lower | + +Built for testing and demonstration purposes. diff --git a/examples/raft-kv-fjall/src/app.rs b/examples/raft-kv-fjall/src/app.rs new file mode 100644 index 000000000..5ed774f76 --- /dev/null +++ b/examples/raft-kv-fjall/src/app.rs @@ -0,0 +1,18 @@ +use std::collections::BTreeMap; +use std::sync::Arc; + +use futures::lock::Mutex; +use openraft::Config; + +use crate::NodeId; +use crate::Raft; + +// Representation of an application state. This struct can be shared around to share +// instances of raft, store and more. +pub struct App { + pub id: NodeId, + pub addr: String, + pub raft: Raft, + pub key_values: Arc>>, + pub config: Arc, +} diff --git a/examples/raft-kv-fjall/src/bin/main.rs b/examples/raft-kv-fjall/src/bin/main.rs new file mode 100644 index 000000000..1fee1ba41 --- /dev/null +++ b/examples/raft-kv-fjall/src/bin/main.rs @@ -0,0 +1,30 @@ +use clap::Parser; +use raft_kv_fjall::start_example_raft_node; +use tracing_subscriber::EnvFilter; + +#[derive(Parser, Clone, Debug)] +#[clap(author, version, about, long_about = None)] +pub struct Opt { + #[clap(long)] + pub id: u64, + + #[clap(long)] + pub addr: String, +} + +#[actix_web::main] +async fn main() -> std::io::Result<()> { + // Setup the logger + tracing_subscriber::fmt() + .with_target(true) + .with_thread_ids(true) + .with_level(true) + .with_ansi(false) + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + // Parse the parameters passed by arguments. + let options = Opt::parse(); + + start_example_raft_node(options.id, format!("{}.db", options.addr), options.addr).await +} diff --git a/examples/raft-kv-fjall/src/lib.rs b/examples/raft-kv-fjall/src/lib.rs new file mode 100644 index 000000000..263b04b06 --- /dev/null +++ b/examples/raft-kv-fjall/src/lib.rs @@ -0,0 +1,92 @@ +use std::path::Path; +use std::sync::Arc; + +use actix_web::HttpServer; +use actix_web::middleware; +use actix_web::middleware::Logger; +use actix_web::web::Data; +use openraft::Config; + +use crate::app::App; +use crate::network::api; +use crate::network::management; +use crate::network::raft; +use crate::store::new_storage; + +pub mod app; +pub mod log_store; +pub mod network; +pub mod store; + +pub type NodeId = u64; + +openraft::declare_raft_types!( + pub TypeConfig: + D = types_kv::Request, + R = types_kv::Response, +); + +pub type LogStore = log_store::FjallLogStore; +pub type StateMachineStore = store::StateMachineStore; +pub type Raft = openraft::Raft; + +#[path = "../../utils/declare_types.rs"] +pub mod typ; + +pub async fn start_example_raft_node

(node_id: NodeId, dir: P, addr: String) -> std::io::Result<()> +where P: AsRef { + // Create a configuration for the raft instance. + let config = Config { + heartbeat_interval: 250, + election_timeout_min: 299, + ..Default::default() + }; + + let config = Arc::new(config.validate().unwrap()); + + let (log_store, state_machine_store) = new_storage(&dir).await; + + let kvs = state_machine_store.data.kvs.clone(); + + // Create the network layer using network-v1 crate + let network = network_v1_http::NetworkFactory {}; + + // Create a local raft instance. + let raft = openraft::Raft::new(node_id, config.clone(), network, log_store, state_machine_store).await.unwrap(); + + // Create an application that will store all the instances created above, this will + // later be used on the actix-web services. + let app_data = Data::new(App { + id: node_id, + addr: addr.clone(), + raft, + key_values: kvs, + config, + }); + + // Start the actix-web server. + let server = HttpServer::new(move || { + actix_web::App::new() + .wrap(Logger::default()) + .wrap(Logger::new("%a %{User-Agent}i")) + .wrap(middleware::Compress::default()) + .app_data(app_data.clone()) + // raft internal RPC + .service(raft::append) + .service(raft::snapshot) + .service(raft::vote) + // admin API + .service(management::init) + .service(management::add_learner) + .service(management::change_membership) + .service(management::metrics) + // application API + .service(api::write) + .service(api::read) + .service(api::linearizable_read) + }); + + let x = server.bind(addr)?; + + x.run().await +} diff --git a/examples/raft-kv-fjall/src/log_store.rs b/examples/raft-kv-fjall/src/log_store.rs new file mode 100644 index 000000000..c1e578802 --- /dev/null +++ b/examples/raft-kv-fjall/src/log_store.rs @@ -0,0 +1,259 @@ +use std::fmt::Debug; +use std::io; +use std::marker::PhantomData; +use std::ops::RangeBounds; + +use byteorder::BigEndian; +use byteorder::ReadBytesExt; +use byteorder::WriteBytesExt; +use fjall::Database; +use fjall::Keyspace; +use fjall::KeyspaceCreateOptions; +use fjall::PersistMode; +use openraft::LogState; +use openraft::OptionalSend; +use openraft::RaftLogReader; +use openraft::RaftTypeConfig; +use openraft::alias::EntryOf; +use openraft::alias::LogIdOf; +use openraft::alias::VoteOf; +use openraft::entry::RaftEntry; +use openraft::storage::IOFlushed; +use openraft::storage::RaftLogStorage; +use openraft::type_config::TypeConfigExt; + +use crate::log_store::meta::StoreMeta; + +#[derive(Clone)] +pub struct FjallLogStore +where C: RaftTypeConfig +{ + db: Database, + _p: PhantomData, +} + +impl FjallLogStore +where C: RaftTypeConfig +{ + pub fn new(db: Database) -> Self { + db.keyspace("meta", KeyspaceCreateOptions::default).expect("keyspace `meta` not found"); + db.keyspace("logs", KeyspaceCreateOptions::default).expect("keyspace `logs` not found"); + + Self { db, _p: PhantomData } + } + + fn keyspace_meta(&self) -> Keyspace { + self.db.keyspace("meta", KeyspaceCreateOptions::default).unwrap() + } + + fn keyspace_logs(&self) -> Keyspace { + self.db.keyspace("logs", KeyspaceCreateOptions::default).unwrap() + } + + fn get_meta>(&self) -> Result, io::Error> { + let val = self.keyspace_meta().get(M::KEY).map_err(|e| io::Error::other(e.to_string()))?; + let Some(val) = val else { + return Ok(None); + }; + + let t = serde_json::from_slice(val.as_ref()).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + Ok(Some(t)) + } + + fn put_meta>(&self, val: &M::Value) -> Result<(), io::Error> { + let val = serde_json::to_vec(val).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + self.keyspace_meta().insert(M::KEY, val).map_err(|e| io::Error::other(e.to_string())) + } +} + +impl RaftLogReader for FjallLogStore +where C: RaftTypeConfig +{ + async fn try_get_log_entries + Clone + Debug + OptionalSend>( + &mut self, + range: RB, + ) -> Result, io::Error> { + let start = match range.start_bound() { + std::ops::Bound::Included(x) => id_to_bin(*x), + std::ops::Bound::Excluded(x) => id_to_bin(*x + 1), + std::ops::Bound::Unbounded => id_to_bin(0), + }; + + let mut res = Vec::new(); + + for item_res in self.keyspace_logs().range(start..) { + let (id, val) = item_res.into_inner().map_err(read_logs_err)?; + + let id = bin_to_id(id.as_ref()); + if !range.contains(&id) { + break; + } + + let entry: EntryOf = serde_json::from_slice(val.as_ref()).map_err(read_logs_err)?; + + assert_eq!(id, entry.index()); + + res.push(entry); + } + Ok(res) + } + + async fn read_vote(&mut self) -> Result>, io::Error> { + self.get_meta::() + } +} + +impl RaftLogStorage for FjallLogStore +where C: RaftTypeConfig +{ + type LogReader = Self; + + async fn get_log_state(&mut self) -> Result, io::Error> { + let last = self.keyspace_logs().iter().last(); + + let last_log_id = match last { + None => None, + Some(res) => { + let (_log_index, entry) = res.into_inner().map_err(read_logs_err)?; + let ent = serde_json::from_slice::>(entry.as_ref()).map_err(read_logs_err)?; + Some(ent.log_id()) + } + }; + + let last_purged_log_id = self.get_meta::()?; + + let last_log_id = match last_log_id { + None => last_purged_log_id.clone(), + Some(x) => Some(x), + }; + + Ok(LogState { + last_purged_log_id, + last_log_id, + }) + } + + async fn get_log_reader(&mut self) -> Self::LogReader { + self.clone() + } + + async fn save_vote(&mut self, vote: &VoteOf) -> Result<(), io::Error> { + self.put_meta::(vote)?; + + // Vote must be persisted to disk before returning. + let db = self.db.clone(); + C::spawn_blocking(move || db.persist(PersistMode::SyncAll).map_err(|e| io::Error::other(e.to_string()))) + .await??; + + Ok(()) + } + + async fn append(&mut self, entries: I, callback: IOFlushed) -> Result<(), io::Error> + where I: IntoIterator> + Send { + for entry in entries { + let id = id_to_bin(entry.index()); + self.keyspace_logs() + .insert( + id, + serde_json::to_vec(&entry).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?, + ) + .map_err(|e| io::Error::other(e.to_string()))?; + } + + let db = self.db.clone(); + std::thread::spawn(move || { + let res = db.persist(PersistMode::SyncAll).map_err(|e| io::Error::other(e.to_string())); + callback.io_completed(res) + }); + + Ok(()) + } + + async fn truncate_after(&mut self, last_log_id: Option>) -> Result<(), io::Error> { + tracing::debug!("truncate_after: ({:?}, +∞)", last_log_id); + + let start_index = match last_log_id { + Some(log_id) => log_id.index() + 1, + None => 0, + }; + + // TODO(ariesdevil): using remove_range instead when + // this pr merged https://github.com/fjall-rs/lsm-tree/pull/242 + for k in start_index..10_000 { + self.keyspace_logs().remove(id_to_bin(k)).map_err(|e| io::Error::other(e.to_string()))?; + } + + Ok(()) + } + + async fn purge(&mut self, log_id: LogIdOf) -> Result<(), io::Error> { + tracing::debug!("delete_log: [0, {:?}]", log_id); + + // Write the last-purged log id before purging the logs. + // The logs at and before last-purged log id will be ignored by openraft. + // Therefore, there is no need to do it in a transaction. + self.put_meta::(&log_id)?; + + // TODO(ariesdevil): using remove_range instead when + // this pr merged https://github.com/fjall-rs/lsm-tree/pull/242 + for k in 0..log_id.index() + 1 { + self.keyspace_logs().remove(id_to_bin(k)).map_err(|e| io::Error::other(e.to_string()))?; + } + + Ok(()) + } +} + +/// Metadata of a raft-store. +/// +/// In raft, except logs and state machine, the store also has to store several piece of metadata. +/// This sub mod defines the key-value pairs of these metadata. +mod meta { + use openraft::RaftTypeConfig; + use openraft::alias::LogIdOf; + use openraft::alias::VoteOf; + + /// Defines metadata key and value + pub(crate) trait StoreMeta + where C: RaftTypeConfig + { + /// The key used to store in fjall + const KEY: &'static str; + + /// The type of the value to store + type Value: serde::Serialize + serde::de::DeserializeOwned; + } + + pub(crate) struct LastPurged {} + pub(crate) struct Vote {} + + impl StoreMeta for LastPurged + where C: RaftTypeConfig + { + const KEY: &'static str = "last_purged_log_id"; + type Value = LogIdOf; + } + impl StoreMeta for Vote + where C: RaftTypeConfig + { + const KEY: &'static str = "vote"; + type Value = VoteOf; + } +} + +/// converts an id to a byte vector for storing in the database. +/// Note that we're using big endian encoding to ensure correct sorting of keys +fn id_to_bin(id: u64) -> Vec { + let mut buf = Vec::with_capacity(8); + buf.write_u64::(id).unwrap(); + buf +} + +fn bin_to_id(buf: &[u8]) -> u64 { + (&buf[0..8]).read_u64::().unwrap() +} + +fn read_logs_err(e: impl std::error::Error + 'static) -> io::Error { + io::Error::other(e.to_string()) +} diff --git a/examples/raft-kv-fjall/src/network/api.rs b/examples/raft-kv-fjall/src/network/api.rs new file mode 100644 index 000000000..51bd9674f --- /dev/null +++ b/examples/raft-kv-fjall/src/network/api.rs @@ -0,0 +1,47 @@ +use actix_web::Responder; +use actix_web::post; +use actix_web::web; +use actix_web::web::Data; +use openraft::ReadPolicy; +use openraft::error::Infallible; +use openraft::error::LinearizableReadError; +use openraft::error::decompose::DecomposeResult; +use web::Json; + +use crate::TypeConfig; +use crate::app::App; + +#[post("/write")] +pub async fn write(app: Data, req: Json) -> actix_web::Result { + let response = app.raft.client_write(req.0).await.decompose().unwrap(); + Ok(Json(response)) +} + +#[post("/read")] +pub async fn read(app: Data, req: Json) -> actix_web::Result { + let key = req.0; + let kvs = app.key_values.lock().await; + let value = kvs.get(&key); + + let res: Result = Ok(value.cloned().unwrap_or_default()); + Ok(Json(res)) +} + +#[post("/linearizable_read")] +pub async fn linearizable_read(app: Data, req: Json) -> actix_web::Result { + let ret = app.raft.get_read_linearizer(ReadPolicy::ReadIndex).await.decompose().unwrap(); + + match ret { + Ok(linearizer) => { + linearizer.await_ready(&app.raft).await.unwrap(); + + let key = req.0; + let kvs = app.key_values.lock().await; + let value = kvs.get(&key); + + let res: Result> = Ok(value.cloned().unwrap_or_default()); + Ok(Json(res)) + } + Err(e) => Ok(Json(Err(e))), + } +} diff --git a/examples/raft-kv-fjall/src/network/management.rs b/examples/raft-kv-fjall/src/network/management.rs new file mode 100644 index 000000000..b3c95abf3 --- /dev/null +++ b/examples/raft-kv-fjall/src/network/management.rs @@ -0,0 +1,63 @@ +use std::collections::BTreeMap; +use std::collections::BTreeSet; + +use actix_web::Responder; +use actix_web::get; +use actix_web::post; +use actix_web::web::Data; +use actix_web::web::Json; +use openraft::BasicNode; +use openraft::async_runtime::WatchReceiver; +use openraft::error::Infallible; +use openraft::error::decompose::DecomposeResult; + +use crate::NodeId; +use crate::app::App; +use crate::typ::*; + +// --- Cluster management + +/// Add a node as **Learner**. +/// +/// A Learner receives log replication from the leader but does not vote. +/// This should be done before adding a node as a member into the cluster +/// (by calling `change-membership`) +#[post("/add-learner")] +pub async fn add_learner(app: Data, req: Json<(NodeId, String)>) -> actix_web::Result { + let (node_id, api_addr) = req.0; + let node = Node { addr: api_addr }; + let res = app.raft.add_learner(node_id, node, true).await.decompose().unwrap(); + Ok(Json(res)) +} + +/// Changes specified learners to members, or remove members. +#[post("/change-membership")] +pub async fn change_membership(app: Data, req: Json>) -> actix_web::Result { + let body = req.0; + let res = app.raft.change_membership(body, false).await.decompose().unwrap(); + Ok(Json(res)) +} + +/// Initialize a cluster. +#[post("/init")] +pub async fn init(app: Data, req: Json>) -> actix_web::Result { + let mut nodes = BTreeMap::new(); + if req.0.is_empty() { + nodes.insert(app.id, BasicNode { addr: app.addr.clone() }); + } else { + for (id, addr) in req.0.into_iter() { + nodes.insert(id, BasicNode { addr }); + } + }; + let res = app.raft.initialize(nodes).await.decompose().unwrap(); + Ok(Json(res)) +} + +/// Get the latest metrics of the cluster +#[get("/metrics")] +pub async fn metrics(app: Data) -> actix_web::Result { + let metrics = app.raft.metrics().borrow_watched().clone(); + + let res: Result = Ok(metrics); + Ok(Json(res)) +} diff --git a/examples/raft-kv-fjall/src/network/mod.rs b/examples/raft-kv-fjall/src/network/mod.rs new file mode 100644 index 000000000..0985670a2 --- /dev/null +++ b/examples/raft-kv-fjall/src/network/mod.rs @@ -0,0 +1,3 @@ +pub mod api; +pub mod management; +pub mod raft; diff --git a/examples/raft-kv-fjall/src/network/raft.rs b/examples/raft-kv-fjall/src/network/raft.rs new file mode 100644 index 000000000..d259d5857 --- /dev/null +++ b/examples/raft-kv-fjall/src/network/raft.rs @@ -0,0 +1,29 @@ +use actix_web::Responder; +use actix_web::post; +use actix_web::web::Data; +use actix_web::web::Json; +use openraft::error::decompose::DecomposeResult; +use openraft_legacy::prelude::*; + +use crate::app::App; +use crate::typ::*; + +// --- Raft communication + +#[post("/vote")] +pub async fn vote(app: Data, req: Json) -> actix_web::Result { + let res = app.raft.vote(req.0).await.decompose().unwrap(); + Ok(Json(res)) +} + +#[post("/append")] +pub async fn append(app: Data, req: Json) -> actix_web::Result { + let res = app.raft.append_entries(req.0).await.decompose().unwrap(); + Ok(Json(res)) +} + +#[post("/snapshot")] +pub async fn snapshot(app: Data, req: Json) -> actix_web::Result { + let res = app.raft.install_snapshot(req.0).await; + Ok(Json(res)) +} diff --git a/examples/raft-kv-fjall/src/store.rs b/examples/raft-kv-fjall/src/store.rs new file mode 100644 index 000000000..04a8c6575 --- /dev/null +++ b/examples/raft-kv-fjall/src/store.rs @@ -0,0 +1,217 @@ +use std::collections::BTreeMap; +use std::io; +use std::io::Cursor; +use std::path::Path; +use std::sync::Arc; + +use fjall::Database; +use fjall::Keyspace; +use fjall::KeyspaceCreateOptions; +use fjall::PersistMode; +use futures::Stream; +use futures::TryStreamExt; +use futures::lock::Mutex; +use openraft::EntryPayload; +use openraft::OptionalSend; +use openraft::RaftSnapshotBuilder; +use openraft::storage::EntryResponder; +use openraft::storage::RaftStateMachine; +use serde::Deserialize; +use serde::Serialize; + +use crate::TypeConfig; +use crate::log_store::FjallLogStore; +use crate::typ::*; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct StoredSnapshot { + pub meta: SnapshotMeta, + + /// The data of the state machine at the time of this snapshot. + pub data: Vec, +} + +#[derive(Debug, Clone)] +pub struct StateMachineData { + pub last_applied_log_id: Option, + + pub last_membership: StoredMembership, + + /// State built from applying the raft logs + pub kvs: Arc>>, +} + +#[derive(Clone)] +pub struct StateMachineStore { + pub data: StateMachineData, + /// snapshot index is not persisted in this example. + /// + /// It is only used as a suffix of snapshot id, and should be globally unique. + /// In practice, using a timestamp in micro-second would be good enough. + snapshot_idx: u64, + + /// State machine stores snapshot in db. + db: Database, +} + +impl RaftSnapshotBuilder for StateMachineStore { + async fn build_snapshot(&mut self) -> Result { + let last_applied_log = self.data.last_applied_log_id; + let last_membership = self.data.last_membership.clone(); + + let kv_json = { + let kvs = self.data.kvs.lock().await; + serde_json::to_vec(&*kvs).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))? + }; + + let snapshot_id = if let Some(last) = last_applied_log { + format!("{}-{}-{}", last.committed_leader_id(), last.index(), self.snapshot_idx) + } else { + format!("--{}", self.snapshot_idx) + }; + + let meta = SnapshotMeta { + last_log_id: last_applied_log, + last_membership, + snapshot_id, + }; + + let snapshot = StoredSnapshot { + meta: meta.clone(), + data: kv_json.clone(), + }; + + self.set_current_snapshot_(snapshot)?; + + Ok(Snapshot { + meta, + snapshot: Cursor::new(kv_json), + }) + } +} + +impl StateMachineStore { + async fn new(db: Database) -> Result { + let mut sm = Self { + data: StateMachineData { + last_applied_log_id: None, + last_membership: Default::default(), + kvs: Arc::new(Mutex::new(BTreeMap::new())), + }, + snapshot_idx: 0, + db, + }; + + let snapshot = sm.get_current_snapshot_()?; + if let Some(snap) = snapshot { + sm.update_state_machine_(snap).await?; + } + + Ok(sm) + } + + fn get_current_snapshot_(&self) -> Result, io::Error> { + Ok(self + .store() + .get(b"snapshot") + .map_err(io::Error::other)? + .and_then(|v| serde_json::from_slice(v.as_ref()).ok())) + } + + async fn update_state_machine_(&mut self, snapshot: StoredSnapshot) -> Result<(), io::Error> { + let kvs: BTreeMap = + serde_json::from_slice(&snapshot.data).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + self.data.last_applied_log_id = snapshot.meta.last_log_id; + self.data.last_membership = snapshot.meta.last_membership.clone(); + let mut x = self.data.kvs.lock().await; + *x = kvs; + + Ok(()) + } + + fn set_current_snapshot_(&self, snap: StoredSnapshot) -> Result<(), io::Error> { + self.store() + .insert(b"snapshot", serde_json::to_vec(&snap).unwrap().as_slice()) + .map_err(io::Error::other)?; + self.db.persist(PersistMode::SyncAll).map_err(io::Error::other)?; + Ok(()) + } + + fn store(&self) -> Keyspace { + self.db.keyspace("store", KeyspaceCreateOptions::default).unwrap() + } +} + +impl RaftStateMachine for StateMachineStore { + type SnapshotBuilder = Self; + + async fn applied_state(&mut self) -> Result<(Option, StoredMembership), io::Error> { + Ok((self.data.last_applied_log_id, self.data.last_membership.clone())) + } + + async fn apply(&mut self, mut entries: Strm) -> Result<(), io::Error> + where Strm: Stream, io::Error>> + Unpin + OptionalSend { + while let Some((entry, responder)) = entries.try_next().await? { + self.data.last_applied_log_id = Some(entry.log_id); + + let response = match entry.payload { + EntryPayload::Blank => types_kv::Response::none(), + EntryPayload::Normal(req) => match req { + types_kv::Request::Set { key, value } => { + let mut x = self.data.kvs.lock().await; + x.insert(key, value); + types_kv::Response::none() + } + }, + EntryPayload::Membership(mem) => { + self.data.last_membership = StoredMembership::new(Some(entry.log_id), mem); + types_kv::Response::none() + } + }; + + if let Some(responder) = responder { + responder.send(response); + } + } + Ok(()) + } + + async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { + self.snapshot_idx += 1; + self.clone() + } + + async fn begin_receiving_snapshot(&mut self) -> Result>, io::Error> { + Ok(Cursor::new(Vec::new())) + } + + async fn install_snapshot(&mut self, meta: &SnapshotMeta, snapshot: SnapshotData) -> Result<(), io::Error> { + let new_snapshot = StoredSnapshot { + meta: meta.clone(), + data: snapshot.into_inner(), + }; + + self.update_state_machine_(new_snapshot.clone()).await?; + self.set_current_snapshot_(new_snapshot)?; + + Ok(()) + } + + async fn get_current_snapshot(&mut self) -> Result, io::Error> { + let x = self.get_current_snapshot_()?; + Ok(x.map(|s| Snapshot { + meta: s.meta.clone(), + snapshot: Cursor::new(s.data.clone()), + })) + } +} + +pub(crate) async fn new_storage>(db_path: P) -> (FjallLogStore, StateMachineStore) { + let db = Database::builder(db_path).open().unwrap(); + + let log_store = FjallLogStore::new(db.clone()); + let sm_store = StateMachineStore::new(db).await.unwrap(); + + (log_store, sm_store) +} diff --git a/examples/raft-kv-fjall/test-cluster.sh b/examples/raft-kv-fjall/test-cluster.sh new file mode 100755 index 000000000..b738d92c5 --- /dev/null +++ b/examples/raft-kv-fjall/test-cluster.sh @@ -0,0 +1,214 @@ +#!/bin/sh + +set -o errexit + +cargo build + +kill_all() { + SERVICE='raft-key-value-fjall' + if [ "$(uname)" = "Darwin" ]; then + if pgrep -xq -- "${SERVICE}"; then + pkill -f "${SERVICE}" + fi + rm -r 127.0.0.1:*.db || echo "no db to clean" + else + set +e # killall will error if finds no process to kill + killall "${SERVICE}" + set -e + fi +} + +rpc() { + local uri=$1 + local body="$2" + + echo '---'" rpc(:$uri, $body)" + + { + if [ ".$body" = "." ]; then + time curl --silent "127.0.0.1:$uri" + else + time curl --silent "127.0.0.1:$uri" -H "Content-Type: application/json" -d "$body" + fi + } | { + if type jq > /dev/null 2>&1; then + jq + else + cat + fi + } + + echo + echo +} + +export RUST_LOG=trace +export RUST_BACKTRACE=full +bin=./target/debug/raft-key-value-fjall + +echo "Killing all running raft-key-value-fjall and cleaning up old data" + +kill_all +sleep 1 + +if ls 127.0.0.1:*.db +then + rm -r 127.0.0.1:*.db || echo "no db to clean" +fi + +echo "Start 3 uninitialized raft-key-value-fjall servers..." + +${bin} --id 1 --addr 127.0.0.1:21001 2>&1 > n1.log & +PID1=$! +sleep 1 +echo "Server 1 started" + +nohup ${bin} --id 2 --addr 127.0.0.1:21002 > n2.log & +sleep 1 +echo "Server 2 started" + +nohup ${bin} --id 3 --addr 127.0.0.1:21003 > n3.log & +sleep 1 +echo "Server 3 started" +sleep 1 + +echo "Initialize server 1 as a single-node cluster" +sleep 2 +echo +rpc 21001/init '[]' + +echo "Server 1 is a leader now" + +sleep 2 + +echo "Get metrics from the leader" +sleep 2 +echo +rpc 21001/metrics +sleep 1 + + +echo "Adding node 2 and node 3 as learners, to receive log from leader node 1" + +sleep 1 +echo +rpc 21001/add-learner '[2, "127.0.0.1:21002"]' +echo "Node 2 added as leaner" +sleep 1 +echo +rpc 21001/add-learner '[3, "127.0.0.1:21003"]' +echo "Node 3 added as leaner" +sleep 1 + +echo "Get metrics from the leader, after adding 2 learners" +sleep 2 +echo +rpc 21001/metrics +sleep 1 + +echo "Changing membership from [1] to 3 nodes cluster: [1, 2, 3]" +echo +rpc 21001/change-membership '[1, 2, 3]' +sleep 1 +echo "Membership changed" +sleep 1 + +echo "Get metrics from the leader again" +sleep 1 +echo +rpc 21001/metrics +sleep 1 + +echo "Write data on leader" +sleep 1 +echo +rpc 21001/write '{"Set":{"key":"foo","value":"bar"}}' +sleep 1 +echo "Data written" +sleep 1 + +echo "Read on every node, including the leader" +sleep 1 +echo "Read from node 1" +echo +rpc 21001/read '"foo"' +echo "Read from node 2" +echo +rpc 21002/read '"foo"' +echo "Read from node 3" +echo +rpc 21003/read '"foo"' + +echo "Kill Node 1" +kill -9 $PID1 +sleep 1 + +echo "Read from node 3" +echo +rpc 21003/read '"foo"' +sleep 1 + + +echo "Get metrics from node 2" +sleep 1 +echo +rpc 21002/metrics +sleep 1 + +echo "Write data on node 2" +sleep 1 +echo +rpc 21002/write '{"Set":{"key":"foo","value":"badger"}}' +sleep 1 +echo "Data written" +sleep 1 + +echo "Write data on node 3" +sleep 1 +echo +rpc 21003/write '{"Set":{"key":"foo","value":"badger"}}' +sleep 1 +echo "Data written" +sleep 1 + + +echo "Get metrics from node 2" +sleep 1 +echo +rpc 21002/metrics +sleep 1 + + +echo "Read from node 3" +echo +rpc 21003/read '"foo"' +sleep 1 + + +echo "Restart node 1" +echo + +${bin} --id 1 --addr 127.0.0.1:21001 2>&1 >> n1.log & +sleep 1 +echo "Server 1 started" + +echo "Read from node 1" +echo +rpc 21001/read '"foo"' +sleep 1 + +echo "Get metrics from node 1" +sleep 1 +echo +rpc 21001/metrics +sleep 1 + + + +echo "Killing all nodes in 3s..." +sleep 1 +echo "Killing all nodes in 2s..." +sleep 1 +echo "Killing all nodes in 1s..." +sleep 1 +kill_all diff --git a/examples/raft-kv-fjall/tests/cluster/main.rs b/examples/raft-kv-fjall/tests/cluster/main.rs new file mode 100644 index 000000000..5148911f9 --- /dev/null +++ b/examples/raft-kv-fjall/tests/cluster/main.rs @@ -0,0 +1,3 @@ +#![allow(clippy::uninlined_format_args)] + +mod test_cluster; diff --git a/examples/raft-kv-fjall/tests/cluster/test_cluster.rs b/examples/raft-kv-fjall/tests/cluster/test_cluster.rs new file mode 100644 index 000000000..14f595d3b --- /dev/null +++ b/examples/raft-kv-fjall/tests/cluster/test_cluster.rs @@ -0,0 +1,251 @@ +use std::backtrace::Backtrace; +use std::collections::BTreeMap; +use std::panic::PanicHookInfo; +use std::thread; +use std::time::Duration; + +use client_http::ExampleClient; +use maplit::btreemap; +use maplit::btreeset; +use openraft::BasicNode; +use openraft::async_runtime::AsyncRuntime; +use openraft::type_config::TypeConfigExt; +use openraft::type_config::alias::AsyncRuntimeOf; +use raft_kv_fjall::TypeConfig; +use raft_kv_fjall::start_example_raft_node; +use tracing_subscriber::EnvFilter; + +pub fn log_panic(panic: &PanicHookInfo) { + let backtrace = { format!("{:?}", Backtrace::force_capture()) }; + + eprintln!("{}", panic); + + if let Some(location) = panic.location() { + tracing::error!( + message = %panic, + backtrace = %backtrace, + panic.file = location.file(), + panic.line = location.line(), + panic.column = location.column(), + ); + eprintln!("{}:{}:{}", location.file(), location.line(), location.column()); + } else { + tracing::error!(message = %panic, backtrace = %backtrace); + } + + eprintln!("{}", backtrace); +} + +/// Setup a cluster of 3 nodes. +/// Write to it and read from it. +#[test] +fn test_cluster() { + TypeConfig::run(test_cluster_inner()).unwrap(); +} + +async fn test_cluster_inner() -> Result<(), Box> { + // --- The client itself does not store addresses for all nodes, but just node id. + // Thus we need a supporting component to provide mapping from node id to node address. + // This is only used by the client. A raft node in this example stores node addresses in its + // store. + + std::panic::set_hook(Box::new(|panic| { + log_panic(panic); + })); + + tracing_subscriber::fmt() + .with_target(true) + .with_thread_ids(true) + .with_level(true) + .with_ansi(false) + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + fn get_addr(node_id: u32) -> String { + match node_id { + 1 => "127.0.0.1:31001".to_string(), + 2 => "127.0.0.1:31002".to_string(), + 3 => "127.0.0.1:31003".to_string(), + _ => panic!("node not found"), + } + } + + // --- Start 3 raft node in 3 threads. + let d1 = tempfile::TempDir::new()?; + let d2 = tempfile::TempDir::new()?; + let d3 = tempfile::TempDir::new()?; + + let _h1 = thread::spawn(move || { + let mut rt = AsyncRuntimeOf::::new(1); + let x = rt.block_on(start_example_raft_node(1, d1.path(), get_addr(1))); + println!("x: {:?}", x); + }); + + let _h2 = thread::spawn(move || { + let mut rt = AsyncRuntimeOf::::new(1); + let x = rt.block_on(start_example_raft_node(2, d2.path(), get_addr(2))); + println!("x: {:?}", x); + }); + + let _h3 = thread::spawn(move || { + let mut rt = AsyncRuntimeOf::::new(1); + let x = rt.block_on(start_example_raft_node(3, d3.path(), get_addr(3))); + println!("x: {:?}", x); + }); + + // Wait for server to start up. + TypeConfig::sleep(Duration::from_millis(3_000)).await; + + // --- Create a client to the first node, as a control handle to the cluster. + + let leader = ExampleClient::::new(1, get_addr(1)); + + // --- 1. Initialize the target node as a cluster of only one node. + // After init(), the single node cluster will be fully functional. + + println!("=== init single node cluster"); + leader.init().await??; + + println!("=== get metrics after init, wait until leader is elected"); + loop { + let metrics = leader.metrics().await?; + if metrics.current_leader == Some(1) { + break; + } + TypeConfig::sleep(Duration::from_millis(500)).await; + } + + // --- 2. Add node 2 and 3 to the cluster as `Learner`, to let them start to receive log replication + // from the leader. + + println!("=== add-learner 2"); + leader.add_learner((2, get_addr(2))).await??; + + println!("=== add-learner 3"); + leader.add_learner((3, get_addr(3))).await??; + + println!("=== metrics after add-learner"); + let x = leader.metrics().await?; + + assert_eq!(&vec![btreeset![1]], x.membership_config.membership().get_joint_config()); + + let nodes_in_cluster = + x.membership_config.nodes().map(|(nid, node)| (*nid, node.clone())).collect::>(); + assert_eq!( + btreemap! { + 1 => BasicNode::new(get_addr(1)), + 2 => BasicNode::new(get_addr(2)), + 3 => BasicNode::new(get_addr(3)), + }, + nodes_in_cluster + ); + + // --- 3. Turn the two learners to members. A member node can vote or elect itself as leader. + + println!("=== change-membership to 1,2,3"); + leader.change_membership(&btreeset! {1,2,3}).await??; + + // --- After change-membership, some cluster state will be seen in the metrics. + // + // ```text + // metrics: RaftMetrics { + // current_leader: Some(1), + // membership_config: EffectiveMembership { + // log_id: LogId { leader_id: LeaderId { term: 1, node_id: 1 }, index: 8 }, + // membership: Membership { learners: {}, configs: [{1, 2, 3}] } + // }, + // leader_metrics: Some(LeaderMetrics { replication: { + // 2: ReplicationMetrics { matched: Some(LogId { leader_id: LeaderId { term: 1, node_id: 1 }, index: 7 }) }, + // 3: ReplicationMetrics { matched: Some(LogId { leader_id: LeaderId { term: 1, node_id: 1 }, index: 8 }) }} }) + // } + // ``` + + println!("=== metrics after change-member"); + let x = leader.metrics().await?; + assert_eq!( + &vec![btreeset![1, 2, 3]], + x.membership_config.membership().get_joint_config() + ); + + // --- Try to write some application data through the leader. + + println!("=== write `foo=bar`"); + leader + .write(&types_kv::Request::Set { + key: "foo".to_string(), + value: "bar".to_string(), + }) + .await??; + + // --- Wait for a while to let the replication get done. + + TypeConfig::sleep(Duration::from_millis(500)).await; + + // --- Read it on every node. + + println!("=== read `foo=bar` on node 1"); + let x = leader.read(&("foo".to_string())).await?; + assert_eq!("bar", x); + + println!("=== read `foo=bar` on node 2"); + let client2 = ExampleClient::::new(2, get_addr(2)); + let x = client2.read(&("foo".to_string())).await?; + assert_eq!("bar", x); + + println!("=== read `foo=bar` on node 3"); + let client3 = ExampleClient::::new(3, get_addr(3)); + let x = client3.read(&("foo".to_string())).await?; + assert_eq!("bar", x); + + // --- A write to non-leader will be automatically forwarded to a known leader + + println!("=== write `foo=wow` on node 2"); + client2 + .write(&types_kv::Request::Set { + key: "foo".to_string(), + value: "wow".to_string(), + }) + .await??; + + TypeConfig::sleep(Duration::from_millis(500)).await; + + // --- Read it on every node. + + println!("=== read `foo=wow` on node 1"); + let x = leader.read(&("foo".to_string())).await?; + assert_eq!("wow", x); + + println!("=== read `foo=wow` on node 2"); + let client2 = ExampleClient::::new(2, get_addr(2)); + let x = client2.read(&("foo".to_string())).await?; + assert_eq!("wow", x); + + println!("=== read `foo=wow` on node 3"); + let client3 = ExampleClient::::new(3, get_addr(3)); + let x = client3.read(&("foo".to_string())).await?; + assert_eq!("wow", x); + + println!("=== linearizable_read `foo=wow` on node 1"); + let x = leader.linearizable_read(&("foo".to_string())).await??; + assert_eq!("wow", x); + + println!("=== linearizable_read `foo=wow` on node 2 MUST return LinearizableReadError"); + let x = client2.linearizable_read(&("foo".to_string())).await?; + println!("=== linearize_read on node 2 result: {:?}", x); + match x { + Err(e) => { + let s = e.to_string(); + let expect_err: String = + "has to forward request to: Some(1), Some(BasicNode { addr: \"127.0.0.1:31001\" })".to_string(); + + assert_eq!(s, expect_err); + } + Ok(_) => panic!("MUST return LinearizableReadError"), + } + + println!("=== linearizable_read_auto_forward `foo=wow` on node 2 returns value"); + let x = client2.linearizable_read_auto_forward(&("foo".to_string())).await?; + assert_eq!(x.unwrap(), "wow"); + + Ok(()) +}