使用同步代码桥接
在大多数使用 Tokio 的示例中,我们使用 #[tokio::main]
标记 main 函数,并将整个项目设为异步。
在某些情况下,您可能需要运行一小部分同步代码。有关更多信息,请参阅 spawn_blocking
。
在其他情况下,将应用程序结构化为主要同步,而较小或逻辑上不同的部分为异步可能更容易。例如,GUI 应用程序可能希望在主线程上运行 GUI 代码,并在另一个线程上在其旁边运行 Tokio 运行时。
此页面解释了如何将 async/await 隔离到项目的一小部分。
#[tokio::main]
宏展开后的内容
#[tokio::main]
宏是一个宏,它将您的 main 函数替换为一个非异步 main 函数,该函数启动一个运行时,然后调用您的代码。例如,这个
#[tokio::main]
async fn main() {
println!("Hello world");
}
被宏转换为这个
fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
println!("Hello world");
})
}
为了在我们自己的项目中使用 async/await,我们可以做类似的事情,即利用 block_on
方法在适当的地方进入异步上下文。
mini-redis 的同步接口
在本节中,我们将介绍如何通过存储 Runtime
对象并使用其 block_on
方法来构建 mini-redis 的同步接口。在以下各节中,我们将讨论一些替代方法以及何时应使用每种方法。
我们将要包装的接口是异步的 Client
类型。它有几种方法,我们将实现以下方法的阻塞版本
为此,我们引入一个名为 src/clients/blocking_client.rs
的新文件,并使用围绕异步 Client
类型的包装器结构对其进行初始化
use tokio::net::ToSocketAddrs;
use tokio::runtime::Runtime;
pub use crate::clients::client::Message;
/// Established connection with a Redis server.
pub struct BlockingClient {
/// The asynchronous `Client`.
inner: crate::clients::Client,
/// A `current_thread` runtime for executing operations on the
/// asynchronous client in a blocking manner.
rt: Runtime,
}
impl BlockingClient {
pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
// Call the asynchronous connect method using the runtime.
let inner = rt.block_on(crate::clients::Client::connect(addr))?;
Ok(BlockingClient { inner, rt })
}
}
在这里,我们包含了构造函数,作为如何在非异步上下文中执行异步方法的第一个示例。我们使用 Tokio Runtime
类型的 block_on
方法来执行此操作,该方法执行异步方法并返回其结果。
一个重要的细节是使用了 current_thread
运行时。通常在使用 Tokio 时,您将使用默认的 multi_thread
运行时,它将生成一堆后台线程,以便它可以高效地同时运行许多任务。对于我们的用例,我们一次只做一件事,因此通过运行多个线程不会获得任何好处。这使得 current_thread
运行时成为完美的选择,因为它不生成任何线程。
enable_all
调用启用 Tokio 运行时的 IO 和计时器驱动程序。如果它们未启用,则运行时无法执行 IO 或计时器操作。
由于
current_thread
运行时不生成线程,因此它仅在调用block_on
时才运行。一旦block_on
返回,在该运行时上生成的所有任务都将冻结,直到您再次调用block_on
。如果生成的任务在不调用block_on
时必须继续运行,请使用multi_threaded
运行时。
一旦我们有了这个结构,大多数方法都很容易实现
use bytes::Bytes;
use std::time::Duration;
impl BlockingClient {
pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
self.rt.block_on(self.inner.get(key))
}
pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
self.rt.block_on(self.inner.set(key, value))
}
pub fn set_expires(
&mut self,
key: &str,
value: Bytes,
expiration: Duration,
) -> crate::Result<()> {
self.rt.block_on(self.inner.set_expires(key, value, expiration))
}
pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
self.rt.block_on(self.inner.publish(channel, message))
}
}
Client::subscribe
方法更有趣,因为它将 Client
转换为 Subscriber
对象。我们可以通过以下方式实现它
/// A client that has entered pub/sub mode.
///
/// Once clients subscribe to a channel, they may only perform
/// pub/sub related commands. The `BlockingClient` type is
/// transitioned to a `BlockingSubscriber` type in order to
/// prevent non-pub/sub methods from being called.
pub struct BlockingSubscriber {
/// The asynchronous `Subscriber`.
inner: crate::clients::Subscriber,
/// A `current_thread` runtime for executing operations on the
/// asynchronous client in a blocking manner.
rt: Runtime,
}
impl BlockingClient {
pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
Ok(BlockingSubscriber {
inner: subscriber,
rt: self.rt,
})
}
}
impl BlockingSubscriber {
pub fn get_subscribed(&self) -> &[String] {
self.inner.get_subscribed()
}
pub fn next_message(&mut self) -> crate::Result<Option<Message>> {
self.rt.block_on(self.inner.next_message())
}
pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> {
self.rt.block_on(self.inner.subscribe(channels))
}
pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> {
self.rt.block_on(self.inner.unsubscribe(channels))
}
}
因此,subscribe
方法将首先使用运行时将异步 Client
转换为异步 Subscriber
。然后,它将存储生成的 Subscriber
以及 Runtime
,并使用 block_on
实现各种方法。
请注意,异步 Subscriber
结构有一个名为 get_subscribed
的非异步方法。为了处理这个问题,我们只需直接调用它,而无需涉及运行时。
其他方法
以上部分解释了实现同步包装器的最简单方法,但这并不是唯一的方法。这些方法包括
我们已经看到了第一种方法。以下概述了其他两种方法。
在运行时上生成任务
Runtime
对象有一个名为 spawn
的方法。当您调用此方法时,您将创建一个新的后台任务以在运行时上运行。例如
use tokio::runtime::Builder;
use tokio::time::{sleep, Duration};
fn main() {
let runtime = Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
let mut handles = Vec::with_capacity(10);
for i in 0..10 {
handles.push(runtime.spawn(my_bg_task(i)));
}
// Do something time-consuming while the background tasks execute.
std::thread::sleep(Duration::from_millis(750));
println!("Finished time-consuming task.");
// Wait for all of them to complete.
for handle in handles {
// The `spawn` method returns a `JoinHandle`. A `JoinHandle` is
// a future, so we can wait for it using `block_on`.
runtime.block_on(handle).unwrap();
}
}
async fn my_bg_task(i: u64) {
// By subtracting, the tasks with larger values of i sleep for a
// shorter duration.
let millis = 1000 - 50 * i;
println!("Task {} sleeping for {} ms.", i, millis);
sleep(Duration::from_millis(millis)).await;
println!("Task {} stopping.", i);
}
Task 0 sleeping for 1000 ms.
Task 1 sleeping for 950 ms.
Task 2 sleeping for 900 ms.
Task 3 sleeping for 850 ms.
Task 4 sleeping for 800 ms.
Task 5 sleeping for 750 ms.
Task 6 sleeping for 700 ms.
Task 7 sleeping for 650 ms.
Task 8 sleeping for 600 ms.
Task 9 sleeping for 550 ms.
Task 9 stopping.
Task 8 stopping.
Task 7 stopping.
Task 6 stopping.
Finished time-consuming task.
Task 5 stopping.
Task 4 stopping.
Task 3 stopping.
Task 2 stopping.
Task 1 stopping.
Task 0 stopping.
在上面的示例中,我们在运行时上生成了 10 个后台任务,然后等待所有任务完成。例如,这可能是实现图形应用程序中后台网络请求的一种好方法,因为网络请求太耗时,无法在主 GUI 线程上运行它们。相反,您可以在后台运行的 Tokio 运行时上生成请求,并让任务在请求完成时,甚至在您想要进度条时以增量方式将信息发送回 GUI 代码。
在此示例中,重要的是将运行时配置为 multi_thread
运行时。如果您将其更改为 current_thread
运行时,您会发现耗时的任务在任何后台任务启动之前完成。这是因为在 current_thread
运行时上生成的后台任务仅在调用 block_on
期间执行,因为否则运行时没有地方运行它们。
该示例通过对 spawn
调用返回的 JoinHandle
调用 block_on
来等待生成的任务完成,但这并不是唯一的方法。以下是一些替代方法
- 使用消息传递通道,例如
tokio::sync::mpsc
。 - 修改受例如
Mutex
保护的共享值。对于 GUI 中的进度条,这可能是一个好方法,其中 GUI 每帧读取共享值。
spawn
方法也可在 Handle
类型上使用。可以克隆 Handle
类型以获取运行时的多个句柄,并且每个 Handle
都可以用于在运行时上生成新任务。
发送消息
第三种技术是生成运行时并使用消息传递与其通信。与其他两种方法相比,这涉及更多的样板代码,但它是最灵活的方法。您可以在下面找到一个基本示例
use tokio::runtime::Builder;
use tokio::sync::mpsc;
pub struct Task {
name: String,
// info that describes the task
}
async fn handle_task(task: Task) {
println!("Got task {}", task.name);
}
#[derive(Clone)]
pub struct TaskSpawner {
spawn: mpsc::Sender<Task>,
}
impl TaskSpawner {
pub fn new() -> TaskSpawner {
// Set up a channel for communicating.
let (send, mut recv) = mpsc::channel(16);
// Build the runtime for the new thread.
//
// The runtime is created before spawning the thread
// to more cleanly forward errors if the `unwrap()`
// panics.
let rt = Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
std::thread::spawn(move || {
rt.block_on(async move {
while let Some(task) = recv.recv().await {
tokio::spawn(handle_task(task));
}
// Once all senders have gone out of scope,
// the `.recv()` call returns None and it will
// exit from the while loop and shut down the
// thread.
});
});
TaskSpawner {
spawn: send,
}
}
pub fn spawn_task(&self, task: Task) {
match self.spawn.blocking_send(task) {
Ok(()) => {},
Err(_) => panic!("The shared runtime has shut down."),
}
}
}
此示例可以以多种方式配置。例如,您可以使用 Semaphore
来限制活动任务的数量,或者您可以在相反的方向上使用通道来向生成器发送响应。当您以这种方式生成运行时时,它是一种 actor。