Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

[![Rust CI](https://github.com/anza-xyz/shaq/actions/workflows/ci.yml/badge.svg)](https://github.com/anza-xyz/shaq/actions/workflows/ci.yml)

shaq is a **SHAred Queue**: a simple shared-memory SPSC (Single Producer Single Consumer) and MPMC (Multi Producer Multi Consumer) FIFO queue.
shaq is a **SHAred Queue**: a simple shared-memory SPSC (Single Producer Single Consumer), MPMC (Multi Producer Multi Consumer), and broadcast FIFO queue.
It is designed for efficient inter-thread or inter-process communication using a lock-free, memory-mapped queue.
157 changes: 156 additions & 1 deletion examples/enqueue_dequeue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use common::{
run_total_throughput_loop, setup_exit_handler, Item, SYNC_CADENCE,
};
use shaq::{
broadcast::{Consumer as BroadcastConsumer, Producer as BroadcastProducer},
mpmc::{Consumer as MpmcConsumer, Producer as MpmcProducer},
spsc::{Consumer as SpscConsumer, Producer as SpscProducer},
};
Expand All @@ -19,6 +20,7 @@ const QUEUE_SIZE: usize = 16 * 1024 * 1024;
enum Mode {
Spsc,
Mpmc { producers: usize, consumers: usize },
Broadcast { producers: usize, consumers: usize },
}

struct Config {
Expand All @@ -34,6 +36,10 @@ fn main() {
producers,
consumers,
} => run_mpmc(producers, consumers, config.verbose),
Mode::Broadcast {
producers,
consumers,
} => run_broadcast(producers, consumers, config.verbose),
}
}

Expand Down Expand Up @@ -76,6 +82,19 @@ fn parse_config_or_exit() -> Config {
consumers,
}
}
Some("broadcast") => {
let producers = parse_usize_arg(positional.get(1).cloned(), 2, "producers");
let consumers = parse_usize_arg(positional.get(2).cloned(), 2, "consumers");
if positional.len() > 3 {
eprintln!("Too many arguments for broadcast mode");
print_usage();
std::process::exit(2);
}
Mode::Broadcast {
producers,
consumers,
}
}
Some(mode) => {
eprintln!("Unknown mode: {mode}");
print_usage();
Expand All @@ -99,7 +118,7 @@ fn parse_usize_arg(value: Option<String>, default: usize, name: &str) -> usize {

fn print_usage() {
eprintln!(
"Usage: cargo run --example enqueue_dequeue -- [-v|--verbose] [spsc|mpmc [producers] [consumers]]"
"Usage: cargo run --example enqueue_dequeue -- [-v|--verbose] [spsc|mpmc [producers] [consumers]|broadcast [producers] [consumers]]"
);
}

Expand Down Expand Up @@ -309,6 +328,88 @@ fn run_mpmc(producers: usize, consumers: usize, verbose: bool) {
cleanup_queue_file(queue_path);
}

fn run_broadcast(producers: usize, consumers: usize, verbose: bool) {
let (consumer_cores, producer_cores) = mpmc_core_ids(consumers, producers);
let exit = setup_exit_handler();
let queue_path = "/tmp/shaq_broadcast";
let queue_file = prepare_queue_file(queue_path);
let total_items_produced = Arc::new(AtomicU64::new(0));
let producer_reserve_failures = Arc::new(AtomicU64::new(0));
let consumer_reserve_failures = Arc::new(AtomicU64::new(0));

// SAFETY: This thread uniquely creates the queue.
unsafe {
let _ = BroadcastProducer::<Item>::create(&queue_file, QUEUE_SIZE).unwrap();
}
// SAFETY: Queue was created above; joining once and sharing handles is safe.
let producer = Arc::new(unsafe { BroadcastProducer::<Item>::join(&queue_file) }.unwrap());

let mut handles = Vec::new();

for (idx, core_id) in consumer_cores.into_iter().enumerate() {
let exit = exit.clone();
let queue_file = queue_file.try_clone().unwrap();
let consumer_reserve_failures = consumer_reserve_failures.clone();
handles.push(
std::thread::Builder::new()
.name(format!("shaqBroadcastConsumer{idx}"))
.spawn(move || {
if let Some(core_id) = core_id {
println!("Consumer {idx} core id: {}", core_id.id);
core_affinity::set_for_current(core_id);
}

// SAFETY: Queue was created above; each broadcast consumer maintains
// its own local cursor and can join independently.
let consumer = unsafe { BroadcastConsumer::<Item>::join(&queue_file) }.unwrap();
run_broadcast_consumer(consumer, exit, consumer_reserve_failures);
})
.unwrap(),
);
}

for (idx, core_id) in producer_cores.into_iter().enumerate() {
let exit = exit.clone();
let producer = producer.clone();
let report_prefix = verbose.then(|| format!("Producer {idx}"));
handles.push(
std::thread::Builder::new()
.name(format!("shaqBroadcastProducer{idx}"))
.spawn({
let total_items_produced = total_items_produced.clone();
let producer_reserve_failures = producer_reserve_failures.clone();
move || {
if let Some(core_id) = core_id {
println!("Producer {idx} core id: {}", core_id.id);
core_affinity::set_for_current(core_id);
}

run_broadcast_producer(
producer,
exit,
report_prefix,
total_items_produced,
producer_reserve_failures,
);
}
})
.unwrap(),
);
}

run_total_throughput_loop::<Item>(
exit.clone(),
total_items_produced,
producer_reserve_failures,
consumer_reserve_failures,
);
for handle in handles {
handle.join().unwrap();
}

cleanup_queue_file(queue_path);
}

fn run_mpmc_producer(
producer: Arc<MpmcProducer<Item>>,
exit: Arc<AtomicBool>,
Expand Down Expand Up @@ -346,6 +447,60 @@ fn run_mpmc_consumer(
});
}

fn run_broadcast_producer(
producer: Arc<BroadcastProducer<Item>>,
exit: Arc<AtomicBool>,
report_prefix: Option<String>,
total_items_produced: Arc<AtomicU64>,
producer_reserve_failures: Arc<AtomicU64>,
) {
run_producer_loop::<Item, _>(exit, report_prefix, total_items_produced, move || {
// SAFETY: we write the batch below.
let Some(mut batch) = (unsafe { producer.reserve_write_batch(SYNC_CADENCE) }) else {
producer_reserve_failures.fetch_add(1, Ordering::Relaxed);
return None;
};
for index in 0..batch.len() {
// SAFETY: reserve_write_batch() yields valid contiguous slots.
unsafe {
batch.as_mut(index).data.fill(42);
}
}
Some(batch.len())
});
}

fn run_broadcast_consumer(
mut consumer: BroadcastConsumer<Item>,
exit: Arc<AtomicBool>,
consumer_reserve_failures: Arc<AtomicU64>,
) {
run_consumer_loop(exit, move || {
// SAFETY: This benchmark intentionally uses the zero-copy broadcast API
// and validates before committing the local cursor advance.
match unsafe { consumer.try_read_direct_batch(SYNC_CADENCE) } {
Ok(Some(batch)) => {
for index in 0..batch.len() {
// SAFETY: The benchmark only touches the item before
// validating and committing the batch.
unsafe {
let _ = batch.as_ref(index).data[0];
}
}
if batch.commit().is_err() {
consumer_reserve_failures.fetch_add(1, Ordering::Relaxed);
consumer.sync_to_oldest();
}
}
Ok(None) => {}
Err(_skipped) => {
consumer_reserve_failures.fetch_add(1, Ordering::Relaxed);
consumer.sync_to_oldest();
}
}
});
}

fn spsc_core_ids() -> (Option<core_affinity::CoreId>, Option<core_affinity::CoreId>) {
core_affinity::get_core_ids()
.map(|cores| {
Expand Down
Loading
Loading