Skip to content
This repository was archived by the owner on Apr 2, 2026. It is now read-only.

Commit 0c40092

Browse files
committed
allow cancelling timers within the same batch
1 parent d7f8230 commit 0c40092

2 files changed

Lines changed: 87 additions & 39 deletions

File tree

core/runtime/jsruntime.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2740,19 +2740,27 @@ impl JsRuntime {
27402740
scope: &mut v8::PinScope<'s, 'i>,
27412741
context_state: &ContextState,
27422742
) -> bool {
2743-
let timers = match context_state.timers.poll_timers(cx) {
2744-
Poll::Ready(timers) => timers,
2743+
let expired = match context_state.timers.poll_timers(cx) {
2744+
Poll::Ready(expired) => expired,
27452745
_ => return false,
27462746
};
27472747

2748-
if timers.is_empty() {
2748+
if expired.is_empty() {
27492749
return false;
27502750
}
27512751

27522752
let traces_enabled = context_state.activity_traces.is_enabled();
27532753
let undefined: v8::Local<v8::Value> = v8::undefined(scope).into();
27542754

2755-
for (timer_id, (callback, depth)) in &timers {
2755+
for (timer_id, timer_type) in &expired {
2756+
// Extract the timer data; if it was cancelled during this dispatch
2757+
// loop (e.g. clearTimeout called from an earlier callback), skip it.
2758+
let Some((callback, depth)) =
2759+
context_state.timers.take_fired_timer(*timer_id, timer_type)
2760+
else {
2761+
continue;
2762+
};
2763+
27562764
if traces_enabled {
27572765
context_state
27582766
.activity_traces
@@ -2764,7 +2772,7 @@ impl JsRuntime {
27642772
let set_timer_depth_cb = context_state.js_set_timer_depth_cb.borrow();
27652773
let set_timer_depth_fn =
27662774
set_timer_depth_cb.as_ref().unwrap().open(scope);
2767-
let depth_val = v8::Integer::new(scope, *depth as i32);
2775+
let depth_val = v8::Integer::new(scope, depth as i32);
27682776
set_timer_depth_fn.call(scope, undefined, &[depth_val.into()]);
27692777
}
27702778

core/web_timeout.rs

Lines changed: 74 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ pub(crate) type WebTimerId = u64;
2828
/// The minimum number of tombstones required to trigger compaction
2929
const COMPACTION_MINIMUM: usize = 16;
3030

31-
#[derive(PartialEq, Eq, PartialOrd, Ord)]
32-
enum TimerType {
31+
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
32+
pub(crate) enum TimerType {
3333
Repeat(NonZeroU64),
3434
Once,
3535
}
@@ -424,40 +424,39 @@ impl<T: Clone, R: Reactor> WebTimers<T, R> {
424424
}
425425

426426
/// Poll for any timers that have completed.
427-
pub fn poll_timers(&self, cx: &mut Context) -> Poll<Vec<(u64, T)>> {
427+
///
428+
/// Returns the IDs and [`TimerType`]s of expired timers. The associated
429+
/// data must be retrieved per-timer via
430+
/// [`take_fired_timer`](Self::take_fired_timer), which allows
431+
/// `cancel_timer` to prevent dispatch of timers that expired in the
432+
/// same batch.
433+
pub fn poll_timers(&self, cx: &mut Context) -> Poll<Vec<(u64, TimerType)>> {
428434
ready!(self.sleep.poll_ready(cx));
429435
let now = R::Instant::now();
430436
let mut timers = self.timers.borrow_mut();
431-
let mut data = self.data_map.borrow_mut();
437+
let data = self.data_map.borrow();
432438
let mut output = vec![];
439+
let mut fired_once_count: usize = 0;
433440

434441
let mut split = timers.split_off(&TimerKey(now, 0, TimerType::Once, false));
435442
std::mem::swap(&mut split, &mut timers);
436443
for TimerKey(_, id, timer_type, is_system_timer) in split {
437-
if let TimerType::Repeat(interval) = timer_type {
438-
if let Some(TimerData { data, .. }) = data.get(&id) {
439-
output.push((id, data.clone()));
440-
timers.insert(TimerKey(
441-
now
442-
.checked_add(Duration::from_millis(interval.into()))
443-
.unwrap(),
444-
id,
445-
timer_type,
446-
is_system_timer,
447-
));
448-
}
449-
} else if let Some(TimerData {
450-
data,
451-
unrefd,
452-
high_res,
453-
}) = data.remove(&id)
454-
{
455-
self.high_res_timer_lock.maybe_unlock(high_res);
456-
if unrefd {
457-
self.unrefd_count.set(self.unrefd_count.get() - 1);
458-
}
459-
output.push((id, data));
444+
if !data.contains_key(&id) {
445+
continue; // tombstone
446+
}
447+
if let TimerType::Repeat(interval) = &timer_type {
448+
timers.insert(TimerKey(
449+
now
450+
.checked_add(Duration::from_millis((*interval).into()))
451+
.unwrap(),
452+
id,
453+
timer_type.clone(),
454+
is_system_timer,
455+
));
456+
} else {
457+
fired_once_count += 1;
460458
}
459+
output.push((id, timer_type));
461460
}
462461

463462
// In-effective poll, run a front-compaction and try again later
@@ -478,15 +477,19 @@ impl<T: Clone, R: Reactor> WebTimers<T, R> {
478477
return Poll::Pending;
479478
}
480479

481-
if data.is_empty() {
482-
// When the # of running timers hits zero, clear the timer tree.
480+
// Adjust for fired-once timers whose data is still in data_map
481+
// (it will be removed by take_fired_timer).
482+
let pending_data_count = data.len() - fired_once_count;
483+
484+
if pending_data_count == 0 {
485+
// No more pending timers; clear the tree and sleep.
483486
if !timers.is_empty() {
484487
timers.clear();
485488
}
486489
self.sleep.clear();
487490
} else {
488491
// Run compaction when there are enough tombstones to justify cleanup.
489-
let tombstone_count = timers.len() - data.len();
492+
let tombstone_count = timers.len() - pending_data_count;
490493
if tombstone_count > COMPACTION_MINIMUM {
491494
timers.retain(|k| data.contains_key(&k.1));
492495
}
@@ -498,6 +501,37 @@ impl<T: Clone, R: Reactor> WebTimers<T, R> {
498501
Poll::Ready(output)
499502
}
500503

504+
/// Extracts the data for a previously-fired timer. Returns `None` if
505+
/// the timer was cancelled between [`poll_timers`](Self::poll_timers)
506+
/// and this call.
507+
pub fn take_fired_timer(&self, id: u64, timer_type: &TimerType) -> Option<T> {
508+
match timer_type {
509+
TimerType::Repeat(_) => {
510+
self.data_map.borrow().get(&id).map(|td| td.data.clone())
511+
}
512+
TimerType::Once => {
513+
let mut data = self.data_map.borrow_mut();
514+
let TimerData {
515+
data: d,
516+
unrefd,
517+
high_res,
518+
} = data.remove(&id)?;
519+
if data.is_empty() {
520+
self.high_res_timer_lock.clear();
521+
self.unrefd_count.set(0);
522+
self.timers.borrow_mut().clear();
523+
self.sleep.clear();
524+
} else {
525+
self.high_res_timer_lock.maybe_unlock(high_res);
526+
if unrefd {
527+
self.unrefd_count.set(self.unrefd_count.get() - 1);
528+
}
529+
}
530+
Some(d)
531+
}
532+
}
533+
}
534+
501535
/// Is this set of timers empty?
502536
pub fn is_empty(&self) -> bool {
503537
self.data_map.borrow().is_empty()
@@ -647,17 +681,20 @@ mod tests {
647681
runtime.block_on(f)
648682
}
649683

650-
async fn poll_all(timers: &TestTimers) -> Vec<(u64, ())> {
684+
async fn poll_all(timers: &TestTimers) -> Vec<u64> {
651685
timers.assert_consistent();
652686
let len = timers.len();
653687
let mut v = vec![];
654688
while !timers.is_empty() {
655-
let mut batch = poll_fn(|cx| {
689+
let batch = poll_fn(|cx| {
656690
timers.assert_consistent();
657691
timers.poll_timers(cx)
658692
})
659693
.await;
660-
v.append(&mut batch);
694+
for (id, timer_type) in &batch {
695+
timers.take_fired_timer(*id, timer_type);
696+
}
697+
v.extend(batch.into_iter().map(|(id, _)| id));
661698
#[allow(clippy::print_stderr)]
662699
{
663700
eprintln!(
@@ -718,7 +755,10 @@ mod tests {
718755
);
719756

720757
// Poll timers to trigger potential compaction
721-
let _ = poll_fn(|cx| timers.poll_timers(cx)).await;
758+
let fired = poll_fn(|cx| timers.poll_timers(cx)).await;
759+
for (id, timer_type) in &fired {
760+
timers.take_fired_timer(*id, timer_type);
761+
}
722762

723763
let remaining_tombstones = count_tombstones();
724764

0 commit comments

Comments
 (0)