发明 Service trait
2021年5月17日
Tower 是一个模块化和可重用组件的库,用于构建健壮的网络客户端和服务器。其核心是 Service
trait。一个 Service
是一个异步函数,它接受一个请求并产生一个响应。然而,其设计的某些方面可能不是立即显而易见的。与其解释今天 Tower 中存在的 Service
trait,不如让我们通过想象如果你从头开始,你可能会如何发明它,来看看 Service
背后的动机。
想象一下你正在 Rust 中构建一个小型的 HTTP 框架。该框架将允许用户通过提供接收请求并回复一些响应的代码来实现 HTTP 服务器。
你可能会有一个像这样的 API
// Create a server that listens on port 3000
let server = Server::new("127.0.0.1:3000").await?;
// Somehow run the user's application
server.run(the_users_application).await?;
问题是,the_users_application
应该是什么?
最简单的可能有效的方法是
fn handle_request(request: HttpRequest) -> HttpResponse {
// ...
}
其中 HttpRequest
和 HttpResponse
是我们的框架提供的一些结构体。
有了这个,我们可以像这样实现 Server::run
impl Server {
async fn run<F>(self, handler: F) -> Result<(), Error>
where
F: Fn(HttpRequest) -> HttpResponse,
{
let listener = TcpListener::bind(self.addr).await?;
loop {
let mut connection = listener.accept().await?;
let request = read_http_request(&mut connection).await?;
task::spawn(async move {
// Call the handler provided by the user
let response = handler(request);
write_http_response(connection, response).await?;
});
}
}
}
在这里,我们有一个异步函数 run
,它接受一个闭包,该闭包接受一个 HttpRequest
并返回一个 HttpResponse
。
这意味着用户可以像这样使用我们的 Server
fn handle_request(request: HttpRequest) -> HttpResponse {
if request.path() == "/" {
HttpResponse::ok("Hello, World!")
} else {
HttpResponse::not_found()
}
}
// Run the server and handle requests using our `handle_request` function
server.run(handle_request).await?;
这还不错。它使用户可以轻松运行 HTTP 服务器,而无需担心任何底层细节。
然而,我们当前的设计有一个问题:我们无法异步处理请求。想象一下,我们的用户在处理请求时需要查询数据库或向其他服务器发送请求。目前,这将需要 阻塞,同时我们等待处理程序生成响应。如果我们希望我们的服务器能够处理大量并发连接,我们需要能够在等待该请求异步完成时为其他请求提供服务。让我们通过让处理函数返回一个 future 来修复这个问题
impl Server {
async fn run<F, Fut>(self, handler: F) -> Result<(), Error>
where
// `handler` now returns a generic type `Fut`...
F: Fn(HttpRequest) -> Fut,
// ...which is a `Future` whose `Output` is an `HttpResponse`
Fut: Future<Output = HttpResponse>,
{
let listener = TcpListener::bind(self.addr).await?;
loop {
let mut connection = listener.accept().await?;
let request = read_http_request(&mut connection).await?;
task::spawn(async move {
// Await the future returned by `handler`
let response = handler(request).await;
write_http_response(connection, response).await?;
});
}
}
}
使用这个 API 与之前非常相似
// Now an async function
async fn handle_request(request: HttpRequest) -> HttpResponse {
if request.path() == "/" {
HttpResponse::ok("Hello, World!")
} else if request.path() == "/important-data" {
// We can now do async stuff in here
let some_data = fetch_data_from_database().await;
make_response(some_data)
} else {
HttpResponse::not_found()
}
}
// Running the server is the same
server.run(handle_request).await?;
这要好得多,因为我们的请求处理现在可以调用其他异步函数。但是,仍然缺少一些东西。如果我们的处理程序遇到错误并且无法生成响应怎么办?让我们使其返回一个 Result
impl Server {
async fn run<F, Fut>(self, handler: F) -> Result<(), Error>
where
F: Fn(HttpRequest) -> Fut,
// The response future is now allowed to fail
Fut: Future<Output = Result<HttpResponse, Error>>,
{
let listener = TcpListener::bind(self.addr).await?;
loop {
let mut connection = listener.accept().await?;
let request = read_http_request(&mut connection).await?;
task::spawn(async move {
// Pattern match on the result of the response future
match handler(request).await {
Ok(response) => write_http_response(connection, response).await?,
Err(error) => handle_error_somehow(error, connection),
}
});
}
}
}
添加更多行为
现在,假设我们想确保所有请求都在及时的方式内完成或失败,而不是让客户端无限期地等待可能永远不会到达的响应。我们可以通过为每个请求添加超时来做到这一点。超时设置了 handler
允许花费的最长时间限制。如果它在该时间量内没有生成响应,则会返回错误。这允许客户端重试该请求或向用户报告错误,而不是永远等待。
你的第一个想法可能是修改 Server
,以便可以配置超时。然后,它将在每次调用 handler
时应用该超时。然而,事实证明你实际上可以在不修改 Server
的情况下添加超时。使用 tokio::time::timeout
,我们可以创建一个新的处理函数,该函数调用我们之前的 handle_request
,但超时时间为 30 秒
async fn handler_with_timeout(request: HttpRequest) -> Result<HttpResponse, Error> {
let result = tokio::time::timeout(
Duration::from_secs(30),
handle_request(request)
).await;
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error),
Err(_timeout_elapsed) => Err(Error::timeout()),
}
}
这提供了非常好的关注点分离。我们能够在不更改任何现有代码的情况下添加超时。
让我们以这种方式添加另一个功能。想象一下我们正在构建一个 JSON API,因此希望所有响应都带有 Content-Type: application/json
标头。我们可以以类似的方式包装 handler_with_timeout
,并像这样修改响应
async fn handler_with_timeout_and_content_type(
request: HttpRequest,
) -> Result<HttpResponse, Error> {
let mut response = handler_with_timeout(request).await?;
response.set_header("Content-Type", "application/json");
Ok(response)
}
我们现在有一个处理程序,它将处理 HTTP 请求,不超过 30 秒,并且始终具有正确的 Content-Type
标头,所有这些都无需修改我们原始的 handle_request
函数或 Server
结构体。
设计可以以这种方式扩展的库非常强大,因为它允许用户通过分层添加新行为来扩展库的功能,而无需等待库维护人员添加对其的支持。
它还使测试更容易,因为你可以将代码分解为小的隔离单元并为其编写细粒度的测试,而无需担心所有其他部分。
但是,有一个问题。我们当前的设计允许我们通过将处理函数包装在一个实现该行为然后调用内部函数的新处理函数中来组合新行为。这可行,但是如果我们想添加大量额外的功能,它的扩展性不是很好。想象一下,我们有很多 handle_with_*
函数,每个函数都添加了一点新行为。必须硬编码中间处理程序调用哪个处理程序的链将变得具有挑战性。我们当前的链是
handler_with_timeout_and_content_type
调用handler_with_timeout
调用handle_request
实际处理请求
如果我们能够以某种方式组合这三个函数,而无需硬编码确切的顺序,那就太好了。类似这样
let final_handler = with_content_type(with_timeout(handle_request));
同时仍然能够像以前一样运行我们的处理程序
server.run(final_handler).await?;
你可以尝试将 with_content_type
和 with_timeout
实现为接受 F: Fn(HttpRequest) -> Future<Output = Result<HttpResponse, Error>>
类型的参数并返回类似 impl Fn(HttpRequest) -> impl Future<Output = Result<HttpResponse, Error>>
的闭包的函数,但这实际上是不可能的,因为 Rust 今天允许 impl Trait
的位置存在限制。具体来说,不允许使用 impl Fn() -> impl Future
。使用 Box
是可能的,但这会带来我们希望避免的性能成本。
你也不能在调用处理程序之外向其添加其他行为,但为什么这是必要的,我们稍后会回到这一点。
Handler
trait
让我们尝试另一种方法。与其让 Server::run
接受闭包 (Fn(HttpRequest) -> ...
),不如让我们创建一个新的 trait,封装相同的 async fn(HttpRequest) -> Result<HttpResponse, Error>
trait Handler {
async fn call(&mut self, request: HttpRequest) -> Result<HttpResponse, Error>;
}
拥有像这样的 trait 允许我们编写实现它的具体类型,因此我们不必一直处理 Fn
。
然而,Rust 目前不支持异步 trait 方法,因此我们有两个选择
- 使
call
返回一个 boxed future,例如Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>
。这是 async-trait crate 所做的事情。 - 向
Handler
添加关联的type Future
,以便用户可以选择自己的类型。
让我们选择选项二,因为它是最灵活的。具有具体 future 类型的用户可以使用它,而无需 Box
的成本,而不在乎的用户仍然可以使用 Pin<Box<...>>
。
trait Handler {
type Future: Future<Output = Result<HttpResponse, Error>>;
fn call(&mut self, request: HttpRequest) -> Self::Future;
}
我们仍然必须要求 Handler::Future
实现 Future
,其输出类型为 Result<HttpResponse, Error>
,因为这是 Server::run
所要求的。
让 call
接受 &mut self
是有用的,因为它允许处理程序在必要时更新其内部状态1。
让我们将我们原始的 handle_request
函数转换为此 trait 的实现
struct RequestHandler;
impl Handler for RequestHandler {
// We use `Pin<Box<...>>` here for simplicity, but could also define our
// own `Future` type to avoid the overhead
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;
fn call(&mut self, request: HttpRequest) -> Self::Future {
Box::pin(async move {
// same implementation as we had before
if request.path() == "/" {
Ok(HttpResponse::ok("Hello, World!"))
} else if request.path() == "/important-data" {
let some_data = fetch_data_from_database().await?;
Ok(make_response(some_data))
} else {
Ok(HttpResponse::not_found())
}
})
}
}
如何支持超时?请记住,我们旨在实现的解决方案是允许我们将不同功能部分组合在一起而无需修改每个单独部分的功能。
如果我们定义一个通用的 Timeout
结构体,像这样
struct Timeout<T> {
// T will be some type that implements `Handler`
inner_handler: T,
duration: Duration,
}
然后我们可以为 Timeout<T>
实现 Handler
,并委托给 T
的 Handler
实现
impl<T> Handler for Timeout<T>
where
T: Handler,
{
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;
fn call(&mut self, request: HttpRequest) -> Self::Future {
Box::pin(async move {
let result = tokio::time::timeout(
self.duration,
self.inner_handler.call(request),
).await;
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error),
Err(_timeout) => Err(Error::timeout()),
}
})
}
}
这里重要的一行是 self.inner_handler.call(request)
。这是我们委托给内部处理程序并让它做它的事情的地方。我们不知道它是什么,我们只知道它完成后会产生一个 Result<HttpResponse, Error>
。
但是这段代码不能完全编译。我们收到类似这样的错误
error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/lib.rs:145:29
|
144 | fn call(&mut self, request: HttpRequest) -> Self::Future {
| --------- this data with an anonymous lifetime `'_`...
145 | Box::pin(async move {
| _____________________________^
146 | | let result = tokio::time::timeout(
147 | | self.duration,
148 | | self.inner_handler.call(request),
... |
155 | | }
156 | | })
| |_________^ ...is captured here, requiring it to live as long as `'static`
问题是我们正在捕获 &mut self
并将其移动到异步块中。这意味着我们的 future 的生命周期与 &mut self
的生命周期绑定。这对我们不起作用,因为我们可能希望在多个线程上运行我们的响应 future 以获得更好的性能,或者生成多个响应 future 并并行运行它们。如果处理程序的引用存在于 future 内部,那是不可能的2。
相反,我们需要将 &mut self
转换为拥有的 self
。这正是 Clone
所做的
// this must be `Clone` for `Timeout<T>` to be `Clone`
#[derive(Clone)]
struct RequestHandler;
impl Handler for RequestHandler {
// ...
}
#[derive(Clone)]
struct Timeout<T> {
inner_handler: T,
duration: Duration,
}
impl<T> Handler for Timeout<T>
where
T: Handler + Clone,
{
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;
fn call(&mut self, request: HttpRequest) -> Self::Future {
// Get an owned clone of `&mut self`
let mut this = self.clone();
Box::pin(async move {
let result = tokio::time::timeout(
this.duration,
this.inner_handler.call(request),
).await;
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error),
Err(_timeout) => Err(Error::timeout()),
}
})
}
}
请注意,在这种情况下克隆非常便宜,因为 RequestHandler
没有数据,而 Timeout<T>
仅添加了 Duration
(它是 Copy
)。
更进一步。我们现在收到不同的错误
error[E0310]: the parameter type `T` may not live long enough
--> src/lib.rs:149:9
|
140 | impl<T> Handler for Timeout<T>
| - help: consider adding an explicit lifetime bound...: `T: 'static`
...
149 | / Box::pin(async move {
150 | | let result = tokio::time::timeout(
151 | | this.duration,
152 | | this.inner_handler.call(request),
... |
159 | | }
160 | | })
| |__________^ ...so that the type `impl Future` will meet its required lifetime bounds
现在的问题是 T
可以是任何类型。它甚至可以是包含引用的类型,例如 Vec<&'a str>
。然而,由于与之前相同的原因,这不起作用。我们需要响应 future 具有 'static
生命周期,以便我们可以更轻松地传递它。
编译器实际上告诉我们修复方法是什么。添加 T: 'static
impl<T> Handler for Timeout<T>
where
T: Handler + Clone + 'static,
{
// ...
}
响应 future 现在满足 'static
生命周期要求,因为它不包含引用(并且 T
包含的任何引用都是 'static
)。现在,我们的代码可以编译了!
让我们为在响应中添加 Content-Type
标头创建一个类似的处理程序结构体
#[derive(Clone)]
struct JsonContentType<T> {
inner_handler: T,
}
impl<T> Handler for JsonContentType<T>
where
T: Handler + Clone + 'static,
{
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;
fn call(&mut self, request: HttpRequest) -> Self::Future {
let mut this = self.clone();
Box::pin(async move {
let mut response = this.inner_handler.call(request).await?;
response.set_header("Content-Type", "application/json");
Ok(response)
})
}
}
请注意,这遵循与 Timeout
非常相似的模式。
接下来,我们修改 Server::run
以接受我们新的 Handler
trait
impl Server {
async fn run<T>(self, mut handler: T) -> Result<(), Error>
where
T: Handler,
{
let listener = TcpListener::bind(self.addr).await?;
loop {
let mut connection = listener.accept().await?;
let request = read_http_request(&mut connection).await?;
task::spawn(async move {
// have to call `Handler::call` here
match handler.call(request).await {
Ok(response) => write_http_response(connection, response).await?,
Err(error) => handle_error_somehow(error, connection),
}
});
}
}
}
我们现在可以将我们的三个处理程序组合在一起
JsonContentType {
inner_handler: Timeout {
inner_handler: RequestHandler,
duration: Duration::from_secs(30),
},
}
如果我们为我们的类型添加一些 new
方法,它们会变得更容易组合
let handler = RequestHandler;
let handler = Timeout::new(handler, Duration::from_secs(30));
let handler = JsonContentType::new(handler);
// `handler` has type `JsonContentType<Timeout<RequestHandler>>`
server.run(handler).await
这效果很好!我们现在能够向 RequestHandler
分层添加额外的功能,而无需修改其实现。从理论上讲,我们可以将我们的 JsonContentType
和 Timeout
处理程序放入一个 crate 中,并将其作为库发布在 crates.io 上供其他人使用!
使 Handler
更灵活
我们的小型 Handler
trait 运行良好,但目前它仅支持我们的 HttpRequest
和 HttpResponse
类型。如果这些是泛型的,那就太好了,这样用户可以使用他们想要的任何类型。
我们将请求设为 trait 的泛型类型参数,以便给定的服务可以接受许多不同类型的请求。这允许定义可用于不同协议的处理程序,而不仅仅是 HTTP。我们将响应设为关联类型,因为对于任何给定的请求类型,只能有一个(关联的)响应类型:相应的调用返回的类型!
trait Handler<Request> {
type Response;
// Error should also be an associated type. No reason for that to be a
// hardcoded type
type Error;
// Our future type from before, but now it's output must use
// the associated `Response` and `Error` types
type Future: Future<Output = Result<Self::Response, Self::Error>>;
// `call` is unchanged, but note that `Request` here is our generic
// `Request` type parameter and not the `HttpRequest` type we've used
// until now
fn call(&mut self, request: Request) -> Self::Future;
}
我们对 RequestHandler
的实现现在变为
impl Handler<HttpRequest> for RequestHandler {
type Response = HttpResponse;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;
fn call(&mut self, request: Request) -> Self::Future {
// same as before
}
}
Timeout<T>
有点不同。由于它包装了其他 Handler
并添加了异步超时,因此它实际上并不关心请求或响应类型是什么,只要它包装的 Handler
使用相同的类型即可。
Error
类型有点不同。由于 tokio::time::timeout
返回 Result<T, tokio::time::error::Elapsed>
,我们必须能够将 tokio::time::error::Elapsed
转换为内部 Handler
的错误类型。
如果我们把所有这些东西放在一起,我们得到
// `Timeout` accepts any request of type `R` as long as `T`
// accepts the same type of request
impl<R, T> Handler<R> for Timeout<T>
where
// The actual type of request must not contain
// references. The compiler would tell us to add
// this if we didn't
R: 'static,
// `T` must accept requests of type `R`
T: Handler<R> + Clone + 'static,
// We must be able to convert an `Elapsed` into
// `T`'s error type
T::Error: From<tokio::time::error::Elapsed>,
{
// Our response type is the same as `T`'s, since we
// don't have to modify it
type Response = T::Response;
// Error type is also the same
type Error = T::Error;
// Future must output a `Result` with the correct types
type Future = Pin<Box<dyn Future<Output = Result<T::Response, T::Error>>>>;
fn call(&mut self, request: R) -> Self::Future {
let mut this = self.clone();
Box::pin(async move {
let result = tokio::time::timeout(
this.duration,
this.inner_handler.call(request),
).await;
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error),
Err(elapsed) => {
// Convert the error
Err(T::Error::from(elapsed))
}
}
})
}
}
JsonContentType
有点不同。它不关心请求或错误类型,但它确实关心响应类型。它必须是 Response
,以便我们可以调用 set_header
。
因此,实现是
// Again a generic request type
impl<R, T> Handler<R> for JsonContentType<T>
where
R: 'static,
// `T` must accept requests of any type `R` and return
// responses of type `HttpResponse`
T: Handler<R, Response = HttpResponse> + Clone + 'static,
{
type Response = HttpResponse;
// Our error type is whatever `T`'s error type is
type Error = T::Error;
type Future = Pin<Box<dyn Future<Output = Result<Response, T::Error>>>>;
fn call(&mut self, request: R) -> Self::Future {
let mut this = self.clone();
Box::pin(async move {
let mut response = this.inner_handler.call(request).await?;
response.set_header("Content-Type", "application/json");
Ok(response)
})
}
}
最后,传递给 Server::run
的 Handler
必须使用 HttpRequest
和 HttpResponse
impl Server {
async fn run<T>(self, mut handler: T) -> Result<(), Error>
where
T: Handler<HttpRequest, Response = HttpResponse>,
{
// ...
}
}
创建服务器没有改变
let handler = RequestHandler;
let handler = Timeout::new(handler, Duration::from_secs(30));
let handler = JsonContentType::new(handler);
server.run(handler).await
所以,我们现在有了一个 Handler
trait,它可以将我们的应用程序分解为小的独立部分并重用它们。不错!
“如果我告诉你……”
到目前为止,我们只讨论了服务器端的事情。但是,我们的 Handler
trait 实际上也适用于 HTTP 客户端。可以想象一个客户端 Handler
,它接受一些请求并异步地将其发送给互联网上的某人。我们的 Timeout
包装器实际上在这里也很有用。JsonContentType
可能不是,因为设置响应标头不是客户端的工作。
由于我们的 Handler
trait 对于定义服务器和客户端都很有用,因此 Handler
可能不是一个合适的名称。客户端不处理请求,它将其发送到服务器,服务器处理请求。让我们将我们的 trait 改名为 Service
trait Service<Request> {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;
fn call(&mut self, request: Request) -> Self::Future;
}
这实际上几乎是 Tower 中定义的 Service
trait。如果你能够跟随到这一点,你现在已经理解了 Tower 的大部分内容。除了 Service
trait 之外,Tower 还提供了几个实用程序,它们通过包装一些也实现了 Service
的其他类型来实现 Service
,就像我们对 Timeout
和 JsonContentType
所做的那样。这些服务可以以类似于我们迄今为止所做的方式进行组合。
Tower 提供的一些示例服务
像 Timeout
和 JsonContentType
这样的类型通常称为中间件,因为它们包装了另一个 Service
并以某种方式转换请求或响应。像 RequestHandler
这样的类型通常称为叶子服务,因为它们位于嵌套服务树的叶子上。实际的响应通常在叶子服务中生成,并由中间件修改。
剩下要讨论的唯一事情是背压和 poll_ready
。
背压
想象一下你想编写一个速率限制中间件,它包装一个 Service
并限制底层服务将接收的最大并发请求数。如果你有一些服务对其可以处理的负载量有硬性上限,这将非常有用。
使用我们当前的 Service
trait,我们实际上没有一个好的方法来实现这样的东西。我们可以尝试
impl<R, T> Service<R> for ConcurrencyLimit<T> {
fn call(&mut self, request: R) -> Self::Future {
// 1. Check a counter for the number of requests currently being
// processed.
// 2. If there is capacity left send the request to `T`
// and increment the counter.
// 3. If not somehow wait until capacity becomes available.
// 4. When the response has been produced, decrement the counter.
}
}
如果没有剩余容量,我们必须等待,并以某种方式在容量可用时收到通知。此外,我们必须在等待时将请求保存在内存中(也称为缓冲)。这意味着等待容量的请求越多,我们的程序将使用的内存就越多 --- 如果产生的请求多于我们的服务可以处理的请求,我们可能会耗尽内存!更健壮的方法是在我们确定服务有能力处理请求时才为请求分配空间。否则,我们可能会在使用大量内存缓冲请求的同时等待我们的服务准备就绪。
如果 Service
有一个像这样的方法就好了
trait Service<R> {
async fn ready(&mut self);
}
ready
将是一个异步函数,当服务有足够的容量来接收一个新请求时完成。然后,我们将要求用户在执行 service.call(request).await
之前首先调用 service.ready().await
。
将“调用服务”与“保留容量”分开也解锁了新的用例,例如能够维护一组“就绪服务”,我们在后台保持更新,以便当请求到达时,我们已经有一个就绪服务可以发送给它,而不必首先等待它准备就绪。
有了这种设计,ConcurrencyLimit
可以在 ready
内部跟踪容量,并且在有足够容量之前不允许用户调用 call
。
不关心容量的 Service
可以直接从 ready
返回,或者如果它们包装了某个内部 Service
,它们可以委托给其 ready
方法。
但是,我们仍然无法在 trait 内部定义异步函数。我们可以向 Service
添加另一个关联类型,称为 ReadyFuture
,但是必须返回 Future
会给我们带来我们之前遇到的相同的生命周期问题。如果有一种方法可以解决这个问题,那就太好了。
相反,我们可以从 Future
trait 中获得一些灵感,并定义一个名为 poll_ready
的方法
use std::task::{Context, Poll};
trait Service<R> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()>;
}
这意味着如果服务超出容量,poll_ready
将返回 Poll::Pending
,并在容量可用时使用来自 Context
的唤醒器通知调用者。此时,可以再次调用 poll_ready
,如果它返回 Poll::Ready(())
,则容量已保留,可以调用 call
。
请注意,从技术上讲,没有什么可以阻止用户在没有首先确保服务就绪的情况下调用 call
。但是,这样做被认为是违反了 Service
API 契约,并且允许实现如果对未就绪的服务调用 call
则 panic
!
poll_ready
不返回 Future
也意味着我们能够快速检查服务是否就绪,而无需被迫等待它准备就绪。如果我们调用 poll_ready
并返回 Poll::Pending
,我们可以简单地决定做其他事情而不是等待。除其他外,这使你可以构建负载均衡器,通过服务返回 Poll::Pending
的频率来估计服务的负载,并将请求发送到负载最小的服务。
仍然可以使用类似 futures::future::poll_fn
(或 tower::ServiceExt::ready
)的东西来获得一个在容量可用时解析的 Future
。
这种服务与其调用者就其容量进行通信的概念称为“背压传播”。你可以将其视为服务向其调用者施压,并告诉他们如果他们产生请求太快则减速。基本思想是你应该不要将请求发送到没有能力处理它的服务。相反,你应该等待(缓冲)、丢弃请求(负载削减)或以其他方式处理容量不足的情况。你可以在这里 和 这里了解更多关于背压的一般概念。
最后,在保留容量时也可能发生一些错误,因此 poll_ready
可能应该返回 Poll<Result<(), Self::Error>>
。通过此更改,我们现在已经到达了完整的 tower::Service
trait
pub trait Service<Request> {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;
fn poll_ready(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>;
fn call(&mut self, req: Request) -> Self::Future;
}
许多中间件不添加自己的背压,而只是委托给包装的服务的 poll_ready
实现。但是,中间件中的背压确实启用了一些有趣的用例,例如各种类型的速率限制、负载平衡和自动缩放。
由于你永远不知道 Service
可能由哪些中间件组成,因此不要忘记 poll_ready
非常重要。
有了这一切,调用服务最常见的方式是
use tower::{
Service,
// for the `ready` method
ServiceExt,
};
let response = service
// wait for the service to have capacity
.ready().await?
// send the request
.call(request).await?;
脚注
1: 关于 call
是否应该接受 Pin<&mut Self>
还是不接受,已经进行了一些讨论,但到目前为止,我们已决定使用普通的 &mut self
,这意味着处理程序(哎呀,服务)必须是 Unpin
。在实践中,这很少成为问题。更多详情请参见 此处。
2: 更准确地说,这需要响应 future 为 'static
的原因是,编写 Box<dyn Future>
实际上变成了 Box<dyn Future + 'static>
,匿名生命周期 fn call(&'_ mut self, ...)
不满足此要求。在未来,Rust 编译器团队计划添加一个名为 泛型关联类型 的功能,这将使我们能够解决这个问题。泛型关联类型将允许我们将响应 future 定义为 type Future<'a>
,并将 call
定义为 fn call<'a>(&'a mut self, ...) -> Self::Future<'a>
,但目前响应 future 必须是 'static
。