流是一个异步的值序列。它相当于 Rust 的 std::iter::Iterator 的异步版本,并由 Stream trait 表示。流可以在 async 函数中迭代。它们也可以使用适配器进行转换。 Tokio 在 StreamExt trait 上提供了许多常用的适配器。

Tokio 在一个单独的 crate 中提供流支持:tokio-stream

tokio-stream = "0.1"

目前,Tokio 的 Stream 实用程序存在于 tokio-stream crate 中。一旦 Stream trait 在 Rust 标准库中稳定下来,Tokio 的流实用程序将被移动到 tokio crate 中。

迭代

目前,Rust 编程语言不支持异步 for 循环。相反,迭代流是使用 while let 循环与 StreamExt::next() 结合完成的。

use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    let mut stream = tokio_stream::iter(&[1, 2, 3]);

    while let Some(v) = stream.next().await {
        println!("GOT = {:?}", v);
    }
}

与迭代器类似,next() 方法返回 Option<T>,其中 T 是流的值类型。接收到 None 表示流迭代已终止。

Mini-Redis 广播

让我们来看一个稍微复杂一点的示例,使用 Mini-Redis 客户端。

完整代码可以在这里找到。

use tokio_stream::StreamExt;
use mini_redis::client;

async fn publish() -> mini_redis::Result<()> {
    let mut client = client::connect("127.0.0.1:6379").await?;

    // Publish some data
    client.publish("numbers", "1".into()).await?;
    client.publish("numbers", "two".into()).await?;
    client.publish("numbers", "3".into()).await?;
    client.publish("numbers", "four".into()).await?;
    client.publish("numbers", "five".into()).await?;
    client.publish("numbers", "6".into()).await?;
    Ok(())
}

async fn subscribe() -> mini_redis::Result<()> {
    let client = client::connect("127.0.0.1:6379").await?;
    let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
    let messages = subscriber.into_stream();

    tokio::pin!(messages);

    while let Some(msg) = messages.next().await {
        println!("got = {:?}", msg);
    }

    Ok(())
}

#[tokio::main]
async fn main() -> mini_redis::Result<()> {
    tokio::spawn(async {
        publish().await
    });

    subscribe().await?;

    println!("DONE");

    Ok(())
}

生成一个任务以在 “numbers” 通道上向 Mini-Redis 服务器发布消息。然后,在主任务中,我们订阅 “numbers” 通道并显示接收到的消息。

订阅后,在返回的 subscriber 上调用 into_stream()。这会消耗 Subscriber,返回一个在消息到达时产生消息的流。在我们开始迭代消息之前,请注意流使用 tokio::pin! pinned 到堆栈。在流上调用 next() 需要流是 pinned 的。into_stream() 函数返回一个 pinned 的流,我们必须显式地 pin 它才能迭代它。

当 Rust 值不能再在内存中移动时,它就是 “pinned” 的。pinned 值的一个关键属性是可以获取指向 pinned 数据的指针,并且调用者可以确信指针保持有效。async/await 使用此功能来支持跨 .await 点借用数据。

如果我们忘记 pin 流,我们会收到这样的错误

error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned
  --> streams/src/main.rs:29:36
   |
29 |     while let Some(msg) = messages.next().await {
   |                                    ^^^^ within `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>`
   |
   = note: required because it appears within the type `impl Future`
   = note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 'static)>>, impl Future>`
   = note: required because it appears within the type `impl Stream`
   = note: required because it appears within the type `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
   = note: required because it appears within the type `tokio_stream::map::_::__Origin<'_, tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
   = note: required because it appears within the type `tokio_stream::take::_::__Origin<'_, tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
   = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::take::Take<tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`

如果您遇到这样的错误消息,请尝试 pin 该值!

在尝试运行此代码之前,请启动 Mini-Redis 服务器

$ mini-redis-server

然后尝试运行代码。我们将看到消息输出到 STDOUT。

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })

一些早期消息可能会被丢弃,因为订阅和发布之间存在竞争。程序永远不会退出。只要服务器处于活动状态,对 Mini-Redis 通道的订阅就会保持活动状态。

让我们看看如何使用流来扩展这个程序。

适配器

接受 Stream 并返回另一个 Stream 的函数通常称为 “流适配器”,因为它们是 “适配器模式” 的一种形式。常见的流适配器包括 maptakefilter

让我们更新 Mini-Redis 以使其退出。在收到三条消息后,停止迭代消息。这是使用 take 完成的。此适配器将流限制为最多产生 n 条消息。

let messages = subscriber
    .into_stream()
    .take(3);

再次运行程序,我们得到

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })

这次程序结束了。

现在,让我们将流限制为个位数数字。我们将通过检查消息长度来检查这一点。我们使用 filter 适配器来删除任何不匹配谓词的消息。

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .take(3);

再次运行程序,我们得到

got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })

请注意,应用适配器的顺序很重要。先调用 filter 然后调用 take 与先调用 take 然后调用 filter 是不同的。

最后,我们将通过剥离输出的 Ok(Message { ... }) 部分来整理输出。这是使用 map 完成的。因为这是在 filter 之后应用的,所以我们知道消息是 Ok,因此我们可以使用 unwrap()

let messages = subscriber
    .into_stream()
    .filter(|msg| match msg {
        Ok(msg) if msg.content.len() == 1 => true,
        _ => false,
    })
    .map(|msg| msg.unwrap().content)
    .take(3);

现在,输出是

got = b"1"
got = b"3"
got = b"6"

另一种选择是将 filtermap 步骤组合成一个使用 filter_map 的单一调用。

还有更多可用的适配器。请参阅此处的列表。

实现 Stream

Stream trait 与 Future trait 非常相似。

use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Stream {
    type Item;

    fn poll_next(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, None)
    }
}

Stream::poll_next() 函数非常像 Future::poll,不同之处在于它可以重复调用以从流中接收多个值。正如我们在异步深入中看到的那样,当流准备好返回值时,将返回 Poll::Pending。任务的 waker 被注册。一旦流应该再次被轮询,waker 就会被通知。

size_hint() 方法的使用方式与 迭代器 的使用方式相同。

通常,在手动实现 Stream 时,它是通过组合 future 和其他流来完成的。作为一个示例,让我们以我们在异步深入中实现的 Delay future 为基础。我们将把它转换为一个流,该流以 10 毫秒的间隔产生 () 三次。

use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

struct Interval {
    rem: usize,
    delay: Delay,
}

impl Interval {
    fn new() -> Self {
        Self {
            rem: 3,
            delay: Delay { when: Instant::now() }
        }
    }
}

impl Stream for Interval {
    type Item = ();

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<()>>
    {
        if self.rem == 0 {
            // No more delays
            return Poll::Ready(None);
        }

        match Pin::new(&mut self.delay).poll(cx) {
            Poll::Ready(_) => {
                let when = self.delay.when + Duration::from_millis(10);
                self.delay = Delay { when };
                self.rem -= 1;
                Poll::Ready(Some(()))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

async-stream

使用 Stream trait 手动实现流可能很乏味。不幸的是,Rust 编程语言尚不支持用于定义流的 async/await 语法。这正在开发中,但尚未准备好。

async-stream crate 可用作临时解决方案。此 crate 提供了一个 stream! 宏,可将输入转换为流。使用此 crate,上面的间隔可以像这样实现

use async_stream::stream;
use std::time::{Duration, Instant};

stream! {
    let mut when = Instant::now();
    for _ in 0..3 {
        let delay = Delay { when };
        delay.await;
        yield ();
        when += Duration::from_millis(10);
    }
}