Select

到目前为止,当我们需要向系统添加并发时,我们生成了一个新的任务。现在我们将介绍使用 Tokio 并发执行异步代码的其他方法。

tokio::select!

tokio::select! 宏允许等待多个异步计算,并在单个计算完成时返回。

例如

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        let _ = tx1.send("one");
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

使用了两个 oneshot 通道。任何一个通道都可能先完成。select! 语句等待两个通道,并将 val 绑定到任务返回的值。当 tx1tx2 完成时,将执行关联的代码块。

完成的分支会被丢弃。在示例中,计算正在等待每个通道的 oneshot::Receiver。尚未完成的通道的 oneshot::Receiver 会被丢弃。

取消

在异步 Rust 中,取消是通过丢弃 future 来执行的。回顾 “异步深入”,异步 Rust 操作是使用 future 实现的,而 future 是惰性的。操作只有在 future 被轮询时才会进行。如果 future 被丢弃,操作将无法进行,因为所有关联的状态都已被丢弃。

话虽如此,有时异步操作会生成后台任务或启动在后台运行的其他操作。例如,在上面的示例中,生成了一个任务来发回消息。通常,该任务将执行一些计算来生成值。

Future 或其他类型可以实现 Drop 来清理后台资源。Tokio 的 oneshot::Receiver 通过向 Sender 半发送关闭通知来实现 Drop。发送者半可以接收此通知并通过丢弃正在进行的操作来中止它。

use tokio::sync::oneshot;

async fn some_operation() -> String {
    // Compute value here
}

#[tokio::main]
async fn main() {
    let (mut tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    tokio::spawn(async {
        // Select on the operation and the oneshot's
        // `closed()` notification.
        tokio::select! {
            val = some_operation() => {
                let _ = tx1.send(val);
            }
            _ = tx1.closed() => {
                // `some_operation()` is canceled, the
                // task completes and `tx1` is dropped.
            }
        }
    });

    tokio::spawn(async {
        let _ = tx2.send("two");
    });

    tokio::select! {
        val = rx1 => {
            println!("rx1 completed first with {:?}", val);
        }
        val = rx2 => {
            println!("rx2 completed first with {:?}", val);
        }
    }
}

Future 的实现

为了帮助更好地理解 select! 的工作原理,让我们看看假设的 Future 实现会是什么样子。这是一个简化版本。在实践中,select! 包含额外的功能,例如随机选择要首先轮询的分支。

use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct MySelect {
    rx1: oneshot::Receiver<&'static str>,
    rx2: oneshot::Receiver<&'static str>,
}

impl Future for MySelect {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {
            println!("rx1 completed first with {:?}", val);
            return Poll::Ready(());
        }

        if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {
            println!("rx2 completed first with {:?}", val);
            return Poll::Ready(());
        }

        Poll::Pending
    }
}

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    // use tx1 and tx2

    MySelect {
        rx1,
        rx2,
    }.await;
}

MySelect future 包含来自每个分支的 future。当 MySelect 被轮询时,第一个分支被轮询。如果它已准备好,则使用该值并且 MySelect 完成。在 .await 接收到来自 future 的输出后,future 被丢弃。这导致两个分支的 future 都被丢弃。由于一个分支未完成,因此操作被有效取消。

回顾上一节

当 future 返回 Poll::Pending 时,它必须确保在未来的某个时刻唤醒器 (waker) 被发出信号。忘记这样做会导致任务无限期地挂起。

MySelect 实现中没有显式使用 Context 参数。相反,唤醒器要求通过将 cx 传递给内部 future 来满足。由于内部 future 也必须满足唤醒器要求,因此仅在从内部 future 接收到 Poll::Pending 时才返回 Poll::PendingMySelect 也满足唤醒器要求。

语法

select! 宏可以处理两个以上的分支。当前限制为 64 个分支。每个分支的结构如下

<pattern> = <async expression> => <handler>,

当评估 select 宏时,所有 <async expression>s 被聚合并并发执行。当一个表达式完成时,结果会与 <pattern> 匹配。如果结果与模式匹配,则所有剩余的异步表达式都会被丢弃,并执行 <handler><handler> 表达式可以访问由 <pattern> 建立的任何绑定。

<pattern> 的基本情况是变量名,异步表达式的结果绑定到变量名,并且 <handler> 可以访问该变量。这就是为什么在原始示例中,val 用于 <pattern>,而 <handler> 能够访问 val

如果 <pattern> 匹配异步计算的结果,那么剩余的异步表达式将继续并发执行,直到下一个表达式完成。此时,相同的逻辑将应用于该结果。

由于 select! 接受任何异步表达式,因此可以定义更复杂的计算来进行选择。

在这里,我们选择一个 oneshot 通道的输出和一个 TCP 连接。

use tokio::net::TcpStream;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx, rx) = oneshot::channel();

    // Spawn a task that sends a message over the oneshot
    tokio::spawn(async move {
        tx.send("done").unwrap();
    });

    tokio::select! {
        socket = TcpStream::connect("localhost:3465") => {
            println!("Socket connected {:?}", socket);
        }
        msg = rx => {
            println!("received message first {:?}", msg);
        }
    }
}

在这里,我们选择一个 oneshot 和从 TcpListener 接受的套接字。

use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    let (tx, rx) = oneshot::channel();

    tokio::spawn(async move {
        tx.send(()).unwrap();
    });

    let mut listener = TcpListener::bind("localhost:3465").await?;

    tokio::select! {
        _ = async {
            loop {
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            // Help the rust type inferencer out
            Ok::<_, io::Error>(())
        } => {}
        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}

accept 循环运行直到遇到错误或 rx 接收到值。_ 模式表示我们对异步计算的返回值不感兴趣。

返回值

tokio::select! 宏返回评估后的 <handler> 表达式的结果。

async fn computation1() -> String {
    // .. computation
}

async fn computation2() -> String {
    // .. computation
}

#[tokio::main]
async fn main() {
    let out = tokio::select! {
        res1 = computation1() => res1,
        res2 = computation2() => res2,
    };

    println!("Got = {}", out);
}

因此,要求每个分支的 <handler> 表达式都评估为相同的类型。如果不需要 select! 表达式的输出,则最佳实践是将表达式评估为 ()

错误

使用 ? 运算符会传播表达式中的错误。其工作方式取决于 ? 是从异步表达式还是从处理程序中使用。在异步表达式中使用 ? 会将错误从异步表达式中传播出去。这使得异步表达式的输出成为 Result。从处理程序中使用 ? 会立即将错误从 select! 表达式中传播出去。让我们再次看一下 accept 循环示例

use tokio::net::TcpListener;
use tokio::sync::oneshot;
use std::io;

#[tokio::main]
async fn main() -> io::Result<()> {
    // [setup `rx` oneshot channel]

    let listener = TcpListener::bind("localhost:3465").await?;

    tokio::select! {
        res = async {
            loop {
                let (socket, _) = listener.accept().await?;
                tokio::spawn(async move { process(socket) });
            }

            // Help the rust type inferencer out
            Ok::<_, io::Error>(())
        } => {
            res?;
        }
        _ = rx => {
            println!("terminating accept loop");
        }
    }

    Ok(())
}

注意 listener.accept().await?? 运算符将错误从该表达式传播出去并传递给 res 绑定。如果发生错误,res 将被设置为 Err(_)。然后,在处理程序中,再次使用 ? 运算符。res? 语句会将错误从 main 函数中传播出去。

模式匹配

回顾一下,select! 宏分支语法定义为

<pattern> = <async expression> => <handler>,

到目前为止,我们只对 <pattern> 使用了变量绑定。但是,可以使用任何 Rust 模式。例如,假设我们正在从多个 MPSC 通道接收,我们可能会这样做

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (mut tx1, mut rx1) = mpsc::channel(128);
    let (mut tx2, mut rx2) = mpsc::channel(128);

    tokio::spawn(async move {
        // Do something w/ `tx1` and `tx2`
    });

    tokio::select! {
        Some(v) = rx1.recv() => {
            println!("Got {:?} from rx1", v);
        }
        Some(v) = rx2.recv() => {
            println!("Got {:?} from rx2", v);
        }
        else => {
            println!("Both channels closed");
        }
    }
}

在本示例中,select! 表达式等待从 rx1rx2 接收值。如果一个通道关闭,recv() 将返回 None。这匹配模式,并且该分支被禁用。select! 表达式将继续等待剩余的分支。

请注意,此 select! 表达式包含一个 else 分支。select! 表达式必须评估为一个值。当使用模式匹配时,可能没有任何分支匹配其关联的模式。如果发生这种情况,则评估 else 分支。

借用

生成任务时,生成的异步表达式必须拥有其所有数据。select! 宏没有此限制。每个分支的异步表达式都可以借用数据并并发操作。根据 Rust 的借用规则,多个异步表达式可以不可变地借用单个数据片段,或者单个异步表达式可以可变地借用单个数据片段。

让我们看一些例子。在这里,我们同时将相同的数据发送到两个不同的 TCP 目标。

use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use std::io;
use std::net::SocketAddr;

async fn race(
    data: &[u8],
    addr1: SocketAddr,
    addr2: SocketAddr
) -> io::Result<()> {
    tokio::select! {
        Ok(_) = async {
            let mut socket = TcpStream::connect(addr1).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}
        Ok(_) = async {
            let mut socket = TcpStream::connect(addr2).await?;
            socket.write_all(data).await?;
            Ok::<_, io::Error>(())
        } => {}
        else => {}
    };

    Ok(())
}

data 变量正在被两个异步表达式不可变地借用。当其中一个操作成功完成时,另一个操作将被丢弃。因为我们对 Ok(_) 进行模式匹配,所以如果一个表达式失败,另一个表达式将继续执行。

当涉及到每个分支的 <handler> 时,select! 保证只运行一个 <handler>。因此,每个 <handler> 都可以可变地借用相同的数据。

例如,这会在两个处理程序中修改 out

use tokio::sync::oneshot;

#[tokio::main]
async fn main() {
    let (tx1, rx1) = oneshot::channel();
    let (tx2, rx2) = oneshot::channel();

    let mut out = String::new();

    tokio::spawn(async move {
        // Send values on `tx1` and `tx2`.
    });

    tokio::select! {
        _ = rx1 => {
            out.push_str("rx1 completed");
        }
        _ = rx2 => {
            out.push_str("rx2 completed");
        }
    }

    println!("{}", out);
}

循环

select! 宏通常在循环中使用。本节将介绍一些示例,以展示在循环中使用 select! 宏的常见方法。我们从选择多个通道开始

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx1, mut rx1) = mpsc::channel(128);
    let (tx2, mut rx2) = mpsc::channel(128);
    let (tx3, mut rx3) = mpsc::channel(128);

    loop {
        let msg = tokio::select! {
            Some(msg) = rx1.recv() => msg,
            Some(msg) = rx2.recv() => msg,
            Some(msg) = rx3.recv() => msg,
            else => { break }
        };

        println!("Got {:?}", msg);
    }

    println!("All channels have been closed.");
}

此示例选择三个通道接收器。当在任何通道上收到消息时,它将被写入 STDOUT。当一个通道关闭时,recv() 返回 None。通过使用模式匹配,select! 宏继续等待剩余的通道。当所有通道都关闭时,将评估 else 分支并终止循环。

select! 宏随机选择分支以首先检查是否准备就绪。当多个通道具有挂起的值时,将随机选择一个通道进行接收。这是为了处理接收循环处理消息的速度慢于消息被推入通道的情况,这意味着通道开始填满。如果 select! 没有随机选择一个分支首先检查,则在循环的每次迭代中,将首先检查 rx1。如果 rx1 始终包含新消息,则永远不会检查剩余的通道。

如果在评估 select! 时,多个通道具有挂起的消息,则只有一个通道弹出一个值。所有其他通道保持不变,并且它们的消息保留在这些通道中,直到下一次循环迭代。没有消息丢失。

恢复异步操作

现在我们将展示如何在对 select! 的多次调用中运行异步操作。在本示例中,我们有一个项目类型为 i32 的 MPSC 通道和一个异步函数。我们希望运行异步函数,直到它完成或在通道上收到偶数。

async fn action() {
    // Some asynchronous logic
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);    
    
    let operation = action();
    tokio::pin!(operation);
    
    loop {
        tokio::select! {
            _ = &mut operation => break,
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    break;
                }
            }
        }
    }
}

请注意,不是在 select! 宏中调用 action(),而是在循环外部调用它。action() 的返回值被赋值给 operation没有调用 .await。然后我们在 operation 上调用 tokio::pin!

select! 循环内部,我们没有传入 operation,而是传入 &mut operationoperation 变量正在跟踪正在进行的异步操作。循环的每次迭代都使用相同的操作,而不是发出对 action() 的新调用。

另一个 select! 分支从通道接收消息。如果消息是偶数,我们就完成了循环。否则,再次启动 select!

这是我们第一次使用 tokio::pin!。我们暂不深入研究 pinning 的细节。需要注意的是,要 .await 一个引用,被引用的值必须是 pinned 的或实现 Unpin

如果我们删除 tokio::pin! 行并尝试编译,我们会得到以下错误

error[E0599]: no method named `poll` found for struct
     `std::pin::Pin<&mut &mut impl std::future::Future>`
     in the current scope
  --> src/main.rs:16:9
   |
16 | /         tokio::select! {
17 | |             _ = &mut operation => break,
18 | |             Some(v) = rx.recv() => {
19 | |                 if v % 2 == 0 {
...  |
22 | |             }
23 | |         }
   | |_________^ method not found in
   |             `std::pin::Pin<&mut &mut impl std::future::Future>`
   |
   = note: the method `poll` exists but the following trait bounds
            were not satisfied:
           `impl std::future::Future: std::marker::Unpin`
           which is required by
           `&mut impl std::future::Future: std::future::Future`

虽然我们在 上一章 中介绍了 Future,但此错误仍然不是很清楚。如果你遇到关于在尝试对引用调用 .awaitFuture 未实现的错误,那么 future 可能需要被 pinned。

标准库 上阅读更多关于 Pin 的信息。

修改分支

让我们看一个稍微复杂的循环。我们有

  1. 一个 i32 值的通道。
  2. 要对 i32 值执行的异步操作。

我们想要实现的逻辑是

  1. 等待通道上的偶数
  2. 使用偶数作为输入启动异步操作。
  3. 等待操作完成,但同时监听通道上更多的偶数。
  4. 如果在现有操作完成之前收到新的偶数,则中止现有操作,并使用新的偶数重新启动它。
async fn action(input: Option<i32>) -> Option<String> {
    // If the input is `None`, return `None`.
    // This could also be written as `let i = input?;`
    let i = match input {
        Some(input) => input,
        None => return None,
    };
    // async logic here
}

#[tokio::main]
async fn main() {
    let (mut tx, mut rx) = tokio::sync::mpsc::channel(128);
    
    let mut done = false;
    let operation = action(None);
    tokio::pin!(operation);
    
    tokio::spawn(async move {
        let _ = tx.send(1).await;
        let _ = tx.send(3).await;
        let _ = tx.send(2).await;
    });
    
    loop {
        tokio::select! {
            res = &mut operation, if !done => {
                done = true;

                if let Some(v) = res {
                    println!("GOT = {}", v);
                    return;
                }
            }
            Some(v) = rx.recv() => {
                if v % 2 == 0 {
                    // `.set` is a method on `Pin`.
                    operation.set(action(Some(v)));
                    done = false;
                }
            }
        }
    }
}

我们使用与上一个示例类似的策略。异步函数在循环外部调用并赋值给 operationoperation 变量被 pinned。循环在 operation 和通道接收器上进行选择。

注意 action 如何将 Option<i32> 作为参数。在我们收到第一个偶数之前,我们需要将 operation 实例化为某些内容。我们使 action 接受 Option 并返回 Option。如果传入 None,则返回 None。在第一次循环迭代中,operation 会立即完成并返回 None

此示例使用了一些新的语法。第一个分支包含 , if !done。这是一个分支先决条件。在解释其工作原理之前,让我们看看如果省略先决条件会发生什么。省略 , if !done 并运行示例会导致以下输出

thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

尝试在 operation 完成之后使用它时,会发生此错误。通常,在使用 .await 时,被等待的值会被消耗掉。在本示例中,我们等待的是一个引用。这意味着 operation 在完成之后仍然存在。

为了避免此 panic,我们必须注意在 operation 完成后禁用第一个分支。done 变量用于跟踪 operation 是否完成。select! 分支可以包含先决条件。此先决条件在 select! 等待分支之前进行检查。如果条件评估为 false,则禁用该分支。done 变量初始化为 false。当 operation 完成时,done 被设置为 true。下一个循环迭代将禁用 operation 分支。当从通道收到偶数消息时,operation 将被重置,并且 done 被设置为 false

每个任务的并发

tokio::spawnselect! 都允许运行并发的异步操作。但是,用于运行并发操作的策略有所不同。tokio::spawn 函数接受一个异步操作并生成一个新的任务来运行它。任务是 Tokio 运行时调度的对象。两个不同的任务由 Tokio 独立调度。它们可能在不同的操作系统线程上同时运行。因此,生成的任务与生成的线程具有相同的限制:不允许借用。

select!在同一任务上并发运行所有分支。由于 select! 宏的所有分支都在同一任务上执行,因此它们永远不会同时运行。select! 宏在单个任务上多路复用异步操作。