diff --git a/content/tokio/tutorial/async.md b/content/tokio/tutorial/async.md index 57216bab..9f8da89b 100644 --- a/content/tokio/tutorial/async.md +++ b/content/tokio/tutorial/async.md @@ -453,25 +453,16 @@ Wakers are `Sync` and can be cloned. When `wake` is called, the task must be scheduled for execution. To implement this, we have a channel. When the `wake()` is called on the waker, the task is pushed into the send half of the channel. Our `Task` structure will implement the wake logic. To do this, it needs to -contain both the spawned future and the channel send half. We place the future -in a `TaskFuture` struct alongside a `Poll` enum to keep track of the latest -`Future::poll()` result, which is needed to handle spurious wake-ups. More -details are given in the implementation of the `poll()` method in `TaskFuture`. +contain both the spawned future and the channel send half. We track if a +`Future::poll()` output was `Ready` to handle spurious wake-ups. More +details are given in the implementation of the `poll()` method. ```rust # use std::future::Future; # use std::pin::Pin; # use std::sync::mpsc; -# use std::task::Poll; use std::sync::{Arc, Mutex}; -/// A structure holding a future and the result of -/// the latest call to its `poll` method. -struct TaskFuture { - future: Pin + Send>>, - poll: Poll<()>, -} - struct Task { // The `Mutex` is to make `Task` implement `Sync`. Only // one thread accesses `task_future` at any given time. @@ -479,8 +470,10 @@ struct Task { // does not use a mutex here, but real Tokio has // more lines of code than can fit in a single tutorial // page. - task_future: Mutex, + future: Mutex + Send>>>, executor: mpsc::Sender>, + // Must be only switched to `Ready` and never back. + is_done: RwLock } impl Task { @@ -529,6 +522,7 @@ channel. Next, we implement receiving and executing the tasks in the # use std::sync::mpsc; # use futures::task::{self, ArcWake}; # use std::future::Future; +# use std::ops::Deref; # use std::pin::Pin; # use std::sync::{Arc, Mutex}; # use std::task::{Context, Poll}; @@ -536,13 +530,10 @@ channel. Next, we implement receiving and executing the tasks in the # scheduled: mpsc::Receiver>, # sender: mpsc::Sender>, # } -# struct TaskFuture { -# future: Pin + Send>>, -# poll: Poll<()>, -# } # struct Task { -# task_future: Mutex, +# future: Mutex + Send>>>, # executor: mpsc::Sender>, +# is_done: RwLock # } # impl ArcWake for Task { # fn wake_by_ref(arc_self: &Arc) {} @@ -573,38 +564,28 @@ impl MiniTokio { } } -impl TaskFuture { - fn new(future: impl Future + Send + 'static) -> TaskFuture { - TaskFuture { - future: Box::pin(future), - poll: Poll::Pending, - } - } - - fn poll(&mut self, cx: &mut Context<'_>) { - // Spurious wake-ups are allowed, even after a future has - // returned `Ready`. However, polling a future which has - // already returned `Ready` is *not* allowed. For this - // reason we need to check that the future is still pending - // before we call it. Failure to do so can lead to a panic. - if self.poll.is_pending() { - self.poll = self.future.as_mut().poll(cx); - } - } -} - impl Task { fn poll(self: Arc) { - // Create a waker from the `Task` instance. This - // uses the `ArcWake` impl from above. - let waker = task::waker(self.clone()); - let mut cx = Context::from_waker(&waker); - - // No other thread ever tries to lock the task_future - let mut task_future = self.task_future.try_lock().unwrap(); - - // Poll the inner future - task_future.poll(&mut cx); + // Spurious wake-ups are allowed, even after a future has + // returned `Ready`. However, polling a future which has + // already returned `Ready` is **not** allowed. For this + // reason we need to check that the future is still pending + // before we call it. Failure to do so can lead to a panic. + if !self.is_done.read().unwrap().deref() { + // Create a waker from the `Task` instance. This + // uses the `ArcWake` impl from above. + let waker = task::waker(self.clone()); + let mut cx = Context::from_waker(&waker); + + // No other thread ever tries to lock the task_future + let mut future = self.future.try_lock().unwrap(); + + // Poll the inner future and if `Ready` save that it's done. + let poll_status = future.as_mut().poll(&mut cx); + if poll_status == Poll::Ready(()) { + *self.is_done.write().unwrap() = true; + } + } } // Spawns a new task with the given future. @@ -617,8 +598,9 @@ impl Task { F: Future + Send + 'static, { let task = Arc::new(Task { - task_future: Mutex::new(TaskFuture::new(future)), + future: Mutex::new(TaskFuture::new(future)), executor: sender.clone(), + is_done: RwLock::new(false) }); let _ = sender.send(task); @@ -735,59 +717,59 @@ impl Future for Delay { // Check the current instant. If the duration has elapsed, then // this future has completed so we return `Poll::Ready`. if Instant::now() >= self.when { - return Poll::Ready(()); - } - - // The duration has not elapsed. If this is the first time the future - // is called, spawn the timer thread. If the timer thread is already - // running, ensure the stored `Waker` matches the current task's waker. - if let Some(waker) = &self.waker { - let mut waker = waker.lock().unwrap(); - - // Check if the stored waker matches the current task's waker. - // This is necessary as the `Delay` future instance may move to - // a different task between calls to `poll`. If this happens, the - // waker contained by the given `Context` will differ and we - // must update our stored waker to reflect this change. - if !waker.will_wake(cx.waker()) { - *waker = cx.waker().clone(); - } + Poll::Ready(()); } else { - let when = self.when; - let waker = Arc::new(Mutex::new(cx.waker().clone())); - self.waker = Some(waker.clone()); - - // This is the first time `poll` is called, spawn the timer thread. - thread::spawn(move || { - let now = Instant::now(); - - if now < when { - thread::sleep(when - now); + // The duration has not elapsed. If this is the first time the future + // is called, spawn the timer thread. If the timer thread is already + // running, ensure the stored `Waker` matches the current task's waker. + if let Some(waker) = &self.waker { + let mut waker = waker.lock().unwrap(); + + // Check if the stored waker matches the current task's waker. + // This is necessary as the `Delay` future instance may move to + // a different task between calls to `poll`. If this happens, the + // waker contained by the given `Context` will differ and we + // must update our stored waker to reflect this change. + if !waker.will_wake(cx.waker()) { + *waker = cx.waker().clone(); } - - // The duration has elapsed. Notify the caller by invoking - // the waker. - let waker = waker.lock().unwrap(); - waker.wake_by_ref(); - }); + } else { + let when = self.when; + let waker = Arc::new(Mutex::new(cx.waker().clone())); + self.waker = Some(waker.clone()); + + // This is the first time `poll` is called, spawn the timer thread. + thread::spawn(move || { + let now = Instant::now(); + + if now < when { + thread::sleep(when - now); + } + + // The duration has elapsed. Notify the caller by invoking + // the waker. + let waker = waker.lock().unwrap(); + waker.wake_by_ref(); + }); + } + + // By now, the waker is stored and the timer thread is started. + // The duration has not elapsed (recall that we checked for this + // first thing), ergo the future has not completed so we must + // return `Poll::Pending`. + // + // The `Future` trait contract requires that when `Pending` is + // returned, the future ensures that the given waker is signalled + // once the future should be polled again. In our case, by + // returning `Pending` here, we are promising that we will + // invoke the given waker included in the `Context` argument + // once the requested duration has elapsed. We ensure this by + // spawning the timer thread above. + // + // If we forget to invoke the waker, the task will hang + // indefinitely. + Poll::Pending } - - // By now, the waker is stored and the timer thread is started. - // The duration has not elapsed (recall that we checked for this - // first thing), ergo the future has not completed so we must - // return `Poll::Pending`. - // - // The `Future` trait contract requires that when `Pending` is - // returned, the future ensures that the given waker is signalled - // once the future should be polled again. In our case, by - // returning `Pending` here, we are promising that we will - // invoke the given waker included in the `Context` argument - // once the requested duration has elapsed. We ensure this by - // spawning the timer thread above. - // - // If we forget to invoke the waker, the task will hang - // indefinitely. - Poll::Pending } } ```