diff --git a/src/arc_ring_buffer.rs b/src/arc_ring_buffer.rs new file mode 100644 index 00000000..ec1836c2 --- /dev/null +++ b/src/arc_ring_buffer.rs @@ -0,0 +1,96 @@ +use alloc::boxed::Box; +use core::sync::atomic::{AtomicU8, Ordering}; +use core::{cell::Cell, ptr::NonNull}; + +use super::IS_ABANDONED; +use super::{Consumer, Producer, RingBuffer}; + +// Non-public helper type. +#[derive(Debug, PartialEq, Eq)] +pub(crate) struct ArcRingBuffer { + ptr: NonNull>, +} + +// SAFETY: If RingBuffer is Send, ArcRingBuffer is as well. +unsafe impl Send for ArcRingBuffer where RingBuffer: Send {} + +impl ArcRingBuffer { + // NB: this takes ownership of the RingBuffer, making sure that only one + // Producer and Consumer are ever created. + #[allow(clippy::new_ret_no_self)] + pub fn new(rb: Box>) -> (Producer, Consumer) { + debug_assert_eq!(rb.flags.load(Ordering::Relaxed) & IS_ABANDONED, 0); + let head = rb.head.load(Ordering::Relaxed); + let tail = rb.tail.load(Ordering::Relaxed); + + // We leak the `Box` here, but in the `Drop` implementation the pointer + // will be turned back into a `Box` and its memory will be properly deallocated. + let ptr = Box::leak(rb); + // SAFETY: Pointer from `Box` is always non-null. + let ptr = unsafe { NonNull::new_unchecked(ptr) }; + + let p = Producer { + buffer: Self { ptr }, + cached_head: Cell::new(head), + cached_tail: Cell::new(tail), + }; + let c = Consumer { + buffer: Self { ptr }, + cached_head: Cell::new(head), + cached_tail: Cell::new(tail), + }; + (p, c) + } +} + +impl Drop for ArcRingBuffer { + fn drop(&mut self) { + // SAFETY: must point to initialized Storage. + let flags: &AtomicU8 = unsafe { &self.ptr.as_ref().flags }; + // The "store" part of `fetch_or()` has to use `Release` to make sure that any previous writes + // to the ring buffer happen before it (in the thread that drops first). + // The "load" part can be `Relaxed` for the first thread, + // but it must be `Acquire` for the second one (see below). + if flags.fetch_or(IS_ABANDONED, Ordering::Release) & IS_ABANDONED == 0 { + // The flag wasn't set before, so we are the first to drop our + // producer/consumer and it should not be dropped yet. + } else { + // The flag was already set, i.e. the other thread has already dropped its + // consumer/producer and it can be dropped now. + + // However, since the load of `flags` was `Relaxed`, + // we have to use `Acquire` here to make sure that reading `head` and `tail` + // in the destructor happens after this point. + + // Ideally, we would use a memory fence like this: + //core::sync::atomic::fence(Ordering::Acquire); + // ... but as long as ThreadSanitizer doesn't support fences, + // we use load(Acquire) as a work-around to avoid false positives: + let _ = flags.load(Ordering::Acquire); + // SAFETY: RingBuffer has been allocated with `Box::new()`. + unsafe { + drop_slow(self.ptr); + } + } + } +} + +/// Non-inlined part of `ArcRingBuffer::drop()`. +#[inline(never)] +unsafe fn drop_slow(ptr: NonNull>) { + // SAFETY: The caller of `ArcRingBuffer::from_ptr()` has to guarantee that the memory + // has been allocated with `Box::new()` or compatible. + unsafe { + // Turn the pointer into a `Box` and immediately drop it, which deallocates the memory. + drop(Box::from_raw(ptr.as_ptr())); + } +} + +impl core::ops::Deref for ArcRingBuffer { + type Target = RingBuffer; + + fn deref(&self) -> &Self::Target { + // SAFETY: There are never any mutable references. + unsafe { self.ptr.as_ref() } + } +} diff --git a/src/lib.rs b/src/lib.rs index 7b1800b2..4f44900a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -67,20 +67,25 @@ extern crate alloc; -use alloc::sync::Arc; +use alloc::boxed::Box; use alloc::vec::Vec; use core::cell::Cell; use core::fmt; use core::marker::PhantomData; use core::mem::{ManuallyDrop, MaybeUninit}; -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; #[allow(dead_code, clippy::undocumented_unsafe_blocks)] mod cache_padded; use cache_padded::CachePadded; +const IS_ABANDONED: u8 = 0b10000000; + +mod arc_ring_buffer; pub mod chunks; +use arc_ring_buffer::ArcRingBuffer; + // This is used in the documentation. #[allow(unused_imports)] use chunks::WriteChunkUninit; @@ -103,6 +108,8 @@ pub struct RingBuffer { /// This integer is in range `0 .. 2 * capacity`. tail: CachePadded, + flags: AtomicU8, + /// The buffer holding slots. data_ptr: *mut T, @@ -136,24 +143,14 @@ impl RingBuffer { #[allow(clippy::new_ret_no_self)] #[must_use] pub fn new(capacity: usize) -> (Producer, Consumer) { - let buffer = Arc::new(RingBuffer { + ArcRingBuffer::new(Box::new(Self { head: CachePadded::new(AtomicUsize::new(0)), tail: CachePadded::new(AtomicUsize::new(0)), + flags: AtomicU8::new(0), data_ptr: ManuallyDrop::new(Vec::with_capacity(capacity)).as_mut_ptr(), capacity, _marker: PhantomData, - }); - let p = Producer { - buffer: buffer.clone(), - cached_head: Cell::new(0), - cached_tail: Cell::new(0), - }; - let c = Consumer { - buffer, - cached_head: Cell::new(0), - cached_tail: Cell::new(0), - }; - (p, c) + })) } /// Returns the capacity of the queue. @@ -294,7 +291,7 @@ impl Eq for RingBuffer {} #[derive(Debug, PartialEq, Eq)] pub struct Producer { /// A reference to the ring buffer. - buffer: Arc>, + buffer: ArcRingBuffer, /// A copy of `buffer.head` for quick access. /// @@ -423,9 +420,10 @@ impl Producer { /// Returns `true` if the corresponding [`Consumer`] has been destroyed. /// - /// Note that since Rust version 1.74.0, this is not synchronizing with the consumer thread - /// anymore, see . - /// In a future version of `rtrb`, the synchronizing behavior might be restored. + /// Note that since Rust version 1.74.0 and before `rtrb` version 0.4, + /// this was not synchronizing with the consumer thread anymore, + /// see [issue #114](https://github.com/mgeier/rtrb/issues/114). + /// In `rtrb` version 0.4, the synchronizing behavior has been restored. /// /// # Examples /// @@ -460,13 +458,11 @@ impl Producer { /// # use rtrb::RingBuffer; /// # let (p, c) = RingBuffer::::new(1); /// if p.is_abandoned() { - /// // This is needed since Rust 1.74.0, see https://github.com/mgeier/rtrb/issues/114: - /// std::sync::atomic::fence(std::sync::atomic::Ordering::Acquire); /// // The consumer does definitely not exist anymore. /// } /// ``` pub fn is_abandoned(&self) -> bool { - Arc::strong_count(&self.buffer) < 2 + self.buffer.flags.load(Ordering::Acquire) & IS_ABANDONED != 0 } /// Returns a read-only reference to the ring buffer. @@ -520,7 +516,7 @@ impl Producer { #[derive(Debug, PartialEq, Eq)] pub struct Consumer { /// A reference to the ring buffer. - buffer: Arc>, + buffer: ArcRingBuffer, /// A copy of `buffer.head` for quick access. /// @@ -702,9 +698,10 @@ impl Consumer { /// Returns `true` if the corresponding [`Producer`] has been destroyed. /// - /// Note that since Rust version 1.74.0, this is not synchronizing with the producer thread - /// anymore, see . - /// In a future version of `rtrb`, the synchronizing behavior might be restored. + /// Note that since Rust version 1.74.0 and before `rtrb` version 0.4, + /// this was not synchronizing with the consumer thread anymore, + /// see [issue #114](https://github.com/mgeier/rtrb/issues/114). + /// In `rtrb` version 0.4, the synchronizing behavior has been restored. /// /// # Examples /// @@ -738,13 +735,11 @@ impl Consumer { /// # use rtrb::RingBuffer; /// # let (p, c) = RingBuffer::::new(1); /// if c.is_abandoned() { - /// // This is needed since Rust 1.74.0, see https://github.com/mgeier/rtrb/issues/114: - /// std::sync::atomic::fence(std::sync::atomic::Ordering::Acquire); /// // The producer does definitely not exist anymore. /// } /// ``` pub fn is_abandoned(&self) -> bool { - Arc::strong_count(&self.buffer) < 2 + self.buffer.flags.load(Ordering::Acquire) & IS_ABANDONED != 0 } /// Returns a read-only reference to the ring buffer. diff --git a/tests/lib.rs b/tests/lib.rs index 5c0cec2f..3d1f9457 100644 --- a/tests/lib.rs +++ b/tests/lib.rs @@ -151,7 +151,6 @@ fn no_race_with_is_abandoned() { }); std::thread::yield_now(); if c.is_abandoned() { - std::sync::atomic::fence(std::sync::atomic::Ordering::Acquire); unsafe { V = 20 }; } t.join().unwrap();