Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions src/arc_ring_buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use alloc::boxed::Box;
use core::sync::atomic::{AtomicU8, Ordering};
use core::{cell::Cell, ptr::NonNull};

use super::IS_ABANDONED;
use super::{Consumer, Producer, RingBuffer};

// Non-public helper type.
#[derive(Debug, PartialEq, Eq)]
pub(crate) struct ArcRingBuffer<T> {
ptr: NonNull<RingBuffer<T>>,
}

// SAFETY: If RingBuffer is Send, ArcRingBuffer is as well.
unsafe impl<T> Send for ArcRingBuffer<T> where RingBuffer<T>: Send {}

impl<T> ArcRingBuffer<T> {
// NB: this takes ownership of the RingBuffer, making sure that only one
// Producer and Consumer are ever created.
#[allow(clippy::new_ret_no_self)]
pub fn new(rb: Box<RingBuffer<T>>) -> (Producer<T>, Consumer<T>) {
debug_assert_eq!(rb.flags.load(Ordering::Relaxed) & IS_ABANDONED, 0);
let head = rb.head.load(Ordering::Relaxed);
let tail = rb.tail.load(Ordering::Relaxed);

// We leak the `Box` here, but in the `Drop` implementation the pointer
// will be turned back into a `Box` and its memory will be properly deallocated.
let ptr = Box::leak(rb);
// SAFETY: Pointer from `Box` is always non-null.
let ptr = unsafe { NonNull::new_unchecked(ptr) };

let p = Producer {
buffer: Self { ptr },
cached_head: Cell::new(head),
cached_tail: Cell::new(tail),
};
let c = Consumer {
buffer: Self { ptr },
cached_head: Cell::new(head),
cached_tail: Cell::new(tail),
};
(p, c)
}
}

impl<T> Drop for ArcRingBuffer<T> {
fn drop(&mut self) {
// SAFETY: must point to initialized Storage.
let flags: &AtomicU8 = unsafe { &self.ptr.as_ref().flags };
// The "store" part of `fetch_or()` has to use `Release` to make sure that any previous writes
// to the ring buffer happen before it (in the thread that drops first).
// The "load" part can be `Relaxed` for the first thread,
// but it must be `Acquire` for the second one (see below).
if flags.fetch_or(IS_ABANDONED, Ordering::Release) & IS_ABANDONED == 0 {
// The flag wasn't set before, so we are the first to drop our
// producer/consumer and it should not be dropped yet.
} else {
// The flag was already set, i.e. the other thread has already dropped its
// consumer/producer and it can be dropped now.

// However, since the load of `flags` was `Relaxed`,
// we have to use `Acquire` here to make sure that reading `head` and `tail`
// in the destructor happens after this point.

// Ideally, we would use a memory fence like this:
//core::sync::atomic::fence(Ordering::Acquire);
// ... but as long as ThreadSanitizer doesn't support fences,
// we use load(Acquire) as a work-around to avoid false positives:
let _ = flags.load(Ordering::Acquire);
// SAFETY: RingBuffer has been allocated with `Box::new()`.
unsafe {
drop_slow(self.ptr);
}
}
}
}

/// Non-inlined part of `ArcRingBuffer::drop()`.
#[inline(never)]
unsafe fn drop_slow<T>(ptr: NonNull<RingBuffer<T>>) {
// SAFETY: The caller of `ArcRingBuffer::from_ptr()` has to guarantee that the memory
// has been allocated with `Box::new()` or compatible.
unsafe {
// Turn the pointer into a `Box` and immediately drop it, which deallocates the memory.
drop(Box::from_raw(ptr.as_ptr()));
}
}

impl<T> core::ops::Deref for ArcRingBuffer<T> {
type Target = RingBuffer<T>;

fn deref(&self) -> &Self::Target {
// SAFETY: There are never any mutable references.
unsafe { self.ptr.as_ref() }
}
}
53 changes: 24 additions & 29 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,25 @@

extern crate alloc;

use alloc::sync::Arc;
use alloc::boxed::Box;
use alloc::vec::Vec;
use core::cell::Cell;
use core::fmt;
use core::marker::PhantomData;
use core::mem::{ManuallyDrop, MaybeUninit};
use core::sync::atomic::{AtomicUsize, Ordering};
use core::sync::atomic::{AtomicU8, AtomicUsize, Ordering};

#[allow(dead_code, clippy::undocumented_unsafe_blocks)]
mod cache_padded;
use cache_padded::CachePadded;

const IS_ABANDONED: u8 = 0b10000000;

mod arc_ring_buffer;
pub mod chunks;

use arc_ring_buffer::ArcRingBuffer;

// This is used in the documentation.
#[allow(unused_imports)]
use chunks::WriteChunkUninit;
Expand All @@ -103,6 +108,8 @@ pub struct RingBuffer<T> {
/// This integer is in range `0 .. 2 * capacity`.
tail: CachePadded<AtomicUsize>,

flags: AtomicU8,

/// The buffer holding slots.
data_ptr: *mut T,

Expand Down Expand Up @@ -136,24 +143,14 @@ impl<T> RingBuffer<T> {
#[allow(clippy::new_ret_no_self)]
#[must_use]
pub fn new(capacity: usize) -> (Producer<T>, Consumer<T>) {
let buffer = Arc::new(RingBuffer {
ArcRingBuffer::new(Box::new(Self {
head: CachePadded::new(AtomicUsize::new(0)),
tail: CachePadded::new(AtomicUsize::new(0)),
flags: AtomicU8::new(0),
data_ptr: ManuallyDrop::new(Vec::with_capacity(capacity)).as_mut_ptr(),
capacity,
_marker: PhantomData,
});
let p = Producer {
buffer: buffer.clone(),
cached_head: Cell::new(0),
cached_tail: Cell::new(0),
};
let c = Consumer {
buffer,
cached_head: Cell::new(0),
cached_tail: Cell::new(0),
};
(p, c)
}))
}

/// Returns the capacity of the queue.
Expand Down Expand Up @@ -294,7 +291,7 @@ impl<T> Eq for RingBuffer<T> {}
#[derive(Debug, PartialEq, Eq)]
pub struct Producer<T> {
/// A reference to the ring buffer.
buffer: Arc<RingBuffer<T>>,
buffer: ArcRingBuffer<T>,

/// A copy of `buffer.head` for quick access.
///
Expand Down Expand Up @@ -423,9 +420,10 @@ impl<T> Producer<T> {

/// Returns `true` if the corresponding [`Consumer`] has been destroyed.
///
/// Note that since Rust version 1.74.0, this is not synchronizing with the consumer thread
/// anymore, see <https://github.com/mgeier/rtrb/issues/114>.
/// In a future version of `rtrb`, the synchronizing behavior might be restored.
/// Note that since Rust version 1.74.0 and before `rtrb` version 0.4,
/// this was not synchronizing with the consumer thread anymore,
/// see [issue #114](https://github.com/mgeier/rtrb/issues/114).
/// In `rtrb` version 0.4, the synchronizing behavior has been restored.
///
/// # Examples
///
Expand Down Expand Up @@ -460,13 +458,11 @@ impl<T> Producer<T> {
/// # use rtrb::RingBuffer;
/// # let (p, c) = RingBuffer::<i32>::new(1);
/// if p.is_abandoned() {
/// // This is needed since Rust 1.74.0, see https://github.com/mgeier/rtrb/issues/114:
/// std::sync::atomic::fence(std::sync::atomic::Ordering::Acquire);
/// // The consumer does definitely not exist anymore.
/// }
/// ```
pub fn is_abandoned(&self) -> bool {
Arc::strong_count(&self.buffer) < 2
self.buffer.flags.load(Ordering::Acquire) & IS_ABANDONED != 0
}

/// Returns a read-only reference to the ring buffer.
Expand Down Expand Up @@ -520,7 +516,7 @@ impl<T> Producer<T> {
#[derive(Debug, PartialEq, Eq)]
pub struct Consumer<T> {
/// A reference to the ring buffer.
buffer: Arc<RingBuffer<T>>,
buffer: ArcRingBuffer<T>,

/// A copy of `buffer.head` for quick access.
///
Expand Down Expand Up @@ -702,9 +698,10 @@ impl<T> Consumer<T> {

/// Returns `true` if the corresponding [`Producer`] has been destroyed.
///
/// Note that since Rust version 1.74.0, this is not synchronizing with the producer thread
/// anymore, see <https://github.com/mgeier/rtrb/issues/114>.
/// In a future version of `rtrb`, the synchronizing behavior might be restored.
/// Note that since Rust version 1.74.0 and before `rtrb` version 0.4,
/// this was not synchronizing with the consumer thread anymore,
/// see [issue #114](https://github.com/mgeier/rtrb/issues/114).
/// In `rtrb` version 0.4, the synchronizing behavior has been restored.
///
/// # Examples
///
Expand Down Expand Up @@ -738,13 +735,11 @@ impl<T> Consumer<T> {
/// # use rtrb::RingBuffer;
/// # let (p, c) = RingBuffer::<i32>::new(1);
/// if c.is_abandoned() {
/// // This is needed since Rust 1.74.0, see https://github.com/mgeier/rtrb/issues/114:
/// std::sync::atomic::fence(std::sync::atomic::Ordering::Acquire);
/// // The producer does definitely not exist anymore.
/// }
/// ```
pub fn is_abandoned(&self) -> bool {
Arc::strong_count(&self.buffer) < 2
self.buffer.flags.load(Ordering::Acquire) & IS_ABANDONED != 0
}

/// Returns a read-only reference to the ring buffer.
Expand Down
1 change: 0 additions & 1 deletion tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ fn no_race_with_is_abandoned() {
});
std::thread::yield_now();
if c.is_abandoned() {
std::sync::atomic::fence(std::sync::atomic::Ordering::Acquire);
unsafe { V = 20 };
}
t.join().unwrap();
Expand Down