Skip to content

Commit e874de3

Browse files
authored
Removed race condition by joining create and cancel timer queues. (#137)
* Removed race condition by joining create and cancel timer queues. * Removed race condition by joining create and cancel timer queues. * Removed debug prints * Changed formatting to please cargo fmt * Re-added semicolon that had been deleted by cargo fmt
1 parent 9766f72 commit e874de3

File tree

3 files changed

+19
-25
lines changed

3 files changed

+19
-25
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "message-io"
3-
version = "0.14.7"
3+
version = "0.14.8"
44
authors = ["lemunozm <[email protected]>"]
55
edition = "2018"
66
readme = "README.md"

src/events.rs

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,20 @@ pub fn split<E: Send + 'static>() -> (EventSender<E>, EventReceiver<E>) {
2626
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
2727
pub struct TimerId(Instant);
2828

29+
// Internal enum to enqueue different timer commands in a single queue
30+
enum TimerCommand<E> {
31+
Create(E),
32+
Cancel,
33+
}
34+
2935
/// A generic and synchronized queue where the user can send and receive events.
3036
/// See [`EventSender`] to see how send events.
3137
/// This entity can be used as an utility for the [`crate::network`] module redirecting the
3238
/// network events to process them later from here.
3339
pub struct EventReceiver<E> {
3440
event_sender: EventSender<E>, // Should be before receiver in order to drop first.
3541
receiver: Receiver<E>,
36-
timer_receiver: Receiver<(Instant, E)>,
37-
remove_timer_receiver: Receiver<Instant>,
42+
timer_receiver: Receiver<(Instant, TimerCommand<E>)>,
3843
priority_receiver: Receiver<E>,
3944
timers: BTreeMap<Instant, E>,
4045
}
@@ -46,18 +51,11 @@ where E: Send + 'static
4651
fn default() -> Self {
4752
let (sender, receiver) = crossbeam_channel::unbounded();
4853
let (timer_sender, timer_receiver) = crossbeam_channel::unbounded();
49-
let (remove_timer_sender, remove_timer_receiver) = crossbeam_channel::unbounded();
5054
let (priority_sender, priority_receiver) = crossbeam_channel::unbounded();
5155
EventReceiver {
52-
event_sender: EventSender::new(
53-
sender,
54-
timer_sender,
55-
remove_timer_sender,
56-
priority_sender,
57-
),
56+
event_sender: EventSender::new(sender, timer_sender, priority_sender),
5857
receiver,
5958
timer_receiver,
60-
remove_timer_receiver,
6159
priority_receiver,
6260
timers: BTreeMap::new(),
6361
}
@@ -76,11 +74,10 @@ where E: Send + 'static
7674

7775
fn enque_timers(&mut self) {
7876
for timer in self.timer_receiver.try_iter() {
79-
self.timers.insert(timer.0, timer.1);
80-
}
81-
82-
for timer_instant in self.remove_timer_receiver.try_iter() {
83-
self.timers.remove(&timer_instant);
77+
match timer.1 {
78+
TimerCommand::Create(e) => self.timers.insert(timer.0, e),
79+
TimerCommand::Cancel => self.timers.remove(&timer.0),
80+
};
8481
}
8582
}
8683

@@ -175,8 +172,7 @@ where E: Send + 'static
175172
/// This type can only be generated by the receiver `EventReceiver`.
176173
pub struct EventSender<E> {
177174
sender: Sender<E>,
178-
timer_sender: Sender<(Instant, E)>,
179-
remove_timer_sender: Sender<Instant>,
175+
timer_sender: Sender<(Instant, TimerCommand<E>)>,
180176
priority_sender: Sender<E>,
181177
}
182178

@@ -185,11 +181,10 @@ where E: Send + 'static
185181
{
186182
fn new(
187183
sender: Sender<E>,
188-
timer_sender: Sender<(Instant, E)>,
189-
remove_timer_sender: Sender<Instant>,
184+
timer_sender: Sender<(Instant, TimerCommand<E>)>,
190185
priority_sender: Sender<E>,
191186
) -> EventSender<E> {
192-
EventSender { sender, timer_sender, remove_timer_sender, priority_sender }
187+
EventSender { sender, timer_sender, priority_sender }
193188
}
194189

195190
/// Send instantly an event to the event queue.
@@ -210,14 +205,14 @@ where E: Send + 'static
210205
/// [`EventSender::cancel_timer()`] be called.
211206
pub fn send_with_timer(&self, event: E, duration: Duration) -> TimerId {
212207
let when = Instant::now() + duration;
213-
self.timer_sender.send((when, event)).ok();
208+
self.timer_sender.send((when, TimerCommand::Create(event))).ok();
214209
TimerId(when)
215210
}
216211

217212
/// Remove a timer previously sent by [`EventSender::send_with_timer()`].
218213
/// The timer will not be receive by the [`EventReceiver`].
219214
pub fn cancel_timer(&self, timer_id: TimerId) {
220-
self.remove_timer_sender.send(timer_id.0).ok();
215+
self.timer_sender.send((timer_id.0, TimerCommand::Cancel)).ok();
221216
}
222217
}
223218

@@ -228,7 +223,6 @@ where E: Send + 'static
228223
EventSender::new(
229224
self.sender.clone(),
230225
self.timer_sender.clone(),
231-
self.remove_timer_sender.clone(),
232226
self.priority_sender.clone(),
233227
)
234228
}

0 commit comments

Comments
 (0)