深度分析Rust异步(翻译)

曾经编写过一篇关于 Rust 异步的文章,以非常肤浅的视角对 Rust 的异步原理进行了描述。现在将 Tokio 官方教程的其中一篇进行翻译,其中非常深入地讲解了 Rust 异步的原理。
原文链接:https://tokio.rs/tokio/tutorial/async
文章内容:
至此,我们已经完成了对异步 Rust 和 Tokio 的相当全面的介绍。现在我们将深入研究 Rust 的异步运行时模型。在本教程的一开始,我们暗示异步 Rust 采用了一种独特的方法。现在,我们来解释一下这意味着什么。

Future

作为快速回顾,我们来看一个非常基本的异步函数。与本教程迄今为止介绍的内容相比,这并不是什么新内容。

1
2
3
4
5
6
7
use tokio::net::TcpStream;

async fn my_async_fn() {
println!("hello from async");
let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap();
println!("async TCP operation complete");
}

我们调用该函数,它返回一些值。我们在这个值上调用 .await 。

1
2
3
4
5
6
7
8
#[tokio::main]
async fn main() {
let what_is_this = my_async_fn();
// Nothing has been printed yet.
what_is_this.await;
// Text has been printed and socket has been
// established and closed.
}

my_async_fn() 返回的值是一个 Future。Future 是一个实现标准库提供的 std::future::Future trait 的值。它们是包含正在进行的异步计算的值。
std::future::Future trait 的定义是:

1
2
3
4
5
6
7
8
9
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
type Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;
}

关联类型 Output 是 Future 完成后产生的类型。Pin 类型是 Rust 用来在 async 函数中支持借用的方法。有关 Pin 的更多详细信息,请参阅标准库文档。
与其他语言中 Future 的实现方式不同,Rust Future 并不代表在后台发生的计算,相反,Rust Future 就是计算本身。Future 的所有者负责通过轮询 Future 来推进计算。这是通过调用 Future::poll 来完成的。

实现 Future

让我们实现一个非常简单的 Future 。这个 Future 将:

  1. 等待到特定的时间点。
  2. 输出一些文本到 STDOUT。
  3. 产生一个字符串。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    use std::future::Future;
    use std::pin::Pin;
    use std::task::{Context, Poll};
    use std::time::{Duration, Instant};

    struct Delay {
    when: Instant,
    }

    impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
    -> Poll<&'static str>
    {
    if Instant::now() >= self.when {
    println!("Hello world");
    Poll::Ready("done")
    } else {
    // Ignore this line for now.
    cx.waker().wake_by_ref();
    Poll::Pending
    }
    }
    }

    #[tokio::main]
    async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
    }

    async fn 作为 Future

    在 main 函数中,我们实例化 Future 并对它调用 .await 。从异步函数中,我们可以对任何实现了 Future 的值调用 .await 。反过来,调用 async 函数会返回一个实现了 Future 的匿名类型。在 async fn main() 的情况下,生成的 Future 大致如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    use std::future::Future;
    use std::pin::Pin;
    use std::task::{Context, Poll};
    use std::time::{Duration, Instant};

    enum MainFuture {
    // Initialized, never polled
    State0,
    // Waiting on `Delay`, i.e. the `future.await` line.
    State1(Delay),
    // The future has completed.
    Terminated,
    }

    impl Future for MainFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
    -> Poll<()>
    {
    use MainFuture::*;

    loop {
    match *self {
    State0 => {
    let when = Instant::now() +
    Duration::from_millis(10);
    let future = Delay { when };
    *self = State1(future);
    }
    State1(ref mut my_future) => {
    match Pin::new(my_future).poll(cx) {
    Poll::Ready(out) => {
    assert_eq!(out, "done");
    *self = Terminated;
    return Poll::Ready(());
    }
    Poll::Pending => {
    return Poll::Pending;
    }
    }
    }
    Terminated => {
    panic!("future polled after completion")
    }
    }
    }
    }
    }
    Rust Future 是状态机。在这里,MainFuture 是表示 Future 的可能状态之一的 enum 。Future 从状态 State0 开始。当 poll 被调用时,Future 会尝试尽可能地推进其内部状态。如果 Future 能够完成,则返回包含异步计算输出的 Poll::Ready。
    如果 Future 无法完成,通常是由于它等待的资源尚未准备好,则返回 Poll::Pending 。调用者收到 Poll::Pending 表明 Future 将在稍后完成,调用者应稍后再次调用 poll 。
    我们还看到 Future 是由其他 Future 组成的。对外部 Future 调用 poll 会导致调用内部 Future 的 poll 函数。

    执行器

    异步 Rust 函数返回 Future。 必须对 Future 调用 poll 来推进其状态。Future 由其他 Future 组成。那么,问题是,什么对最外层 Future 调用了 poll ?
    回想一下前面的内容,要运行异步函数,它们必须被传递给 tokio::spawn 或成为用 #[tokio::main] 注释的主函数。这会导致将生成的最外层 Future 被提交给 Tokio 执行器。执行器负责对最外层 Future 调用 Future::poll ,推动异步计算完成。

    迷你 Tokio

    为了更好地理解这一切是如何结合在一起的,让我们实现我们自己的 Tokio 最小版本!完整代码可以在 这里 找到。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    use std::collections::VecDeque;
    use std::future::Future;
    use std::pin::Pin;
    use std::task::{Context, Poll};
    use std::time::{Duration, Instant};
    use futures::task;

    fn main() {
    let mut mini_tokio = MiniTokio::new();

    mini_tokio.spawn(async {
    let when = Instant::now() + Duration::from_millis(10);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
    });

    mini_tokio.run();
    }

    struct MiniTokio {
    tasks: VecDeque<Task>,
    }

    type Task = Pin<Box<dyn Future<Output = ()> + Send>>;

    impl MiniTokio {
    fn new() -> MiniTokio {
    MiniTokio {
    tasks: VecDeque::new(),
    }
    }

    /// Spawn a future onto the mini-tokio instance.
    fn spawn<F>(&mut self, future: F)
    where
    F: Future<Output = ()> + Send + 'static,
    {
    self.tasks.push_back(Box::pin(future));
    }

    fn run(&mut self) {
    let waker = task::noop_waker();
    let mut cx = Context::from_waker(&waker);

    while let Some(mut task) = self.tasks.pop_front() {
    if task.as_mut().poll(&mut cx).is_pending() {
    self.tasks.push_back(task);
    }
    }
    }
    }
    这将运行异步块。将创建一个具有请求的延迟的 Delay 实例并等待它。但是,到目前为止,我们的实现存在一个重大缺陷。我们的执行器从不进入休眠状态。执行器不断循环所有生成的 Future 并轮询它们。大多数情况下,Future 尚未准备好执行更多工作并将再次返回 Poll::Pending 。该过程将消耗 CPU 周期,并且通常效率不高。
    理想情况下,我们希望 mini-tokio 仅在 Future 能够取得进展时轮询 Future 。当任务被阻止的资源准备好执行请求的操作时,就会发生这种情况。如果任务想要从 TCP 套接字读取数据,那么我们只想在 TCP 套接字收到数据时轮询任务。在我们的例子中,任务在给定的 Instant 值到达前阻塞。理想情况下,mini-tokio 只会在时间过去后轮询任务。
    为了实现这一点,当轮询某个资源并且该资源尚未准备好时,该资源一旦转换为就绪状态就会发送通知。

    唤醒器

    唤醒器是缺失的部分。唤醒器是资源能够通知等待任务的系统,资源已准备好继续执行某些操作。
    我们再来看一下 Future::poll 定义:
    1
    2
    fn poll(self: Pin<&mut Self>, cx: &mut Context)
    -> Poll<Self::Output>;
    Context 参数有一个 waker() 方法。此方法返回绑定当前任务的唤醒器。唤醒器有一个 wake() 方法。调用此方法向执行器发出信号,表示应安排执行相关任务。资源在转换为就绪状态时调用 wake(),以通知执行器轮询任务将能够取得进展。

    更新 Delay

    我们可以更新Delay以使用唤醒程序:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    use std::future::Future;
    use std::pin::Pin;
    use std::task::{Context, Poll};
    use std::time::{Duration, Instant};
    use std::thread;

    struct Delay {
    when: Instant,
    }

    impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
    -> Poll<&'static str>
    {
    if Instant::now() >= self.when {
    println!("Hello world");
    Poll::Ready("done")
    } else {
    // Get a handle to the waker for the current task
    let waker = cx.waker().clone();
    let when = self.when;

    // Spawn a timer thread.
    thread::spawn(move || {
    let now = Instant::now();

    if now < when {
    thread::sleep(when - now);
    }

    waker.wake();
    });

    Poll::Pending
    }
    }
    }
    现在,一旦请求的持续时间过去,就会通知调用任务,并且执行器可以确保再次安排该任务。下一步是更新 mini-tokio 以监听唤醒通知。
    我们的 Delay 实现中仍存在一些问题。我们稍后会修复它们。
    当 Future 返回 Poll::Pending 时,它必须确保唤醒程序在某个时刻收到信号。忘记这样做会导致任务无限期挂起。
    返回 Poll::Pending 后忘记唤醒任务是错误的常见来源。
    回想一下 Delay 的第一次迭代。以下是 Future 的实现:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    impl Future for Delay {
    type Output = &'static str;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
    -> Poll<&'static str>
    {
    if Instant::now() >= self.when {
    println!("Hello world");
    Poll::Ready("done")
    } else {
    // Ignore this line for now.
    cx.waker().wake_by_ref();
    Poll::Pending
    }
    }
    }
    在返回 Poll::Pending 之前,我们调用 cx.waker().wake_by_ref()。这是为了满足 Future 约定。通过返回Poll::Pending,我们负责向唤醒程序发出信号。因为我们还没有实现计时器线程,所以我们以内联方式向唤醒程序发出信号。这样做会导致 Future 立即被重新调度,再次执行,并且可能无法完成。
    请注意,您可以比需要更频繁地向唤醒器发出信号。在此特定情况下,即使我们根本没有准备好继续操作,我们也会向唤醒器发出信号。除了浪费一些 CPU 周期外,这样做没有任何问题。但是,这种特定实现会导致繁忙循环。

    更新迷你 Tokio

    下一步是更新 Mini Tokio 以接收唤醒器通知。我们希望执行器仅在被唤醒时运行任务,为此,Mini Tokio 将提供自己的唤醒器。当唤醒器被调用时,其相关任务将排队等待执行。Mini-Tokio 在轮询 Future 时将此唤醒器传递给 Future 。
    更新后的 Mini Tokio 将使用通道来存储已调度的任务。通道允许将任务排队以从任何线程执行。唤醒器必须是Send 和 Sync 。
    Send 和 Sync trait 是 Rust 提供的与并发相关的标记 trait 。Send 是可以发送到不同线程的类型 。大多数类型是 Send,但类似于 Rc 的类型不是 。Sync 是可以通过不可变引用并发访问的类型。 类型可以是 Send 但不是 Sync 的 - Cell 是一个很好的例子 ,它可以通过不可变引用进行修改,因此并发访问不安全。
    更多详细信息,请参阅《Rust编程语言》一书中的相关章节
    更新 MiniTokio 结构。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    use std::sync::mpsc;
    use std::sync::Arc;

    struct MiniTokio {
    scheduled: mpsc::Receiver<Arc<Task>>,
    sender: mpsc::Sender<Arc<Task>>,
    }

    struct Task {
    // This will be filled in soon.
    }
    唤醒器是 Sync 并且可以被克隆的。当 wake 被调用时,必须安排任务执行。为了实现这一点,我们有一个通道。当在唤醒器上调用 wake() 时,任务被推入通道的发送部分。我们的 Task 结构将实现唤醒逻辑。为此,它需要包含生成的 Future 和通道发送部分。我们将 Future 放在 TaskFuture 结构中,旁边是 Poll 枚举,以跟踪最新的 Future::poll() 结果,这对于处理虚假唤醒是必要的。在 TaskFuture 的 poll() 方法实现中给出了更多详细信息。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    use std::sync::{Arc, Mutex};

    /// A structure holding a future and the result of
    /// the latest call to its `poll` method.
    struct TaskFuture {
    future: Pin<Box<dyn Future<Output = ()> + Send>>,
    poll: Poll<()>,
    }

    struct Task {
    // The `Mutex` is to make `Task` implement `Sync`. Only
    // one thread accesses `task_future` at any given time.
    // The `Mutex` is not required for correctness. Real Tokio
    // does not use a mutex here, but real Tokio has
    // more lines of code than can fit in a single tutorial
    // page.
    task_future: Mutex<TaskFuture>,
    executor: mpsc::Sender<Arc<Task>>,
    }

    impl Task {
    fn schedule(self: &Arc<Self>) {
    self.executor.send(self.clone());
    }
    }
    要安排任务,需要克隆 Arc 并通过通道发送。现在,我们需要使用 std::task::Waker 来给我们的 schedule 函数下钩子。标准库提供了一个低级 API,可以使用手动 vtable 构造来执行此操作。此策略为实现者提供了最大的灵活性,但需要一堆不安全的样板代码。 我们将使用 futures crate 提供的 ArcWake 实用程序,而不是直接使用 RawWakerVTable 。这使我们能够实现一个简单的 trait ,便将我们的 Task 结构暴露为唤醒器。
    将以下依赖项添加到您的 Cargo.toml 中以拉取 futures 。
    1
    futures = "0.3"
    然后实现 futures::task::ArcWake 。
    1
    2
    3
    4
    5
    6
    7
    use futures::task::{self, ArcWake};
    use std::sync::Arc;
    impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
    arc_self.schedule();
    }
    }
    当上面的定时器线程调用 waker.wake() 时,任务就被推送到通道中。接下来我们在 MiniTokio::run() 函数中实现接收并执行任务。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    impl MiniTokio {
    fn run(&self) {
    while let Ok(task) = self.scheduled.recv() {
    task.poll();
    }
    }

    /// Initialize a new mini-tokio instance.
    fn new() -> MiniTokio {
    let (sender, scheduled) = mpsc::channel();

    MiniTokio { scheduled, sender }
    }

    /// Spawn a future onto the mini-tokio instance.
    ///
    /// The given future is wrapped with the `Task` harness and pushed into the
    /// `scheduled` queue. The future will be executed when `run` is called.
    fn spawn<F>(&self, future: F)
    where
    F: Future<Output = ()> + Send + 'static,
    {
    Task::spawn(future, &self.sender);
    }
    }

    impl TaskFuture {
    fn new(future: impl Future<Output = ()> + Send + 'static) -> TaskFuture {
    TaskFuture {
    future: Box::pin(future),
    poll: Poll::Pending,
    }
    }

    fn poll(&mut self, cx: &mut Context<'_>) {
    // Spurious wake-ups are allowed, even after a future has
    // returned `Ready`. However, polling a future which has
    // already returned `Ready` is *not* allowed. For this
    // reason we need to check that the future is still pending
    // before we call it. Failure to do so can lead to a panic.
    if self.poll.is_pending() {
    self.poll = self.future.as_mut().poll(cx);
    }
    }
    }

    impl Task {
    fn poll(self: Arc<Self>) {
    // Create a waker from the `Task` instance. This
    // uses the `ArcWake` impl from above.
    let waker = task::waker(self.clone());
    let mut cx = Context::from_waker(&waker);

    // No other thread ever tries to lock the task_future
    let mut task_future = self.task_future.try_lock().unwrap();

    // Poll the inner future
    task_future.poll(&mut cx);
    }

    // Spawns a new task with the given future.
    //
    // Initializes a new Task harness containing the given future and pushes it
    // onto `sender`. The receiver half of the channel will get the task and
    // execute it.
    fn spawn<F>(future: F, sender: &mpsc::Sender<Arc<Task>>)
    where
    F: Future<Output = ()> + Send + 'static,
    {
    let task = Arc::new(Task {
    task_future: Mutex::new(TaskFuture::new(future)),
    executor: sender.clone(),
    });

    let _ = sender.send(task);
    }
    }
    这里发生了多件事。首先,实现了 MiniTokio::run() 。该函数循环运行,从通道接收已计划任务。由于任务在被唤醒时被推送到通道中,因此这些任务在执行时能够取得进展。
    此外,MiniTokio::new() 和 MiniTokio::spawn() 函数被调整为使用通道而不是 VecDeque 。当新任务产生时,它们会被赋予通道发送方部分的克隆,任务可以使用该克隆在运行时安排自身。
    Task::poll() 函数使用来自 futures crate 的 ArcWake 实用程序创建唤醒程序。唤醒程序被用于创建 task::Context 。该 task::Context 将传递给 poll 。

    总结

    现在,我们已经看到了异步 Rust 工作原理的端到端示例。Rust 的 async/await 功能由 trait 支持。这允许第三方 crate(如 Tokio)提供执行细节。
  • 异步 Rust 操作是惰性的,需要调用者对其进行轮询。
  • 唤醒者被传递给 Future ,以将 Future 与调用它的 Task 联系起来。
  • 当资源尚未准备好完成操作时,将返回 Poll::Pending 并记录任务的唤醒者。
  • 当资源准备就绪时,任务的唤醒者会收到通知。
  • 执行器收到通知就安排任务执行。
  • 再次轮询任务,这一次资源已准备就绪并且任务取得进展。

    一些悬而未决的问题

    回想一下,当我们实现 Delay Future 时,我们说过还有一些事情需要解决。Rust 的异步模型允许单个 Future 在执行时跨任务迁移。考虑以下内容:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    use futures::future::poll_fn;
    use std::future::Future;
    use std::pin::Pin;

    #[tokio::main]
    async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let mut delay = Some(Delay { when });

    poll_fn(move |cx| {
    let mut delay = delay.take().unwrap();
    let res = Pin::new(&mut delay).poll(cx);
    assert!(res.is_pending());
    tokio::spawn(async move {
    delay.await;
    });

    Poll::Ready(())
    }).await;
    }
    该 poll_fn 函数使用闭包创建一个 Future 实例。上面的代码片段创建一个 Delay 实例,轮询它一次,然后将该 Delay 实例发送到正在等待的新任务中。在此示例中,使用不同的 Waker 实例多次调用 Delay::poll 。发生这种情况时,您必须确保在最近调用的 poll 中传入的 Waker 上调用 wake 。
    在实现 Future 时,每次调用 poll 都可以提供不同的 Waker 实例是个至关重要的假设。poll 函数必须用新的唤醒器更新任何先前记录的唤醒器。
    我们之前的 Delay 实现每次轮询时都会生成一个新线程。这很好,但如果轮询过于频繁,效率会非常低下(例如,如果您 select! 轮询该 Future 和其他 Future ,则只要其中一个发生事件,就会轮询两者)。解决此问题的一种方法是记住您是否已经生成了线程,并且仅在尚未生成线程的情况下才生成新线程。但是,如果您这样做,则必须确保线程的 Waker 在以后的轮询调用中更新,否则您将无法唤醒最近的 Waker。
    为了修复我们之前的实现,我们可以做这样的事情:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    use std::future::Future;
    use std::pin::Pin;
    use std::sync::{Arc, Mutex};
    use std::task::{Context, Poll, Waker};
    use std::thread;
    use std::time::{Duration, Instant};

    struct Delay {
    when: Instant,
    // This is Some when we have spawned a thread, and None otherwise.
    waker: Option<Arc<Mutex<Waker>>>,
    }

    impl Future for Delay {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
    // Check the current instant. If the duration has elapsed, then
    // this future has completed so we return `Poll::Ready`.
    if Instant::now() >= self.when {
    return Poll::Ready(());
    }

    // The duration has not elapsed. If this is the first time the future
    // is called, spawn the timer thread. If the timer thread is already
    // running, ensure the stored `Waker` matches the current task's waker.
    if let Some(waker) = &self.waker {
    let mut waker = waker.lock().unwrap();

    // Check if the stored waker matches the current task's waker.
    // This is necessary as the `Delay` future instance may move to
    // a different task between calls to `poll`. If this happens, the
    // waker contained by the given `Context` will differ and we
    // must update our stored waker to reflect this change.
    if !waker.will_wake(cx.waker()) {
    *waker = cx.waker().clone();
    }
    } else {
    let when = self.when;
    let waker = Arc::new(Mutex::new(cx.waker().clone()));
    self.waker = Some(waker.clone());

    // This is the first time `poll` is called, spawn the timer thread.
    thread::spawn(move || {
    let now = Instant::now();

    if now < when {
    thread::sleep(when - now);
    }

    // The duration has elapsed. Notify the caller by invoking
    // the waker.
    let waker = waker.lock().unwrap();
    waker.wake_by_ref();
    });
    }

    // By now, the waker is stored and the timer thread is started.
    // The duration has not elapsed (recall that we checked for this
    // first thing), ergo the future has not completed so we must
    // return `Poll::Pending`.
    //
    // The `Future` trait contract requires that when `Pending` is
    // returned, the future ensures that the given waker is signalled
    // once the future should be polled again. In our case, by
    // returning `Pending` here, we are promising that we will
    // invoke the given waker included in the `Context` argument
    // once the requested duration has elapsed. We ensure this by
    // spawning the timer thread above.
    //
    // If we forget to invoke the waker, the task will hang
    // indefinitely.
    Poll::Pending
    }
    }
    这有点复杂,但其理念是,每次调用 poll 时,Future 都会检查提供的唤醒器是否与之前记录的唤醒器匹配。如果两个唤醒器匹配,则无需执行其他操作。如果它们不匹配,则必须更新记录的唤醒器。

    Notify 实用程序

    我们演示了 Delay 如何使用唤醒程序手动实现 Future 。唤醒程序是异步 Rust 工作的基础。通常,没有必要降低到那个级别。例如,在 Delay 的情况下,我们可以通过使用 tokio::sync::Notify 实用程序完全利用 async/await 实现它。此实用程序提供了一种基本的任务通知机制。它处理唤醒程序的细节,包括确保记录的唤醒程序与当前任务匹配。
    使用 Notify,我们可以使用 async/await 实现如下 delay 函数:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    use tokio::sync::Notify;
    use std::sync::Arc;
    use std::time::{Duration, Instant};
    use std::thread;

    async fn delay(dur: Duration) {
    let when = Instant::now() + dur;
    let notify = Arc::new(Notify::new());
    let notify_clone = notify.clone();

    thread::spawn(move || {
    let now = Instant::now();

    if now < when {
    thread::sleep(when - now);
    }

    notify_clone.notify_one();
    });


    notify.notified().await;
    }