帧处理

现在我们将应用刚刚学到的关于 I/O 的知识,并实现 Mini-Redis 的帧处理层。帧处理是将字节流转换为帧流的过程。帧是在两个对等方之间传输的数据单元。Redis 协议帧定义如下

use bytes::Bytes;

enum Frame {
    Simple(String),
    Error(String),
    Integer(u64),
    Bulk(Bytes),
    Null,
    Array(Vec<Frame>),
}

请注意,帧仅包含数据,没有任何语义。命令解析和实现发生在更高的层级。

对于 HTTP,帧可能如下所示

enum HttpFrame {
    RequestHead {
        method: Method,
        uri: Uri,
        version: Version,
        headers: HeaderMap,
    },
    ResponseHead {
        status: StatusCode,
        version: Version,
        headers: HeaderMap,
    },
    BodyChunk {
        chunk: Bytes,
    },
}

为了实现 Mini-Redis 的帧处理,我们将实现一个 Connection 结构体,它包装了一个 TcpStream 并读取/写入 mini_redis::Frame 值。

use tokio::net::TcpStream;
use mini_redis::{Frame, Result};

struct Connection {
    stream: TcpStream,
    // ... other fields here
}

impl Connection {
    /// Read a frame from the connection.
    /// 
    /// Returns `None` if EOF is reached
    pub async fn read_frame(&mut self)
        -> Result<Option<Frame>>
    {
        // implementation here
    }

    /// Write a frame to the connection.
    pub async fn write_frame(&mut self, frame: &Frame)
        -> Result<()>
    {
        // implementation here
    }
}

您可以在这里找到 Redis 线路协议的详细信息。完整的 Connection 代码可以在这里找到。

缓冲读取

read_frame 方法等待接收到整个帧后才返回。单次调用 TcpStream::read() 可能会返回任意数量的数据。它可能包含一个完整的帧、一个部分帧或多个帧。如果收到部分帧,则数据会被缓冲,并从套接字读取更多数据。如果收到多个帧,则返回第一个帧,其余数据将被缓冲,直到下次调用 read_frame

如果您还没有创建,请创建一个名为 connection.rs 的新文件。

touch src/connection.rs

为了实现这一点,Connection 需要一个读取缓冲区字段。数据从套接字读取到读取缓冲区中。当解析帧时,相应的数据将从缓冲区中移除。

我们将使用 BytesMut 作为缓冲区类型。这是 Bytes 的可变版本。

use bytes::BytesMut;
use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // Allocate the buffer with 4kb of capacity.
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

接下来,我们实现 read_frame() 方法。

use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;

pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        // Attempt to parse a frame from the buffered data. If
        // enough data has been buffered, the frame is
        // returned.
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // There is not enough buffered data to read a frame.
        // Attempt to read more data from the socket.
        //
        // On success, the number of bytes is returned. `0`
        // indicates "end of stream".
        if 0 == self.stream.read_buf(&mut self.buffer).await? {
            // The remote closed the connection. For this to be
            // a clean shutdown, there should be no data in the
            // read buffer. If there is, this means that the
            // peer closed the socket while sending a frame.
            if self.buffer.is_empty() {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        }
    }
}

让我们分解一下。read_frame 方法在一个循环中运行。首先,调用 self.parse_frame()。这将尝试从 self.buffer 解析一个 redis 帧。如果有足够的数据来解析帧,则帧将返回给 read_frame() 的调用者。否则,我们尝试从套接字读取更多数据到缓冲区中。读取更多数据后,再次调用 parse_frame()。这次,如果已接收到足够的数据,则解析可能会成功。

当从流中读取时,返回值为 0 表示不会再从对等方接收到更多数据。如果读取缓冲区中仍然有数据,则表示已收到部分帧,并且连接正在被突然终止。这是一个错误条件,并返回 Err

Buf trait

当从流中读取时,会调用 read_buf。此版本的读取函数接受一个实现了 BufMut 的值,该值来自 bytes crate。

首先,考虑一下我们将如何使用 read() 实现相同的读取循环。可以使用 Vec<u8> 代替 BytesMut

use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: Vec<u8>,
    cursor: usize,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            // Allocate the buffer with 4kb of capacity.
            buffer: vec![0; 4096],
            cursor: 0,
        }
    }
}

以及 Connection 上的 read_frame() 函数

use mini_redis::{Frame, Result};

pub async fn read_frame(&mut self)
    -> Result<Option<Frame>>
{
    loop {
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // Ensure the buffer has capacity
        if self.buffer.len() == self.cursor {
            // Grow the buffer
            self.buffer.resize(self.cursor * 2, 0);
        }

        // Read into the buffer, tracking the number
        // of bytes read
        let n = self.stream.read(
            &mut self.buffer[self.cursor..]).await?;

        if 0 == n {
            if self.cursor == 0 {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        } else {
            // Update our cursor
            self.cursor += n;
        }
    }
}

当使用字节数组和 read 时,我们还必须维护一个游标来跟踪已缓冲多少数据。我们必须确保将缓冲区的空闲部分传递给 read()。否则,我们将覆盖缓冲的数据。如果我们的缓冲区已满,我们必须增加缓冲区的大小才能继续读取。在 parse_frame() 中(未包含),我们需要解析 self.buffer[..self.cursor] 包含的数据。

因为将字节数组与游标配对非常常见,所以 bytes crate 提供了一个抽象,表示字节数组和游标。Buf trait 由可以从中读取数据的类型实现。BufMut trait 由可以向其中写入数据的类型实现。当将 T: BufMut 传递给 read_buf() 时,缓冲区的内部游标会自动被 read_buf 更新。因此,在我们的 read_frame 版本中,我们不需要管理自己的游标。

此外,当使用 Vec<u8> 时,缓冲区必须被初始化vec![0; 4096] 分配一个 4096 字节的数组,并将零写入每个条目。当调整缓冲区大小时,新的容量也必须用零初始化。初始化过程不是免费的。当使用 BytesMutBufMut 时,容量是未初始化的BytesMut 抽象防止我们读取未初始化的内存。这使我们避免了初始化步骤。

解析

现在,让我们看一下 parse_frame() 函数。解析分两步完成。

  1. 确保已缓冲完整的帧,并找到帧的结束索引。
  2. 解析帧。

mini-redis crate 为我们提供了用于这两个步骤的函数

  1. Frame::check
  2. Frame::parse

我们还将重用 Buf 抽象来提供帮助。Buf 被传递到 Frame::check 中。当 check 函数迭代传入的缓冲区时,内部游标将前进。当 check 返回时,缓冲区的内部游标指向帧的末尾。

对于 Buf 类型,我们将使用 std::io::Cursor<&[u8]>

use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;

fn parse_frame(&mut self)
    -> Result<Option<Frame>>
{
    // Create the `T: Buf` type.
    let mut buf = Cursor::new(&self.buffer[..]);

    // Check whether a full frame is available
    match Frame::check(&mut buf) {
        Ok(_) => {
            // Get the byte length of the frame
            let len = buf.position() as usize;

            // Reset the internal cursor for the
            // call to `parse`.
            buf.set_position(0);

            // Parse the frame
            let frame = Frame::parse(&mut buf)?;

            // Discard the frame from the buffer
            self.buffer.advance(len);

            // Return the frame to the caller.
            Ok(Some(frame))
        }
        // Not enough data has been buffered
        Err(Incomplete) => Ok(None),
        // An error was encountered
        Err(e) => Err(e.into()),
    }
}

完整的 Frame::check 函数可以在这里找到。我们不会完整地介绍它。

需要注意的相关事项是使用了 Buf 的“字节迭代器”风格的 API。这些 API 获取数据并前进内部游标。例如,为了解析帧,首先检查第一个字节以确定帧的类型。使用的函数是 Buf::get_u8。这会获取当前游标位置的字节并将游标前进一位。

Buf trait 上还有更多有用的方法。查看 API 文档 了解更多详情。

缓冲写入

帧处理 API 的另一半是 write_frame(frame) 函数。此函数将整个帧写入套接字。为了最大限度地减少 write 系统调用,写入将被缓冲。维护一个写入缓冲区,帧在写入套接字之前被编码到该缓冲区。然而,与 read_frame() 不同,并非始终将整个帧缓冲到字节数组中,然后再写入套接字。

考虑一个 bulk stream 帧。正在写入的值是 Frame::Bulk(Bytes)。bulk 帧的线路格式是帧头,它由 $ 字符后跟数据长度(以字节为单位)组成。帧的大部分是 Bytes 值的内容。如果数据很大,将其复制到中间缓冲区将是昂贵的。

为了实现缓冲写入,我们将使用 BufWriter 结构体。此结构体使用 T: AsyncWrite 初始化,并自身实现 AsyncWrite。当在 BufWriter 上调用 write 时,写入不会直接发送到内部 writer,而是发送到缓冲区。当缓冲区满时,内容会被刷新到内部 writer,并且内部缓冲区会被清除。还有一些优化允许在某些情况下绕过缓冲区。

作为教程的一部分,我们不会尝试完整实现 write_frame()。请参阅完整的实现这里

首先,更新 Connection 结构体

use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;

pub struct Connection {
    stream: BufWriter<TcpStream>,
    buffer: BytesMut,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream: BufWriter::new(stream),
            buffer: BytesMut::with_capacity(4096),
        }
    }
}

接下来,实现 write_frame()

use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;

async fn write_frame(&mut self, frame: &Frame)
    -> io::Result<()>
{
    match frame {
        Frame::Simple(val) => {
            self.stream.write_u8(b'+').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Error(val) => {
            self.stream.write_u8(b'-').await?;
            self.stream.write_all(val.as_bytes()).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Integer(val) => {
            self.stream.write_u8(b':').await?;
            self.write_decimal(*val).await?;
        }
        Frame::Null => {
            self.stream.write_all(b"$-1\r\n").await?;
        }
        Frame::Bulk(val) => {
            let len = val.len();

            self.stream.write_u8(b'$').await?;
            self.write_decimal(len as u64).await?;
            self.stream.write_all(val).await?;
            self.stream.write_all(b"\r\n").await?;
        }
        Frame::Array(_val) => unimplemented!(),
    }

    self.stream.flush().await;

    Ok(())
}

此处使用的函数由 AsyncWriteExt 提供。它们在 TcpStream 上也可用,但不建议在没有中间缓冲区的情况下发出单字节写入。

该函数以调用 self.stream.flush().await 结束。因为 BufWriter 将写入存储在中间缓冲区中,所以调用 write 并不能保证数据已写入套接字。在返回之前,我们希望将帧写入套接字。调用 flush() 将缓冲区中任何待处理的数据写入套接字。

另一种选择是在 write_frame()调用 flush()。而是,在 Connection 上提供一个 flush() 函数。这将允许调用者在写入缓冲区中排队多个小帧,然后通过一个 write 系统调用将它们全部写入套接字。这样做会使 Connection API 复杂化。简洁是 Mini-Redis 的目标之一,因此我们决定在 fn write_frame() 中包含 flush().await 调用。