Skip to content

Commit 58b5f01

Browse files
committed
Simply error, and discard oldest message rather than most recent one first
Note that this is a very naive implementation of force_send, that works when used within a single thread, but can easily fail when multiple threads/cores are involved.
1 parent f59101d commit 58b5f01

File tree

5 files changed

+47
-42
lines changed

5 files changed

+47
-42
lines changed

crossbeam-channel/src/channel.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -418,29 +418,38 @@ impl<T> Sender<T> {
418418
/// the channel is full. The returned error contains the original message.
419419
///
420420
/// If called on a zero-capacity channel, this method will send the message only if there
421-
/// happens to be a receive operation on the other side of the channel at the same time.
421+
/// happens to be a pending receive operation on the other side of the channel at the same time,
422+
/// otherwise it will return the value to the sender.
422423
///
423424
/// ```
424425
/// use crossbeam_channel::{bounded, ForceSendError};
425426
///
426427
/// let (s, r) = bounded(3);
428+
///
427429
/// assert_eq!(s.force_send(0), Ok(None));
428430
/// assert_eq!(s.force_send(1), Ok(None));
429431
/// assert_eq!(s.force_send(2), Ok(None));
430-
/// assert_eq!(s.force_send(3), Ok(Some(2)));
431-
/// assert_eq!(r.recv(), Ok(0));
432-
/// assert_eq!(s.force_send(4), Ok(None));
432+
/// assert_eq!(s.force_send(3), Ok(Some(0)));
433+
///
433434
/// assert_eq!(r.recv(), Ok(1));
435+
///
436+
/// assert_eq!(s.force_send(4), Ok(None));
437+
///
438+
/// assert_eq!(r.recv(), Ok(2));
434439
/// assert_eq!(r.recv(), Ok(3));
440+
///
435441
/// assert_eq!(s.force_send(5), Ok(None));
436442
/// assert_eq!(s.force_send(6), Ok(None));
437-
/// assert_eq!(s.force_send(7), Ok(Some(6)));
438-
/// assert_eq!(s.force_send(8), Ok(Some(7)));
439-
/// assert_eq!(r.recv(), Ok(4));
440-
/// assert_eq!(r.recv(), Ok(5));
443+
/// assert_eq!(s.force_send(7), Ok(Some(4)));
444+
/// assert_eq!(s.force_send(8), Ok(Some(5)));
445+
///
446+
/// assert_eq!(r.recv(), Ok(6));
447+
/// assert_eq!(r.recv(), Ok(7));
441448
/// assert_eq!(r.recv(), Ok(8));
449+
///
442450
/// drop(r);
443-
/// assert_eq!(s.force_send(9), Err(ForceSendError::Disconnected(9)));
451+
///
452+
/// assert_eq!(s.force_send(9), Err(ForceSendError(9)));
444453
/// ``````
445454
pub fn force_send(&self, msg: T) -> Result<Option<T>, ForceSendError<T>> {
446455
match &self.flavor {

crossbeam-channel/src/err.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,13 @@ pub struct SendError<T>(pub T);
1313

1414
/// An error returned from the [`force_send`] method.
1515
///
16-
/// The error contains the message being sent so it can be recovered.
16+
/// The message could not be sent because the channel is disconnected.
17+
///
18+
/// The error contains the message so it can be recovered.
1719
///
1820
/// [`force_send`]: super::Sender::force_send
1921
#[derive(PartialEq, Eq, Clone, Copy)]
20-
pub enum ForceSendError<T> {
21-
/// The message could not be sent because the channel is disconnected.
22-
Disconnected(T),
23-
}
22+
pub struct ForceSendError<T>(pub T);
2423

2524
/// An error returned from the [`try_send`] method.
2625
///
@@ -223,15 +222,15 @@ impl<T> TrySendError<T> {
223222
impl<T> fmt::Debug for ForceSendError<T> {
224223
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
225224
match *self {
226-
Self::Disconnected(..) => "Disconnected(..)".fmt(f),
225+
Self(..) => "ForceSendError(..)".fmt(f),
227226
}
228227
}
229228
}
230229

231230
impl<T> fmt::Display for ForceSendError<T> {
232231
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233232
match *self {
234-
Self::Disconnected(..) => "sending on a disconnected channel".fmt(f),
233+
Self(..) => "sending on a disconnected channel".fmt(f),
235234
}
236235
}
237236
}
@@ -241,7 +240,7 @@ impl<T: Send> error::Error for ForceSendError<T> {}
241240
impl<T> From<SendError<T>> for ForceSendError<T> {
242241
fn from(err: SendError<T>) -> Self {
243242
match err {
244-
SendError(t) => Self::Disconnected(t),
243+
SendError(t) => Self(t),
245244
}
246245
}
247246
}
@@ -263,14 +262,9 @@ impl<T> ForceSendError<T> {
263262
/// ```
264263
pub fn into_inner(self) -> T {
265264
match self {
266-
Self::Disconnected(v) => v,
265+
Self(v) => v,
267266
}
268267
}
269-
270-
/// Returns `true` if the send operation failed because the channel is disconnected.
271-
pub fn is_disconnected(&self) -> bool {
272-
matches!(self, Self::Disconnected(_))
273-
}
274268
}
275269

276270
impl<T> fmt::Debug for SendTimeoutError<T> {

crossbeam-channel/src/flavors/array.rs

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -326,24 +326,26 @@ impl<T> Channel<T> {
326326
}
327327

328328
/// Force send a message into the channel. Only fails if the channel is disconnected
329-
pub(crate) fn force_send(&self, mut msg: T) -> Result<Option<T>, ForceSendError<T>> {
330-
let mut token = Token::default();
331-
if self.start_send(&mut token) {
332-
match unsafe { self.write(&mut token, msg) } {
333-
Ok(()) => Ok(None),
334-
Err(msg) => Err(ForceSendError::Disconnected(msg)),
335-
}
336-
} else {
337-
let tail = self.tail.load(Ordering::Acquire);
338-
let prev_index = match tail & (self.mark_bit - 1) {
339-
0 => self.cap() - 1,
340-
x => x - 1,
341-
};
342-
let queued_msg =
343-
unsafe { (*self.buffer.get_unchecked(prev_index).msg.get()).assume_init_mut() };
344-
std::mem::swap(&mut msg, queued_msg);
345-
Ok(Some(msg))
329+
///
330+
/// Note that this is currently a naive implementation to make the sequential test pass
331+
pub(crate) fn force_send(&self, msg: T) -> Result<Option<T>, ForceSendError<T>> {
332+
if self.is_disconnected() {
333+
return Err(ForceSendError(msg));
346334
}
335+
336+
let old_msg = if self.is_full() {
337+
let Ok(old_msg) = self.try_recv() else {
338+
return Err(ForceSendError(msg));
339+
};
340+
Some(old_msg)
341+
} else {
342+
None
343+
};
344+
345+
self.try_send(msg)
346+
.map_err(|e| ForceSendError(e.into_inner()))?;
347+
348+
Ok(old_msg)
347349
}
348350

349351
/// Sends a message into the channel.

crossbeam-channel/src/flavors/list.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ impl<T> Channel<T> {
421421
pub(crate) fn force_send(&self, msg: T) -> Result<Option<T>, ForceSendError<T>> {
422422
match self.send(msg, None) {
423423
Ok(()) => Ok(None),
424-
Err(SendTimeoutError::Disconnected(err)) => Err(ForceSendError::Disconnected(err)),
424+
Err(SendTimeoutError::Disconnected(err)) => Err(ForceSendError(err)),
425425
Err(SendTimeoutError::Timeout(_)) => unreachable!(),
426426
}
427427
}

crossbeam-channel/src/flavors/zero.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ impl<T> Channel<T> {
230230
}
231231
Ok(None)
232232
} else if inner.is_disconnected {
233-
Err(ForceSendError::Disconnected(msg))
233+
Err(ForceSendError(msg))
234234
} else {
235235
Ok(Some(msg))
236236
}

0 commit comments

Comments
 (0)