通道

既然我们已经了解了一些关于 Tokio 的并发性,让我们将其应用到客户端。将我们之前编写的服务器代码放入一个显式的二进制文件中

$ mkdir src/bin
$ mv src/main.rs src/bin/server.rs

并创建一个新的二进制文件,其中将包含客户端代码

$ touch src/bin/client.rs

在这个文件中,你将编写此页面的代码。每当你想运行它时,你都必须首先在一个单独的终端窗口中启动服务器

$ cargo run --bin server

然后再启动客户端,分别地

$ cargo run --bin client

话虽如此,让我们开始编码吧!

假设我们想要运行两个并发的 Redis 命令。我们可以为每个命令衍生一个任务。然后这两个命令将并发执行。

首先,我们可能会尝试类似这样的方法

use mini_redis::client;

#[tokio::main]
async fn main() {
    // Establish a connection to the server
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // Spawn two tasks, one gets a key, the other sets a key
    let t1 = tokio::spawn(async {
        let res = client.get("foo").await;
    });

    let t2 = tokio::spawn(async {
        client.set("foo", "bar".into()).await;
    });

    t1.await.unwrap();
    t2.await.unwrap();
}

这无法编译,因为两个任务都需要以某种方式访问 client。由于 Client 没有实现 Copy,如果没有一些代码来方便这种共享,它将无法编译。此外,Client::set 接受 &mut self,这意味着调用它需要独占访问权限。我们可以为每个任务打开一个连接,但这并不理想。我们不能使用 std::sync::Mutex,因为 .await 需要在持有锁的情况下调用。我们可以使用 tokio::sync::Mutex,但这只允许单个正在进行的请求。如果客户端实现了 管道,那么异步互斥锁会导致连接利用不足。

消息传递

答案是使用消息传递。这种模式涉及衍生一个专用任务来管理 client 资源。任何希望发出请求的任务都会向 client 任务发送消息。client 任务代表发送者发出请求,并将响应发送回发送者。

使用这种策略,将建立单个连接。管理 client 的任务能够获得独占访问权限,以便调用 getset。此外,通道充当缓冲区。操作可以在 client 任务繁忙时发送到 client 任务。一旦 client 任务可以处理新请求,它就会从通道中拉取下一个请求。这可以带来更好的吞吐量,并可以扩展以支持连接池。

Tokio 的通道原语

Tokio 提供了许多通道,每个通道服务于不同的目的。

  • mpsc:多生产者、单消费者通道。可以发送多个值。
  • oneshot:单生产者、单消费者通道。可以发送单个值。
  • broadcast:多生产者、多消费者。可以发送多个值。每个接收者都会看到每个值。
  • watch:多生产者、多消费者。可以发送多个值,但不保留历史记录。接收者只会看到最近的值。

如果你需要一个多生产者多消费者通道,其中每个消息只有一个消费者看到,你可以使用 async-channel crate。还有一些通道用于异步 Rust 之外,例如 std::sync::mpsccrossbeam::channel。这些通道通过阻塞线程来等待消息,这在异步代码中是不允许的。

在本节中,我们将使用 mpsconeshot。其他类型的消息传递通道将在后面的章节中探讨。本节的完整代码可以在这里找到。

定义消息类型

在大多数情况下,当使用消息传递时,接收消息的任务会响应多个命令。在我们的例子中,任务将响应 GETSET 命令。为了对此进行建模,我们首先定义一个 Command 枚举,并为每种命令类型包含一个变体。

use bytes::Bytes;

#[derive(Debug)]
enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        val: Bytes,
    }
}

创建通道

main 函数中,创建了一个 mpsc 通道。

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // Create a new channel with a capacity of at most 32.
    let (tx, mut rx) = mpsc::channel(32);

    // ... Rest comes here
}

mpsc 通道用于向管理 redis 连接的任务发送命令。多生产者能力允许从多个任务发送消息。创建通道会返回两个值,一个发送者和一个接收者。这两个句柄是分开使用的。它们可以移动到不同的任务。

通道创建时的容量为 32。如果消息发送速度快于接收速度,通道将存储它们。一旦通道中存储了 32 条消息,调用 send(...).await 将进入睡眠状态,直到接收者删除一条消息。

从多个任务发送是通过克隆 Sender 来完成的。例如

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();

    tokio::spawn(async move {
        tx.send("sending from first handle").await.unwrap();
    });

    tokio::spawn(async move {
        tx2.send("sending from second handle").await.unwrap();
    });

    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);
    }
}

两条消息都发送到单个 Receiver 句柄。无法克隆 mpsc 通道的接收者。

当每个 Sender 都超出范围或已被丢弃时,就不可能再向通道发送更多消息。此时,对 Receiverrecv 调用将返回 None,这意味着所有发送者都已消失,通道已关闭。

在我们的管理 Redis 连接的任务示例中,它知道一旦通道关闭就可以关闭 Redis 连接,因为该连接将不再使用。

衍生管理器任务

接下来,衍生一个任务来处理来自通道的消息。首先,建立到 Redis 的客户端连接。然后,通过 Redis 连接发出接收到的命令。

use mini_redis::client;
// The `move` keyword is used to **move** ownership of `rx` into the task.
let manager = tokio::spawn(async move {
    // Establish a connection to the server
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // Start receiving messages
    while let Some(cmd) = rx.recv().await {
        use Command::*;

        match cmd {
            Get { key } => {
                client.get(&key).await;
            }
            Set { key, val } => {
                client.set(&key, val).await;
            }
        }
    }
});

现在,更新这两个任务以通过通道发送命令,而不是直接在 Redis 连接上发出命令。

// The `Sender` handles are moved into the tasks. As there are two
// tasks, we need a second `Sender`.
let tx2 = tx.clone();

// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async move {
    let cmd = Command::Get {
        key: "foo".to_string(),
    };

    tx.send(cmd).await.unwrap();
});

let t2 = tokio::spawn(async move {
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
    };

    tx2.send(cmd).await.unwrap();
});

main 函数的底部,我们 .await join 句柄以确保命令在进程退出之前完全完成。

t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();

接收响应

最后一步是从管理器任务接收响应。GET 命令需要获取值,而 SET 命令需要知道操作是否成功完成。

为了传递响应,使用了 oneshot 通道。oneshot 通道是为发送单个值而优化的单生产者、单消费者通道。在我们的例子中,单个值是响应。

mpsc 类似,oneshot::channel() 返回一个发送者和接收者句柄。

use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel();

mpsc 不同,没有指定容量,因为容量始终为 1。此外,两个句柄都无法克隆。

为了从管理器任务接收响应,在发送命令之前,创建了一个 oneshot 通道。通道的 Sender 一半包含在发送给管理器任务的命令中。接收一半用于接收响应。

首先,更新 Command 以包含 Sender。为了方便起见,使用类型别名来引用 Sender

use tokio::sync::oneshot;
use bytes::Bytes;

/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
    Get {
        key: String,
        resp: Responder<Option<Bytes>>,
    },
    Set {
        key: String,
        val: Bytes,
        resp: Responder<()>,
    },
}

/// Provided by the requester and used by the manager task to send
/// the command response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;

现在,更新发出命令的任务以包含 oneshot::Sender

let t1 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Get {
        key: "foo".to_string(),
        resp: resp_tx,
    };

    // Send the GET request
    tx.send(cmd).await.unwrap();

    // Await the response
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

let t2 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
        resp: resp_tx,
    };

    // Send the SET request
    tx2.send(cmd).await.unwrap();

    // Await the response
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

最后,更新管理器任务以通过 oneshot 通道发送响应。

while let Some(cmd) = rx.recv().await {
    match cmd {
        Command::Get { key, resp } => {
            let res = client.get(&key).await;
            // Ignore errors
            let _ = resp.send(res);
        }
        Command::Set { key, val, resp } => {
            let res = client.set(&key, val).await;
            // Ignore errors
            let _ = resp.send(res);
        }
    }
}

oneshot::Sender 上调用 send 会立即完成,并且需要 .await。这是因为 oneshot 通道上的 send 将始终立即失败或成功,而无需任何形式的等待。

当接收者一半被丢弃时,在 oneshot 通道上发送值会返回 Err。这表示接收者不再对响应感兴趣。在我们的场景中,接收者取消兴趣是可以接受的事件。resp.send(...) 返回的 Err 不需要处理。

你可以在这里找到完整代码。

反压和有界通道

每当引入并发或排队时,重要的是要确保排队是有界的,并且系统将优雅地处理负载。无界队列最终将填满所有可用内存,并导致系统以不可预测的方式失败。

Tokio 注意避免隐式排队。这很大程度上是因为异步操作是惰性的。考虑以下情况

loop {
    async_op();
}

如果异步操作急切地运行,则循环将重复排队一个新的 async_op 来运行,而不确保之前的操作已完成。这会导致隐式无界排队。基于回调的系统和急切的基于 Future 的系统特别容易受到这种情况的影响。

但是,使用 Tokio 和异步 Rust,上面的代码片段将不会导致 async_op 运行。这是因为从未调用 .await。如果代码片段更新为使用 .await,则循环会在重新开始之前等待操作完成。

loop {
    // Will not repeat until `async_op` completes
    async_op().await;
}

并发和排队必须显式引入。执行此操作的方法包括

  • tokio::spawn
  • select!
  • join!
  • mpsc::channel

这样做时,请注意确保并发的总量是有界的。例如,在编写 TCP accept 循环时,请确保打开的套接字总数是有界的。当使用 mpsc::channel 时,选择可管理的通道容量。特定的边界值将是特定于应用程序的。

小心并选择好的边界是编写可靠的 Tokio 应用程序的重要组成部分。