共享状态

到目前为止,我们已经有一个工作的键值服务器。但是,存在一个主要的缺陷:状态在连接之间不共享。我们将在本文中修复这个问题。

策略

在 Tokio 中,有几种不同的方法可以共享状态。

  1. 使用互斥锁 (Mutex) 保护共享状态。
  2. 生成一个任务来管理状态,并使用消息传递来操作它。

通常,对于简单数据,您希望使用第一种方法,而对于需要异步工作(如 I/O 原语)的事物,则使用第二种方法。在本章中,共享状态是一个 HashMap,操作是 insertget。这些操作都不是异步的,因此我们将使用 Mutex

后一种方法将在下一章中介绍。

添加 bytes 依赖

Mini-Redis crate 没有使用 Vec<u8>,而是使用了 bytes crate 中的 BytesBytes 的目标是为网络编程提供健壮的字节数组结构。它比 Vec<u8> 增加的最大特性是浅克隆。换句话说,在 Bytes 实例上调用 clone() 不会复制底层数据。相反,Bytes 实例是对某些底层数据的引用计数句柄。Bytes 类型大致相当于 Arc<Vec<u8>>,但具有一些附加功能。

要依赖 bytes,请将以下内容添加到 Cargo.toml[dependencies] 部分

bytes = "1"

初始化 HashMap

HashMap 将在许多任务和潜在的许多线程之间共享。为了支持这一点,它被包装在 Arc<Mutex<_>> 中。

首先,为了方便起见,在使用 use 语句后添加以下类型别名。

use bytes::Bytes;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

type Db = Arc<Mutex<HashMap<String, Bytes>>>;

然后,更新 main 函数以初始化 HashMap 并将 Arc 句柄 传递给 process 函数。使用 Arc 允许从许多任务(可能在许多线程上运行)并发引用 HashMap。在整个 Tokio 中,术语 句柄 用于引用提供对某些共享状态的访问权限的值。

use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    println!("Listening");

    let db = Arc::new(Mutex::new(HashMap::new()));

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // Clone the handle to the hash map.
        let db = db.clone();

        println!("Accepted");
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }
}

关于使用 std::sync::Mutextokio::sync::Mutex

请注意,使用 std::sync::Mutex不是 tokio::sync::Mutex 来保护 HashMap。一个常见的错误是从异步代码中无条件地使用 tokio::sync::Mutex。异步互斥锁是在跨 .await 调用时锁定的互斥锁。

同步互斥锁在等待获取锁时会阻塞当前线程。反过来,这将阻止其他任务进行处理。但是,切换到 tokio::sync::Mutex 通常没有帮助,因为异步互斥锁在内部使用同步互斥锁。

作为经验法则,只要争用保持在较低水平并且锁未在 .await 调用之间保持,从异步代码中使用同步互斥锁是可以的。

更新 process()

process 函数不再初始化 HashMap。相反,它将共享的 HashMap 句柄作为参数。它还需要在使用 HashMap 之前锁定它。请记住,HashMap 的值的类型现在是 Bytes(我们可以廉价地克隆),因此这也需要更改。

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream, db: Db) {
    use mini_redis::Command::{self, Get, Set};

    // Connection, provided by `mini-redis`, handles parsing frames from
    // the socket
    let mut connection = Connection::new(socket);

    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                let mut db = db.lock().unwrap();
                db.insert(cmd.key().to_string(), cmd.value().clone());
                Frame::Simple("OK".to_string())
            }           
            Get(cmd) => {
                let db = db.lock().unwrap();
                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // Write the response to the client
        connection.write_frame(&response).await.unwrap();
    }
}

.await 期间持有 MutexGuard

您可能会编写如下代码

use std::sync::{Mutex, MutexGuard};

async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here

当您尝试生成调用此函数的某些内容时,您将遇到以下错误消息

error: future cannot be sent between threads safely
   --> src/lib.rs:13:5
    |
13  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    |
   ::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
   --> src/lib.rs:7:5
    |
4   |     let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    |         -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7   |     do_something_async().await;
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8   | }
    | - `mut lock` is later dropped here

发生这种情况是因为 std::sync::MutexGuard 类型不是 Send。这意味着您无法将互斥锁发送到另一个线程,并且发生错误是因为 Tokio 运行时可以在每个 .await 之间在线程之间移动任务。为避免这种情况,您应该重构代码,以便互斥锁的析构函数在 .await 之前运行。

// This works!
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    {
        let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
        *lock += 1;
    } // lock goes out of scope here

    do_something_async().await;
}

请注意,这不起作用

use std::sync::{Mutex, MutexGuard};

// This fails too.
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
    *lock += 1;
    drop(lock);

    do_something_async().await;
}

这是因为编译器当前仅根据作用域信息计算 future 是否为 Send。编译器有望在未来更新以支持显式删除它,但目前,您必须显式使用作用域。

请注意,此处讨论的错误也在生成章节的 Send bound 部分中讨论。

您不应尝试通过以不需要它是 Send 的方式生成任务来规避此问题,因为如果 Tokio 在任务持有锁时在 .await 处暂停您的任务,则可能会在同一线程上调度运行其他任务,而此其他任务也可能尝试锁定该互斥锁,这将导致死锁,因为等待锁定互斥锁的任务将阻止持有互斥锁的任务释放互斥锁。

请记住,某些互斥锁 crate 为其 MutexGuard 实现了 Send。在这种情况下,即使您在 .await 期间持有 MutexGuard,也不会出现编译器错误。代码可以编译,但会死锁!

我们将在下面讨论一些避免这些问题的方法

重构您的代码,使其不在 .await 期间持有锁

处理互斥锁的最安全方法是将其包装在结构体中,并且仅在该结构体的非异步方法内部锁定互斥锁。

use std::sync::Mutex;

struct CanIncrement {
    mutex: Mutex<i32>,
}
impl CanIncrement {
    // This function is not marked async.
    fn increment(&self) {
        let mut lock = self.mutex.lock().unwrap();
        *lock += 1;
    }
}

async fn increment_and_do_stuff(can_incr: &CanIncrement) {
    can_incr.increment();
    do_something_async().await;
}

这种模式保证您不会遇到 Send 错误,因为互斥锁 guard 不会出现在任何异步函数中。当使用其 MutexGuard 实现 Send 的 crate 时,它还可以保护您免受死锁的影响。

您可以在这篇博客文章中找到更详细的示例。

生成一个任务来管理状态,并使用消息传递来操作它

这是本章开头提到的第二种方法,通常在共享资源是 I/O 资源时使用。有关更多详细信息,请参见下一章。

使用 Tokio 的异步互斥锁

也可以使用 Tokio 提供的 tokio::sync::Mutex 类型。Tokio 互斥锁的主要特点是它可以跨 .await 持有而没有任何问题。尽管如此,异步互斥锁比普通互斥锁更昂贵,通常最好使用其他两种方法之一。

use tokio::sync::Mutex; // note! This uses the Tokio mutex

// This compiles!
// (but restructuring the code would be better in this case)
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
    let mut lock = mutex.lock().await;
    *lock += 1;

    do_something_async().await;
} // lock goes out of scope here

任务、线程和争用

当争用最小时,使用阻塞互斥锁来保护短的关键部分是可接受的策略。当锁被争用时,执行任务的线程必须阻塞并等待互斥锁。这不仅会阻塞当前任务,还会阻塞计划在当前线程上的所有其他任务。

默认情况下,Tokio 运行时使用多线程调度器。任务被调度到运行时管理的任意数量的线程上。如果计划执行大量任务,并且它们都需要访问互斥锁,则会发生争用。另一方面,如果使用 current_thread 运行时风格,则互斥锁永远不会被争用。

current_thread 运行时风格是一个轻量级的单线程运行时。当仅生成少量任务并打开少量套接字时,这是一个不错的选择。例如,当在异步客户端库之上提供同步 API 桥接时,此选项效果很好。

如果同步互斥锁上的争用成为问题,则最佳解决方案很少是切换到 Tokio 互斥锁。相反,要考虑的选项是

  • 让专用任务管理状态并使用消息传递。
  • 分片互斥锁。
  • 重构代码以避免互斥锁。

互斥锁分片

在我们的例子中,由于每个都是独立的,因此互斥锁分片将很好地工作。为此,我们将引入 N 个不同的实例,而不是使用单个 Mutex<HashMap<_, _>> 实例。

type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>;

fn new_sharded_db(num_shards: usize) -> ShardedDb {
    let mut db = Vec::with_capacity(num_shards);
    for _ in 0..num_shards {
        db.push(Mutex::new(HashMap::new()));
    }
    Arc::new(db)
}

然后,查找任何给定键的单元格变成一个两步过程。首先,键用于标识它属于哪个分片。然后,在 HashMap 中查找键。

let shard = db[hash(key) % db.len()].lock().unwrap();
shard.insert(key, value);

上面概述的简单实现需要使用固定数量的分片,并且一旦创建分片映射,就无法更改分片数量。

dashmap crate 提供了更复杂的分片哈希映射的实现。您可能还想查看诸如 leapfrogflurry 之类的并发哈希表实现,后者是 Java 的 ConcurrentHashMap 数据结构的端口。

在您开始使用任何这些 crate 之前,请确保您构建代码,以便您无法在 .await 期间持有 MutexGuard。如果您不这样做,您要么会遇到编译器错误(在非 Send guard 的情况下),要么您的代码会死锁(在 Send guard 的情况下)。请参阅这篇博客文章中的完整示例和更多上下文。