Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 58 additions & 26 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ struct Tail {
/// Number of active receivers.
rx_cnt: usize,

/// True if the channel is closed.
closed: bool,
/// True if there are any strong senders.
has_senders: bool,

/// Receivers waiting for a value.
waiters: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
Expand Down Expand Up @@ -566,7 +566,7 @@ impl<T> Sender<T> {
tail: Mutex::new(Tail {
pos: 0,
rx_cnt: receiver_count,
closed: receiver_count == 0,
has_senders: true,
waiters: LinkedList::new(),
}),
num_tx: AtomicUsize::new(1),
Expand Down Expand Up @@ -887,24 +887,12 @@ impl<T> Sender<T> {
/// # }
/// ```
pub async fn closed(&self) {
loop {
let notified = self.shared.notify_last_rx_drop.notified();

{
// Ensure the lock drops if the channel isn't closed
let tail = self.shared.tail.lock();
if tail.closed {
return;
}
}

notified.await;
}
self.shared.closed_for_senders().await;
Comment on lines 889 to +890
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was initially going to say this changes behavior of the existing function, but I guess it doesn't because if you can call this method then there is a sender, so the closed for receivers condition does not matter here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the closed boolean was a bit overloaded - if there were no receivers, senders could observe that there are no receivers; if there were no senders, receivers could observe that there are no senders.

I'll just add that has_receivers is now the same as rx_cnt != 0 and has_senders is the same as num_tx.load(Ordering::Relaxed) != 0. The first one can be safely deleted because they are both in a locked Tail. I'm not entirely sure about the other one since num_tx is in Shared and not in Tail so it's not behind the same lock as the boolean currently is. I'm just mentioning this if you'd want me to delete has_receivers so that it cannot diverge and if I should look closer into deleting has_senders if it's safe.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can delete the fields entirely, then that's great. But don't do it if it's too complex.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I've deleted just has_receivers. I'm pretty sure that's safe. I'm not so sure about the other one so I'll leave it alone.

}

fn close_channel(&self) {
let mut tail = self.shared.tail.lock();
tail.closed = true;
tail.has_senders = false;

self.shared.notify_rx(tail);
}
Expand All @@ -926,13 +914,6 @@ fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {

assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers");

if tail.rx_cnt == 0 {
// Potentially need to re-open the channel, if a new receiver has been added between calls
// to poll(). Note that we use rx_cnt == 0 instead of is_closed since is_closed also
// applies if the sender has been dropped
tail.closed = false;
}

tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");
let next = tail.pos;

Expand Down Expand Up @@ -1053,6 +1034,27 @@ impl<T> Shared<T> {

wakers.wake_all();
}

async fn closed_for_senders(&self) {
cooperative(async {
crate::trace::async_trace_leaf().await;

loop {
let notified = self.notify_last_rx_drop.notified();

{
// Ensure the lock drops if the channel isn't closed
let tail = self.tail.lock();
if tail.rx_cnt == 0 {
return;
}
}

notified.await;
}
})
.await;
}
}

impl<T> Clone for Sender<T> {
Expand Down Expand Up @@ -1102,6 +1104,37 @@ impl<T> WeakSender<T> {
}
}

/// A future which completes when the number of [Receiver]s subscribed to this channel reaches
/// zero, regardless of whether strong senders still exist.
///
/// # Examples
///
/// ```
/// use futures::FutureExt;
/// use tokio::sync::broadcast;
///
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() {
/// let (tx, mut rx1) = broadcast::channel::<u32>(16);
/// let mut rx2 = tx.subscribe();
///
/// let _ = tx.send(10);
/// let weak = tx.downgrade();
Comment thread
martin-g marked this conversation as resolved.
/// drop(tx);
///
/// assert_eq!(rx1.recv().await.unwrap(), 10);
/// drop(rx1);
/// assert!(weak.closed().now_or_never().is_none());
///
/// assert_eq!(rx2.recv().await.unwrap(), 10);
/// drop(rx2);
/// assert!(weak.closed().now_or_never().is_some());
/// # }
/// ```
pub async fn closed(&self) {
self.shared.closed_for_senders().await;
}

/// Returns the number of [`Sender`] handles.
pub fn strong_count(&self) -> usize {
self.shared.num_tx.load(Acquire)
Expand Down Expand Up @@ -1256,7 +1289,7 @@ impl<T> Receiver<T> {
// At this point the channel is empty for *this* receiver. If
// it's been closed, then that's what we return, otherwise we
// set a waker and return empty.
if tail.closed {
if !tail.has_senders {
return Err(TryRecvError::Closed);
}

Expand Down Expand Up @@ -1555,7 +1588,6 @@ impl<T> Drop for Receiver<T> {

if remaining_rx == 0 {
self.shared.notify_last_rx_drop.notify_waiters();
tail.closed = true;
}

drop(tail);
Expand Down
34 changes: 34 additions & 0 deletions tokio/tests/sync_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,38 +673,72 @@ fn broadcast_sender_closed() {
assert_ready!(task.poll());
}

#[test]
fn broadcast_weak_sender_closed() {
let (tx, rx) = broadcast::channel::<()>(1);
let rx2 = tx.subscribe();
let weak = tx.downgrade();
drop(tx);

let mut task = task::spawn(weak.closed());
assert_pending!(task.poll());

drop(rx);
assert!(!task.is_woken());
assert_pending!(task.poll());

drop(rx2);
assert!(task.is_woken());
assert_ready!(task.poll());
}

#[test]
fn broadcast_sender_closed_with_extra_subscribe() {
let (tx, rx) = broadcast::channel::<()>(1);
let rx2 = tx.subscribe();
let weak = tx.downgrade();

let mut task = task::spawn(tx.closed());
let mut weak_task = task::spawn(weak.closed());
assert_pending!(task.poll());
assert_pending!(weak_task.poll());

drop(rx);
assert!(!task.is_woken());
assert!(!weak_task.is_woken());
assert_pending!(task.poll());
assert_pending!(weak_task.poll());

drop(rx2);
assert!(task.is_woken());
assert!(weak_task.is_woken());

let rx3 = tx.subscribe();
assert_pending!(task.poll());
assert_pending!(weak_task.poll());

drop(rx3);
assert!(task.is_woken());
assert!(weak_task.is_woken());
assert_ready!(task.poll());
assert_ready!(weak_task.poll());

let mut task2 = task::spawn(tx.closed());
assert_ready!(task2.poll());
let mut weak_task2 = task::spawn(weak.closed());
assert_ready!(weak_task2.poll());

let rx4 = tx.subscribe();
let mut task3 = task::spawn(tx.closed());
assert_pending!(task3.poll());
let mut weak_task3 = task::spawn(weak.closed());
assert_pending!(weak_task3.poll());

drop(rx4);
assert!(task3.is_woken());
assert_ready!(task3.poll());
assert!(weak_task3.is_woken());
assert_ready!(weak_task3.poll());
}

#[tokio::test]
Expand Down