Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pretty_assertions = { version = "1.0.0" }
proc-macro2 = { version = "1.0" }
quote = { version = "1.0" }
rand = { version = "0.9" }
rkyv = { version = "0.8.15" }
semver = { version = "1.0.14" }
serde = { version = "1.0.114", features = ["derive", "rc"] }
serde_json = { version = "1.0.57" }
Expand Down Expand Up @@ -82,10 +83,13 @@ exclude = [

"examples/log-mem",
"examples/sm-mem",
"examples/sm-mem-rkyv",
"examples/rocksstore",
"examples/types-kv",
"examples/types-kv-rkyv",

"examples/raft-kv-memstore",
"examples/raft-kv-memstore-rkyv",
"examples/raft-kv-memstore-grpc",
"examples/raft-kv-memstore-single-threaded",
"examples/raft-kv-memstore-network-v2",
Expand Down
8 changes: 6 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ This directory contains example applications demonstrating different implementat
- **State Machine**: StateMachine implementation for application state
- **RaftNetwork Impl**: Transport protocol and client library used
- **RaftNetwork**: Interface version (RaftNetwork vs RaftNetworkV2)
- **Client**: HTTP/gRPC client library for application requests
- **Server**: Web framework for handling incoming requests
- **Client**: Client transport/library used by the example
- **Server**: Server runtime/framework used to accept requests
- **Special Features**: Unique characteristics of each example

| Example | Log | State Machine | RaftNetwork Impl | RaftNetwork | Client | Server | Special Features |
|---------|-----|---------------|------------------|-------------|--------|--------|------------------|
| [raft-kv-memstore] | [log-mem] | [sm-mem] | HTTP/reqwest | RaftNetwork | reqwest | actix-web | Basic example |
| [raft-kv-memstore-rkyv] | [log-mem] | in-memory (`rkyv`) | TCP + length-prefixed `rkyv` frames | RaftNetworkV2 | custom TCP (tests) | tokio `TcpListener` | custom wire protocol + `rkyv` serialization |
| [raft-kv-rocksdb] | [rocksstore] | [rocksstore] | HTTP/reqwest([network-v1]) | RaftNetwork | reqwest | actix-web | Persistent storage |
| [raft-kv-memstore-network-v2] | [log-mem] | [sm-mem] | HTTP/reqwest | RaftNetworkV2 | reqwest | actix-web | Network V2 interface |
| [multi-raft-kv] | [log-mem] | [sm-mem] | HTTP/channel | GroupRouter | channel | in-memory | Multi-Raft groups |
Expand All @@ -30,6 +31,7 @@ This directory contains example applications demonstrating different implementat
### Storage Implementations
- **[log-mem]** - In-memory Raft Log Store using `std::collections::BTreeMap`
- **[sm-mem]** - In-memory KV State Machine implementation
- **[sm-mem-rkyv]** - In-memory KV State Machine implementation with `rkyv` snapshots
- **[rocksstore]** - RocksDB-based persistent storage using `rocksdb` crate

### Backward Compatibility (since 0.10)
Expand All @@ -48,6 +50,7 @@ The following symbolic links are provided for backward compatibility:

<!-- Reference Links -->
[raft-kv-memstore]: raft-kv-memstore/
[raft-kv-memstore-rkyv]: raft-kv-memstore-rkyv/
[raft-kv-rocksdb]: raft-kv-rocksdb/
[raft-kv-memstore-network-v2]: raft-kv-memstore-network-v2/
[raft-kv-memstore-grpc]: raft-kv-memstore-grpc/
Expand All @@ -56,6 +59,7 @@ The following symbolic links are provided for backward compatibility:
[multi-raft-kv]: multi-raft-kv/
[log-mem]: log-mem/
[sm-mem]: sm-mem/
[sm-mem-rkyv]: sm-mem-rkyv/
[rocksstore]: rocksstore/
[network-v1]: network-v1-http/
[types-kv]: types-kv/
Expand Down
5 changes: 5 additions & 0 deletions examples/raft-kv-memstore-rkyv/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
target
vendor
.idea

/*.log
40 changes: 40 additions & 0 deletions examples/raft-kv-memstore-rkyv/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[package]
name = "raft-kv-memstore-rkyv"
version = "0.1.0"
readme = "README.md"

edition = "2024"
authors = [
"Sainath Singineedi <sainathsingineedi2222@gmail.com>",
]
categories = ["algorithms", "asynchronous", "data-structures"]
description = "An example distributed key-value store built upon `openraft`."
homepage = "https://github.com/databendlabs/openraft"
keywords = ["raft", "consensus"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/databendlabs/openraft"

[[bin]]
name = "raft-key-value"
path = "src/bin/main.rs"

[dependencies]
log-mem = { path = "../log-mem", features = [] }
openraft = { path = "../../openraft", features = ["type-alias", "rkyv", "serde"] }
tokio = { version = "1.49.0", features = ["net", "io-util", "rt"] }
clap = { version = "4.1.11", features = ["derive", "env"] }
futures = { version = "0.3.31" }
tracing = { version = "0.1.29" }
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
rkyv = { version = "0.8.15" }
serde = { version = "1", features = ["derive"] }
bincode = { version = "2.0.1", features = ["serde"] }

[dev-dependencies]
anyhow = { version = "1.0.63" }
maplit = { version = "1.0.2" }

[features]

[package.metadata.docs.rs]
all-features = true
80 changes: 80 additions & 0 deletions examples/raft-kv-memstore-rkyv/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Distributed Key-Value Store with OpenRaft over TCP + rkyv

Demonstrates a distributed key-value store built on OpenRaft using a custom TCP transport and `rkyv`-serialized wire messages.

## Key Features Demonstrated

- **Custom TCP networking**: Raft RPCs are exchanged over `tokio::net::TcpStream`.
- **Length-prefixed framing**: Each request/response is sent as `u32` length + payload bytes.
- **`rkyv` serialization**: Wire RPC enums and snapshot/state-machine data use zero-copy-friendly `rkyv` formats.
- **`RaftNetworkV2` implementation**: Implements `append_entries`, `vote`, `full_snapshot`, `transfer_leader`, and `backoff`.
- **In-memory storage**: `log-mem::LogStore` plus an in-memory `StateMachineStore` with snapshot support.

## Overview

This example includes:

- **Storage**:
- In-memory log store (`log-mem`)
- In-memory state machine (`BTreeMap<String, String>`) and snapshot build/install logic in `src/store/mod.rs`
- **Network**:
- A TCP-based `RaftNetworkFactory` and `RaftNetworkV2` implementation in `src/network/mod.rs`
- Request/response wire enums (`WireRequest`, `WireResponse`) serialized with `rkyv`
- **Application process**:
- Node bootstrap in `src/app.rs`
- Binary entrypoint and CLI args in `src/bin/main.rs`

## Testing

- **Storage conformance** (`src/test_store.rs`):
- Runs OpenRaft's storage test suite against this example's in-memory store.
- **Message serialization tests** (`tests/test_rkyv_messages.rs`):
- Validates `rkyv` round-trip for vote/append/snapshot/transfer-leader/write message types.
- **Single-process transport test** (`tests/test_cluster.rs`):
- Spawns 3 nodes and verifies framed TCP Raft RPC behavior.
- **Multi-process smoke test** (`./test-cluster.sh`):
- Builds the binary, starts 5 local nodes, probes TCP endpoints, and tails recent logs.

## Architecture

**Key code locations:**

- `src/bin/main.rs`: CLI and runtime bootstrap.
- `src/app.rs`: Raft config, store/network setup, and TCP listener loop.
- `src/network/mod.rs`: TCP wire protocol, framing, request handling, and `RaftNetworkV2`.
- `src/store/mod.rs`: State machine and snapshot implementation.
- `src/raft.rs`: App data types (`SetRequest`, `Response`, `Node`, `Vote`, `Entry`).
- `tests/`: integration tests for transport and serialization.

## Running

### Build

```shell
cargo build
```

### Run tests

```shell
cargo test
```

### Start a 3-node cluster manually

```bash
# Terminal 1
./target/debug/raft-key-value --id 1 --addr 127.0.0.1:5051

# Terminal 2
./target/debug/raft-key-value --id 2 --addr 127.0.0.1:5052

# Terminal 3
./target/debug/raft-key-value --id 3 --addr 127.0.0.1:5053
```

### Multi-process smoke test

```bash
./test-cluster.sh
```
53 changes: 53 additions & 0 deletions examples/raft-kv-memstore-rkyv/src/app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::sync::Arc;

use openraft::Config;
use tokio::net::TcpListener;
use tracing::info;
use tracing::warn;

use crate::NodeId;
use crate::network::Network;
use crate::network::process_socket;
use crate::store::LogStore;
use crate::store::StateMachineStore;
use crate::typ::*;

pub async fn start_raft_app(
node_id: NodeId,
http_addr: String,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Create a configuration for the raft instance.
let config = Arc::new(
Config {
heartbeat_interval: 500,
election_timeout_min: 1500,
election_timeout_max: 3000,
..Default::default()
}
.validate()?,
);

// Create stores and network
let log_store = LogStore::default();
let state_machine_store = Arc::new(StateMachineStore::default());
let network = Network {};

// Create Raft instance and keep it alive for the lifetime of this server task.
let raft: Raft = Raft::new(node_id, config.clone(), network, log_store, state_machine_store.clone()).await?;

// Bind tcp socket
let listener = TcpListener::bind(&http_addr).await?;
info!("Node {node_id} listening on {http_addr}");

loop {
let (socket, peer_addr) = listener.accept().await?;
info!("Accepted incoming connection from {peer_addr}");

let raft = raft.clone();
tokio::spawn(async move {
if let Err(e) = process_socket(socket, raft).await {
warn!("Failed to process socket from {peer_addr}: {e}");
}
});
}
}
30 changes: 30 additions & 0 deletions examples/raft-kv-memstore-rkyv/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use clap::Parser;
use openraft::AsyncRuntime;
use raft_kv_memstore_rkyv::TypeConfig;
use raft_kv_memstore_rkyv::app::start_raft_app;

#[derive(Parser, Clone, Debug)]
#[clap(author, version, about, long_about = None)]
pub struct Opt {
#[clap(long)]
pub id: u64,

#[clap(long)]
/// Network address to bind the server to (e.g., "127.0.0.1:50051")
pub addr: String,
}

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Initialize tracing first, before any logging happens
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_file(true)
.with_line_number(true)
.init();

// Parse the parameters passed by arguments.
let options = Opt::parse();

let mut rt = <TypeConfig as openraft::RaftTypeConfig>::AsyncRuntime::new(1);
rt.block_on(start_raft_app(options.id, options.addr))
}
28 changes: 28 additions & 0 deletions examples/raft-kv-memstore-rkyv/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#![allow(clippy::uninlined_format_args)]

pub mod app;
pub mod network;
pub mod raft;
pub mod store;

#[path = "../../utils/declare_types.rs"]
pub mod typ;

#[cfg(test)]
mod test_store;

openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig:
D = raft::SetRequest,
R = raft::Response,
LeaderId = raft::LeaderId,
Vote = raft::Vote,
Entry = raft::Entry,
Node = raft::Node,
SnapshotData = Vec<u8>,
);

pub type NodeId = u64;
pub type LogStore = store::LogStore;
pub type StateMachineStore = store::StateMachineStore;
Loading