Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ci/san.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,5 @@ RUSTFLAGS="${RUSTFLAGS:-} -Z sanitizer=memory" \
# TODO: Use x86_64-unknown-linux-gnutsan once https://github.com/rust-lang/rust/pull/152757 merged
# Run thread sanitizer
cargo clean
TSAN_OPTIONS="${TSAN_OPTIONS:-} suppressions=$(pwd)/ci/tsan" \
RUSTFLAGS="${RUSTFLAGS:-} -Z sanitizer=thread" \
cargo test -Z build-std --all --all-features --release --target x86_64-unknown-linux-gnu --tests --exclude benchmarks -- --test-threads=1
10 changes: 0 additions & 10 deletions ci/tsan

This file was deleted.

1 change: 1 addition & 0 deletions crossbeam-deque/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ std = ["crossbeam-epoch/std", "crossbeam-utils/std"]
[dependencies]
crossbeam-epoch = { version = "0.9.17", path = "../crossbeam-epoch", default-features = false }
crossbeam-utils = { version = "0.8.18", path = "../crossbeam-utils", default-features = false }
atomic-maybe-uninit = { version = "0.3.15", git = "https://github.com/taiki-e/atomic-maybe-uninit", branch = "memcpy" }

[dev-dependencies]
fastrand = "2"
Expand Down
9 changes: 6 additions & 3 deletions crossbeam-deque/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ use std::env;

fn main() {
println!("cargo:rerun-if-changed=build.rs");
println!("cargo:rustc-check-cfg=cfg(crossbeam_sanitize_thread)");
println!("cargo:rustc-check-cfg=cfg(crossbeam_sanitize_thread,crossbeam_sanitize_any)");

// `cfg(sanitize = "..")` is not stabilized.
let sanitize = env::var("CARGO_CFG_SANITIZE").unwrap_or_default();
if sanitize.contains("thread") {
println!("cargo:rustc-cfg=crossbeam_sanitize_thread");
if !sanitize.is_empty() {
println!("cargo:rustc-cfg=crossbeam_sanitize_any");
if sanitize.contains("thread") {
println!("cargo:rustc-cfg=crossbeam_sanitize_thread");
}
}
}
1 change: 1 addition & 0 deletions crossbeam-deque/src/atomic_memcpy.rs
19 changes: 5 additions & 14 deletions crossbeam-deque/src/deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use core::{
sync::atomic::{self, AtomicIsize, AtomicPtr, AtomicUsize, Ordering},
};

use atomic_maybe_uninit::PerByteAtomicMaybeUninit;
use crossbeam_epoch::{self as epoch, Atomic, Owned};
use crossbeam_utils::{Backoff, CachePadded};

use crate::alloc_helper::Global;
use crate::{alloc_helper::Global, atomic_memcpy};

// Minimum buffer capacity.
const MIN_CAP: usize = 64;
Expand Down Expand Up @@ -43,7 +44,7 @@ impl<T> Buffer<T> {

let ptr = Box::into_raw(
(0..cap)
.map(|_| MaybeUninit::<T>::uninit())
.map(|_| PerByteAtomicMaybeUninit::new(MaybeUninit::<T>::uninit()))
.collect::<Box<[_]>>(),
)
.cast::<T>();
Expand All @@ -70,23 +71,13 @@ impl<T> Buffer<T> {
}

/// Writes `task` into the specified `index`.
///
/// This method might be concurrently called with another `read` at the same index, which is
/// technically speaking a data race and therefore UB. We should use an atomic store here, but
/// that would be more expensive and difficult to implement generically for all types `T`.
/// Hence, as a hack, we use a volatile write instead.
unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
unsafe { ptr::write_volatile(self.at(index).cast::<MaybeUninit<T>>(), task) }
unsafe { atomic_memcpy::store(self.at(index), task) }
}

/// Reads a task from the specified `index`.
///
/// This method might be concurrently called with another `write` at the same index, which is
/// technically speaking a data race and therefore UB. We should use an atomic load here, but
/// that would be more expensive and difficult to implement generically for all types `T`.
/// Hence, as a hack, we use a volatile load instead.
unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
unsafe { ptr::read_volatile(self.at(index).cast::<MaybeUninit<T>>()) }
unsafe { atomic_memcpy::load(self.at(index)) }
}
}

Expand Down
2 changes: 2 additions & 0 deletions crossbeam-deque/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ extern crate std;

#[cfg(feature = "std")]
mod alloc_helper;
#[cfg(feature = "std")]
mod atomic_memcpy;

#[cfg(feature = "std")]
mod deque;
Expand Down
2 changes: 1 addition & 1 deletion crossbeam-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ std = []
atomic = ["atomic-maybe-uninit"]

[dependencies]
atomic-maybe-uninit = { version = "0.3.4", optional = true }
atomic-maybe-uninit = { version = "0.3.15", optional = true, git = "https://github.com/taiki-e/atomic-maybe-uninit", branch = "memcpy" }

# Enable the use of loom for concurrency testing.
#
Expand Down
8 changes: 5 additions & 3 deletions crossbeam-utils/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ include!("build-common.rs");
fn main() {
println!("cargo:rerun-if-changed=no_atomic.rs");
println!(
"cargo:rustc-check-cfg=cfg(crossbeam_no_atomic,crossbeam_sanitize_thread,crossbeam_atomic_cell_force_fallback)"
"cargo:rustc-check-cfg=cfg(crossbeam_no_atomic,crossbeam_sanitize_thread,crossbeam_sanitize_any,crossbeam_atomic_cell_force_fallback)"
);

let target = match env::var("TARGET") {
Expand All @@ -41,10 +41,12 @@ fn main() {
}

// `cfg(sanitize = "..")` is not stabilized.
if let Ok(sanitize) = env::var("CARGO_CFG_SANITIZE") {
let sanitize = env::var("CARGO_CFG_SANITIZE").unwrap_or_default();
if !sanitize.is_empty() {
println!("cargo:rustc-cfg=crossbeam_sanitize_any");
println!("cargo:rustc-cfg=crossbeam_atomic_cell_force_fallback");
if sanitize.contains("thread") {
println!("cargo:rustc-cfg=crossbeam_sanitize_thread");
}
println!("cargo:rustc-cfg=crossbeam_atomic_cell_force_fallback");
}
}
117 changes: 58 additions & 59 deletions crossbeam-utils/src/atomic/atomic_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use core::{
ptr,
};

use super::seq_lock::SeqLock;
use super::{
atomic_memcpy::{self, store as write},
seq_lock::SeqLock,
};
use crate::{
CachePadded,
primitive::sync::atomic::{self, Ordering},
Expand Down Expand Up @@ -407,10 +410,10 @@ macro_rules! impl_arithmetic {
}
},
{
let _guard = lock(self.as_ptr() as usize).write();
let value = unsafe { &mut *(self.as_ptr()) };
let old = *value;
*value = value.wrapping_add(val);
let dst = self.as_ptr();
let _guard = lock(dst as usize).write();
let old = unsafe { ptr::read(dst) };
unsafe { write(dst, MaybeUninit::new(old.wrapping_add(val))) }
old
}
}
Expand Down Expand Up @@ -446,10 +449,10 @@ macro_rules! impl_arithmetic {
}
},
{
let _guard = lock(self.as_ptr() as usize).write();
let value = unsafe { &mut *(self.as_ptr()) };
let old = *value;
*value = value.wrapping_sub(val);
let dst = self.as_ptr();
let _guard = lock(dst as usize).write();
let old = unsafe { ptr::read(dst) };
unsafe { write(dst, MaybeUninit::new(old.wrapping_sub(val))) }
old
}
}
Expand Down Expand Up @@ -483,10 +486,10 @@ macro_rules! impl_arithmetic {
}
},
{
let _guard = lock(self.as_ptr() as usize).write();
let value = unsafe { &mut *(self.as_ptr()) };
let old = *value;
*value &= val;
let dst = self.as_ptr();
let _guard = lock(dst as usize).write();
let old = unsafe { ptr::read(dst) };
unsafe { write(dst, MaybeUninit::new(old & val)) }
old
}
}
Expand Down Expand Up @@ -520,10 +523,10 @@ macro_rules! impl_arithmetic {
}
},
{
let _guard = lock(self.as_ptr() as usize).write();
let value = unsafe { &mut *(self.as_ptr()) };
let old = *value;
*value = !(old & val);
let dst = self.as_ptr();
let _guard = lock(dst as usize).write();
let old = unsafe { ptr::read(dst) };
unsafe { write(dst, MaybeUninit::new(!(old & val))) }
old
}
}
Expand Down Expand Up @@ -557,10 +560,10 @@ macro_rules! impl_arithmetic {
}
},
{
let _guard = lock(self.as_ptr() as usize).write();
let value = unsafe { &mut *(self.as_ptr()) };
let old = *value;
*value |= val;
let dst = self.as_ptr();
let _guard = lock(dst as usize).write();
let old = unsafe { ptr::read(dst) };
unsafe { write(dst, MaybeUninit::new(old | val)) }
old
}
}
Expand Down Expand Up @@ -594,10 +597,10 @@ macro_rules! impl_arithmetic {
}
},
{
let _guard = lock(self.as_ptr() as usize).write();
let value = unsafe { &mut *(self.as_ptr()) };
let old = *value;
*value ^= val;
let dst = self.as_ptr();
let _guard = lock(dst as usize).write();
let old = unsafe { ptr::read(dst) };
unsafe { write(dst, MaybeUninit::new(old ^ val)) }
old
}
}
Expand Down Expand Up @@ -632,10 +635,10 @@ macro_rules! impl_arithmetic {
}
},
{
let _guard = lock(self.as_ptr() as usize).write();
let value = unsafe { &mut *(self.as_ptr()) };
let old = *value;
*value = cmp::max(old, val);
let dst = self.as_ptr();
let _guard = lock(dst as usize).write();
let old = unsafe { ptr::read(dst) };
unsafe { write(dst, MaybeUninit::new(cmp::max(old, val))) }
old
}
}
Expand Down Expand Up @@ -670,10 +673,10 @@ macro_rules! impl_arithmetic {
}
},
{
let _guard = lock(self.as_ptr() as usize).write();
let value = unsafe { &mut *(self.as_ptr()) };
let old = *value;
*value = cmp::min(old, val);
let dst = self.as_ptr();
let _guard = lock(dst as usize).write();
let old = unsafe { ptr::read(dst) };
unsafe { write(dst, MaybeUninit::new(cmp::min(old, val))) }
old
}
}
Expand Down Expand Up @@ -792,10 +795,10 @@ impl AtomicCell<bool> {
}
},
{
let _guard = lock(self.as_ptr() as usize).write();
let value = unsafe { &mut *(self.as_ptr()) };
let old = *value;
*value &= val;
let dst = self.as_ptr();
let _guard = lock(dst as usize).write();
let old = unsafe { ptr::read(dst) };
unsafe { write(dst, MaybeUninit::new(old & val)) }
old
}
}
Expand Down Expand Up @@ -835,10 +838,10 @@ impl AtomicCell<bool> {
}
},
{
let _guard = lock(self.as_ptr() as usize).write();
let value = unsafe { &mut *(self.as_ptr()) };
let old = *value;
*value = !(old & val);
let dst = self.as_ptr();
let _guard = lock(dst as usize).write();
let old = unsafe { ptr::read(dst) };
unsafe { write(dst, MaybeUninit::new(!(old & val))) }
old
}
}
Expand Down Expand Up @@ -875,10 +878,10 @@ impl AtomicCell<bool> {
}
},
{
let _guard = lock(self.as_ptr() as usize).write();
let value = unsafe { &mut *(self.as_ptr()) };
let old = *value;
*value |= val;
let dst = self.as_ptr();
let _guard = lock(dst as usize).write();
let old = unsafe { ptr::read(dst) };
unsafe { write(dst, MaybeUninit::new(old | val)) }
old
}
}
Expand Down Expand Up @@ -915,10 +918,10 @@ impl AtomicCell<bool> {
}
},
{
let _guard = lock(self.as_ptr() as usize).write();
let value = unsafe { &mut *(self.as_ptr()) };
let old = *value;
*value ^= val;
let dst = self.as_ptr();
let _guard = lock(dst as usize).write();
let old = unsafe { ptr::read(dst) };
unsafe { write(dst, MaybeUninit::new(old ^ val)) }
old
}
}
Expand Down Expand Up @@ -1045,13 +1048,7 @@ where

// Try doing an optimistic read first.
if let Some(stamp) = lock.optimistic_read() {
// We need a volatile read here because other threads might concurrently modify the
// value. In theory, data races are *always* UB, even if we use volatile reads and
// discard the data when a data race is detected. The proper solution would be to
// do atomic reads and atomic writes, but we can't atomically read and write all
// kinds of data since `AtomicU8` is not available on stable Rust yet.
// Load as `MaybeUninit` because we may load a value that is not valid as `T`.
let val = unsafe { ptr::read_volatile(src.cast::<MaybeUninit<T>>()) };
let val = unsafe { atomic_memcpy::load(src) };

if lock.validate_read(stamp) {
return unsafe { val.assume_init() };
Expand Down Expand Up @@ -1082,7 +1079,7 @@ unsafe fn atomic_store<T>(dst: *mut T, val: T) {
},
{
let _guard = lock(dst as usize).write();
unsafe { ptr::write(dst, val) }
unsafe { write(dst, MaybeUninit::new(val)) }
}
}
}
Expand All @@ -1102,7 +1099,9 @@ unsafe fn atomic_swap<T>(dst: *mut T, val: T) -> T {
},
{
let _guard = lock(dst as usize).write();
unsafe { ptr::replace(dst, val) }
let old = unsafe { ptr::read(dst.cast::<MaybeUninit<T>>()) };
unsafe { write(dst, MaybeUninit::new(val)) }
unsafe { old.assume_init() }
}
}
}
Expand Down Expand Up @@ -1156,7 +1155,7 @@ where

let old = unsafe { ptr::read(dst) };
if T::eq(&old, &current) {
unsafe { ptr::write(dst, new) }
unsafe { write(dst, MaybeUninit::new(new)) }
Ok(old)
} else {
// The value hasn't been changed. Drop the guard without incrementing the stamp.
Expand Down
Loading