生成任务 (Spawning)

我们将要转换方向,开始开发 Redis 服务器。

首先,将上一节中的客户端 SET/GET 代码移动到一个示例文件中。这样,我们可以针对我们的服务器运行它。

$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs

然后创建一个新的、空的 src/main.rs 并继续。

接受套接字

我们的 Redis 服务器需要做的第一件事是接受入站 TCP 套接字。这可以通过将 tokio::net::TcpListener 绑定到端口 6379 来完成。

Tokio 的许多类型与其 Rust 标准库中的同步等价物同名。在有意义的情况下,Tokio 公开了与 std 相同的 API,但使用 async fn

然后,套接字在一个循环中被接受。每个套接字都被处理,然后被关闭。现在,我们将读取命令,将其打印到 stdout,并以错误响应。

src/main.rs

use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};

#[tokio::main]
async fn main() {
    // Bind the listener to the address
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        // The second item contains the IP and port of the new connection.
        let (socket, _) = listener.accept().await.unwrap();
        process(socket).await;
    }
}

async fn process(socket: TcpStream) {
    // The `Connection` lets us read/write redis **frames** instead of
    // byte streams. The `Connection` type is defined by mini-redis.
    let mut connection = Connection::new(socket);

    if let Some(frame) = connection.read_frame().await.unwrap() {
        println!("GOT: {:?}", frame);

        // Respond with an error
        let response = Frame::Error("unimplemented".to_string());
        connection.write_frame(&response).await.unwrap();
    }
}

现在,运行这个接受循环

$ cargo run

在一个单独的终端窗口中,运行 hello-redis 示例(来自上一节的 SET/GET 命令)

$ cargo run --example hello-redis

输出应该是

Error: "unimplemented"

在服务器终端中,输出是

GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])

并发

我们的服务器有一个小问题(除了只响应错误之外)。它一次处理一个入站请求。当接受连接时,服务器会停留在接受循环阻塞中,直到响应完全写入套接字。

我们希望我们的 Redis 服务器处理许多并发请求。为此,我们需要添加一些并发性。

并发和并行不是同一件事。如果您在两个任务之间交替,那么您是在并发地处理这两个任务,但不是并行地处理。要使其符合并行的条件,您需要两个人,一个人专门负责一个任务。

使用 Tokio 的优势之一是,异步代码允许您并发地处理许多任务,而无需使用普通线程并行处理它们。实际上,Tokio 可以在单个线程上并发运行许多任务!

为了并发地处理连接,为每个入站连接生成一个新的任务。连接在这个任务上被处理。

接受循环变为

use tokio::net::TcpListener;

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        let (socket, _) = listener.accept().await.unwrap();
        // A new task is spawned for each inbound socket. The socket is
        // moved to the new task and processed there.
        tokio::spawn(async move {
            process(socket).await;
        });
    }
}

任务

Tokio 任务是一个异步的绿色线程。它们通过将 async 块传递给 tokio::spawn 来创建。tokio::spawn 函数返回一个 JoinHandle,调用者可以使用它与生成的任务进行交互。async 块可能有一个返回值。调用者可以使用 .awaitJoinHandle 上获取返回值。

例如

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        // Do some async work
        "return value"
    });

    // Do some other work

    let out = handle.await.unwrap();
    println!("GOT {}", out);
}

等待 JoinHandle 返回一个 Result。当任务在执行期间遇到错误时,JoinHandle 将返回一个 Err。当任务发生 panic,或者任务被运行时强制关闭时,就会发生这种情况。

任务是由调度器管理的执行单元。生成任务会将其提交给 Tokio 调度器,然后调度器确保任务在有工作要做时执行。生成的任务可能在与生成它的线程相同的线程上执行,也可能在不同的运行时线程上执行。任务也可能在生成后在线程之间移动。

Tokio 中的任务非常轻量级。在底层,它们只需要一个分配和 64 字节的内存。应用程序应该随意生成数千个,甚至数百万个任务。

'static 约束

当您在 Tokio 运行时上生成任务时,其类型的生命周期必须是 'static。这意味着生成的任务不能包含对任务外部拥有的数据的任何引用。

一个常见的误解是 'static 总是意味着“永远存在”,但事实并非如此。仅仅因为一个值是 'static 并不意味着您有内存泄漏。您可以在 Common Rust Lifetime Misconceptions 中阅读更多内容。

例如,以下代码将无法编译

use tokio::task;

#[tokio::main]
async fn main() {
    let v = vec![1, 2, 3];

    task::spawn(async {
        println!("Here's a vec: {:?}", v);
    });
}

尝试编译此代码会导致以下错误

error[E0373]: async block may outlive the current function, but
              it borrows `v`, which is owned by the current function
 --> src/main.rs:7:23
  |
7 |       task::spawn(async {
  |  _______________________^
8 | |         println!("Here's a vec: {:?}", v);
  | |                                        - `v` is borrowed here
9 | |     });
  | |_____^ may outlive borrowed value `v`
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:7:17
  |
7 |       task::spawn(async {
  |  _________________^
8 | |         println!("Here's a vector: {:?}", v);
9 | |     });
  | |_____^
help: to force the async block to take ownership of `v` (and any other
      referenced variables), use the `move` keyword
  |
7 |     task::spawn(async move {
8 |         println!("Here's a vec: {:?}", v);
9 |     });
  |

发生这种情况是因为,默认情况下,变量不会被移动到 async 块中。v 向量仍然由 main 函数拥有。println! 行借用了 v。rust 编译器会很有帮助地向我们解释这一点,甚至建议了修复方法!将第 7 行更改为 task::spawn(async move { 将指示编译器移动 v 到生成的任务中。现在,任务拥有其所有数据,使其成为 'static

如果单个数据片段必须从多个任务并发访问,则必须使用同步原语(例如 Arc)共享它。

请注意,错误消息讨论的是参数类型超出 'static 生命周期。这种术语可能相当令人困惑,因为 'static 生命周期持续到程序结束,所以如果它超出它,难道您不会有内存泄漏吗?解释是,必须超出 'static 生命周期的是类型,而不是,并且值可能在其类型不再有效之前被销毁。

当我们说一个值是 'static 时,所有这意味着保持该值永远存在是不会不正确的。这很重要,因为编译器无法推断新生成的任务会存在多久。我们必须确保允许任务永远存在,以便 Tokio 可以使任务在需要时运行。

信息框前面链接的文章使用术语“受 'static 约束”而不是“其类型超出 'static”或“值是 'static”来指代 T: 'static。这些都意味着相同的事情,但与“用 'static 注释”不同,例如 &'static T

Send 约束

tokio::spawn 生成的任务必须实现 Send。这允许 Tokio 运行时在任务在 .await 处挂起时在线程之间移动任务。

所有.await 调用之间保持的数据都是 Send 时,任务才是 Send 的。这有点微妙。当 .await 被调用时,任务会屈服于调度器。下次任务执行时,它会从上次屈服的点恢复。为了使这项工作正常进行,任务必须保存之后使用的所有状态。如果此状态是 Send,即可以跨线程移动,则任务本身也可以跨线程移动。相反,如果状态不是 Send,那么任务也不是。

例如,这可以工作

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // The scope forces `rc` to drop before `.await`.
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        // `rc` is no longer used. It is **not** persisted when
        // the task yields to the scheduler
        yield_now().await;
    });
}

这不行

use tokio::task::yield_now;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");

        // `rc` is used after `.await`. It must be persisted to
        // the task's state.
        yield_now().await;

        println!("{}", rc);
    });
}

尝试编译代码片段会导致

error: future cannot be sent between threads safely
   --> src/main.rs:6:5
    |
6   |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: [..]spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ---- required by this bound in
    |                          `tokio::task::spawn::spawn`
    |
    = help: within `impl std::future::Future`, the trait
    |       `std::marker::Send` is not  implemented for
    |       `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:10:9
    |
7   |         let rc = Rc::new("hello");
    |             -- has type `std::rc::Rc<&str>` which is not `Send`
...
10  |         yield_now().await;
    |         ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
    |                           used later
11  |         println!("{}", rc);
12  |     });
    |     - `rc` is later dropped here

我们将在下一章中更深入地讨论此错误的特殊情况。

存储值

我们现在将实现 process 函数来处理传入的命令。我们将使用 HashMap 来存储值。SET 命令将插入到 HashMap 中,而 GET 值将加载它们。此外,我们将使用一个循环来接受每个连接的多个命令。

use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};

async fn process(socket: TcpStream) {
    use mini_redis::Command::{self, Get, Set};
    use std::collections::HashMap;

    // A hashmap is used to store data
    let mut db = HashMap::new();

    // Connection, provided by `mini-redis`, handles parsing frames from
    // the socket
    let mut connection = Connection::new(socket);

    // Use `read_frame` to receive a command from the connection.
    while let Some(frame) = connection.read_frame().await.unwrap() {
        let response = match Command::from_frame(frame).unwrap() {
            Set(cmd) => {
                // The value is stored as `Vec<u8>`
                db.insert(cmd.key().to_string(), cmd.value().to_vec());
                Frame::Simple("OK".to_string())
            }
            Get(cmd) => {
                if let Some(value) = db.get(cmd.key()) {
                    // `Frame::Bulk` expects data to be of type `Bytes`. This
                    // type will be covered later in the tutorial. For now,
                    // `&Vec<u8>` is converted to `Bytes` using `into()`.
                    Frame::Bulk(value.clone().into())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };

        // Write the response to the client
        connection.write_frame(&response).await.unwrap();
    }
}

现在,启动服务器

$ cargo run

并在一个单独的终端窗口中,运行 hello-redis 示例

$ cargo run --example hello-redis

现在,输出将是

got value from the server; result=Some(b"world")

我们现在可以获取和设置值,但存在一个问题:这些值在连接之间不共享。如果另一个套接字连接并尝试 GET hello 键,它将找不到任何内容。

您可以在 这里 找到完整的代码。

在下一节中,我们将实现为所有套接字持久化数据。