@@ -453,34 +453,27 @@ Wakers are `Sync` and can be cloned. When `wake` is called, the task must be
453
453
scheduled for execution. To implement this, we have a channel. When the ` wake() `
454
454
is called on the waker, the task is pushed into the send half of the channel.
455
455
Our ` Task ` structure will implement the wake logic. To do this, it needs to
456
- contain both the spawned future and the channel send half. We place the future
457
- in a ` TaskFuture ` struct alongside a ` Poll ` enum to keep track of the latest
458
- ` Future::poll() ` result, which is needed to handle spurious wake-ups. More
459
- details are given in the implementation of the ` poll() ` method in ` TaskFuture ` .
456
+ contain both the spawned future and the channel send half. We track if a
457
+ ` Future::poll() ` output was ` Ready ` to handle spurious wake-ups. More
458
+ details are given in the implementation of the ` poll() ` method.
460
459
461
460
``` rust
462
461
# use std :: future :: Future ;
463
462
# use std :: pin :: Pin ;
464
463
# use std :: sync :: mpsc;
465
- # use std :: task :: Poll ;
466
464
use std :: sync :: {Arc , Mutex };
467
465
468
- /// A structure holding a future and the result of
469
- /// the latest call to its `poll` method.
470
- struct TaskFuture {
471
- future : Pin <Box <dyn Future <Output = ()> + Send >>,
472
- poll : Poll <()>,
473
- }
474
-
475
466
struct Task {
476
467
// The `Mutex` is to make `Task` implement `Sync`. Only
477
468
// one thread accesses `task_future` at any given time.
478
469
// The `Mutex` is not required for correctness. Real Tokio
479
470
// does not use a mutex here, but real Tokio has
480
471
// more lines of code than can fit in a single tutorial
481
472
// page.
482
- task_future : Mutex <TaskFuture >,
473
+ future : Mutex <Pin < Box < dyn Future < Output = ()> + Send >> >,
483
474
executor : mpsc :: Sender <Arc <Task >>,
475
+ // Must be only switched to `Ready` and never back.
476
+ is_done : RwLock <bool >
484
477
}
485
478
486
479
impl Task {
@@ -529,20 +522,18 @@ channel. Next, we implement receiving and executing the tasks in the
529
522
# use std :: sync :: mpsc;
530
523
# use futures :: task :: {self , ArcWake };
531
524
# use std :: future :: Future ;
525
+ # use std :: ops :: Deref ;
532
526
# use std :: pin :: Pin ;
533
527
# use std :: sync :: {Arc , Mutex };
534
528
# use std :: task :: {Context , Poll };
535
529
# struct MiniTokio {
536
530
# scheduled : mpsc :: Receiver <Arc <Task >>,
537
531
# sender : mpsc :: Sender <Arc <Task >>,
538
532
# }
539
- # struct TaskFuture {
540
- # future : Pin <Box <dyn Future <Output = ()> + Send >>,
541
- # poll : Poll <()>,
542
- # }
543
533
# struct Task {
544
- # task_future : Mutex <TaskFuture >,
534
+ # future : Mutex <Pin < Box < dyn Future < Output = ()> + Send >> >,
545
535
# executor : mpsc :: Sender <Arc <Task >>,
536
+ # is_done : RwLock <bool >
546
537
# }
547
538
# impl ArcWake for Task {
548
539
# fn wake_by_ref (arc_self : & Arc <Self >) {}
@@ -573,38 +564,28 @@ impl MiniTokio {
573
564
}
574
565
}
575
566
576
- impl TaskFuture {
577
- fn new (future : impl Future <Output = ()> + Send + 'static ) -> TaskFuture {
578
- TaskFuture {
579
- future : Box :: pin (future ),
580
- poll : Poll :: Pending ,
581
- }
582
- }
583
-
584
- fn poll (& mut self , cx : & mut Context <'_ >) {
585
- // Spurious wake-ups are allowed, even after a future has
586
- // returned `Ready`. However, polling a future which has
587
- // already returned `Ready` is *not* allowed. For this
588
- // reason we need to check that the future is still pending
589
- // before we call it. Failure to do so can lead to a panic.
590
- if self . poll. is_pending () {
591
- self . poll = self . future. as_mut (). poll (cx );
592
- }
593
- }
594
- }
595
-
596
567
impl Task {
597
568
fn poll (self : Arc <Self >) {
598
- // Create a waker from the `Task` instance. This
599
- // uses the `ArcWake` impl from above.
600
- let waker = task :: waker (self . clone ());
601
- let mut cx = Context :: from_waker (& waker );
602
-
603
- // No other thread ever tries to lock the task_future
604
- let mut task_future = self . task_future. try_lock (). unwrap ();
605
-
606
- // Poll the inner future
607
- task_future . poll (& mut cx );
569
+ // Spurious wake-ups are allowed, even after a future has
570
+ // returned `Ready`. However, polling a future which has
571
+ // already returned `Ready` is **not** allowed. For this
572
+ // reason we need to check that the future is still pending
573
+ // before we call it. Failure to do so can lead to a panic.
574
+ if ! self . is_done. read (). unwrap (). deref () {
575
+ // Create a waker from the `Task` instance. This
576
+ // uses the `ArcWake` impl from above.
577
+ let waker = task :: waker (self . clone ());
578
+ let mut cx = Context :: from_waker (& waker );
579
+
580
+ // No other thread ever tries to lock the task_future
581
+ let mut future = self . future. try_lock (). unwrap ();
582
+
583
+ // Poll the inner future and if `Ready` save that it's done.
584
+ let poll_status = future . as_mut (). poll (& mut cx );
585
+ if poll_status == Poll :: Ready (()) {
586
+ * self . is_done. write (). unwrap () = true ;
587
+ }
588
+ }
608
589
}
609
590
610
591
// Spawns a new task with the given future.
@@ -617,8 +598,9 @@ impl Task {
617
598
F : Future <Output = ()> + Send + 'static ,
618
599
{
619
600
let task = Arc :: new (Task {
620
- task_future : Mutex :: new (TaskFuture :: new (future )),
601
+ future : Mutex :: new (TaskFuture :: new (future )),
621
602
executor : sender . clone (),
603
+ is_done : RwLock :: new (false )
622
604
});
623
605
624
606
let _ = sender . send (task );
@@ -735,59 +717,59 @@ impl Future for Delay {
735
717
// Check the current instant. If the duration has elapsed, then
736
718
// this future has completed so we return `Poll::Ready`.
737
719
if Instant :: now () >= self . when {
738
- return Poll :: Ready (());
739
- }
740
-
741
- // The duration has not elapsed. If this is the first time the future
742
- // is called, spawn the timer thread. If the timer thread is already
743
- // running, ensure the stored `Waker` matches the current task's waker.
744
- if let Some (waker ) = & self . waker {
745
- let mut waker = waker . lock (). unwrap ();
746
-
747
- // Check if the stored waker matches the current task's waker.
748
- // This is necessary as the `Delay` future instance may move to
749
- // a different task between calls to `poll`. If this happens, the
750
- // waker contained by the given `Context` will differ and we
751
- // must update our stored waker to reflect this change.
752
- if ! waker . will_wake (cx . waker ()) {
753
- * waker = cx . waker (). clone ();
754
- }
720
+ Poll :: Ready (());
755
721
} else {
756
- let when = self . when;
757
- let waker = Arc :: new (Mutex :: new (cx . waker (). clone ()));
758
- self . waker = Some (waker . clone ());
759
-
760
- // This is the first time `poll` is called, spawn the timer thread.
761
- thread :: spawn (move || {
762
- let now = Instant :: now ();
763
-
764
- if now < when {
765
- thread :: sleep (when - now );
722
+ // The duration has not elapsed. If this is the first time the future
723
+ // is called, spawn the timer thread. If the timer thread is already
724
+ // running, ensure the stored `Waker` matches the current task's waker.
725
+ if let Some (waker ) = & self . waker {
726
+ let mut waker = waker . lock (). unwrap ();
727
+
728
+ // Check if the stored waker matches the current task's waker.
729
+ // This is necessary as the `Delay` future instance may move to
730
+ // a different task between calls to `poll`. If this happens, the
731
+ // waker contained by the given `Context` will differ and we
732
+ // must update our stored waker to reflect this change.
733
+ if ! waker . will_wake (cx . waker ()) {
734
+ * waker = cx . waker (). clone ();
766
735
}
767
-
768
- // The duration has elapsed. Notify the caller by invoking
769
- // the waker.
770
- let waker = waker . lock (). unwrap ();
771
- waker . wake_by_ref ();
772
- });
736
+ } else {
737
+ let when = self . when;
738
+ let waker = Arc :: new (Mutex :: new (cx . waker (). clone ()));
739
+ self . waker = Some (waker . clone ());
740
+
741
+ // This is the first time `poll` is called, spawn the timer thread.
742
+ thread :: spawn (move || {
743
+ let now = Instant :: now ();
744
+
745
+ if now < when {
746
+ thread :: sleep (when - now );
747
+ }
748
+
749
+ // The duration has elapsed. Notify the caller by invoking
750
+ // the waker.
751
+ let waker = waker . lock (). unwrap ();
752
+ waker . wake_by_ref ();
753
+ });
754
+ }
755
+
756
+ // By now, the waker is stored and the timer thread is started.
757
+ // The duration has not elapsed (recall that we checked for this
758
+ // first thing), ergo the future has not completed so we must
759
+ // return `Poll::Pending`.
760
+ //
761
+ // The `Future` trait contract requires that when `Pending` is
762
+ // returned, the future ensures that the given waker is signalled
763
+ // once the future should be polled again. In our case, by
764
+ // returning `Pending` here, we are promising that we will
765
+ // invoke the given waker included in the `Context` argument
766
+ // once the requested duration has elapsed. We ensure this by
767
+ // spawning the timer thread above.
768
+ //
769
+ // If we forget to invoke the waker, the task will hang
770
+ // indefinitely.
771
+ Poll :: Pending
773
772
}
774
-
775
- // By now, the waker is stored and the timer thread is started.
776
- // The duration has not elapsed (recall that we checked for this
777
- // first thing), ergo the future has not completed so we must
778
- // return `Poll::Pending`.
779
- //
780
- // The `Future` trait contract requires that when `Pending` is
781
- // returned, the future ensures that the given waker is signalled
782
- // once the future should be polled again. In our case, by
783
- // returning `Pending` here, we are promising that we will
784
- // invoke the given waker included in the `Context` argument
785
- // once the requested duration has elapsed. We ensure this by
786
- // spawning the timer thread above.
787
- //
788
- // If we forget to invoke the waker, the task will hang
789
- // indefinitely.
790
- Poll :: Pending
791
773
}
792
774
}
793
775
```
0 commit comments