通道
既然我们已经了解了一些关于 Tokio 的并发性,让我们将其应用到客户端。将我们之前编写的服务器代码放入一个显式的二进制文件中
$ mkdir src/bin
$ mv src/main.rs src/bin/server.rs
并创建一个新的二进制文件,其中将包含客户端代码
$ touch src/bin/client.rs
在这个文件中,你将编写此页面的代码。每当你想运行它时,你都必须首先在一个单独的终端窗口中启动服务器
$ cargo run --bin server
然后再启动客户端,分别地
$ cargo run --bin client
话虽如此,让我们开始编码吧!
假设我们想要运行两个并发的 Redis 命令。我们可以为每个命令衍生一个任务。然后这两个命令将并发执行。
首先,我们可能会尝试类似这样的方法
use mini_redis::client;
#[tokio::main]
async fn main() {
// Establish a connection to the server
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async {
let res = client.get("foo").await;
});
let t2 = tokio::spawn(async {
client.set("foo", "bar".into()).await;
});
t1.await.unwrap();
t2.await.unwrap();
}
这无法编译,因为两个任务都需要以某种方式访问 client
。由于 Client
没有实现 Copy
,如果没有一些代码来方便这种共享,它将无法编译。此外,Client::set
接受 &mut self
,这意味着调用它需要独占访问权限。我们可以为每个任务打开一个连接,但这并不理想。我们不能使用 std::sync::Mutex
,因为 .await
需要在持有锁的情况下调用。我们可以使用 tokio::sync::Mutex
,但这只允许单个正在进行的请求。如果客户端实现了 管道,那么异步互斥锁会导致连接利用不足。
消息传递
答案是使用消息传递。这种模式涉及衍生一个专用任务来管理 client
资源。任何希望发出请求的任务都会向 client
任务发送消息。client
任务代表发送者发出请求,并将响应发送回发送者。
使用这种策略,将建立单个连接。管理 client
的任务能够获得独占访问权限,以便调用 get
和 set
。此外,通道充当缓冲区。操作可以在 client
任务繁忙时发送到 client
任务。一旦 client
任务可以处理新请求,它就会从通道中拉取下一个请求。这可以带来更好的吞吐量,并可以扩展以支持连接池。
Tokio 的通道原语
Tokio 提供了许多通道,每个通道服务于不同的目的。
- mpsc:多生产者、单消费者通道。可以发送多个值。
- oneshot:单生产者、单消费者通道。可以发送单个值。
- broadcast:多生产者、多消费者。可以发送多个值。每个接收者都会看到每个值。
- watch:多生产者、多消费者。可以发送多个值,但不保留历史记录。接收者只会看到最近的值。
如果你需要一个多生产者多消费者通道,其中每个消息只有一个消费者看到,你可以使用 async-channel
crate。还有一些通道用于异步 Rust 之外,例如 std::sync::mpsc
和 crossbeam::channel
。这些通道通过阻塞线程来等待消息,这在异步代码中是不允许的。
在本节中,我们将使用 mpsc 和 oneshot。其他类型的消息传递通道将在后面的章节中探讨。本节的完整代码可以在这里找到。
定义消息类型
在大多数情况下,当使用消息传递时,接收消息的任务会响应多个命令。在我们的例子中,任务将响应 GET
和 SET
命令。为了对此进行建模,我们首先定义一个 Command
枚举,并为每种命令类型包含一个变体。
use bytes::Bytes;
#[derive(Debug)]
enum Command {
Get {
key: String,
},
Set {
key: String,
val: Bytes,
}
}
创建通道
在 main
函数中,创建了一个 mpsc
通道。
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
// Create a new channel with a capacity of at most 32.
let (tx, mut rx) = mpsc::channel(32);
// ... Rest comes here
}
mpsc
通道用于向管理 redis 连接的任务发送命令。多生产者能力允许从多个任务发送消息。创建通道会返回两个值,一个发送者和一个接收者。这两个句柄是分开使用的。它们可以移动到不同的任务。
通道创建时的容量为 32。如果消息发送速度快于接收速度,通道将存储它们。一旦通道中存储了 32 条消息,调用 send(...).await
将进入睡眠状态,直到接收者删除一条消息。
从多个任务发送是通过克隆 Sender
来完成的。例如
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();
tokio::spawn(async move {
tx.send("sending from first handle").await.unwrap();
});
tokio::spawn(async move {
tx2.send("sending from second handle").await.unwrap();
});
while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
两条消息都发送到单个 Receiver
句柄。无法克隆 mpsc
通道的接收者。
当每个 Sender
都超出范围或已被丢弃时,就不可能再向通道发送更多消息。此时,对 Receiver
的 recv
调用将返回 None
,这意味着所有发送者都已消失,通道已关闭。
在我们的管理 Redis 连接的任务示例中,它知道一旦通道关闭就可以关闭 Redis 连接,因为该连接将不再使用。
衍生管理器任务
接下来,衍生一个任务来处理来自通道的消息。首先,建立到 Redis 的客户端连接。然后,通过 Redis 连接发出接收到的命令。
use mini_redis::client;
// The `move` keyword is used to **move** ownership of `rx` into the task.
let manager = tokio::spawn(async move {
// Establish a connection to the server
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// Start receiving messages
while let Some(cmd) = rx.recv().await {
use Command::*;
match cmd {
Get { key } => {
client.get(&key).await;
}
Set { key, val } => {
client.set(&key, val).await;
}
}
}
});
现在,更新这两个任务以通过通道发送命令,而不是直接在 Redis 连接上发出命令。
// The `Sender` handles are moved into the tasks. As there are two
// tasks, we need a second `Sender`.
let tx2 = tx.clone();
// Spawn two tasks, one gets a key, the other sets a key
let t1 = tokio::spawn(async move {
let cmd = Command::Get {
key: "foo".to_string(),
};
tx.send(cmd).await.unwrap();
});
let t2 = tokio::spawn(async move {
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
};
tx2.send(cmd).await.unwrap();
});
在 main
函数的底部,我们 .await
join 句柄以确保命令在进程退出之前完全完成。
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
接收响应
最后一步是从管理器任务接收响应。GET
命令需要获取值,而 SET
命令需要知道操作是否成功完成。
为了传递响应,使用了 oneshot
通道。oneshot
通道是为发送单个值而优化的单生产者、单消费者通道。在我们的例子中,单个值是响应。
与 mpsc
类似,oneshot::channel()
返回一个发送者和接收者句柄。
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel();
与 mpsc
不同,没有指定容量,因为容量始终为 1。此外,两个句柄都无法克隆。
为了从管理器任务接收响应,在发送命令之前,创建了一个 oneshot
通道。通道的 Sender
一半包含在发送给管理器任务的命令中。接收一半用于接收响应。
首先,更新 Command
以包含 Sender
。为了方便起见,使用类型别名来引用 Sender
。
use tokio::sync::oneshot;
use bytes::Bytes;
/// Multiple different commands are multiplexed over a single channel.
#[derive(Debug)]
enum Command {
Get {
key: String,
resp: Responder<Option<Bytes>>,
},
Set {
key: String,
val: Bytes,
resp: Responder<()>,
},
}
/// Provided by the requester and used by the manager task to send
/// the command response back to the requester.
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
现在,更新发出命令的任务以包含 oneshot::Sender
。
let t1 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Get {
key: "foo".to_string(),
resp: resp_tx,
};
// Send the GET request
tx.send(cmd).await.unwrap();
// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
let t2 = tokio::spawn(async move {
let (resp_tx, resp_rx) = oneshot::channel();
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
resp: resp_tx,
};
// Send the SET request
tx2.send(cmd).await.unwrap();
// Await the response
let res = resp_rx.await;
println!("GOT = {:?}", res);
});
最后,更新管理器任务以通过 oneshot
通道发送响应。
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, resp } => {
let res = client.get(&key).await;
// Ignore errors
let _ = resp.send(res);
}
Command::Set { key, val, resp } => {
let res = client.set(&key, val).await;
// Ignore errors
let _ = resp.send(res);
}
}
}
在 oneshot::Sender
上调用 send
会立即完成,并且不需要 .await
。这是因为 oneshot
通道上的 send
将始终立即失败或成功,而无需任何形式的等待。
当接收者一半被丢弃时,在 oneshot 通道上发送值会返回 Err
。这表示接收者不再对响应感兴趣。在我们的场景中,接收者取消兴趣是可以接受的事件。resp.send(...)
返回的 Err
不需要处理。
你可以在这里找到完整代码。
反压和有界通道
每当引入并发或排队时,重要的是要确保排队是有界的,并且系统将优雅地处理负载。无界队列最终将填满所有可用内存,并导致系统以不可预测的方式失败。
Tokio 注意避免隐式排队。这很大程度上是因为异步操作是惰性的。考虑以下情况
loop {
async_op();
}
如果异步操作急切地运行,则循环将重复排队一个新的 async_op
来运行,而不确保之前的操作已完成。这会导致隐式无界排队。基于回调的系统和急切的基于 Future 的系统特别容易受到这种情况的影响。
但是,使用 Tokio 和异步 Rust,上面的代码片段将不会导致 async_op
运行。这是因为从未调用 .await
。如果代码片段更新为使用 .await
,则循环会在重新开始之前等待操作完成。
loop {
// Will not repeat until `async_op` completes
async_op().await;
}
并发和排队必须显式引入。执行此操作的方法包括
tokio::spawn
select!
join!
mpsc::channel
这样做时,请注意确保并发的总量是有界的。例如,在编写 TCP accept 循环时,请确保打开的套接字总数是有界的。当使用 mpsc::channel
时,选择可管理的通道容量。特定的边界值将是特定于应用程序的。
小心并选择好的边界是编写可靠的 Tokio 应用程序的重要组成部分。