I/O

Tokio 中的 I/O 操作方式与 std 中非常相似,但它是异步的。Tokio 中有一个用于读取的 trait (AsyncRead) 和一个用于写入的 trait (AsyncWrite)。特定的类型会根据需要实现这些 trait (TcpStream, File, Stdout)。许多数据结构也实现了 AsyncReadAsyncWrite,例如 Vec<u8>&[u8]。这允许在需要读取器或写入器的地方使用字节数组。

本页将介绍 Tokio 中的基本 I/O 读取和写入操作,并通过一些示例进行说明。下一页将介绍更高级的 I/O 示例。

AsyncReadAsyncWrite

这两个 trait 提供了异步地从字节流读取和写入字节流的功能。这些 trait 上的方法通常不直接调用,类似于你不会手动调用 Future trait 中的 poll 方法。相反,你将通过 AsyncReadExtAsyncWriteExt 提供的实用方法来使用它们。

让我们简要看一下这些方法中的几个。所有这些函数都是 async 的,必须与 .await 一起使用。

async fn read()

AsyncReadExt::read 提供了一个异步方法,用于将数据读取到缓冲区中,并返回读取的字节数。

注意:read() 返回 Ok(0) 时,这表示流已关闭。任何后续对 read() 的调用都将立即完成并返回 Ok(0)。对于 TcpStream 实例,这表示套接字的读取端已关闭。

use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = [0; 10];

    // read up to 10 bytes
    let n = f.read(&mut buffer[..]).await?;

    println!("The bytes: {:?}", &buffer[..n]);
    Ok(())
}

async fn read_to_end()

AsyncReadExt::read_to_end 从流中读取所有字节,直到 EOF (文件结束符)。

use tokio::io::{self, AsyncReadExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = Vec::new();

    // read the whole file
    f.read_to_end(&mut buffer).await?;
    Ok(())
}

async fn write()

AsyncWriteExt::write 将缓冲区写入到写入器中,并返回写入的字节数。

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut file = File::create("foo.txt").await?;

    // Writes some prefix of the byte string, but not necessarily all of it.
    let n = file.write(b"some bytes").await?;

    println!("Wrote the first {} bytes of 'some bytes'.", n);
    Ok(())
}

async fn write_all()

AsyncWriteExt::write_all 将整个缓冲区写入到写入器中。

use tokio::io::{self, AsyncWriteExt};
use tokio::fs::File;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut file = File::create("foo.txt").await?;

    file.write_all(b"some bytes").await?;
    Ok(())
}

这两个 trait 都包含许多其他有用的方法。请参阅 API 文档以获取完整列表。

辅助函数

此外,就像 std 一样,tokio::io 模块包含许多有用的实用函数,以及用于处理 标准输入标准输出标准错误 的 API。例如,tokio::io::copy 异步地将读取器的全部内容复制到写入器中。

use tokio::fs::File;
use tokio::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut reader: &[u8] = b"hello";
    let mut file = File::create("foo.txt").await?;

    io::copy(&mut reader, &mut file).await?;
    Ok(())
}

请注意,这利用了字节数组也实现了 AsyncRead 这一事实。

Echo 服务器

让我们练习进行一些异步 I/O 操作。我们将编写一个 echo 服务器。

echo 服务器绑定一个 TcpListener 并在循环中接受入站连接。对于每个入站连接,从套接字读取数据并立即写回套接字。客户端向服务器发送数据,并接收完全相同的数据返回。

我们将实现两次 echo 服务器,使用略有不同的策略。

使用 io::copy()

首先,我们将使用 io::copy 实用程序来实现 echo 逻辑。

你可以将此代码写入一个新的二进制文件

$ touch src/bin/echo-server-copy.rs

你可以使用以下命令启动(或仅检查编译)

$ cargo run --bin echo-server-copy

你将能够使用标准的命令行工具(如 telnet)或通过编写一个简单的客户端(如 tokio::net::TcpStream 文档中找到的客户端)来尝试这个服务器。

这是一个 TCP 服务器,需要一个 accept 循环。将派生一个新的任务来处理每个接受的套接字。

use tokio::io;
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            // Copy data here
        });
    }
}

如前所述,此实用函数接受一个读取器和一个写入器,并将数据从一个复制到另一个。但是,我们只有一个 TcpStream。这个单个值实现了 AsyncReadAsyncWrite。由于 io::copy 要求读取器和写入器都使用 &mut,因此该套接字不能用于这两个参数。

// This fails to compile
io::copy(&mut socket, &mut socket).await

拆分读取器 + 写入器

为了解决这个问题,我们必须将套接字拆分为一个读取器句柄和一个写入器句柄。拆分读取器/写入器组合的最佳方式取决于具体类型。

任何读取器 + 写入器类型都可以使用 io::split 实用程序进行拆分。此函数接受单个值并返回单独的读取器和写入器句柄。这两个句柄可以独立使用,包括从不同的任务中使用。

例如,echo 客户端可以像这样处理并发的读取和写入

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> io::Result<()> {
    let socket = TcpStream::connect("127.0.0.1:6142").await?;
    let (mut rd, mut wr) = io::split(socket);

    // Write data in the background
    tokio::spawn(async move {
        wr.write_all(b"hello\r\n").await?;
        wr.write_all(b"world\r\n").await?;

        // Sometimes, the rust type inferencer needs
        // a little help
        Ok::<_, io::Error>(())
    });

    let mut buf = vec![0; 128];

    loop {
        let n = rd.read(&mut buf).await?;

        if n == 0 {
            break;
        }

        println!("GOT {:?}", &buf[..n]);
    }

    Ok(())
}

由于 io::split 支持实现 AsyncRead + AsyncWrite任何 值,并返回独立的句柄,因此在内部 io::split 使用 ArcMutex。这种开销可以通过 TcpStream 避免。TcpStream 提供了两个专门的拆分函数。

TcpStream::split 接受对流的 引用,并返回读取器和写入器句柄。由于使用了引用,因此这两个句柄必须保留在调用 split()同一 任务中。这种专门的 split 是零成本的。不需要 ArcMutexTcpStream 还提供了 into_split,它支持可以跨任务移动的句柄,但仅需付出 Arc 的代价。

由于 io::copy() 在拥有 TcpStream 的同一任务中调用,因此我们可以使用 TcpStream::split。服务器中处理 echo 逻辑的任务变为

tokio::spawn(async move {
    let (mut rd, mut wr) = socket.split();
    
    if io::copy(&mut rd, &mut wr).await.is_err() {
        eprintln!("failed to copy");
    }
});

你可以在这里找到完整的代码。

手动复制

现在让我们看看如何通过手动复制数据来编写 echo 服务器。为此,我们使用 AsyncReadExt::readAsyncWriteExt::write_all

完整的 echo 服务器如下所示

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:6142").await?;

    loop {
        let (mut socket, _) = listener.accept().await?;

        tokio::spawn(async move {
            let mut buf = vec![0; 1024];

            loop {
                match socket.read(&mut buf).await {
                    // Return value of `Ok(0)` signifies that the remote has
                    // closed
                    Ok(0) => return,
                    Ok(n) => {
                        // Copy the data back to socket
                        if socket.write_all(&buf[..n]).await.is_err() {
                            // Unexpected socket error. There isn't much we can
                            // do here so just stop processing.
                            return;
                        }
                    }
                    Err(_) => {
                        // Unexpected socket error. There isn't much we can do
                        // here so just stop processing.
                        return;
                    }
                }
            }
        });
    }
}

(你可以将此代码放入 src/bin/echo-server.rs 并使用 cargo run --bin echo-server 启动它)。

让我们分解一下。首先,由于使用了 AsyncReadAsyncWrite 实用程序,因此必须将扩展 trait 引入作用域。

use tokio::io::{self, AsyncReadExt, AsyncWriteExt};

分配缓冲区

策略是从套接字读取一些数据到缓冲区中,然后将缓冲区的内容写回套接字。

let mut buf = vec![0; 1024];

显式避免使用栈缓冲区。回想一下 之前 的内容,我们注意到在 .await 调用之间存活的所有任务数据都必须由任务存储。在这种情况下,buf.await 调用之间使用。所有任务数据都存储在单个分配中。你可以将其视为一个 enum,其中每个变体都是需要在特定 .await 调用中存储的数据。

如果缓冲区由栈数组表示,则它将被内联存储在任务结构中,任务结构可能看起来像这样

struct Task {
    // internal task fields here
    task: enum {
        AwaitingRead {
            socket: TcpStream,
            buf: [BufferType],
        },
        AwaitingWriteAll {
            socket: TcpStream,
            buf: [BufferType],
        }

    }
}

如果使用栈数组作为缓冲区类型,它将 *内联* 存储在任务结构中。这将使任务结构非常大。此外,缓冲区大小通常是页面大小。反过来,这将使 Task 的大小变得笨拙:$page-size + a-few-bytes

编译器对 async 代码块的布局进行了比基本 enum 更进一步的优化。在实践中,变量不会像 enum 那样在变体之间移动。但是,任务结构的大小至少与最大的变量一样大。

因此,通常使用专用分配来分配缓冲区会更有效。

处理 EOF

当 TCP 流的读取端关闭时,调用 read() 会返回 Ok(0)。此时退出读取循环非常重要。忘记在 EOF 时跳出读取循环是 bug 的常见来源。

loop {
    match socket.read(&mut buf).await {
        // Return value of `Ok(0)` signifies that the remote has
        // closed
        Ok(0) => return,
        // ... other cases handled here
    }
}

忘记跳出读取循环通常会导致 100% CPU 的无限循环情况。由于套接字已关闭,socket.read() 立即返回。然后循环永远重复。

完整的代码可以在这里找到。