生成任务 (Spawning)
我们将要转换方向,开始开发 Redis 服务器。
首先,将上一节中的客户端 SET
/GET
代码移动到一个示例文件中。这样,我们可以针对我们的服务器运行它。
$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs
然后创建一个新的、空的 src/main.rs
并继续。
接受套接字
我们的 Redis 服务器需要做的第一件事是接受入站 TCP 套接字。这可以通过将 tokio::net::TcpListener
绑定到端口 6379 来完成。
Tokio 的许多类型与其 Rust 标准库中的同步等价物同名。在有意义的情况下,Tokio 公开了与
std
相同的 API,但使用async fn
。
然后,套接字在一个循环中被接受。每个套接字都被处理,然后被关闭。现在,我们将读取命令,将其打印到 stdout,并以错误响应。
src/main.rs
use tokio::net::{TcpListener, TcpStream};
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
// Bind the listener to the address
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
// The second item contains the IP and port of the new connection.
let (socket, _) = listener.accept().await.unwrap();
process(socket).await;
}
}
async fn process(socket: TcpStream) {
// The `Connection` lets us read/write redis **frames** instead of
// byte streams. The `Connection` type is defined by mini-redis.
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
// Respond with an error
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
现在,运行这个接受循环
$ cargo run
在一个单独的终端窗口中,运行 hello-redis
示例(来自上一节的 SET
/GET
命令)
$ cargo run --example hello-redis
输出应该是
Error: "unimplemented"
在服务器终端中,输出是
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])
并发
我们的服务器有一个小问题(除了只响应错误之外)。它一次处理一个入站请求。当接受连接时,服务器会停留在接受循环阻塞中,直到响应完全写入套接字。
我们希望我们的 Redis 服务器处理许多并发请求。为此,我们需要添加一些并发性。
并发和并行不是同一件事。如果您在两个任务之间交替,那么您是在并发地处理这两个任务,但不是并行地处理。要使其符合并行的条件,您需要两个人,一个人专门负责一个任务。
使用 Tokio 的优势之一是,异步代码允许您并发地处理许多任务,而无需使用普通线程并行处理它们。实际上,Tokio 可以在单个线程上并发运行许多任务!
为了并发地处理连接,为每个入站连接生成一个新的任务。连接在这个任务上被处理。
接受循环变为
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
// A new task is spawned for each inbound socket. The socket is
// moved to the new task and processed there.
tokio::spawn(async move {
process(socket).await;
});
}
}
任务
Tokio 任务是一个异步的绿色线程。它们通过将 async
块传递给 tokio::spawn
来创建。tokio::spawn
函数返回一个 JoinHandle
,调用者可以使用它与生成的任务进行交互。async
块可能有一个返回值。调用者可以使用 .await
在 JoinHandle
上获取返回值。
例如
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
// Do some async work
"return value"
});
// Do some other work
let out = handle.await.unwrap();
println!("GOT {}", out);
}
等待 JoinHandle
返回一个 Result
。当任务在执行期间遇到错误时,JoinHandle
将返回一个 Err
。当任务发生 panic,或者任务被运行时强制关闭时,就会发生这种情况。
任务是由调度器管理的执行单元。生成任务会将其提交给 Tokio 调度器,然后调度器确保任务在有工作要做时执行。生成的任务可能在与生成它的线程相同的线程上执行,也可能在不同的运行时线程上执行。任务也可能在生成后在线程之间移动。
Tokio 中的任务非常轻量级。在底层,它们只需要一个分配和 64 字节的内存。应用程序应该随意生成数千个,甚至数百万个任务。
'static
约束
当您在 Tokio 运行时上生成任务时,其类型的生命周期必须是 'static
。这意味着生成的任务不能包含对任务外部拥有的数据的任何引用。
一个常见的误解是
'static
总是意味着“永远存在”,但事实并非如此。仅仅因为一个值是'static
并不意味着您有内存泄漏。您可以在 Common Rust Lifetime Misconceptions 中阅读更多内容。
例如,以下代码将无法编译
use tokio::task;
#[tokio::main]
async fn main() {
let v = vec![1, 2, 3];
task::spawn(async {
println!("Here's a vec: {:?}", v);
});
}
尝试编译此代码会导致以下错误
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|
发生这种情况是因为,默认情况下,变量不会被移动到 async 块中。v
向量仍然由 main
函数拥有。println!
行借用了 v
。rust 编译器会很有帮助地向我们解释这一点,甚至建议了修复方法!将第 7 行更改为 task::spawn(async move {
将指示编译器移动 v
到生成的任务中。现在,任务拥有其所有数据,使其成为 'static
。
如果单个数据片段必须从多个任务并发访问,则必须使用同步原语(例如 Arc
)共享它。
请注意,错误消息讨论的是参数类型超出 'static
生命周期。这种术语可能相当令人困惑,因为 'static
生命周期持续到程序结束,所以如果它超出它,难道您不会有内存泄漏吗?解释是,必须超出 'static
生命周期的是类型,而不是值,并且值可能在其类型不再有效之前被销毁。
当我们说一个值是 'static
时,所有这意味着保持该值永远存在是不会不正确的。这很重要,因为编译器无法推断新生成的任务会存在多久。我们必须确保允许任务永远存在,以便 Tokio 可以使任务在需要时运行。
信息框前面链接的文章使用术语“受 'static
约束”而不是“其类型超出 'static
”或“值是 'static
”来指代 T: 'static
。这些都意味着相同的事情,但与“用 'static
注释”不同,例如 &'static T
。
Send
约束
由 tokio::spawn
生成的任务必须实现 Send
。这允许 Tokio 运行时在任务在 .await
处挂起时在线程之间移动任务。
当所有在 .await
调用之间保持的数据都是 Send
时,任务才是 Send
的。这有点微妙。当 .await
被调用时,任务会屈服于调度器。下次任务执行时,它会从上次屈服的点恢复。为了使这项工作正常进行,任务必须保存之后使用的所有状态。如果此状态是 Send
,即可以跨线程移动,则任务本身也可以跨线程移动。相反,如果状态不是 Send
,那么任务也不是。
例如,这可以工作
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
// The scope forces `rc` to drop before `.await`.
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` is no longer used. It is **not** persisted when
// the task yields to the scheduler
yield_now().await;
});
}
这不行
use tokio::task::yield_now;
use std::rc::Rc;
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
// `rc` is used after `.await`. It must be persisted to
// the task's state.
yield_now().await;
println!("{}", rc);
});
}
尝试编译代码片段会导致
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here
我们将在下一章中更深入地讨论此错误的特殊情况。
存储值
我们现在将实现 process
函数来处理传入的命令。我们将使用 HashMap
来存储值。SET
命令将插入到 HashMap
中,而 GET
值将加载它们。此外,我们将使用一个循环来接受每个连接的多个命令。
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
async fn process(socket: TcpStream) {
use mini_redis::Command::{self, Get, Set};
use std::collections::HashMap;
// A hashmap is used to store data
let mut db = HashMap::new();
// Connection, provided by `mini-redis`, handles parsing frames from
// the socket
let mut connection = Connection::new(socket);
// Use `read_frame` to receive a command from the connection.
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
// The value is stored as `Vec<u8>`
db.insert(cmd.key().to_string(), cmd.value().to_vec());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
if let Some(value) = db.get(cmd.key()) {
// `Frame::Bulk` expects data to be of type `Bytes`. This
// type will be covered later in the tutorial. For now,
// `&Vec<u8>` is converted to `Bytes` using `into()`.
Frame::Bulk(value.clone().into())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}
现在,启动服务器
$ cargo run
并在一个单独的终端窗口中,运行 hello-redis
示例
$ cargo run --example hello-redis
现在,输出将是
got value from the server; result=Some(b"world")
我们现在可以获取和设置值,但存在一个问题:这些值在连接之间不共享。如果另一个套接字连接并尝试 GET
hello
键,它将找不到任何内容。
您可以在 这里 找到完整的代码。
在下一节中,我们将实现为所有套接字持久化数据。