I/O
Tokio 中的 I/O 操作方式与 std
中非常相似,但它是异步的。Tokio 中有一个用于读取的 trait (AsyncRead
) 和一个用于写入的 trait (AsyncWrite
)。特定的类型会根据需要实现这些 trait (TcpStream
, File
, Stdout
)。许多数据结构也实现了 AsyncRead
和 AsyncWrite
,例如 Vec<u8>
和 &[u8]
。这允许在需要读取器或写入器的地方使用字节数组。
本页将介绍 Tokio 中的基本 I/O 读取和写入操作,并通过一些示例进行说明。下一页将介绍更高级的 I/O 示例。
AsyncRead
和 AsyncWrite
这两个 trait 提供了异步地从字节流读取和写入字节流的功能。这些 trait 上的方法通常不直接调用,类似于你不会手动调用 Future
trait 中的 poll
方法。相反,你将通过 AsyncReadExt
和 AsyncWriteExt
提供的实用方法来使用它们。
让我们简要看一下这些方法中的几个。所有这些函数都是 async
的,必须与 .await
一起使用。
async fn read()
AsyncReadExt::read
提供了一个异步方法,用于将数据读取到缓冲区中,并返回读取的字节数。
注意: 当 read()
返回 Ok(0)
时,这表示流已关闭。任何后续对 read()
的调用都将立即完成并返回 Ok(0)
。对于 TcpStream
实例,这表示套接字的读取端已关闭。
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
// read up to 10 bytes
let n = f.read(&mut buffer[..]).await?;
println!("The bytes: {:?}", &buffer[..n]);
Ok(())
}
async fn read_to_end()
AsyncReadExt::read_to_end
从流中读取所有字节,直到 EOF (文件结束符)。
use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = Vec::new();
// read the whole file
f.read_to_end(&mut buffer).await?;
Ok(())
}
async fn write()
AsyncWriteExt::write
将缓冲区写入到写入器中,并返回写入的字节数。
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("foo.txt").await?;
// Writes some prefix of the byte string, but not necessarily all of it.
let n = file.write(b"some bytes").await?;
println!("Wrote the first {} bytes of 'some bytes'.", n);
Ok(())
}
async fn write_all()
AsyncWriteExt::write_all
将整个缓冲区写入到写入器中。
use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut file = File::create("foo.txt").await?;
file.write_all(b"some bytes").await?;
Ok(())
}
这两个 trait 都包含许多其他有用的方法。请参阅 API 文档以获取完整列表。
辅助函数
此外,就像 std
一样,tokio::io
模块包含许多有用的实用函数,以及用于处理 标准输入、标准输出 和 标准错误 的 API。例如,tokio::io::copy
异步地将读取器的全部内容复制到写入器中。
use tokio::fs::File;
use tokio::io;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut reader: &[u8] = b"hello";
let mut file = File::create("foo.txt").await?;
io::copy(&mut reader, &mut file).await?;
Ok(())
}
请注意,这利用了字节数组也实现了 AsyncRead
这一事实。
Echo 服务器
让我们练习进行一些异步 I/O 操作。我们将编写一个 echo 服务器。
echo 服务器绑定一个 TcpListener
并在循环中接受入站连接。对于每个入站连接,从套接字读取数据并立即写回套接字。客户端向服务器发送数据,并接收完全相同的数据返回。
我们将实现两次 echo 服务器,使用略有不同的策略。
使用 io::copy()
首先,我们将使用 io::copy
实用程序来实现 echo 逻辑。
你可以将此代码写入一个新的二进制文件
$ touch src/bin/echo-server-copy.rs
你可以使用以下命令启动(或仅检查编译)
$ cargo run --bin echo-server-copy
你将能够使用标准的命令行工具(如 telnet
)或通过编写一个简单的客户端(如 tokio::net::TcpStream
文档中找到的客户端)来尝试这个服务器。
这是一个 TCP 服务器,需要一个 accept 循环。将派生一个新的任务来处理每个接受的套接字。
use tokio::io;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
// Copy data here
});
}
}
如前所述,此实用函数接受一个读取器和一个写入器,并将数据从一个复制到另一个。但是,我们只有一个 TcpStream
。这个单个值实现了 AsyncRead
和 AsyncWrite
。由于 io::copy
要求读取器和写入器都使用 &mut
,因此该套接字不能用于这两个参数。
// This fails to compile
io::copy(&mut socket, &mut socket).await
拆分读取器 + 写入器
为了解决这个问题,我们必须将套接字拆分为一个读取器句柄和一个写入器句柄。拆分读取器/写入器组合的最佳方式取决于具体类型。
任何读取器 + 写入器类型都可以使用 io::split
实用程序进行拆分。此函数接受单个值并返回单独的读取器和写入器句柄。这两个句柄可以独立使用,包括从不同的任务中使用。
例如,echo 客户端可以像这样处理并发的读取和写入
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> io::Result<()> {
let socket = TcpStream::connect("127.0.0.1:6142").await?;
let (mut rd, mut wr) = io::split(socket);
// Write data in the background
tokio::spawn(async move {
wr.write_all(b"hello\r\n").await?;
wr.write_all(b"world\r\n").await?;
// Sometimes, the rust type inferencer needs
// a little help
Ok::<_, io::Error>(())
});
let mut buf = vec![0; 128];
loop {
let n = rd.read(&mut buf).await?;
if n == 0 {
break;
}
println!("GOT {:?}", &buf[..n]);
}
Ok(())
}
由于 io::split
支持实现 AsyncRead + AsyncWrite
的 任何 值,并返回独立的句柄,因此在内部 io::split
使用 Arc
和 Mutex
。这种开销可以通过 TcpStream
避免。TcpStream
提供了两个专门的拆分函数。
TcpStream::split
接受对流的 引用,并返回读取器和写入器句柄。由于使用了引用,因此这两个句柄必须保留在调用 split()
的 同一 任务中。这种专门的 split
是零成本的。不需要 Arc
或 Mutex
。TcpStream
还提供了 into_split
,它支持可以跨任务移动的句柄,但仅需付出 Arc
的代价。
由于 io::copy()
在拥有 TcpStream
的同一任务中调用,因此我们可以使用 TcpStream::split
。服务器中处理 echo 逻辑的任务变为
tokio::spawn(async move {
let (mut rd, mut wr) = socket.split();
if io::copy(&mut rd, &mut wr).await.is_err() {
eprintln!("failed to copy");
}
});
你可以在这里找到完整的代码。
手动复制
现在让我们看看如何通过手动复制数据来编写 echo 服务器。为此,我们使用 AsyncReadExt::read
和 AsyncWriteExt::write_all
。
完整的 echo 服务器如下所示
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6142").await?;
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
loop {
match socket.read(&mut buf).await {
// Return value of `Ok(0)` signifies that the remote has
// closed
Ok(0) => return,
Ok(n) => {
// Copy the data back to socket
if socket.write_all(&buf[..n]).await.is_err() {
// Unexpected socket error. There isn't much we can
// do here so just stop processing.
return;
}
}
Err(_) => {
// Unexpected socket error. There isn't much we can do
// here so just stop processing.
return;
}
}
}
});
}
}
(你可以将此代码放入 src/bin/echo-server.rs
并使用 cargo run --bin echo-server
启动它)。
让我们分解一下。首先,由于使用了 AsyncRead
和 AsyncWrite
实用程序,因此必须将扩展 trait 引入作用域。
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
分配缓冲区
策略是从套接字读取一些数据到缓冲区中,然后将缓冲区的内容写回套接字。
let mut buf = vec![0; 1024];
显式避免使用栈缓冲区。回想一下 之前 的内容,我们注意到在 .await
调用之间存活的所有任务数据都必须由任务存储。在这种情况下,buf
在 .await
调用之间使用。所有任务数据都存储在单个分配中。你可以将其视为一个 enum
,其中每个变体都是需要在特定 .await
调用中存储的数据。
如果缓冲区由栈数组表示,则它将被内联存储在任务结构中,任务结构可能看起来像这样
struct Task {
// internal task fields here
task: enum {
AwaitingRead {
socket: TcpStream,
buf: [BufferType],
},
AwaitingWriteAll {
socket: TcpStream,
buf: [BufferType],
}
}
}
如果使用栈数组作为缓冲区类型,它将 *内联* 存储在任务结构中。这将使任务结构非常大。此外,缓冲区大小通常是页面大小。反过来,这将使 Task
的大小变得笨拙:$page-size + a-few-bytes
。
编译器对 async 代码块的布局进行了比基本 enum
更进一步的优化。在实践中,变量不会像 enum
那样在变体之间移动。但是,任务结构的大小至少与最大的变量一样大。
因此,通常使用专用分配来分配缓冲区会更有效。
处理 EOF
当 TCP 流的读取端关闭时,调用 read()
会返回 Ok(0)
。此时退出读取循环非常重要。忘记在 EOF 时跳出读取循环是 bug 的常见来源。
loop {
match socket.read(&mut buf).await {
// Return value of `Ok(0)` signifies that the remote has
// closed
Ok(0) => return,
// ... other cases handled here
}
}
忘记跳出读取循环通常会导致 100% CPU 的无限循环情况。由于套接字已关闭,socket.read()
立即返回。然后循环永远重复。
完整的代码可以在这里找到。