Tokio-Compat 发布
2019 年 12 月 18 日
Tokio 0.2 的发布是众多贡献者辛勤工作的结晶,并为 Tokio 带来了多项重大改进。使用 std::future
和 async/await 使使用 Tokio 编写异步代码更加符合人体工程学,而新的调度器实现使 Tokio 0.2 的线程池速度提高了 10 倍。然而,更新现有的 Tokio 0.1 项目以使用 0.2 和 std::future
带来了一些新的挑战。因此,我们非常高兴地宣布发布 tokio-compat
crate,通过提供与 Tokio 0.1 和 Tokio 0.2 futures 兼容的运行时,来帮助简化这种过渡。
您可以在 crates.io 和 GitHub 上找到 tokio-compat
。
动机
从 futures
0.1 过渡到 std::future
以及 Tokio 0.2 中 API 的重大更改,使得更新现有项目以受益于 Tokio 0.2 提供的各种改进变得具有挑战性。此外,这些重大更改使得项目难以进行增量迁移:相反,它们必须一次性完成,这需要大量的工作,有时还会使其他工作暂停,直到迁移完成。为了实现增量迁移,我们需要一个兼容层,使我们能够在同一项目中使用针对旧版 Tokio 0.1/futures
0.1 API 和 新版 Tokio 0.2/std::future
API 编写的代码。
futures
crate 的 compat
模块提供了 futures
0.1 和 std::future
future 类型之间的互操作性,例如为实现 futures
0.1 Future
的类型实现 std::future::Future
。这是兼容性故事的基础部分,但就其本身而言,它不足以使大多数项目能够增量更新。大多数使用 Tokio 的代码都依赖于 Tokio 提供的运行时服务。这些运行时服务包括生成其他任务的能力;I/O 驱动程序,它允许任务通过操作系统的异步 I/O API 得到通知,以及定时器。依赖于 Tokio 0.1 运行时服务的 Futures 在尝试访问 Tokio 0.2 运行时上的这些运行时服务(例如通过生成任务或创建定时器)时会发生 panic,即使它们被转换为 std::future::Future
特征也是如此。这是因为新的运行时没有以与 Tokio 0.1 的 API 兼容的方式提供这些服务。
tokio-compat
crate 通过提供一个兼容性运行时来帮助弥合这一差距,该运行时提供与 Tokio 0.1 和 Tokio 0.2 兼容的运行时服务。例如,使用 tokio-compat
,我们可以编写如下代码
use futures_01::future::lazy;
tokio_compat::run(lazy(|| {
// spawn a `futures` 0.1 future using the `spawn` function from the
// `tokio` 0.1 crate:
tokio_01::spawn(lazy(|| {
println!("hello from tokio 0.1!");
Ok(())
}));
// spawn an `async` block future on the same runtime using `tokio`
// 0.2's `spawn`:
tokio_02::spawn(async {
println!("hello from tokio 0.2!");
});
Ok(())
}))
同样,我们可以运行依赖于定时器和 I/O 驱动程序等运行时服务的 0.1 和 0.2 版本的任务
use std::time::{Duration, Instant};
use tokio_compat::prelude::*;
tokio_compat::run_std(async {
// Wait for a `tokio` 0.1 `Delay`...
let when = Instant::now() + Duration::from_millis(10);
tokio_01::timer::Delay::new(when)
// convert the delay future into a `std::future` that we can `await`.
.compat()
.await
.expect("tokio 0.1 timer should work!");
println!("10 ms have elapsed");
// Wait for a `tokio` 0.2 `Delay`...
tokio_02::time::delay_for(Duration::from_millis(20)).await;
println!("20 ms have elapsed");
});
使用 tokio-compat
tokio-compat
的主要用例包括
- 增量迁移应用程序:更新大型项目以使用新的 API 具有挑战性。当这种更改可以逐步进行,逐模块进行,或者通过要求添加到项目中的新代码使用新的 API 并随着现有代码因其他原因而更改而缓慢重写现有代码时,这种更改通常会容易得多。然而,由于 0.1 和 0.2 运行时之间的不兼容性,这对于大多数项目来说并不是真正可行的。相反,有必要一次性更新所有内容,这需要进行一次大的更改,这通常会阻碍其他工作。
tokio-compat
可以允许项目改为增量迁移。 - 在新代码中使用旧版库:如果使用 Tokio 0.2 的新项目需要来自针对 0.1 编写的库的功能,其作者将面临各种选择,但没有一个特别好。他们可以将功能的进度阻塞在依赖项被重写为使用 0.2 上,这可能需要很长时间;他们可以重写他们的项目以使用 0.1,放弃 async/await 的所有优势,并且当依赖项更新时可能需要另一次重写;或者他们可以承担起自行更新依赖项的责任。但是,使用
tokio-compat
,可以在与 Tokio 0.2 futures 相同的运行时上使用来自期望 Tokio 0.1 的库的 futures。
在所有这些情况下,tokio-compat
有望成为临时的必需品:理想情况下,大多数代码都应该过渡到使用 async/await、std::future
和 Tokio 0.2。尽管我们努力使兼容层尽可能轻量,但根据定义,它是系统中复杂性的额外来源。此外,async/await 提供了显著的人体工程学优势,并且使用它的代码更易于理解和修改,因此大多数项目将从迁移到它中获益匪浅。tokio-compat
的作用是使这种过渡更容易和更渐进。
入门指南
tokio-compat
提供的 API 旨在作为 tokio
0.1 API 的直接替代品。tokio-compat
提供的运行时公开的函数与 Tokio 0.1 的运行时具有相同的名称和签名。因此,在许多情况下,开始使用 tokio-compat
非常简单,只需添加
tokio-compat = { version = "0.1", features = ["rt-full"] }
到您的 Cargo.toml,并将引用 tokio
0.1 的 Runtime
模块的导入和路径更改为 tokio_compat
的。因此,例如,
tokio::runtime::run(future);
变为
tokio_compat::runtime::run(future);
同样,
use tokio::runtime::Runtime;
let mut rt = Runtime::new().unwrap();
rt.spawn(future);
rt.shutdown_on_idle()
.wait()
.unwrap();
变为
use tokio_compat::runtime::Runtime;
let mut rt = Runtime::new().unwrap();
rt.spawn(future);
rt.shutdown_on_idle()
.wait()
.unwrap();
只需要将 tokio::runtime
模块和 tokio::run
函数替换为 tokio-compat
的版本。Tokio 0.1 公开的其他 API,例如 tokio::net
,将在兼容性运行时上正常工作(有一些例外,我们稍后会讨论)。在兼容性运行时上运行时,通过 tokio
0.1 的 spawn
函数生成 futures
0.1 任务的代码将正常工作,通过 tokio
0.2 的 spawn
生成 std::future
任务的代码也将正常工作。
此外,tokio-compat
运行时、TaskExecutor
和其他类型也提供与 std::future
兼容的方法。例如,Runtime
既有 spawn
(用于生成 0.1 futures),也有 spawn_std
(用于生成 std::future
future(或 async
函数/块))。有关生成的详细信息,请参阅 tokio-compat
API 文档中的此部分。
一旦项目在兼容性运行时上运行,就可以轻松地逐步迁移到 std::future
和 async/await。一种选择是简单地对现有代码库执行“一对一”的翻译:例如,实现 futures::future::Future
的类型被重写为实现 std::future::Future
,使用 future 组合器的代码被更改为使用这些组合器的 futures
0.3 版本,等等。在许多情况下,所需的更改相当机械,例如更改导入,将 Async::NotReady
重命名为 Poll::Pending
等。但是,std::future::Future
的 poll
方法的 Pin<&mut Self>
接收器类型可能会使迁移手动 Future
实现具有挑战性。诸如 pin-project
之类的 crates 在处理堆栈 pinning 要求时可能会有所帮助。
但是,在大多数情况下,通常更容易重写现有代码以使用 async/await 语法,而不是手动实现 Future
或使用 future 组合器。尽管从修改的代码行数来衡量,这种更改更大,但 async/await 语法的人体工程学优势可以使迁移更容易——通常,可以简单地删除大量样板代码。此外,这还具有产生更符合语言习惯、更易读和更易于维护的代码的好处。对于大多数项目,切换到使用 async/await 语法是推荐的迁移路径。
注意
在当前的 tokio-compat
v0.1 中,需要记住一些“陷阱”。特别是,重要的是要注意,兼容性线程池运行时目前不支持 tokio
0.1 tokio_threadpool::blocking
API。在兼容性运行时上对旧版本的 blocking
的调用目前将失败。将来,tokio-compat
将允许透明地将旧版 blocking
替换为 tokio
0.2 blocking API,但在过渡期间,有必要将此代码转换为调用 tokio
0.2 task::block_in_place
和 task::spawn_blocking
API。由于 tokio::fs
依赖于 blocking API,因此 Tokio 0.1 版本的 tokio::fs
目前也无法在兼容性运行时上工作。有关详细信息,请参阅此处。
此外,重要的是要记住,Tokio 0.1 和 Tokio 0.2 在生成任务和关闭运行时方面提供了略有不同的行为。特别是,Tokio 0.1 跟踪运行时是否空闲(即,它没有 futures 在其上运行),并提供了一个 Runtime::shutdown_on_idle
方法,该方法在运行时变为空闲时关闭运行时。另一方面,Tokio 0.2 的 spawn
函数返回 JoinHandle
,可用于等待已生成任务的完成,并且用户应改为等待这些 JoinHandle
以确定运行时何时应关闭。因此,tokio-compat
同时提供了这两个 API,但重要的是要注意,只有没有 JoinHandle
生成的任务“计入”运行时被视为空闲。有关详细信息,请参阅文档中的此部分。
案例研究:Vector
Vector 是一个高性能的可观测性数据路由器,用于收集和转换日志、指标和事件。Vector 是一个生产应用程序,它依赖于 Tokio 运行时,并且目前依赖于 Tokio 0.1。
在将 Vector 从 Tokio 0.1 切换到 Tokio 0.2 后,其维护人员观察到基准测试的性能显着提高
| 基准测试组 | 时间 (0.1) | 吞吐量 (0.1) | 时间 (0.2) | 吞吐量 (0.2) | 加速 | | --------------------------- | ------------: | --------------: | ------------: | ---------------: | ------: | | 批处理 10mb,批大小 2mb | 5.5±0.14ms | 1732.8 MB/s | 5.1±0.15ms | 1864.0 MB/s | x 1.08 | | buffers/in-memory | 199.3±2.35ms | 47.8 MB/s | 99.6±4.40ms | 95.7 MB/s | x 2.00 | | http/http_gzip | 191.1±3.17ms | 49.9 MB/s | 94.3±6.24ms | 101.1 MB/s | x 2.03 | | http/http_no_compression | 191.8±4.68ms | 49.7 MB/s | 91.2±2.56ms | 104.6 MB/s | x 2.10 | | interconnected | 238.7±5.66ms | 79.9 MB/s | 209.7±5.32ms | 91.0 MB/s | x 1.14 | | pipe | 198.4±0.60ms | 48.1 MB/s | 106.0±6.52ms | 90.0 MB/s | x 1.87 | | pipe_with_many_writers | 131.7±12.56ms | 72.4 MB/s | 150.7±10.56ms | 63.3 MB/s | x 0.86 | | pipe_with_tiny_lines | 165.7±0.77ms | 589.2 KB/s | 84.1±0.66ms | 1161.4 KB/s | x 1.97 | | transforms | 275.7±3.28ms | 38.0 MB/s | 167.5±8.07ms | 62.6 MB/s | x 1.65 |
请注意,在许多这些基准测试中,Tokio 0.2 版本(使用 tokio-compat
)比 Tokio 0.1 版本快两倍,仅仅是通过更改使用的 Tokio 运行时。
更新 Vector 以使用兼容性运行时所需的更改表面积也非常小。这是完整的差异
diff --git a/src/runtime.rs b/src/runtime.rs
index 07005a7d..4d64a106 100644
--- a/src/runtime.rs
+++ b/src/runtime.rs
@@ -1,15 +1,16 @@
use futures::future::{ExecuteError, Executor, Future};
+use futures_util::future::{FutureExt, TryFutureExt};
use std::io;
-use tokio::runtime::Builder;
+use tokio_compat::runtime::Builder;
pub struct Runtime {
- rt: tokio::runtime::Runtime,
+ rt: tokio_compat::runtime::Runtime,
}
impl Runtime {
pub fn new() -> io::Result<Self> {
Ok(Runtime {
- rt: tokio::runtime::Runtime::new()?,
+ rt: tokio_compat::runtime::Runtime::new()?,
})
}
@@ -47,17 +48,17 @@ impl Runtime {
}
pub fn shutdown_on_idle(self) -> impl Future<Item = (), Error = ()> {
- self.rt.shutdown_on_idle()
+ self.rt.shutdown_on_idle().unit_error().boxed().compat()
}
pub fn shutdown_now(self) -> impl Future<Item = (), Error = ()> {
- self.rt.shutdown_now()
+ self.rt.shutdown_now().unit_error().boxed().compat()
}
}
#[derive(Clone, Debug)]
pub struct TaskExecutor {
- inner: tokio::runtime::TaskExecutor,
+ inner: tokio_compat::runtime::TaskExecutor,
}
impl TaskExecutor {
@@ -71,6 +72,7 @@ where
F: Future<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
- self.inner.execute(future)
+ self.inner.spawn(future);
+ Ok(())
}
}
diff --git a/src/sinks/kafka.rs b/src/sinks/kafka.rs
index 1b99328b..880374b2 100644
--- a/src/sinks/kafka.rs
+++ b/src/sinks/kafka.rs
@@ -6,7 +6,7 @@ use crate::{
topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
};
use futures::{
- future::{self, poll_fn, IntoFuture},
+ future::{self, IntoFuture},
stream::FuturesUnordered,
Async, AsyncSink, Future, Poll, Sink, StartSend, Stream,
};
@@ -213,18 +213,14 @@ impl Sink for KafkaSink {
fn healthcheck(config: KafkaSinkConfig) -> super::Healthcheck {
let consumer: BaseConsumer = config.to_rdkafka().unwrap().create().unwrap();
- let check = poll_fn(move || {
- tokio_threadpool::blocking(|| {
- consumer
- .fetch_metadata(Some(&config.topic), Duration::from_secs(3))
- .map(|_| ())
- .map_err(|err| err.into())
- })
- })
- .map_err(|err| err.into())
- .and_then(|result| result.into_future());
+ let task = tokio02::task::block_in_place(|| {
+ consumer
+ .fetch_metadata(Some(&config.topic), Duration::from_secs(3))
+ .map(|_| ())
+ .map_err(|err| err.into())
+ });
- Box::new(check)
+ Box::new(task.into_future())
}
(感谢 @LucioFranco 在 Vector 中试用 tokio-compat
!)
结论
自 tokio-core
时代以来,许多主要的开源项目一直在使用 Tokio。Rust 中的异步编程自那时以来取得了巨大的进步,我们希望 tokio-compat
使所有 Tokio 用户都能轻松地开始从新的 std::future
/Tokio 0.2 生态系统中看到好处,并且尽可能轻松。当然,与所有 0.1 版本一样,还有更多工作要做,包括
- 无缝支持
tokio-threadpool
0.1 的blocking
API tokio-io
0.1 的AsyncRead
和AsyncWrite
特征的兼容层- 重新实现 Tokio 0.1 中已在 0.2 中删除的 API
- 必要的错误修复和性能改进
您可以在 GitHub 上的 tokio-compat
存储库中找到它。与往常一样,我们非常欢迎来自 Tokio 社区的反馈和贡献。如果您需要帮助,或想讨论 tokio-compat
的详细信息,请加入我们的 Discord —— 欢迎所有人!