Skip to content

Commit 0f05c78

Browse files
committed
Add receiver.disconnect() for atomic graceful shutdown
1 parent 99ec614 commit 0f05c78

File tree

4 files changed

+101
-13
lines changed

4 files changed

+101
-13
lines changed

crossbeam-channel/src/channel.rs

Lines changed: 86 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,19 +1149,47 @@ impl<T> Receiver<T> {
11491149
_ => false,
11501150
}
11511151
}
1152+
1153+
/// Disconnects this from the channel.
1154+
///
1155+
/// If this is the last one connected to the channel, sender start to fail and
1156+
/// the returned iterator can be used to drain any remaining sent messages. Otherwise, it
1157+
/// always returns [`None`].
1158+
pub fn disconnect(self) -> DisconnectIter<T> {
1159+
let was_last = unsafe {
1160+
match &self.flavor {
1161+
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1162+
_ => {
1163+
panic!();
1164+
}
1165+
}
1166+
};
1167+
if let Some(true) = was_last {
1168+
DisconnectIter {
1169+
last_receiver: Some(self),
1170+
}
1171+
} else {
1172+
std::mem::forget(self);
1173+
DisconnectIter {
1174+
last_receiver: None,
1175+
}
1176+
}
1177+
}
11521178
}
11531179

11541180
impl<T> Drop for Receiver<T> {
11551181
fn drop(&mut self) {
11561182
unsafe {
11571183
match &self.flavor {
11581184
ReceiverFlavor::Array(chan) => chan.release(|c| c.disconnect()),
1159-
ReceiverFlavor::List(chan) => chan.release(|c| c.disconnect_receivers()),
1185+
ReceiverFlavor::List(chan) => {
1186+
chan.release(|c| c.disconnect_receivers_and_discard_messages())
1187+
}
11601188
ReceiverFlavor::Zero(chan) => chan.release(|c| c.disconnect()),
1161-
ReceiverFlavor::At(_) => {}
1162-
ReceiverFlavor::Tick(_) => {}
1163-
ReceiverFlavor::Never(_) => {}
1164-
}
1189+
ReceiverFlavor::At(_) => None,
1190+
ReceiverFlavor::Tick(_) => None,
1191+
ReceiverFlavor::Never(_) => None,
1192+
};
11651193
}
11661194
}
11671195
}
@@ -1289,6 +1317,39 @@ pub struct TryIter<'a, T> {
12891317
receiver: &'a Receiver<T>,
12901318
}
12911319

1320+
/// A non-blocking draining iterator over unreceived messages after channel termination.
1321+
///
1322+
/// Each call to [`next`] returns a message if there is still more to be received. The iterator
1323+
/// never blocks waiting for the next message as the channel don't allow sending anymore.
1324+
///
1325+
/// [`next`]: Iterator::next
1326+
pub struct DisconnectIter<T> {
1327+
last_receiver: Option<Receiver<T>>,
1328+
}
1329+
1330+
impl<T> FusedIterator for DisconnectIter<T> {}
1331+
1332+
impl<T> Iterator for DisconnectIter<T> {
1333+
type Item = T;
1334+
1335+
fn next(&mut self) -> Option<Self::Item> {
1336+
self.last_receiver
1337+
.as_ref()
1338+
.and_then(|r| match r.try_recv() {
1339+
Ok(msg) => Some(msg),
1340+
Err(TryRecvError::Disconnected) => None,
1341+
Err(TryRecvError::Empty) => unreachable!(),
1342+
})
1343+
}
1344+
}
1345+
1346+
impl<T> DisconnectIter<T> {
1347+
/// Returns true if this was returned from the last receiver's disconnection.
1348+
pub fn is_last_receiver(&self) -> bool {
1349+
self.last_receiver.is_some()
1350+
}
1351+
}
1352+
12921353
impl<T> Iterator for TryIter<'_, T> {
12931354
type Item = T;
12941355

@@ -1303,6 +1364,26 @@ impl<T> fmt::Debug for TryIter<'_, T> {
13031364
}
13041365
}
13051366

1367+
impl<T> fmt::Debug for DisconnectIter<T> {
1368+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1369+
f.pad("DisconnectIter { .. }")
1370+
}
1371+
}
1372+
1373+
impl<T> Drop for DisconnectIter<T> {
1374+
fn drop(&mut self) {
1375+
if let Some(last_receiver) = self.last_receiver.take() {
1376+
match &last_receiver.flavor {
1377+
ReceiverFlavor::List(chan) => chan.discard_all_messages(),
1378+
_ => {
1379+
panic!();
1380+
}
1381+
}
1382+
std::mem::forget(last_receiver);
1383+
}
1384+
}
1385+
}
1386+
13061387
/// A blocking iterator over messages in a channel.
13071388
///
13081389
/// Each call to [`next`] blocks waiting for the next message and then returns it. However, if the

crossbeam-channel/src/counter.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,18 @@ impl<C> Receiver<C> {
118118
/// Releases the receiver reference.
119119
///
120120
/// Function `disconnect` will be called if this is the last receiver reference.
121-
pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
121+
pub(crate) unsafe fn release<T, F: FnOnce(&C) -> T>(&self, disconnect: F) -> Option<T> {
122+
let mut from_callback = None;
123+
122124
if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
123-
disconnect(&self.counter().chan);
125+
from_callback = Some(disconnect(&self.counter().chan));
124126

125127
if self.counter().destroy.swap(true, Ordering::AcqRel) {
126128
drop(Box::from_raw(self.counter));
127129
}
128130
}
131+
132+
from_callback
129133
}
130134
}
131135

crossbeam-channel/src/flavors/list.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -549,10 +549,8 @@ impl<T> Channel<T> {
549549
/// Disconnects receivers.
550550
///
551551
/// Returns `true` if this call disconnected the channel.
552-
pub(crate) fn disconnect_receivers(&self) -> bool {
553-
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
554-
555-
if tail & MARK_BIT == 0 {
552+
pub(crate) fn disconnect_receivers_and_discard_messages(&self) -> bool {
553+
if self.disconnect_receivers() {
556554
// If receivers are dropped first, discard all messages to free
557555
// memory eagerly.
558556
self.discard_all_messages();
@@ -562,10 +560,15 @@ impl<T> Channel<T> {
562560
}
563561
}
564562

563+
pub(crate) fn disconnect_receivers(&self) -> bool {
564+
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
565+
tail & MARK_BIT == 0
566+
}
567+
565568
/// Discards all messages.
566569
///
567570
/// This method should only be called when all receivers are dropped.
568-
fn discard_all_messages(&self) {
571+
pub(crate) fn discard_all_messages(&self) {
569572
let backoff = Backoff::new();
570573
let mut tail = self.tail.index.load(Ordering::Acquire);
571574
loop {

crossbeam-channel/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ cfg_if! {
359359

360360
pub use crate::channel::{after, at, never, tick};
361361
pub use crate::channel::{bounded, unbounded};
362-
pub use crate::channel::{IntoIter, Iter, TryIter};
362+
pub use crate::channel::{IntoIter, Iter, TryIter, DisconnectIter};
363363
pub use crate::channel::{Receiver, Sender};
364364

365365
pub use crate::select::{Select, SelectedOperation};

0 commit comments

Comments
 (0)