深入异步
至此,我们已经相当全面地了解了异步 Rust 和 Tokio。现在我们将深入探讨 Rust 的异步运行时模型。在本教程的开头,我们暗示了异步 Rust 采用了一种独特的方法。现在,我们将解释这意味着什么。
Futures
快速回顾一下,让我们来看一个非常基本的异步函数。这与本教程到目前为止所涵盖的内容相比,没有什么新的东西。
use tokio::net::TcpStream;
async fn my_async_fn() {
println!("hello from async");
let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap();
println!("async TCP operation complete");
}
我们调用该函数,它返回一些值。我们对该值调用 .await
。
#[tokio::main]
async fn main() {
let what_is_this = my_async_fn();
// Nothing has been printed yet.
what_is_this.await;
// Text has been printed and socket has been
// established and closed.
}
my_async_fn()
返回的值是一个 future。future 是一个实现了标准库提供的 std::future::Future
trait 的值。它们是包含正在进行的异步计算的值。
std::future::Future
trait 的定义是
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;
}
关联类型 Output
是 future 完成后产生的值的类型。Pin
类型是 Rust 如何在 async
函数中支持借用的方式。有关更多详细信息,请参阅 标准库 文档。
与其他语言中 future 的实现方式不同,Rust future 并不代表在后台发生的计算,而是 Rust future 就是 计算本身。future 的所有者负责通过轮询 future 来推进计算。这通过调用 Future::poll
完成。
实现 Future
让我们实现一个非常简单的 future。这个 future 将会
- 等待直到特定时刻。
- 将一些文本输出到 STDOUT。
- 产生一个字符串。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
}
Async fn 作为一个 Future
在 main 函数中,我们实例化 future 并对其调用 .await
。从 async 函数中,我们可以对任何实现了 Future
的值调用 .await
。反过来,调用 async
函数会返回一个实现了 Future
的匿名类型。对于 async fn main()
的情况,生成的 future 大致如下
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
enum MainFuture {
// Initialized, never polled
State0,
// Waiting on `Delay`, i.e. the `future.await` line.
State1(Delay),
// The future has completed.
Terminated,
}
impl Future for MainFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<()>
{
use MainFuture::*;
loop {
match *self {
State0 => {
let when = Instant::now() +
Duration::from_millis(10);
let future = Delay { when };
*self = State1(future);
}
State1(ref mut my_future) => {
match Pin::new(my_future).poll(cx) {
Poll::Ready(out) => {
assert_eq!(out, "done");
*self = Terminated;
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
}
}
}
Terminated => {
panic!("future polled after completion")
}
}
}
}
}
Rust futures 是状态机。这里,MainFuture
被表示为 future 可能状态的 enum
。future 从 State0
状态开始。当调用 poll
时,future 尝试尽可能地推进其内部状态。如果 future 能够完成,则返回 Poll::Ready
,其中包含异步计算的输出。
如果 future 不能 完成,通常是由于它等待的资源尚未就绪,则返回 Poll::Pending
。接收到 Poll::Pending
表明 future 将在稍后完成,调用者应稍后再次调用 poll
。
我们还看到 futures 由其他 futures 组成。调用外部 future 的 poll
会导致调用内部 future 的 poll
函数。
执行器
异步 Rust 函数返回 futures。Futures 必须调用 poll
才能推进其状态。Futures 由其他 futures 组成。那么,问题是,谁在最外层的 future 上调用 poll
呢?
回想一下之前的内容,要运行异步函数,它们必须传递给 tokio::spawn
或作为用 #[tokio::main]
注释的 main 函数。这会将生成的最外层 future 提交给 Tokio 执行器。执行器负责在最外层 future 上调用 Future::poll
,从而驱动异步计算完成。
Mini Tokio
为了更好地理解这一切是如何组合在一起的,让我们实现我们自己的最小版本 Tokio!完整的代码可以在 这里 找到。
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;
fn main() {
let mut mini_tokio = MiniTokio::new();
mini_tokio.spawn(async {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
});
mini_tokio.run();
}
struct MiniTokio {
tasks: VecDeque<Task>,
}
type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
impl MiniTokio {
fn new() -> MiniTokio {
MiniTokio {
tasks: VecDeque::new(),
}
}
/// Spawn a future onto the mini-tokio instance.
fn spawn<F>(&mut self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.tasks.push_back(Box::pin(future));
}
fn run(&mut self) {
let waker = task::noop_waker();
let mut cx = Context::from_waker(&waker);
while let Some(mut task) = self.tasks.pop_front() {
if task.as_mut().poll(&mut cx).is_pending() {
self.tasks.push_back(task);
}
}
}
}
这会运行异步代码块。创建了一个具有请求延迟的 Delay
实例,并在其上进行了 await 操作。但是,到目前为止,我们的实现存在一个主要的缺陷。我们的执行器永远不会休眠。执行器持续循环所有衍生的 futures 并轮询它们。大多数时候,futures 将无法准备好执行更多工作,并将再次返回 Poll::Pending
。该过程将消耗 CPU 周期,并且通常效率不高。
理想情况下,我们希望 mini-tokio 仅在 future 能够取得进展时才轮询 future。当任务阻塞的资源变为可以执行请求的操作时,就会发生这种情况。如果任务想要从 TCP 套接字读取数据,那么我们只希望在 TCP 套接字收到数据时才轮询该任务。在我们的例子中,任务被阻塞在给定的 Instant
到达时。理想情况下,mini-tokio 只会在该时刻过去后才轮询该任务。
为了实现这一点,当轮询资源并且资源未就绪时,资源将在其转换为就绪状态后发送通知。
Wakers
Wakers 是缺失的部分。这是资源能够通知等待任务资源已变为就绪状态以继续某些操作的系统。
让我们再次查看 Future::poll
的定义
fn poll(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Self::Output>;
poll
的 Context
参数有一个 waker()
方法。此方法返回一个绑定到当前任务的 Waker
。Waker
有一个 wake()
方法。调用此方法会向执行器发出信号,表明应调度关联的任务以进行执行。当资源转换为就绪状态时,资源会调用 wake()
以通知执行器,轮询任务将能够取得进展。
更新 Delay
我们可以更新 Delay
以使用 wakers
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Get a handle to the waker for the current task
let waker = cx.waker().clone();
let when = self.when;
// Spawn a timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
waker.wake();
});
Poll::Pending
}
}
}
现在,一旦请求的持续时间过去,调用任务就会收到通知,并且执行器可以确保再次调度该任务。下一步是更新 mini-tokio 以监听唤醒通知。
我们的 Delay
实现仍然存在一些剩余问题。我们稍后会修复它们。
当 future 返回
Poll::Pending
时,它必须确保在某个时候发出 waker 信号。忘记这样做会导致任务无限期挂起。在返回
Poll::Pending
后忘记唤醒任务是 bug 的常见来源。
回想一下 Delay
的第一个迭代版本。这是 future 实现
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
在返回 Poll::Pending
之前,我们调用了 cx.waker().wake_by_ref()
。这是为了满足 future 约定。通过返回 Poll::Pending
,我们有责任发出 waker 信号。因为我们还没有实现计时器线程,所以我们内联发出了 waker 信号。这样做将导致 future 立即重新调度、再次执行,并且可能尚未准备好完成。
请注意,您被允许比必要时更频繁地发出 waker 信号。在这种特定情况下,即使我们根本没有准备好继续操作,我们也发出了 waker 信号。除了浪费一些 CPU 周期外,这没有什么问题。但是,此特定实现将导致忙循环。
更新 Mini Tokio
下一步是更新 Mini Tokio 以接收 waker 通知。我们希望执行器仅在任务被唤醒时才运行任务,为此,Mini Tokio 将提供自己的 waker。当调用 waker 时,其关联的任务将被排队等待执行。Mini-Tokio 在轮询 future 时将此 waker 传递给 future。
更新后的 Mini Tokio 将使用通道来存储计划任务。通道允许从任何线程将任务排队以进行执行。Wakers 必须是 Send
和 Sync
。
Send
和Sync
traits 是 Rust 提供的与并发相关的标记 trait。可以发送到不同线程的类型是Send
。大多数类型都是Send
,但像Rc
这样的类型不是。可以通过不可变引用并发访问的类型是Sync
。一种类型可以是Send
但不是Sync
— 一个很好的例子是Cell
,它可以通过不可变引用进行修改,因此并发访问是不安全的。有关更多详细信息,请参阅 Rust 书中相关的 章节。
更新 MiniTokio
结构体。
use std::sync::mpsc;
use std::sync::Arc;
struct MiniTokio {
scheduled: mpsc::Receiver<Arc<Task>>,
sender: mpsc::Sender<Arc<Task>>,
}
struct Task {
// This will be filled in soon.
}
Wakers 是 Sync
并且可以被克隆。当调用 wake
时,必须调度任务以进行执行。为了实现这一点,我们有一个通道。当在 waker 上调用 wake()
时,任务被推送到通道的发送端。我们的 Task
结构体将实现唤醒逻辑。为此,它需要同时包含衍生的 future 和通道发送端。我们将 future 放在 TaskFuture
结构体中,同时放在 Poll
enum 中,以跟踪最新的 Future::poll()
结果,这对于处理虚假唤醒是必需的。更多详细信息在 TaskFuture
中 poll()
方法的实现中给出。
use std::sync::{Arc, Mutex};
/// A structure holding a future and the result of
/// the latest call to its `poll` method.
struct TaskFuture {
future: Pin<Box<dyn Future<Output = ()> + Send>>,
poll: Poll<()>,
}
struct Task {
// The `Mutex` is to make `Task` implement `Sync`. Only
// one thread accesses `task_future` at any given time.
// The `Mutex` is not required for correctness. Real Tokio
// does not use a mutex here, but real Tokio has
// more lines of code than can fit in a single tutorial
// page.
task_future: Mutex<TaskFuture>,
executor: mpsc::Sender<Arc<Task>>,
}
impl Task {
fn schedule(self: &Arc<Self>) {
self.executor.send(self.clone());
}
}
为了调度任务,Arc
被克隆并通过通道发送。现在,我们需要将我们的 schedule
函数与 std::task::Waker
连接起来。标准库提供了一个低级 API,可以使用 手动 vtable 构建 来做到这一点。此策略为实现者提供了最大的灵活性,但需要大量不安全的样板代码。我们将使用 futures
crate 提供的 ArcWake
实用程序,而不是直接使用 RawWakerVTable
。这使我们能够实现一个简单的 trait,以将我们的 Task
结构体公开为 waker。
将以下依赖项添加到您的 Cargo.toml
中以拉取 futures
。
futures = "0.3"
然后实现 futures::task::ArcWake
。
use futures::task::{self, ArcWake};
use std::sync::Arc;
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.schedule();
}
}
当上面的计时器线程调用 waker.wake()
时,任务被推送到通道中。接下来,我们实现在 MiniTokio::run()
函数中接收和执行任务。
impl MiniTokio {
fn run(&self) {
while let Ok(task) = self.scheduled.recv() {
task.poll();
}
}
/// Initialize a new mini-tokio instance.
fn new() -> MiniTokio {
let (sender, scheduled) = mpsc::channel();
MiniTokio { scheduled, sender }
}
/// Spawn a future onto the mini-tokio instance.
///
/// The given future is wrapped with the `Task` harness and pushed into the
/// `scheduled` queue. The future will be executed when `run` is called.
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Task::spawn(future, &self.sender);
}
}
impl TaskFuture {
fn new(future: impl Future<Output = ()> + Send + 'static) -> TaskFuture {
TaskFuture {
future: Box::pin(future),
poll: Poll::Pending,
}
}
fn poll(&mut self, cx: &mut Context<'_>) {
// Spurious wake-ups are allowed, even after a future has
// returned `Ready`. However, polling a future which has
// already returned `Ready` is *not* allowed. For this
// reason we need to check that the future is still pending
// before we call it. Failure to do so can lead to a panic.
if self.poll.is_pending() {
self.poll = self.future.as_mut().poll(cx);
}
}
}
impl Task {
fn poll(self: Arc<Self>) {
// Create a waker from the `Task` instance. This
// uses the `ArcWake` impl from above.
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);
// No other thread ever tries to lock the task_future
let mut task_future = self.task_future.try_lock().unwrap();
// Poll the inner future
task_future.poll(&mut cx);
}
// Spawns a new task with the given future.
//
// Initializes a new Task harness containing the given future and pushes it
// onto `sender`. The receiver half of the channel will get the task and
// execute it.
fn spawn<F>(future: F, sender: &mpsc::Sender<Arc<Task>>)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
task_future: Mutex::new(TaskFuture::new(future)),
executor: sender.clone(),
});
let _ = sender.send(task);
}
}
这里发生了很多事情。首先,实现了 MiniTokio::run()
。该函数在一个循环中运行,从通道接收计划任务。当任务被唤醒时被推送到通道中时,这些任务在执行时能够取得进展。
此外,调整了 MiniTokio::new()
和 MiniTokio::spawn()
函数以使用通道而不是 VecDeque
。当衍生新任务时,它们会获得通道发送部分的克隆,任务可以使用该克隆在运行时上调度自身。
Task::poll()
函数使用 futures
crate 中的 ArcWake
实用程序创建 waker。waker 用于创建 task::Context
。该 task::Context
被传递给 poll
。
总结
我们现在已经看到了异步 Rust 如何工作的端到端示例。Rust 的 async/await
功能由 traits 支持。这允许像 Tokio 这样的第三方 crates 提供执行细节。
- 异步 Rust 操作是惰性的,需要调用者轮询它们。
- Wakers 被传递给 futures,以将 future 链接到调用它的任务。
- 当资源未准备好完成操作时,返回
Poll::Pending
并记录任务的 waker。 - 当资源变为就绪时,会通知任务的 waker。
- 执行器接收到通知并调度任务执行。
- 再次轮询任务,这次资源已就绪,任务取得进展。
一些遗留问题
回想一下,当我们实现 Delay
future 时,我们说过还有一些事情需要修复。Rust 的异步模型允许单个 future 在执行时跨任务迁移。考虑以下情况
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let mut delay = Some(Delay { when });
poll_fn(move |cx| {
let mut delay = delay.take().unwrap();
let res = Pin::new(&mut delay).poll(cx);
assert!(res.is_pending());
tokio::spawn(async move {
delay.await;
});
Poll::Ready(())
}).await;
}
poll_fn
函数使用闭包创建一个 Future
实例。上面的代码片段创建了一个 Delay
实例,轮询一次,然后将 Delay
实例发送到一个新任务,并在其中进行 await 操作。在此示例中,使用不同的 Waker
实例多次调用 Delay::poll
。发生这种情况时,您必须确保在传递给最近一次 poll
调用的 Waker
上调用 wake
。
在实现 future 时,至关重要的是要假设每次调用 poll
可能会提供不同的 Waker
实例。poll 函数必须使用新的 waker 更新任何先前记录的 waker。
我们早期的 Delay
实现每次轮询时都会衍生一个新的线程。这很好,但如果轮询过于频繁(例如,如果您在 future 和其他 future 上使用 select!
,则每当其中任何一个发生事件时都会轮询两者),则效率会非常低。一种方法是记住您是否已经衍生了一个线程,并且仅在您尚未衍生线程时才衍生一个新线程。但是,如果您这样做,则必须确保在后续的 poll 调用中更新线程的 Waker
,否则您将不会唤醒最近的 Waker
。
为了修复我们早期的实现,我们可以这样做
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
// This is Some when we have spawned a thread, and None otherwise.
waker: Option<Arc<Mutex<Waker>>>,
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// Check the current instant. If the duration has elapsed, then
// this future has completed so we return `Poll::Ready`.
if Instant::now() >= self.when {
return Poll::Ready(());
}
// The duration has not elapsed. If this is the first time the future
// is called, spawn the timer thread. If the timer thread is already
// running, ensure the stored `Waker` matches the current task's waker.
if let Some(waker) = &self.waker {
let mut waker = waker.lock().unwrap();
// Check if the stored waker matches the current task's waker.
// This is necessary as the `Delay` future instance may move to
// a different task between calls to `poll`. If this happens, the
// waker contained by the given `Context` will differ and we
// must update our stored waker to reflect this change.
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
} else {
let when = self.when;
let waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(waker.clone());
// This is the first time `poll` is called, spawn the timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
// The duration has elapsed. Notify the caller by invoking
// the waker.
let waker = waker.lock().unwrap();
waker.wake_by_ref();
});
}
// By now, the waker is stored and the timer thread is started.
// The duration has not elapsed (recall that we checked for this
// first thing), ergo the future has not completed so we must
// return `Poll::Pending`.
//
// The `Future` trait contract requires that when `Pending` is
// returned, the future ensures that the given waker is signalled
// once the future should be polled again. In our case, by
// returning `Pending` here, we are promising that we will
// invoke the given waker included in the `Context` argument
// once the requested duration has elapsed. We ensure this by
// spawning the timer thread above.
//
// If we forget to invoke the waker, the task will hang
// indefinitely.
Poll::Pending
}
}
这有点复杂,但想法是,在每次调用 poll
时,future 都会检查提供的 waker 是否与先前记录的 waker 匹配。如果两个 waker 匹配,则无需执行任何其他操作。如果它们不匹配,则必须更新记录的 waker。
Notify
实用程序
我们演示了如何使用 wakers 手动实现 Delay
future。Wakers 是异步 Rust 工作原理的基础。通常,没有必要下降到那个级别。例如,对于 Delay
的情况,我们可以完全使用 async/await
通过使用 tokio::sync::Notify
实用程序来实现它。此实用程序提供基本的任务通知机制。它处理 wakers 的细节,包括确保记录的 waker 与当前任务匹配。
使用 Notify
,我们可以使用 async/await
实现一个 delay
函数,如下所示
use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;
async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
notify_clone.notify_one();
});
notify.notified().await;
}