深度分析Rust异步(翻译)
曾经编写过一篇关于 Rust 异步的文章,以非常肤浅的视角对 Rust 的异步原理进行了描述。现在将 Tokio 官方教程的其中一篇进行翻译,其中非常深入地讲解了 Rust 异步的原理。
原文链接:https://tokio.rs/tokio/tutorial/async
文章内容:
至此,我们已经完成了对异步 Rust 和 Tokio 的相当全面的介绍。现在我们将深入研究 Rust 的异步运行时模型。在本教程的一开始,我们暗示异步 Rust 采用了一种独特的方法。现在,我们来解释一下这意味着什么。
Future
作为快速回顾,我们来看一个非常基本的异步函数。与本教程迄今为止介绍的内容相比,这并不是什么新内容。
1 | use tokio::net::TcpStream; |
我们调用该函数,它返回一些值。我们在这个值上调用 .await 。
1 |
|
my_async_fn() 返回的值是一个 Future。Future 是一个实现标准库提供的 std::future::Future trait 的值。它们是包含正在进行的异步计算的值。
std::future::Future trait 的定义是:
1 | use std::pin::Pin; |
关联类型 Output 是 Future 完成后产生的类型。Pin 类型是 Rust 用来在 async 函数中支持借用的方法。有关 Pin 的更多详细信息,请参阅标准库文档。
与其他语言中 Future 的实现方式不同,Rust Future 并不代表在后台发生的计算,相反,Rust Future 就是计算本身。Future 的所有者负责通过轮询 Future 来推进计算。这是通过调用 Future::poll 来完成的。
实现 Future
让我们实现一个非常简单的 Future 。这个 Future 将:
- 等待到特定的时间点。
- 输出一些文本到 STDOUT。
- 产生一个字符串。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
}async fn 作为 Future
在 main 函数中,我们实例化 Future 并对它调用 .await 。从异步函数中,我们可以对任何实现了 Future 的值调用 .await 。反过来,调用 async 函数会返回一个实现了 Future 的匿名类型。在 async fn main() 的情况下,生成的 Future 大致如下:Rust Future 是状态机。在这里,MainFuture 是表示 Future 的可能状态之一的 enum 。Future 从状态 State0 开始。当 poll 被调用时,Future 会尝试尽可能地推进其内部状态。如果 Future 能够完成,则返回包含异步计算输出的 Poll::Ready。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
enum MainFuture {
// Initialized, never polled
State0,
// Waiting on `Delay`, i.e. the `future.await` line.
State1(Delay),
// The future has completed.
Terminated,
}
impl Future for MainFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<()>
{
use MainFuture::*;
loop {
match *self {
State0 => {
let when = Instant::now() +
Duration::from_millis(10);
let future = Delay { when };
*self = State1(future);
}
State1(ref mut my_future) => {
match Pin::new(my_future).poll(cx) {
Poll::Ready(out) => {
assert_eq!(out, "done");
*self = Terminated;
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
}
}
}
Terminated => {
panic!("future polled after completion")
}
}
}
}
}
如果 Future 无法完成,通常是由于它等待的资源尚未准备好,则返回 Poll::Pending 。调用者收到 Poll::Pending 表明 Future 将在稍后完成,调用者应稍后再次调用 poll 。
我们还看到 Future 是由其他 Future 组成的。对外部 Future 调用 poll 会导致调用内部 Future 的 poll 函数。执行器
异步 Rust 函数返回 Future。 必须对 Future 调用 poll 来推进其状态。Future 由其他 Future 组成。那么,问题是,什么对最外层 Future 调用了 poll ?
回想一下前面的内容,要运行异步函数,它们必须被传递给 tokio::spawn 或成为用 #[tokio::main] 注释的主函数。这会导致将生成的最外层 Future 被提交给 Tokio 执行器。执行器负责对最外层 Future 调用 Future::poll ,推动异步计算完成。迷你 Tokio
为了更好地理解这一切是如何结合在一起的,让我们实现我们自己的 Tokio 最小版本!完整代码可以在 这里 找到。这将运行异步块。将创建一个具有请求的延迟的 Delay 实例并等待它。但是,到目前为止,我们的实现存在一个重大缺陷。我们的执行器从不进入休眠状态。执行器不断循环所有生成的 Future 并轮询它们。大多数情况下,Future 尚未准备好执行更多工作并将再次返回 Poll::Pending 。该过程将消耗 CPU 周期,并且通常效率不高。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;
fn main() {
let mut mini_tokio = MiniTokio::new();
mini_tokio.spawn(async {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
});
mini_tokio.run();
}
struct MiniTokio {
tasks: VecDeque<Task>,
}
type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
impl MiniTokio {
fn new() -> MiniTokio {
MiniTokio {
tasks: VecDeque::new(),
}
}
/// Spawn a future onto the mini-tokio instance.
fn spawn<F>(&mut self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.tasks.push_back(Box::pin(future));
}
fn run(&mut self) {
let waker = task::noop_waker();
let mut cx = Context::from_waker(&waker);
while let Some(mut task) = self.tasks.pop_front() {
if task.as_mut().poll(&mut cx).is_pending() {
self.tasks.push_back(task);
}
}
}
}
理想情况下,我们希望 mini-tokio 仅在 Future 能够取得进展时轮询 Future 。当任务被阻止的资源准备好执行请求的操作时,就会发生这种情况。如果任务想要从 TCP 套接字读取数据,那么我们只想在 TCP 套接字收到数据时轮询任务。在我们的例子中,任务在给定的 Instant 值到达前阻塞。理想情况下,mini-tokio 只会在时间过去后轮询任务。
为了实现这一点,当轮询某个资源并且该资源尚未准备好时,该资源一旦转换为就绪状态就会发送通知。唤醒器
唤醒器是缺失的部分。唤醒器是资源能够通知等待任务的系统,资源已准备好继续执行某些操作。
我们再来看一下 Future::poll 定义:Context 参数有一个 waker() 方法。此方法返回绑定当前任务的唤醒器。唤醒器有一个 wake() 方法。调用此方法向执行器发出信号,表示应安排执行相关任务。资源在转换为就绪状态时调用 wake(),以通知执行器轮询任务将能够取得进展。1
2fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;更新 Delay
我们可以更新Delay以使用唤醒程序:现在,一旦请求的持续时间过去,就会通知调用任务,并且执行器可以确保再次安排该任务。下一步是更新 mini-tokio 以监听唤醒通知。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Get a handle to the waker for the current task
let waker = cx.waker().clone();
let when = self.when;
// Spawn a timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
waker.wake();
});
Poll::Pending
}
}
}
我们的 Delay 实现中仍存在一些问题。我们稍后会修复它们。
当 Future 返回 Poll::Pending 时,它必须确保唤醒程序在某个时刻收到信号。忘记这样做会导致任务无限期挂起。
返回 Poll::Pending 后忘记唤醒任务是错误的常见来源。
回想一下 Delay 的第一次迭代。以下是 Future 的实现:在返回 Poll::Pending 之前,我们调用 cx.waker().wake_by_ref()。这是为了满足 Future 约定。通过返回Poll::Pending,我们负责向唤醒程序发出信号。因为我们还没有实现计时器线程,所以我们以内联方式向唤醒程序发出信号。这样做会导致 Future 立即被重新调度,再次执行,并且可能无法完成。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
请注意,您可以比需要更频繁地向唤醒器发出信号。在此特定情况下,即使我们根本没有准备好继续操作,我们也会向唤醒器发出信号。除了浪费一些 CPU 周期外,这样做没有任何问题。但是,这种特定实现会导致繁忙循环。更新迷你 Tokio
下一步是更新 Mini Tokio 以接收唤醒器通知。我们希望执行器仅在被唤醒时运行任务,为此,Mini Tokio 将提供自己的唤醒器。当唤醒器被调用时,其相关任务将排队等待执行。Mini-Tokio 在轮询 Future 时将此唤醒器传递给 Future 。
更新后的 Mini Tokio 将使用通道来存储已调度的任务。通道允许将任务排队以从任何线程执行。唤醒器必须是Send 和 Sync 。
Send 和 Sync trait 是 Rust 提供的与并发相关的标记 trait 。Send 是可以发送到不同线程的类型 。大多数类型是 Send,但类似于 Rc 的类型不是 。Sync 是可以通过不可变引用并发访问的类型。 类型可以是 Send 但不是 Sync 的 - Cell 是一个很好的例子 ,它可以通过不可变引用进行修改,因此并发访问不安全。
更多详细信息,请参阅《Rust编程语言》一书中的相关章节。
更新 MiniTokio 结构。唤醒器是 Sync 并且可以被克隆的。当 wake 被调用时,必须安排任务执行。为了实现这一点,我们有一个通道。当在唤醒器上调用 wake() 时,任务被推入通道的发送部分。我们的 Task 结构将实现唤醒逻辑。为此,它需要包含生成的 Future 和通道发送部分。我们将 Future 放在 TaskFuture 结构中,旁边是 Poll 枚举,以跟踪最新的 Future::poll() 结果,这对于处理虚假唤醒是必要的。在 TaskFuture 的 poll() 方法实现中给出了更多详细信息。1
2
3
4
5
6
7
8
9
10
11use std::sync::mpsc;
use std::sync::Arc;
struct MiniTokio {
scheduled: mpsc::Receiver<Arc<Task>>,
sender: mpsc::Sender<Arc<Task>>,
}
struct Task {
// This will be filled in soon.
}要安排任务,需要克隆 Arc 并通过通道发送。现在,我们需要使用 std::task::Waker 来给我们的 schedule 函数下钩子。标准库提供了一个低级 API,可以使用手动 vtable 构造来执行此操作。此策略为实现者提供了最大的灵活性,但需要一堆不安全的样板代码。 我们将使用 futures crate 提供的 ArcWake 实用程序,而不是直接使用 RawWakerVTable 。这使我们能够实现一个简单的 trait ,便将我们的 Task 结构暴露为唤醒器。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25use std::sync::{Arc, Mutex};
/// A structure holding a future and the result of
/// the latest call to its `poll` method.
struct TaskFuture {
future: Pin<Box<dyn Future<Output = ()> + Send>>,
poll: Poll<()>,
}
struct Task {
// The `Mutex` is to make `Task` implement `Sync`. Only
// one thread accesses `task_future` at any given time.
// The `Mutex` is not required for correctness. Real Tokio
// does not use a mutex here, but real Tokio has
// more lines of code than can fit in a single tutorial
// page.
task_future: Mutex<TaskFuture>,
executor: mpsc::Sender<Arc<Task>>,
}
impl Task {
fn schedule(self: &Arc<Self>) {
self.executor.send(self.clone());
}
}
将以下依赖项添加到您的 Cargo.toml 中以拉取 futures 。然后实现 futures::task::ArcWake 。1
futures = "0.3"
当上面的定时器线程调用 waker.wake() 时,任务就被推送到通道中。接下来我们在 MiniTokio::run() 函数中实现接收并执行任务。1
2
3
4
5
6
7use futures::task::{self, ArcWake};
use std::sync::Arc;
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.schedule();
}
}这里发生了多件事。首先,实现了 MiniTokio::run() 。该函数循环运行,从通道接收已计划任务。由于任务在被唤醒时被推送到通道中,因此这些任务在执行时能够取得进展。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77impl MiniTokio {
fn run(&self) {
while let Ok(task) = self.scheduled.recv() {
task.poll();
}
}
/// Initialize a new mini-tokio instance.
fn new() -> MiniTokio {
let (sender, scheduled) = mpsc::channel();
MiniTokio { scheduled, sender }
}
/// Spawn a future onto the mini-tokio instance.
///
/// The given future is wrapped with the `Task` harness and pushed into the
/// `scheduled` queue. The future will be executed when `run` is called.
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Task::spawn(future, &self.sender);
}
}
impl TaskFuture {
fn new(future: impl Future<Output = ()> + Send + 'static) -> TaskFuture {
TaskFuture {
future: Box::pin(future),
poll: Poll::Pending,
}
}
fn poll(&mut self, cx: &mut Context<'_>) {
// Spurious wake-ups are allowed, even after a future has
// returned `Ready`. However, polling a future which has
// already returned `Ready` is *not* allowed. For this
// reason we need to check that the future is still pending
// before we call it. Failure to do so can lead to a panic.
if self.poll.is_pending() {
self.poll = self.future.as_mut().poll(cx);
}
}
}
impl Task {
fn poll(self: Arc<Self>) {
// Create a waker from the `Task` instance. This
// uses the `ArcWake` impl from above.
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);
// No other thread ever tries to lock the task_future
let mut task_future = self.task_future.try_lock().unwrap();
// Poll the inner future
task_future.poll(&mut cx);
}
// Spawns a new task with the given future.
//
// Initializes a new Task harness containing the given future and pushes it
// onto `sender`. The receiver half of the channel will get the task and
// execute it.
fn spawn<F>(future: F, sender: &mpsc::Sender<Arc<Task>>)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
task_future: Mutex::new(TaskFuture::new(future)),
executor: sender.clone(),
});
let _ = sender.send(task);
}
}
此外,MiniTokio::new() 和 MiniTokio::spawn() 函数被调整为使用通道而不是 VecDeque 。当新任务产生时,它们会被赋予通道发送方部分的克隆,任务可以使用该克隆在运行时安排自身。
Task::poll() 函数使用来自 futures crate 的 ArcWake 实用程序创建唤醒程序。唤醒程序被用于创建 task::Context 。该 task::Context 将传递给 poll 。总结
现在,我们已经看到了异步 Rust 工作原理的端到端示例。Rust 的 async/await 功能由 trait 支持。这允许第三方 crate(如 Tokio)提供执行细节。
- 异步 Rust 操作是惰性的,需要调用者对其进行轮询。
- 唤醒者被传递给 Future ,以将 Future 与调用它的 Task 联系起来。
- 当资源尚未准备好完成操作时,将返回 Poll::Pending 并记录任务的唤醒者。
- 当资源准备就绪时,任务的唤醒者会收到通知。
- 执行器收到通知就安排任务执行。
- 再次轮询任务,这一次资源已准备就绪并且任务取得进展。
一些悬而未决的问题
回想一下,当我们实现 Delay Future 时,我们说过还有一些事情需要解决。Rust 的异步模型允许单个 Future 在执行时跨任务迁移。考虑以下内容:该 poll_fn 函数使用闭包创建一个 Future 实例。上面的代码片段创建一个 Delay 实例,轮询它一次,然后将该 Delay 实例发送到正在等待的新任务中。在此示例中,使用不同的 Waker 实例多次调用 Delay::poll 。发生这种情况时,您必须确保在最近调用的 poll 中传入的 Waker 上调用 wake 。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let mut delay = Some(Delay { when });
poll_fn(move |cx| {
let mut delay = delay.take().unwrap();
let res = Pin::new(&mut delay).poll(cx);
assert!(res.is_pending());
tokio::spawn(async move {
delay.await;
});
Poll::Ready(())
}).await;
}
在实现 Future 时,每次调用 poll 都可以提供不同的 Waker 实例是个至关重要的假设。poll 函数必须用新的唤醒器更新任何先前记录的唤醒器。
我们之前的 Delay 实现每次轮询时都会生成一个新线程。这很好,但如果轮询过于频繁,效率会非常低下(例如,如果您 select! 轮询该 Future 和其他 Future ,则只要其中一个发生事件,就会轮询两者)。解决此问题的一种方法是记住您是否已经生成了线程,并且仅在尚未生成线程的情况下才生成新线程。但是,如果您这样做,则必须确保线程的 Waker 在以后的轮询调用中更新,否则您将无法唤醒最近的 Waker。
为了修复我们之前的实现,我们可以做这样的事情:这有点复杂,但其理念是,每次调用 poll 时,Future 都会检查提供的唤醒器是否与之前记录的唤醒器匹配。如果两个唤醒器匹配,则无需执行其他操作。如果它们不匹配,则必须更新记录的唤醒器。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
// This is Some when we have spawned a thread, and None otherwise.
waker: Option<Arc<Mutex<Waker>>>,
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// Check the current instant. If the duration has elapsed, then
// this future has completed so we return `Poll::Ready`.
if Instant::now() >= self.when {
return Poll::Ready(());
}
// The duration has not elapsed. If this is the first time the future
// is called, spawn the timer thread. If the timer thread is already
// running, ensure the stored `Waker` matches the current task's waker.
if let Some(waker) = &self.waker {
let mut waker = waker.lock().unwrap();
// Check if the stored waker matches the current task's waker.
// This is necessary as the `Delay` future instance may move to
// a different task between calls to `poll`. If this happens, the
// waker contained by the given `Context` will differ and we
// must update our stored waker to reflect this change.
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
} else {
let when = self.when;
let waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(waker.clone());
// This is the first time `poll` is called, spawn the timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
// The duration has elapsed. Notify the caller by invoking
// the waker.
let waker = waker.lock().unwrap();
waker.wake_by_ref();
});
}
// By now, the waker is stored and the timer thread is started.
// The duration has not elapsed (recall that we checked for this
// first thing), ergo the future has not completed so we must
// return `Poll::Pending`.
//
// The `Future` trait contract requires that when `Pending` is
// returned, the future ensures that the given waker is signalled
// once the future should be polled again. In our case, by
// returning `Pending` here, we are promising that we will
// invoke the given waker included in the `Context` argument
// once the requested duration has elapsed. We ensure this by
// spawning the timer thread above.
//
// If we forget to invoke the waker, the task will hang
// indefinitely.
Poll::Pending
}
}Notify 实用程序
我们演示了 Delay 如何使用唤醒程序手动实现 Future 。唤醒程序是异步 Rust 工作的基础。通常,没有必要降低到那个级别。例如,在 Delay 的情况下,我们可以通过使用 tokio::sync::Notify 实用程序完全利用 async/await 实现它。此实用程序提供了一种基本的任务通知机制。它处理唤醒程序的细节,包括确保记录的唤醒程序与当前任务匹配。
使用 Notify,我们可以使用 async/await 实现如下 delay 函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;
async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
notify_clone.notify_one();
});
notify.notified().await;
}