Tokio-Compat 发布

2019 年 12 月 18 日

Tokio 0.2 的发布是众多贡献者辛勤工作的结晶,并为 Tokio 带来了多项重大改进。使用 std::future 和 async/await 使使用 Tokio 编写异步代码更加符合人体工程学,而新的调度器实现使 Tokio 0.2 的线程池速度提高了 10 倍。然而,更新现有的 Tokio 0.1 项目以使用 0.2 和 std::future 带来了一些新的挑战。因此,我们非常高兴地宣布发布 tokio-compat crate,通过提供与 Tokio 0.1 和 Tokio 0.2 futures 兼容的运行时,来帮助简化这种过渡。

您可以在 crates.ioGitHub 上找到 tokio-compat

动机

futures 0.1 过渡到 std::future 以及 Tokio 0.2 中 API 的重大更改,使得更新现有项目以受益于 Tokio 0.2 提供的各种改进变得具有挑战性。此外,这些重大更改使得项目难以进行增量迁移:相反,它们必须一次性完成,这需要大量的工作,有时还会使其他工作暂停,直到迁移完成。为了实现增量迁移,我们需要一个兼容层,使我们能够在同一项目中使用针对旧版 Tokio 0.1/futures 0.1 API 新版 Tokio 0.2/std::future API 编写的代码。

futures crate 的 compat 模块提供了 futures 0.1 和 std::future future 类型之间的互操作性,例如为实现 futures 0.1 Future 的类型实现 std::future::Future。这是兼容性故事的基础部分,但就其本身而言,它不足以使大多数项目能够增量更新。大多数使用 Tokio 的代码都依赖于 Tokio 提供的运行时服务。这些运行时服务包括生成其他任务的能力;I/O 驱动程序,它允许任务通过操作系统的异步 I/O API 得到通知,以及定时器。依赖于 Tokio 0.1 运行时服务的 Futures 在尝试访问 Tokio 0.2 运行时上的这些运行时服务(例如通过生成任务或创建定时器)时会发生 panic,即使它们被转换为 std::future::Future 特征也是如此。这是因为新的运行时没有以与 Tokio 0.1 的 API 兼容的方式提供这些服务。

tokio-compat crate 通过提供一个兼容性运行时来帮助弥合这一差距,该运行时提供与 Tokio 0.1 Tokio 0.2 兼容的运行时服务。例如,使用 tokio-compat,我们可以编写如下代码

use futures_01::future::lazy;

tokio_compat::run(lazy(|| {
    // spawn a `futures` 0.1 future using the `spawn` function from the
    // `tokio` 0.1 crate:
    tokio_01::spawn(lazy(|| {
        println!("hello from tokio 0.1!");
        Ok(())
    }));

    // spawn an `async` block future on the same runtime using `tokio`
    // 0.2's `spawn`:
    tokio_02::spawn(async {
        println!("hello from tokio 0.2!");
    });

    Ok(())
}))

同样,我们可以运行依赖于定时器和 I/O 驱动程序等运行时服务的 0.1 和 0.2 版本的任务

use std::time::{Duration, Instant};
use tokio_compat::prelude::*;

tokio_compat::run_std(async {
    // Wait for a `tokio` 0.1 `Delay`...
    let when = Instant::now() + Duration::from_millis(10);
    tokio_01::timer::Delay::new(when)
        // convert the delay future into a `std::future` that we can `await`.
        .compat()
        .await
        .expect("tokio 0.1 timer should work!");
    println!("10 ms have elapsed");

    // Wait for a `tokio` 0.2 `Delay`...
    tokio_02::time::delay_for(Duration::from_millis(20)).await;
    println!("20 ms have elapsed");
});

使用 tokio-compat

tokio-compat 的主要用例包括

  • 增量迁移应用程序:更新大型项目以使用新的 API 具有挑战性。当这种更改可以逐步进行,逐模块进行,或者通过要求添加到项目中的代码使用新的 API 并随着现有代码因其他原因而更改而缓慢重写现有代码时,这种更改通常会容易得多。然而,由于 0.1 和 0.2 运行时之间的不兼容性,这对于大多数项目来说并不是真正可行的。相反,有必要一次性更新所有内容,这需要进行一次大的更改,这通常会阻碍其他工作。tokio-compat 可以允许项目改为增量迁移。
  • 在新代码中使用旧版库:如果使用 Tokio 0.2 的新项目需要来自针对 0.1 编写的库的功能,其作者将面临各种选择,但没有一个特别好。他们可以将功能的进度阻塞在依赖项被重写为使用 0.2 上,这可能需要很长时间;他们可以重写他们的项目以使用 0.1,放弃 async/await 的所有优势,并且当依赖项更新时可能需要另一次重写;或者他们可以承担起自行更新依赖项的责任。但是,使用 tokio-compat,可以在与 Tokio 0.2 futures 相同的运行时上使用来自期望 Tokio 0.1 的库的 futures。

在所有这些情况下,tokio-compat 有望成为临时的必需品:理想情况下,大多数代码都应该过渡到使用 async/await、std::future 和 Tokio 0.2。尽管我们努力使兼容层尽可能轻量,但根据定义,它是系统中复杂性的额外来源。此外,async/await 提供了显著的人体工程学优势,并且使用它的代码更易于理解和修改,因此大多数项目将从迁移到它中获益匪浅。tokio-compat 的作用是使这种过渡更容易和更渐进。

入门指南

tokio-compat 提供的 API 旨在作为 tokio 0.1 API 的直接替代品。tokio-compat 提供的运行时公开的函数与 Tokio 0.1 的运行时具有相同的名称和签名。因此,在许多情况下,开始使用 tokio-compat 非常简单,只需添加

tokio-compat = { version = "0.1", features = ["rt-full"] }

到您的 Cargo.toml,并将引用 tokio 0.1 的 Runtime 模块的导入和路径更改为 tokio_compat 的。因此,例如,

tokio::runtime::run(future);

变为

tokio_compat::runtime::run(future);

同样,

use tokio::runtime::Runtime;

let mut rt = Runtime::new().unwrap();

rt.spawn(future);

rt.shutdown_on_idle()
    .wait()
    .unwrap();

变为

use tokio_compat::runtime::Runtime;

let mut rt = Runtime::new().unwrap();

rt.spawn(future);

rt.shutdown_on_idle()
    .wait()
    .unwrap();

只需要将 tokio::runtime 模块和 tokio::run 函数替换为 tokio-compat 的版本。Tokio 0.1 公开的其他 API,例如 tokio::net,将在兼容性运行时上正常工作(有一些例外,我们稍后会讨论)。在兼容性运行时上运行时,通过 tokio 0.1 的 spawn 函数生成 futures 0.1 任务的代码将正常工作,通过 tokio 0.2 的 spawn 生成 std::future 任务的代码也将正常工作。

此外,tokio-compat 运行时、TaskExecutor 和其他类型提供与 std::future 兼容的方法。例如,Runtime 既有 spawn(用于生成 0.1 futures),也有 spawn_std(用于生成 std::future future(或 async 函数/块))。有关生成的详细信息,请参阅 tokio-compat API 文档中的此部分

一旦项目在兼容性运行时上运行,就可以轻松地逐步迁移到 std::future 和 async/await。一种选择是简单地对现有代码库执行“一对一”的翻译:例如,实现 futures::future::Future 的类型被重写为实现 std::future::Future,使用 future 组合器的代码被更改为使用这些组合器的 futures 0.3 版本,等等。在许多情况下,所需的更改相当机械,例如更改导入,将 Async::NotReady 重命名为 Poll::Pending 等。但是,std::future::Futurepoll 方法的 Pin<&mut Self> 接收器类型可能会使迁移手动 Future 实现具有挑战性。诸如 pin-project 之类的 crates 在处理堆栈 pinning 要求时可能会有所帮助。

但是,在大多数情况下,通常更容易重写现有代码以使用 async/await 语法,而不是手动实现 Future 或使用 future 组合器。尽管从修改的代码行数来衡量,这种更改更大,但 async/await 语法的人体工程学优势可以使迁移更容易——通常,可以简单地删除大量样板代码。此外,这还具有产生更符合语言习惯、更易读和更易于维护的代码的好处。对于大多数项目,切换到使用 async/await 语法是推荐的迁移路径。

注意

在当前的 tokio-compat v0.1 中,需要记住一些“陷阱”。特别是,重要的是要注意,兼容性线程池运行时目前支持 tokio 0.1 tokio_threadpool::blocking API。在兼容性运行时上对旧版本的 blocking 的调用目前将失败。将来,tokio-compat 将允许透明地将旧版 blocking 替换为 tokio 0.2 blocking API,但在过渡期间,有必要将此代码转换为调用 tokio 0.2 task::block_in_placetask::spawn_blocking API。由于 tokio::fs 依赖于 blocking API,因此 Tokio 0.1 版本的 tokio::fs 目前也无法在兼容性运行时上工作。有关详细信息,请参阅此处

此外,重要的是要记住,Tokio 0.1 和 Tokio 0.2 在生成任务和关闭运行时方面提供了略有不同的行为。特别是,Tokio 0.1 跟踪运行时是否空闲(即,它没有 futures 在其上运行),并提供了一个 Runtime::shutdown_on_idle 方法,该方法在运行时变为空闲时关闭运行时。另一方面,Tokio 0.2 的 spawn 函数返回 JoinHandle,可用于等待已生成任务的完成,并且用户应改为等待这些 JoinHandle 以确定运行时何时应关闭。因此,tokio-compat 同时提供了这两个 API,但重要的是要注意,只有没有 JoinHandle 生成的任务“计入”运行时被视为空闲。有关详细信息,请参阅文档中的此部分

案例研究:Vector

Vector 是一个高性能的可观测性数据路由器,用于收集和转换日志、指标和事件。Vector 是一个生产应用程序,它依赖于 Tokio 运行时,并且目前依赖于 Tokio 0.1。

在将 Vector 从 Tokio 0.1 切换到 Tokio 0.2 后,其维护人员观察到基准测试的性能显着提高

| 基准测试组 | 时间 (0.1) | 吞吐量 (0.1) | 时间 (0.2) | 吞吐量 (0.2) | 加速 | | --------------------------- | ------------: | --------------: | ------------: | ---------------: | ------: | | 批处理 10mb,批大小 2mb | 5.5±0.14ms | 1732.8 MB/s | 5.1±0.15ms | 1864.0 MB/s | x 1.08 | | buffers/in-memory | 199.3±2.35ms | 47.8 MB/s | 99.6±4.40ms | 95.7 MB/s | x 2.00 | | http/http_gzip | 191.1±3.17ms | 49.9 MB/s | 94.3±6.24ms | 101.1 MB/s | x 2.03 | | http/http_no_compression | 191.8±4.68ms | 49.7 MB/s | 91.2±2.56ms | 104.6 MB/s | x 2.10 | | interconnected | 238.7±5.66ms | 79.9 MB/s | 209.7±5.32ms | 91.0 MB/s | x 1.14 | | pipe | 198.4±0.60ms | 48.1 MB/s | 106.0±6.52ms | 90.0 MB/s | x 1.87 | | pipe_with_many_writers | 131.7±12.56ms | 72.4 MB/s | 150.7±10.56ms | 63.3 MB/s | x 0.86 | | pipe_with_tiny_lines | 165.7±0.77ms | 589.2 KB/s | 84.1±0.66ms | 1161.4 KB/s | x 1.97 | | transforms | 275.7±3.28ms | 38.0 MB/s | 167.5±8.07ms | 62.6 MB/s | x 1.65 |

请注意,在许多这些基准测试中,Tokio 0.2 版本(使用 tokio-compat)比 Tokio 0.1 版本快两倍,仅仅是通过更改使用的 Tokio 运行时。

更新 Vector 以使用兼容性运行时所需的更改表面积也非常小。这是完整的差异

diff --git a/src/runtime.rs b/src/runtime.rs
index 07005a7d..4d64a106 100644
--- a/src/runtime.rs
+++ b/src/runtime.rs
@@ -1,15 +1,16 @@
 use futures::future::{ExecuteError, Executor, Future};
+use futures_util::future::{FutureExt, TryFutureExt};
 use std::io;
-use tokio::runtime::Builder;
+use tokio_compat::runtime::Builder;

 pub struct Runtime {
-    rt: tokio::runtime::Runtime,
+    rt: tokio_compat::runtime::Runtime,
 }

 impl Runtime {
     pub fn new() -> io::Result<Self> {
         Ok(Runtime {
-            rt: tokio::runtime::Runtime::new()?,
+            rt: tokio_compat::runtime::Runtime::new()?,
         })
     }

@@ -47,17 +48,17 @@ impl Runtime {
     }

     pub fn shutdown_on_idle(self) -> impl Future<Item = (), Error = ()> {
-        self.rt.shutdown_on_idle()
+        self.rt.shutdown_on_idle().unit_error().boxed().compat()
     }

     pub fn shutdown_now(self) -> impl Future<Item = (), Error = ()> {
-        self.rt.shutdown_now()
+        self.rt.shutdown_now().unit_error().boxed().compat()
     }
 }

 #[derive(Clone, Debug)]
 pub struct TaskExecutor {
-    inner: tokio::runtime::TaskExecutor,
+    inner: tokio_compat::runtime::TaskExecutor,
 }

 impl TaskExecutor {
@@ -71,6 +72,7 @@ where
     F: Future<Item = (), Error = ()> + Send + 'static,
 {
     fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
-        self.inner.execute(future)
+        self.inner.spawn(future);
+        Ok(())
     }
 }
diff --git a/src/sinks/kafka.rs b/src/sinks/kafka.rs
index 1b99328b..880374b2 100644
--- a/src/sinks/kafka.rs
+++ b/src/sinks/kafka.rs
@@ -6,7 +6,7 @@ use crate::{
     topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
 };
 use futures::{
-    future::{self, poll_fn, IntoFuture},
+    future::{self, IntoFuture},
     stream::FuturesUnordered,
     Async, AsyncSink, Future, Poll, Sink, StartSend, Stream,
 };
@@ -213,18 +213,14 @@ impl Sink for KafkaSink {
 fn healthcheck(config: KafkaSinkConfig) -> super::Healthcheck {
     let consumer: BaseConsumer = config.to_rdkafka().unwrap().create().unwrap();

-    let check = poll_fn(move || {
-        tokio_threadpool::blocking(|| {
-            consumer
-                .fetch_metadata(Some(&config.topic), Duration::from_secs(3))
-                .map(|_| ())
-                .map_err(|err| err.into())
-        })
-    })
-    .map_err(|err| err.into())
-    .and_then(|result| result.into_future());
+    let task = tokio02::task::block_in_place(|| {
+        consumer
+            .fetch_metadata(Some(&config.topic), Duration::from_secs(3))
+            .map(|_| ())
+            .map_err(|err| err.into())
+    });

-    Box::new(check)
+    Box::new(task.into_future())
 }

(感谢 @LucioFranco 在 Vector 中试用 tokio-compat!)

结论

tokio-core 时代以来,许多主要的开源项目一直在使用 Tokio。Rust 中的异步编程自那时以来取得了巨大的进步,我们希望 tokio-compat 使所有 Tokio 用户都能轻松地开始从新的 std::future/Tokio 0.2 生态系统中看到好处,并且尽可能轻松。当然,与所有 0.1 版本一样,还有更多工作要做,包括

  • 无缝支持 tokio-threadpool 0.1 的 blocking API
  • tokio-io 0.1 的 AsyncReadAsyncWrite 特征的兼容层
  • 重新实现 Tokio 0.1 中已在 0.2 中删除的 API
  • 必要的错误修复和性能改进

您可以在 GitHub 上的 tokio-compat 存储库中找到它。与往常一样,我们非常欢迎来自 Tokio 社区的反馈和贡献。如果您需要帮助,或想讨论 tokio-compat 的详细信息,请加入我们的 Discord —— 欢迎所有人!

— Eliza Weisman (@hawkw)