#[tokio::main] asyncfnmain() { letwhat_is_this = my_async_fn(); // Nothing has been printed yet. what_is_this.await; // Text has been printed and socket has been // established and closed. }
use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant};
enumMainFuture { // Initialized, never polled State0, // Waiting on `Delay`, i.e. the `future.await` line. State1(Delay), // The future has completed. Terminated, }
implFutureforMainFuture { typeOutput = ();
fnpoll(mutself: Pin<&mutSelf>, cx: &mut Context<'_>) -> Poll<()> { use MainFuture::*;
use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use futures::task;
use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use std::thread;
structDelay { when: Instant, }
implFutureforDelay { typeOutput = &'staticstr;
fnpoll(self: Pin<&mutSelf>, cx: &mut Context<'_>) -> Poll<&'staticstr> { if Instant::now() >= self.when { println!("Hello world"); Poll::Ready("done") } else { // Get a handle to the waker for the current task letwaker = cx.waker().clone(); letwhen = self.when;
/// A structure holding a future and the result of /// the latest call to its `poll` method. structTaskFuture { future: Pin<Box<dyn Future<Output = ()> + Send>>, poll: Poll<()>, }
structTask { // The `Mutex` is to make `Task` implement `Sync`. Only // one thread accesses `task_future` at any given time. // The `Mutex` is not required for correctness. Real Tokio // does not use a mutex here, but real Tokio has // more lines of code than can fit in a single tutorial // page. task_future: Mutex<TaskFuture>, executor: mpsc::Sender<Arc<Task>>, }
/// Initialize a new mini-tokio instance. fnnew() -> MiniTokio { let (sender, scheduled) = mpsc::channel();
MiniTokio { scheduled, sender } }
/// Spawn a future onto the mini-tokio instance. /// /// The given future is wrapped with the `Task` harness and pushed into the /// `scheduled` queue. The future will be executed when `run` is called. fnspawn<F>(&self, future: F) where F: Future<Output = ()> + Send + 'static, { Task::spawn(future, &self.sender); } }
fnpoll(&mutself, cx: &mut Context<'_>) { // Spurious wake-ups are allowed, even after a future has // returned `Ready`. However, polling a future which has // already returned `Ready` is *not* allowed. For this // reason we need to check that the future is still pending // before we call it. Failure to do so can lead to a panic. ifself.poll.is_pending() { self.poll = self.future.as_mut().poll(cx); } } }
implTask { fnpoll(self: Arc<Self>) { // Create a waker from the `Task` instance. This // uses the `ArcWake` impl from above. letwaker = task::waker(self.clone()); letmut cx = Context::from_waker(&waker);
// No other thread ever tries to lock the task_future letmut task_future = self.task_future.try_lock().unwrap();
// Poll the inner future task_future.poll(&mut cx); }
// Spawns a new task with the given future. // // Initializes a new Task harness containing the given future and pushes it // onto `sender`. The receiver half of the channel will get the task and // execute it. fnspawn<F>(future: F, sender: &mpsc::Sender<Arc<Task>>) where F: Future<Output = ()> + Send + 'static, { lettask = Arc::new(Task { task_future: Mutex::new(TaskFuture::new(future)), executor: sender.clone(), });
use std::future::Future; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; use std::thread; use std::time::{Duration, Instant};
structDelay { when: Instant, // This is Some when we have spawned a thread, and None otherwise. waker: Option<Arc<Mutex<Waker>>>, }
implFutureforDelay { typeOutput = ();
fnpoll(mutself: Pin<&mutSelf>, cx: &mut Context<'_>) -> Poll<()> { // Check the current instant. If the duration has elapsed, then // this future has completed so we return `Poll::Ready`. if Instant::now() >= self.when { return Poll::Ready(()); }
// The duration has not elapsed. If this is the first time the future // is called, spawn the timer thread. If the timer thread is already // running, ensure the stored `Waker` matches the current task's waker. ifletSome(waker) = &self.waker { letmut waker = waker.lock().unwrap();
// Check if the stored waker matches the current task's waker. // This is necessary as the `Delay` future instance may move to // a different task between calls to `poll`. If this happens, the // waker contained by the given `Context` will differ and we // must update our stored waker to reflect this change. if !waker.will_wake(cx.waker()) { *waker = cx.waker().clone(); } } else { letwhen = self.when; letwaker = Arc::new(Mutex::new(cx.waker().clone())); self.waker = Some(waker.clone());
// This is the first time `poll` is called, spawn the timer thread. thread::spawn(move || { letnow = Instant::now();
if now < when { thread::sleep(when - now); }
// The duration has elapsed. Notify the caller by invoking // the waker. letwaker = waker.lock().unwrap(); waker.wake_by_ref(); }); }
// By now, the waker is stored and the timer thread is started. // The duration has not elapsed (recall that we checked for this // first thing), ergo the future has not completed so we must // return `Poll::Pending`. // // The `Future` trait contract requires that when `Pending` is // returned, the future ensures that the given waker is signalled // once the future should be polled again. In our case, by // returning `Pending` here, we are promising that we will // invoke the given waker included in the `Context` argument // once the requested duration has elapsed. We ensure this by // spawning the timer thread above. // // If we forget to invoke the waker, the task will hang // indefinitely. Poll::Pending } }