帧处理
现在我们将应用刚刚学到的关于 I/O 的知识,并实现 Mini-Redis 的帧处理层。帧处理是将字节流转换为帧流的过程。帧是在两个对等方之间传输的数据单元。Redis 协议帧定义如下
use bytes::Bytes;
enum Frame {
Simple(String),
Error(String),
Integer(u64),
Bulk(Bytes),
Null,
Array(Vec<Frame>),
}
请注意,帧仅包含数据,没有任何语义。命令解析和实现发生在更高的层级。
对于 HTTP,帧可能如下所示
enum HttpFrame {
RequestHead {
method: Method,
uri: Uri,
version: Version,
headers: HeaderMap,
},
ResponseHead {
status: StatusCode,
version: Version,
headers: HeaderMap,
},
BodyChunk {
chunk: Bytes,
},
}
为了实现 Mini-Redis 的帧处理,我们将实现一个 Connection
结构体,它包装了一个 TcpStream
并读取/写入 mini_redis::Frame
值。
use tokio::net::TcpStream;
use mini_redis::{Frame, Result};
struct Connection {
stream: TcpStream,
// ... other fields here
}
impl Connection {
/// Read a frame from the connection.
///
/// Returns `None` if EOF is reached
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
// implementation here
}
/// Write a frame to the connection.
pub async fn write_frame(&mut self, frame: &Frame)
-> Result<()>
{
// implementation here
}
}
您可以在这里找到 Redis 线路协议的详细信息。完整的 Connection
代码可以在这里找到。
缓冲读取
read_frame
方法等待接收到整个帧后才返回。单次调用 TcpStream::read()
可能会返回任意数量的数据。它可能包含一个完整的帧、一个部分帧或多个帧。如果收到部分帧,则数据会被缓冲,并从套接字读取更多数据。如果收到多个帧,则返回第一个帧,其余数据将被缓冲,直到下次调用 read_frame
。
如果您还没有创建,请创建一个名为 connection.rs
的新文件。
touch src/connection.rs
为了实现这一点,Connection
需要一个读取缓冲区字段。数据从套接字读取到读取缓冲区中。当解析帧时,相应的数据将从缓冲区中移除。
我们将使用 BytesMut
作为缓冲区类型。这是 Bytes
的可变版本。
use bytes::BytesMut;
use tokio::net::TcpStream;
pub struct Connection {
stream: TcpStream,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
// Allocate the buffer with 4kb of capacity.
buffer: BytesMut::with_capacity(4096),
}
}
}
接下来,我们实现 read_frame()
方法。
use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
// Attempt to parse a frame from the buffered data. If
// enough data has been buffered, the frame is
// returned.
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
// There is not enough buffered data to read a frame.
// Attempt to read more data from the socket.
//
// On success, the number of bytes is returned. `0`
// indicates "end of stream".
if 0 == self.stream.read_buf(&mut self.buffer).await? {
// The remote closed the connection. For this to be
// a clean shutdown, there should be no data in the
// read buffer. If there is, this means that the
// peer closed the socket while sending a frame.
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
}
}
}
让我们分解一下。read_frame
方法在一个循环中运行。首先,调用 self.parse_frame()
。这将尝试从 self.buffer
解析一个 redis 帧。如果有足够的数据来解析帧,则帧将返回给 read_frame()
的调用者。否则,我们尝试从套接字读取更多数据到缓冲区中。读取更多数据后,再次调用 parse_frame()
。这次,如果已接收到足够的数据,则解析可能会成功。
当从流中读取时,返回值为 0
表示不会再从对等方接收到更多数据。如果读取缓冲区中仍然有数据,则表示已收到部分帧,并且连接正在被突然终止。这是一个错误条件,并返回 Err
。
Buf
trait
当从流中读取时,会调用 read_buf
。此版本的读取函数接受一个实现了 BufMut
的值,该值来自 bytes
crate。
首先,考虑一下我们将如何使用 read()
实现相同的读取循环。可以使用 Vec<u8>
代替 BytesMut
。
use tokio::net::TcpStream;
pub struct Connection {
stream: TcpStream,
buffer: Vec<u8>,
cursor: usize,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
// Allocate the buffer with 4kb of capacity.
buffer: vec![0; 4096],
cursor: 0,
}
}
}
以及 Connection
上的 read_frame()
函数
use mini_redis::{Frame, Result};
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
// Ensure the buffer has capacity
if self.buffer.len() == self.cursor {
// Grow the buffer
self.buffer.resize(self.cursor * 2, 0);
}
// Read into the buffer, tracking the number
// of bytes read
let n = self.stream.read(
&mut self.buffer[self.cursor..]).await?;
if 0 == n {
if self.cursor == 0 {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
} else {
// Update our cursor
self.cursor += n;
}
}
}
当使用字节数组和 read
时,我们还必须维护一个游标来跟踪已缓冲多少数据。我们必须确保将缓冲区的空闲部分传递给 read()
。否则,我们将覆盖缓冲的数据。如果我们的缓冲区已满,我们必须增加缓冲区的大小才能继续读取。在 parse_frame()
中(未包含),我们需要解析 self.buffer[..self.cursor]
包含的数据。
因为将字节数组与游标配对非常常见,所以 bytes
crate 提供了一个抽象,表示字节数组和游标。Buf
trait 由可以从中读取数据的类型实现。BufMut
trait 由可以向其中写入数据的类型实现。当将 T: BufMut
传递给 read_buf()
时,缓冲区的内部游标会自动被 read_buf
更新。因此,在我们的 read_frame
版本中,我们不需要管理自己的游标。
此外,当使用 Vec<u8>
时,缓冲区必须被初始化。vec![0; 4096]
分配一个 4096 字节的数组,并将零写入每个条目。当调整缓冲区大小时,新的容量也必须用零初始化。初始化过程不是免费的。当使用 BytesMut
和 BufMut
时,容量是未初始化的。BytesMut
抽象防止我们读取未初始化的内存。这使我们避免了初始化步骤。
解析
现在,让我们看一下 parse_frame()
函数。解析分两步完成。
- 确保已缓冲完整的帧,并找到帧的结束索引。
- 解析帧。
mini-redis
crate 为我们提供了用于这两个步骤的函数
我们还将重用 Buf
抽象来提供帮助。Buf
被传递到 Frame::check
中。当 check
函数迭代传入的缓冲区时,内部游标将前进。当 check
返回时,缓冲区的内部游标指向帧的末尾。
对于 Buf
类型,我们将使用 std::io::Cursor<&[u8]>
。
use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;
fn parse_frame(&mut self)
-> Result<Option<Frame>>
{
// Create the `T: Buf` type.
let mut buf = Cursor::new(&self.buffer[..]);
// Check whether a full frame is available
match Frame::check(&mut buf) {
Ok(_) => {
// Get the byte length of the frame
let len = buf.position() as usize;
// Reset the internal cursor for the
// call to `parse`.
buf.set_position(0);
// Parse the frame
let frame = Frame::parse(&mut buf)?;
// Discard the frame from the buffer
self.buffer.advance(len);
// Return the frame to the caller.
Ok(Some(frame))
}
// Not enough data has been buffered
Err(Incomplete) => Ok(None),
// An error was encountered
Err(e) => Err(e.into()),
}
}
完整的 Frame::check
函数可以在这里找到。我们不会完整地介绍它。
需要注意的相关事项是使用了 Buf
的“字节迭代器”风格的 API。这些 API 获取数据并前进内部游标。例如,为了解析帧,首先检查第一个字节以确定帧的类型。使用的函数是 Buf::get_u8
。这会获取当前游标位置的字节并将游标前进一位。
Buf
trait 上还有更多有用的方法。查看 API 文档 了解更多详情。
缓冲写入
帧处理 API 的另一半是 write_frame(frame)
函数。此函数将整个帧写入套接字。为了最大限度地减少 write
系统调用,写入将被缓冲。维护一个写入缓冲区,帧在写入套接字之前被编码到该缓冲区。然而,与 read_frame()
不同,并非始终将整个帧缓冲到字节数组中,然后再写入套接字。
考虑一个 bulk stream 帧。正在写入的值是 Frame::Bulk(Bytes)
。bulk 帧的线路格式是帧头,它由 $
字符后跟数据长度(以字节为单位)组成。帧的大部分是 Bytes
值的内容。如果数据很大,将其复制到中间缓冲区将是昂贵的。
为了实现缓冲写入,我们将使用 BufWriter
结构体。此结构体使用 T: AsyncWrite
初始化,并自身实现 AsyncWrite
。当在 BufWriter
上调用 write
时,写入不会直接发送到内部 writer,而是发送到缓冲区。当缓冲区满时,内容会被刷新到内部 writer,并且内部缓冲区会被清除。还有一些优化允许在某些情况下绕过缓冲区。
作为教程的一部分,我们不会尝试完整实现 write_frame()
。请参阅完整的实现这里。
首先,更新 Connection
结构体
use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;
pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(4096),
}
}
}
接下来,实现 write_frame()
。
use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;
async fn write_frame(&mut self, frame: &Frame)
-> io::Result<()>
{
match frame {
Frame::Simple(val) => {
self.stream.write_u8(b'+').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Error(val) => {
self.stream.write_u8(b'-').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Integer(val) => {
self.stream.write_u8(b':').await?;
self.write_decimal(*val).await?;
}
Frame::Null => {
self.stream.write_all(b"$-1\r\n").await?;
}
Frame::Bulk(val) => {
let len = val.len();
self.stream.write_u8(b'$').await?;
self.write_decimal(len as u64).await?;
self.stream.write_all(val).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Array(_val) => unimplemented!(),
}
self.stream.flush().await;
Ok(())
}
此处使用的函数由 AsyncWriteExt
提供。它们在 TcpStream
上也可用,但不建议在没有中间缓冲区的情况下发出单字节写入。
write_u8
将单个字节写入 writer。write_all
将整个切片写入 writer。write_decimal
由 mini-redis 实现。
该函数以调用 self.stream.flush().await
结束。因为 BufWriter
将写入存储在中间缓冲区中,所以调用 write
并不能保证数据已写入套接字。在返回之前,我们希望将帧写入套接字。调用 flush()
将缓冲区中任何待处理的数据写入套接字。
另一种选择是在 write_frame()
中不调用 flush()
。而是,在 Connection
上提供一个 flush()
函数。这将允许调用者在写入缓冲区中排队多个小帧,然后通过一个 write
系统调用将它们全部写入套接字。这样做会使 Connection
API 复杂化。简洁是 Mini-Redis 的目标之一,因此我们决定在 fn write_frame()
中包含 flush().await
调用。