Skip to content

Commit

Permalink
fix: port fix discard all message on receiver droped
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Jun 26, 2024
1 parent 407375c commit bbff85d
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 45 deletions.
4 changes: 2 additions & 2 deletions crossbeam-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ impl<T> Drop for Sender<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
SenderFlavor::Array(chan) => chan.release(|c| c.disconnect()),
SenderFlavor::Array(chan) => chan.release(|c| c.disconnect_senders()),
SenderFlavor::List(chan) => chan.release(|c| c.disconnect_senders()),
SenderFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
}
Expand Down Expand Up @@ -1159,7 +1159,7 @@ impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
unsafe {
match &self.flavor {
ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect_receivers()),
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
ReceiverFlavor::At(_) => {}
Expand Down
128 changes: 85 additions & 43 deletions crossbeam-channel/src/flavors/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use std::boxed::Box;
use std::cell::UnsafeCell;
use std::mem::{self, MaybeUninit};
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{self, AtomicUsize, Ordering};
use std::time::Instant;
Expand Down Expand Up @@ -476,21 +476,102 @@ impl<T> Channel<T> {
Some(self.cap())
}

/// Disconnects the channel and wakes up all blocked senders and receivers.
/// Disconnects senders and wakes up all blocked senders and receivers.
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect(&self) -> bool {
pub(crate) fn disconnect_senders(&self) -> bool {
let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);

if tail & self.mark_bit == 0 {
self.senders.disconnect();
self.receivers.disconnect();
true
} else {
false
}
}

/// Disconnects receivers and wakes up all blocked senders.
///
/// Returns `true` if this call disconnected the channel.
///
/// # Safety
/// May only be called once upon dropping the last receiver. The
/// destruction of all other receivers must have been observed with acquire
/// ordering or stronger.
pub(crate) unsafe fn disconnect_receivers(&self) -> bool {
let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
let disconnected = if tail & self.mark_bit == 0 {
self.senders.disconnect();
true
} else {
false
};

unsafe {
self.discard_all_messages(tail);
}

disconnected
}

/// Discards all messages.
///
/// `tail` should be the current (and therefore last) value of `tail`.
///
/// # Panicking
/// If a destructor panics, the remaining messages are leaked, matching the
/// behaviour of the unbounded channel.
///
/// # Safety
/// This method must only be called when dropping the last receiver. The
/// destruction of all other receivers must have been observed with acquire
/// ordering or stronger.
unsafe fn discard_all_messages(&self, tail: usize) {
debug_assert!(self.is_disconnected());

// Only receivers modify `head`, so since we are the last one,
// this value will not change and will not be observed (since
// no new messages can be sent after disconnection).
let mut head = self.head.load(Ordering::Relaxed);
let tail = tail & !self.mark_bit;

let backoff = Backoff::new();
loop {
// Deconstruct the head.
let index = head & (self.mark_bit - 1);
let lap = head & !(self.one_lap - 1);

// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);

// If the stamp is ahead of the head by 1, we may drop the message.
if head + 1 == stamp {
head = if index + 1 < self.cap() {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
head + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};

unsafe {
(*slot.msg.get()).assume_init_drop();
}
// If the tail equals the head, that means the channel is empty.
} else if tail == head {
return;
// Otherwise, a sender is about to write into the slot, so we need
// to wait for it to update the stamp.
} else {
backoff.spin();
}
}
}

/// Returns `true` if the channel is disconnected.
pub(crate) fn is_disconnected(&self) -> bool {
self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
Expand Down Expand Up @@ -521,45 +602,6 @@ impl<T> Channel<T> {
}
}

impl<T> Drop for Channel<T> {
fn drop(&mut self) {
if mem::needs_drop::<T>() {
// Get the index of the head.
let head = *self.head.get_mut();
let tail = *self.tail.get_mut();

let hix = head & (self.mark_bit - 1);
let tix = tail & (self.mark_bit - 1);

let len = if hix < tix {
tix - hix
} else if hix > tix {
self.cap() - hix + tix
} else if (tail & !self.mark_bit) == head {
0
} else {
self.cap()
};

// Loop over all slots that hold a message and drop them.
for i in 0..len {
// Compute the index of the next slot holding a message.
let index = if hix + i < self.cap() {
hix + i
} else {
hix + i - self.cap()
};

unsafe {
debug_assert!(index < self.buffer.len());
let slot = self.buffer.get_unchecked_mut(index);
(*slot.msg.get()).assume_init_drop();
}
}
}
}
}

/// Receiver handle to a channel.
pub(crate) struct Receiver<'a, T>(&'a Channel<T>);

Expand Down
12 changes: 12 additions & 0 deletions crossbeam-channel/tests/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,3 +742,15 @@ fn panic_on_drop() {
// Elements after the panicked element will leak.
assert!(!b);
}

#[test]
fn drop_unreceived() {
let (tx, rx) = bounded::<std::rc::Rc<()>>(1);
let msg = std::rc::Rc::new(());
let weak = std::rc::Rc::downgrade(&msg);
assert!(tx.send(msg).is_ok());
drop(rx);
// Messages should be dropped immediately when the last receiver is destroyed.
assert!(weak.upgrade().is_none());
drop(tx);
}

0 comments on commit bbff85d

Please sign in to comment.