共享状态
到目前为止,我们已经有一个工作的键值服务器。但是,存在一个主要的缺陷:状态在连接之间不共享。我们将在本文中修复这个问题。
策略
在 Tokio 中,有几种不同的方法可以共享状态。
- 使用互斥锁 (Mutex) 保护共享状态。
- 生成一个任务来管理状态,并使用消息传递来操作它。
通常,对于简单数据,您希望使用第一种方法,而对于需要异步工作(如 I/O 原语)的事物,则使用第二种方法。在本章中,共享状态是一个 HashMap
,操作是 insert
和 get
。这些操作都不是异步的,因此我们将使用 Mutex
。
后一种方法将在下一章中介绍。
添加 bytes
依赖
Mini-Redis crate 没有使用 Vec<u8>
,而是使用了 bytes
crate 中的 Bytes
。Bytes
的目标是为网络编程提供健壮的字节数组结构。它比 Vec<u8>
增加的最大特性是浅克隆。换句话说,在 Bytes
实例上调用 clone()
不会复制底层数据。相反,Bytes
实例是对某些底层数据的引用计数句柄。Bytes
类型大致相当于 Arc<Vec<u8>>
,但具有一些附加功能。
要依赖 bytes
,请将以下内容添加到 Cargo.toml
的 [dependencies]
部分
bytes = "1"
初始化 HashMap
HashMap
将在许多任务和潜在的许多线程之间共享。为了支持这一点,它被包装在 Arc<Mutex<_>>
中。
首先,为了方便起见,在使用 use
语句后添加以下类型别名。
use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
然后,更新 main
函数以初始化 HashMap
并将 Arc
句柄 传递给 process
函数。使用 Arc
允许从许多任务(可能在许多线程上运行)并发引用 HashMap
。在整个 Tokio 中,术语 句柄 用于引用提供对某些共享状态的访问权限的值。
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listening");
let db = Arc::new(Mutex::new(HashMap::new()));
loop {
let (socket, _) = listener.accept().await.unwrap();
// Clone the handle to the hash map.
let db = db.clone();
println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}
关于使用 std::sync::Mutex
和 tokio::sync::Mutex
请注意,使用 std::sync::Mutex
而不是 tokio::sync::Mutex
来保护 HashMap
。一个常见的错误是从异步代码中无条件地使用 tokio::sync::Mutex
。异步互斥锁是在跨 .await
调用时锁定的互斥锁。
同步互斥锁在等待获取锁时会阻塞当前线程。反过来,这将阻止其他任务进行处理。但是,切换到 tokio::sync::Mutex
通常没有帮助,因为异步互斥锁在内部使用同步互斥锁。
作为经验法则,只要争用保持在较低水平并且锁未在 .await
调用之间保持,从异步代码中使用同步互斥锁是可以的。
更新 process()
process 函数不再初始化 HashMap
。相反,它将共享的 HashMap
句柄作为参数。它还需要在使用 HashMap
之前锁定它。请记住,HashMap
的值的类型现在是 Bytes
(我们可以廉价地克隆),因此这也需要更改。
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
// Connection, provided by `mini-redis`, handles parsing frames from
// the socket
let mut connection = Connection::new(socket);
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}
在 .await
期间持有 MutexGuard
您可能会编写如下代码
use std::sync::{Mutex, MutexGuard};
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
当您尝试生成调用此函数的某些内容时,您将遇到以下错误消息
error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here
发生这种情况是因为 std::sync::MutexGuard
类型不是 Send
。这意味着您无法将互斥锁发送到另一个线程,并且发生错误是因为 Tokio 运行时可以在每个 .await
之间在线程之间移动任务。为避免这种情况,您应该重构代码,以便互斥锁的析构函数在 .await
之前运行。
// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // lock goes out of scope here
do_something_async().await;
}
请注意,这不起作用
use std::sync::{Mutex, MutexGuard};
// This fails too.
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
drop(lock);
do_something_async().await;
}
这是因为编译器当前仅根据作用域信息计算 future 是否为 Send
。编译器有望在未来更新以支持显式删除它,但目前,您必须显式使用作用域。
请注意,此处讨论的错误也在生成章节的 Send bound 部分中讨论。
您不应尝试通过以不需要它是 Send
的方式生成任务来规避此问题,因为如果 Tokio 在任务持有锁时在 .await
处暂停您的任务,则可能会在同一线程上调度运行其他任务,而此其他任务也可能尝试锁定该互斥锁,这将导致死锁,因为等待锁定互斥锁的任务将阻止持有互斥锁的任务释放互斥锁。
请记住,某些互斥锁 crate 为其 MutexGuard 实现了 Send
。在这种情况下,即使您在 .await
期间持有 MutexGuard,也不会出现编译器错误。代码可以编译,但会死锁!
我们将在下面讨论一些避免这些问题的方法
重构您的代码,使其不在 .await
期间持有锁
处理互斥锁的最安全方法是将其包装在结构体中,并且仅在该结构体的非异步方法内部锁定互斥锁。
use std::sync::Mutex;
struct CanIncrement {
mutex: Mutex<i32>,
}
impl CanIncrement {
// This function is not marked async.
fn increment(&self) {
let mut lock = self.mutex.lock().unwrap();
*lock += 1;
}
}
async fn increment_and_do_stuff(can_incr: &CanIncrement) {
can_incr.increment();
do_something_async().await;
}
这种模式保证您不会遇到 Send
错误,因为互斥锁 guard 不会出现在任何异步函数中。当使用其 MutexGuard
实现 Send
的 crate 时,它还可以保护您免受死锁的影响。
您可以在这篇博客文章中找到更详细的示例。
生成一个任务来管理状态,并使用消息传递来操作它
这是本章开头提到的第二种方法,通常在共享资源是 I/O 资源时使用。有关更多详细信息,请参见下一章。
使用 Tokio 的异步互斥锁
也可以使用 Tokio 提供的 tokio::sync::Mutex
类型。Tokio 互斥锁的主要特点是它可以跨 .await
持有而没有任何问题。尽管如此,异步互斥锁比普通互斥锁更昂贵,通常最好使用其他两种方法之一。
use tokio::sync::Mutex; // note! This uses the Tokio mutex
// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
let mut lock = mutex.lock().await;
*lock += 1;
do_something_async().await;
} // lock goes out of scope here
任务、线程和争用
当争用最小时,使用阻塞互斥锁来保护短的关键部分是可接受的策略。当锁被争用时,执行任务的线程必须阻塞并等待互斥锁。这不仅会阻塞当前任务,还会阻塞计划在当前线程上的所有其他任务。
默认情况下,Tokio 运行时使用多线程调度器。任务被调度到运行时管理的任意数量的线程上。如果计划执行大量任务,并且它们都需要访问互斥锁,则会发生争用。另一方面,如果使用 current_thread
运行时风格,则互斥锁永远不会被争用。
current_thread
运行时风格是一个轻量级的单线程运行时。当仅生成少量任务并打开少量套接字时,这是一个不错的选择。例如,当在异步客户端库之上提供同步 API 桥接时,此选项效果很好。
如果同步互斥锁上的争用成为问题,则最佳解决方案很少是切换到 Tokio 互斥锁。相反,要考虑的选项是
- 让专用任务管理状态并使用消息传递。
- 分片互斥锁。
- 重构代码以避免互斥锁。
互斥锁分片
在我们的例子中,由于每个键都是独立的,因此互斥锁分片将很好地工作。为此,我们将引入 N
个不同的实例,而不是使用单个 Mutex<HashMap<_, _>>
实例。
type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;
fn new_sharded_db(num_shards: usize) -> ShardedDb {
let mut db = Vec::with_capacity(num_shards);
for _ in 0..num_shards {
db.push(Mutex::new(HashMap::new()));
}
Arc::new(db)
}
然后,查找任何给定键的单元格变成一个两步过程。首先,键用于标识它属于哪个分片。然后,在 HashMap
中查找键。
let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);
上面概述的简单实现需要使用固定数量的分片,并且一旦创建分片映射,就无法更改分片数量。
dashmap crate 提供了更复杂的分片哈希映射的实现。您可能还想查看诸如 leapfrog 和 flurry 之类的并发哈希表实现,后者是 Java 的 ConcurrentHashMap
数据结构的端口。
在您开始使用任何这些 crate 之前,请确保您构建代码,以便您无法在 .await
期间持有 MutexGuard
。如果您不这样做,您要么会遇到编译器错误(在非 Send guard 的情况下),要么您的代码会死锁(在 Send guard 的情况下)。请参阅这篇博客文章中的完整示例和更多上下文。