使用同步代码桥接

在大多数使用 Tokio 的示例中,我们使用 #[tokio::main] 标记 main 函数,并将整个项目设为异步。

在某些情况下,您可能需要运行一小部分同步代码。有关更多信息,请参阅 spawn_blocking

在其他情况下,将应用程序结构化为主要同步,而较小或逻辑上不同的部分为异步可能更容易。例如,GUI 应用程序可能希望在主线程上运行 GUI 代码,并在另一个线程上在其旁边运行 Tokio 运行时。

此页面解释了如何将 async/await 隔离到项目的一小部分。

#[tokio::main] 宏展开后的内容

#[tokio::main] 宏是一个宏,它将您的 main 函数替换为一个非异步 main 函数,该函数启动一个运行时,然后调用您的代码。例如,这个

#[tokio::main]
async fn main() {
    println!("Hello world");
}

被宏转换为这个

fn main() {
    tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap()
        .block_on(async {
            println!("Hello world");
        })
}

为了在我们自己的项目中使用 async/await,我们可以做类似的事情,即利用 block_on 方法在适当的地方进入异步上下文。

mini-redis 的同步接口

在本节中,我们将介绍如何通过存储 Runtime 对象并使用其 block_on 方法来构建 mini-redis 的同步接口。在以下各节中,我们将讨论一些替代方法以及何时应使用每种方法。

我们将要包装的接口是异步的 Client 类型。它有几种方法,我们将实现以下方法的阻塞版本

为此,我们引入一个名为 src/clients/blocking_client.rs 的新文件,并使用围绕异步 Client 类型的包装器结构对其进行初始化

use tokio::net::ToSocketAddrs;
use tokio::runtime::Runtime;

pub use crate::clients::client::Message;

/// Established connection with a Redis server.
pub struct BlockingClient {
    /// The asynchronous `Client`.
    inner: crate::clients::Client,

    /// A `current_thread` runtime for executing operations on the
    /// asynchronous client in a blocking manner.
    rt: Runtime,
}

impl BlockingClient {
    pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
        let rt = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()?;

        // Call the asynchronous connect method using the runtime.
        let inner = rt.block_on(crate::clients::Client::connect(addr))?;

        Ok(BlockingClient { inner, rt })
    }
}

在这里,我们包含了构造函数,作为如何在非异步上下文中执行异步方法的第一个示例。我们使用 Tokio Runtime 类型的 block_on 方法来执行此操作,该方法执行异步方法并返回其结果。

一个重要的细节是使用了 current_thread 运行时。通常在使用 Tokio 时,您将使用默认的 multi_thread 运行时,它将生成一堆后台线程,以便它可以高效地同时运行许多任务。对于我们的用例,我们一次只做一件事,因此通过运行多个线程不会获得任何好处。这使得 current_thread 运行时成为完美的选择,因为它不生成任何线程。

enable_all 调用启用 Tokio 运行时的 IO 和计时器驱动程序。如果它们未启用,则运行时无法执行 IO 或计时器操作。

由于 current_thread 运行时不生成线程,因此它仅在调用 block_on 时才运行。一旦 block_on 返回,在该运行时上生成的所有任务都将冻结,直到您再次调用 block_on。如果生成的任务在不调用 block_on 时必须继续运行,请使用 multi_threaded 运行时。

一旦我们有了这个结构,大多数方法都很容易实现

use bytes::Bytes;
use std::time::Duration;

impl BlockingClient {
    pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
        self.rt.block_on(self.inner.get(key))
    }

    pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
        self.rt.block_on(self.inner.set(key, value))
    }

    pub fn set_expires(
        &mut self,
        key: &str,
        value: Bytes,
        expiration: Duration,
    ) -> crate::Result<()> {
        self.rt.block_on(self.inner.set_expires(key, value, expiration))
    }

    pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
        self.rt.block_on(self.inner.publish(channel, message))
    }
}

Client::subscribe 方法更有趣,因为它将 Client 转换为 Subscriber 对象。我们可以通过以下方式实现它

/// A client that has entered pub/sub mode.
///
/// Once clients subscribe to a channel, they may only perform
/// pub/sub related commands. The `BlockingClient` type is
/// transitioned to a `BlockingSubscriber` type in order to
/// prevent non-pub/sub methods from being called.
pub struct BlockingSubscriber {
    /// The asynchronous `Subscriber`.
    inner: crate::clients::Subscriber,

    /// A `current_thread` runtime for executing operations on the
    /// asynchronous client in a blocking manner.
    rt: Runtime,
}

impl BlockingClient {
    pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
        let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
        Ok(BlockingSubscriber {
            inner: subscriber,
            rt: self.rt,
        })
    }
}

impl BlockingSubscriber {
    pub fn get_subscribed(&self) -> &[String] {
        self.inner.get_subscribed()
    }

    pub fn next_message(&mut self) -> crate::Result<Option<Message>> {
        self.rt.block_on(self.inner.next_message())
    }

    pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> {
        self.rt.block_on(self.inner.subscribe(channels))
    }

    pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> {
        self.rt.block_on(self.inner.unsubscribe(channels))
    }
}

因此,subscribe 方法将首先使用运行时将异步 Client 转换为异步 Subscriber。然后,它将存储生成的 Subscriber 以及 Runtime,并使用 block_on 实现各种方法。

请注意,异步 Subscriber 结构有一个名为 get_subscribed 的非异步方法。为了处理这个问题,我们只需直接调用它,而无需涉及运行时。

其他方法

以上部分解释了实现同步包装器的最简单方法,但这并不是唯一的方法。这些方法包括

  • 创建一个 Runtime 并对异步代码调用 block_on
  • 创建一个 Runtime 并在其上 spawn 任务。
  • 在单独的线程中运行 Runtime 并向其发送消息。

我们已经看到了第一种方法。以下概述了其他两种方法。

在运行时上生成任务

Runtime 对象有一个名为 spawn 的方法。当您调用此方法时,您将创建一个新的后台任务以在运行时上运行。例如

use tokio::runtime::Builder;
use tokio::time::{sleep, Duration};

fn main() {
    let runtime = Builder::new_multi_thread()
        .worker_threads(1)
        .enable_all()
        .build()
        .unwrap();

    let mut handles = Vec::with_capacity(10);
    for i in 0..10 {
        handles.push(runtime.spawn(my_bg_task(i)));
    }

    // Do something time-consuming while the background tasks execute.
    std::thread::sleep(Duration::from_millis(750));
    println!("Finished time-consuming task.");

    // Wait for all of them to complete.
    for handle in handles {
        // The `spawn` method returns a `JoinHandle`. A `JoinHandle` is
        // a future, so we can wait for it using `block_on`.
        runtime.block_on(handle).unwrap();
    }
}

async fn my_bg_task(i: u64) {
    // By subtracting, the tasks with larger values of i sleep for a
    // shorter duration.
    let millis = 1000 - 50 * i;
    println!("Task {} sleeping for {} ms.", i, millis);

    sleep(Duration::from_millis(millis)).await;

    println!("Task {} stopping.", i);
}
Task 0 sleeping for 1000 ms.
Task 1 sleeping for 950 ms.
Task 2 sleeping for 900 ms.
Task 3 sleeping for 850 ms.
Task 4 sleeping for 800 ms.
Task 5 sleeping for 750 ms.
Task 6 sleeping for 700 ms.
Task 7 sleeping for 650 ms.
Task 8 sleeping for 600 ms.
Task 9 sleeping for 550 ms.
Task 9 stopping.
Task 8 stopping.
Task 7 stopping.
Task 6 stopping.
Finished time-consuming task.
Task 5 stopping.
Task 4 stopping.
Task 3 stopping.
Task 2 stopping.
Task 1 stopping.
Task 0 stopping.

在上面的示例中,我们在运行时上生成了 10 个后台任务,然后等待所有任务完成。例如,这可能是实现图形应用程序中后台网络请求的一种好方法,因为网络请求太耗时,无法在主 GUI 线程上运行它们。相反,您可以在后台运行的 Tokio 运行时上生成请求,并让任务在请求完成时,甚至在您想要进度条时以增量方式将信息发送回 GUI 代码。

在此示例中,重要的是将运行时配置为 multi_thread 运行时。如果您将其更改为 current_thread 运行时,您会发现耗时的任务在任何后台任务启动之前完成。这是因为在 current_thread 运行时上生成的后台任务仅在调用 block_on 期间执行,因为否则运行时没有地方运行它们。

该示例通过对 spawn 调用返回的 JoinHandle 调用 block_on 来等待生成的任务完成,但这并不是唯一的方法。以下是一些替代方法

  • 使用消息传递通道,例如 tokio::sync::mpsc
  • 修改受例如 Mutex 保护的共享值。对于 GUI 中的进度条,这可能是一个好方法,其中 GUI 每帧读取共享值。

spawn 方法也可在 Handle 类型上使用。可以克隆 Handle 类型以获取运行时的多个句柄,并且每个 Handle 都可以用于在运行时上生成新任务。

发送消息

第三种技术是生成运行时并使用消息传递与其通信。与其他两种方法相比,这涉及更多的样板代码,但它是最灵活的方法。您可以在下面找到一个基本示例

use tokio::runtime::Builder;
use tokio::sync::mpsc;

pub struct Task {
    name: String,
    // info that describes the task
}

async fn handle_task(task: Task) {
    println!("Got task {}", task.name);
}

#[derive(Clone)]
pub struct TaskSpawner {
    spawn: mpsc::Sender<Task>,
}

impl TaskSpawner {
    pub fn new() -> TaskSpawner {
        // Set up a channel for communicating.
        let (send, mut recv) = mpsc::channel(16);

        // Build the runtime for the new thread.
        //
        // The runtime is created before spawning the thread
        // to more cleanly forward errors if the `unwrap()`
        // panics.
        let rt = Builder::new_current_thread()
            .enable_all()
            .build()
            .unwrap();

        std::thread::spawn(move || {
            rt.block_on(async move {
                while let Some(task) = recv.recv().await {
                    tokio::spawn(handle_task(task));
                }

                // Once all senders have gone out of scope,
                // the `.recv()` call returns None and it will
                // exit from the while loop and shut down the
                // thread.
            });
        });

        TaskSpawner {
            spawn: send,
        }
    }

    pub fn spawn_task(&self, task: Task) {
        match self.spawn.blocking_send(task) {
            Ok(()) => {},
            Err(_) => panic!("The shared runtime has shut down."),
        }
    }
}

此示例可以以多种方式配置。例如,您可以使用 Semaphore 来限制活动任务的数量,或者您可以在相反的方向上使用通道来向生成器发送响应。当您以这种方式生成运行时时,它是一种 actor