Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add is_closed, is_empty and len to mpsc::Receiver and mpsc::UnboundedReceiver #6348

Merged
merged 15 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions tokio/src/sync/mpsc/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ impl<T> Block<T> {
Some(Read::Value(value.assume_init()))
}

/// Returns true if there is a value in the slot to be consumed
///
/// # Safety
///
/// To maintain safety, the caller must ensure:
///
/// * No concurrent access to the slot.
pub(crate) fn has_value(&self, slot_index: usize) -> bool {
let offset = offset(slot_index);
let ready_bits = self.header.ready_slots.load(Acquire);
is_ready(ready_bits, offset)
}

/// Writes a value to the block at the given offset.
///
/// # Safety
Expand Down Expand Up @@ -195,6 +208,11 @@ impl<T> Block<T> {
self.header.ready_slots.fetch_or(TX_CLOSED, Release);
}

pub(crate) unsafe fn is_closed(&self) -> bool {
let ready_bits = self.header.ready_slots.load(Acquire);
is_tx_closed(ready_bits)
}

/// Resets the block to a blank state. This enables reusing blocks in the
/// channel.
///
Expand Down
67 changes: 67 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,73 @@ impl<T> Receiver<T> {
self.chan.close();
}

/// Checks if a channel is closed.
///
/// This method returns `true` if the channel has been closed. The channel is closed
/// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called.
///
/// [`Sender`]: crate::sync::mpsc::Sender
/// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
///
/// # Examples
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (_tx, mut rx) = mpsc::channel::<()>(10);
/// assert!(!rx.is_closed());
///
/// rx.close();
///
/// assert!(rx.is_closed());
/// }
/// ```
pub fn is_closed(&self) -> bool {
self.chan.is_closed()
}

/// Checks if a channel is empty.
///
/// This method returns `true` if the channel has no messages.
///
/// # Examples
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::channel(10);
/// assert!(rx.is_empty());
///
/// tx.send(0).await.unwrap();
/// assert!(!rx.is_empty());
/// }
///
/// ```
pub fn is_empty(&self) -> bool {
self.chan.is_empty()
}

/// Returns the number of messages in the channel.
///
/// # Examples
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::channel(10);
/// assert_eq!(0, rx.len());
///
/// tx.send(0).await.unwrap();
/// assert_eq!(1, rx.len());
/// }
/// ```
pub fn len(&self) -> usize {
self.chan.len()
}

/// Polls to receive the next message on this channel.
///
/// This method returns:
Expand Down
27 changes: 27 additions & 0 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,33 @@ impl<T, S: Semaphore> Rx<T, S> {
self.inner.notify_rx_closed.notify_waiters();
}

pub(crate) fn is_closed(&self) -> bool {
// There two internal states that can represent a closed channel
//
// 1. When `close` is called.
// In this case, the inner semaphore will be closed.
//
// 2. When all senders are dropped.
// In this case, the semaphore remains unclosed, and the `index` in the list won't
// reach the tail position. It is necessary to check the list if the last block is
// `closed`.
self.inner.semaphore.is_closed() || self.inner.tx_count.load(Acquire) == 0
}

pub(crate) fn is_empty(&self) -> bool {
self.inner.rx_fields.with(|rx_fields_ptr| {
let rx_fields = unsafe { &*rx_fields_ptr };
rx_fields.list.is_empty(&self.inner.tx)
})
}

pub(crate) fn len(&self) -> usize {
self.inner.rx_fields.with(|rx_fields_ptr| {
let rx_fields = unsafe { &*rx_fields_ptr };
rx_fields.list.len(&self.inner.tx)
})
}

/// Receive the next value
pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
use super::block::Read;
Expand Down
27 changes: 27 additions & 0 deletions tokio/src/sync/mpsc/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,15 @@ impl<T> Tx<T> {
let _ = Box::from_raw(block.as_ptr());
}
}

pub(crate) fn is_closed(&self) -> bool {
let tail = self.block_tail.load(Acquire);

unsafe {
let tail_block = &*tail;
tail_block.is_closed()
}
}
}

impl<T> fmt::Debug for Tx<T> {
Expand All @@ -230,6 +239,24 @@ impl<T> fmt::Debug for Tx<T> {
}

impl<T> Rx<T> {
pub(crate) fn is_empty(&self, tx: &Tx<T>) -> bool {
let block = unsafe { self.head.as_ref() };
if block.has_value(self.index) {
return false;
}

// It is possible that a block has no value "now" but the list is still not empty.
// To be sure, it is necessary to check the length of the list.
self.len(tx) == 0
}

pub(crate) fn len(&self, tx: &Tx<T>) -> usize {
// When all the senders are dropped, there will be a last block in the tail position,
// but it will be closed
let tail_position = tx.tail_position.load(Acquire);
tail_position - self.index - (tx.is_closed() as usize)
}
Comment on lines +253 to +258
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a test for whether the length is correct in all of the following cases?

  • There are still senders, but Receiver::close has been called.
  • There are no more senders, but Receiver::close has not been called.
  • There are no more senders, and Receiver::close has been called before the last sender dropped.
  • There are no more senders, and Receiver::close has been called after the last sender dropped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the scenarios were missing, but I have added them now.


/// Pops the next value off the queue.
pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
// Advance `head`, if needed
Expand Down
67 changes: 67 additions & 0 deletions tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,73 @@ impl<T> UnboundedReceiver<T> {
self.chan.close();
}

/// Checks if a channel is closed.
///
/// This method returns `true` if the channel has been closed. The channel is closed
/// when all [`UnboundedSender`] have been dropped, or when [`UnboundedReceiver::close`] is called.
///
/// [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender
/// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close
///
/// # Examples
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (_tx, mut rx) = mpsc::unbounded_channel::<()>();
/// assert!(!rx.is_closed());
///
/// rx.close();
///
/// assert!(rx.is_closed());
/// }
/// ```
pub fn is_closed(&self) -> bool {
self.chan.is_closed()
}

/// Checks if a channel is empty.
///
/// This method returns `true` if the channel has no messages.
///
/// # Examples
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::unbounded_channel();
/// assert!(rx.is_empty());
///
/// tx.send(0).unwrap();
/// assert!(!rx.is_empty());
/// }
///
/// ```
pub fn is_empty(&self) -> bool {
self.chan.is_empty()
}

/// Returns the number of messages in the channel.
///
/// # Examples
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::unbounded_channel();
/// assert_eq!(0, rx.len());
///
/// tx.send(0).unwrap();
/// assert_eq!(1, rx.len());
/// }
/// ```
pub fn len(&self) -> usize {
self.chan.len()
}

/// Polls to receive the next message on this channel.
///
/// This method returns:
Expand Down
34 changes: 34 additions & 0 deletions tokio/src/sync/tests/loom_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,37 @@ fn try_recv() {
}
});
}

#[test]
fn len_nonzero_after_send() {
loom::model(|| {
let (send, recv) = mpsc::channel(10);
let send2 = send.clone();

let join = thread::spawn(move || {
block_on(send2.send("message2")).unwrap();
});

block_on(send.send("message1")).unwrap();
assert!(recv.len() != 0);

join.join().unwrap();
});
}

#[test]
fn nonempty_after_send() {
loom::model(|| {
let (send, recv) = mpsc::channel(10);
let send2 = send.clone();

let join = thread::spawn(move || {
block_on(send2.send("message2")).unwrap();
});

block_on(send.send("message1")).unwrap();
assert!(!recv.is_empty());

join.join().unwrap();
});
}