发明Service特质
Service
背后的动机Tower是一个模块化和可重复使用的组件库,用于构建强大的网络客户端和服务器。其核心是 Service
特性。Service
是一个异步函数,它接受一个请求并产生一个响应。然而,其设计的某些方面可能并不那么明显。与其解释今天存在于Tower中的 Service
特性,不如让我们想象一下如果你从头开始,你会如何发明 Service
背后的动机。
想象一下,你正在用Rust构建一个小小的HTTP框架。这个框架将允许用户通过提供接收请求并回复一些响应的代码来实现一个HTTP服务器。
你可能有一个这样的API:
// Create a server that listens on port 3000
let server = Server::new("127.0.0.1:3000").await?;
// Somehow run the user's application
server.run(the_users_application).await?;
问题是,the_users_application应该是什么?
最简单的可能是:
fn handle_request(request: HttpRequest) -> HttpResponse {
// ...
}
其中 HttpRequest
和 HttpResponse
是由我们的框架提供的一些结构体。
有了这个,我们可以像这样实现 Server::run
:
impl Server {
async fn run<F>(self, handler: F) -> Result<(), Error>
where
F: Fn(HttpRequest) -> HttpResponse,
{
let listener = TcpListener::bind(self.addr).await?;
loop {
let mut connection = listener.accept().await?;
let request = read_http_request(&mut connection).await?;
// Call the handler provided by the user
let response = handler(request);
write_http_response(connection, response).await?;
}
}
}
在这里,我们有一个异步函数 run
,它接受一个接受 HttpRequest 并返回 HttpResponse 的闭包。
这意味着用户可以像这样使用我们的服务器:
fn handle_request(request: HttpRequest) -> HttpResponse {
if request.path() == "/" {
HttpResponse::ok("Hello, World!")
} else {
HttpResponse::not_found()
}
}
// Run the server and handle requests using our `handle_request` function
server.run(handle_request).await?;
这并不坏。它使用户可以很容易地运行HTTP服务器,而不必担心任何低层次的细节问题。
然而,我们目前的设计有一个问题:我们不能异步地处理请求。想象一下,我们的用户在处理请求的同时需要查询数据库或向其他一些服务器发送请求。目前,这需要在我们等待处理程序产生响应时进行阻塞。如果我们希望我们的服务器能够处理大量的并发连接,我们需要在等待该请求异步完成的同时能够为其他请求提供服务。让我们通过让处理程序函数返回一个 future 来解决这个问题:
impl Server {
async fn run<F, Fut>(self, handler: F) -> Result<(), Error>
where
// `handler` now returns a generic type `Fut`...
F: Fn(HttpRequest) -> Fut,
// ...which is a `Future` whose `Output` is an `HttpResponse`
Fut: Future<Output = HttpResponse>,
{
let listener = TcpListener::bind(self.addr).await?;
loop {
let mut connection = listener.accept().await?;
let request = read_http_request(&mut connection).await?;
// Await the future returned by `handler`
let response = handler(request).await;
write_http_response(connection, response).await?;
}
}
}
使用这个API和以前非常相似:
// Now an async function
async fn handle_request(request: HttpRequest) -> HttpResponse {
if request.path() == "/" {
HttpResponse::ok("Hello, World!")
} else if request.path() == "/important-data" {
// We can now do async stuff in here
let some_data = fetch_data_from_database().await;
make_response(some_data)
} else {
HttpResponse::not_found()
}
}
// Running the server is the same
server.run(handle_request).await?;
这就好得多了,因为我们的请求处理现在可以调用其他异步函数。然而,仍然缺少一些东西。如果我们的处理程序遇到了错误,不能产生响应怎么办?让我们让它返回一个 Result
:
impl Server {
async fn run<F, Fut>(self, handler: F) -> Result<(), Error>
where
F: Fn(HttpRequest) -> Fut,
// The response future is now allowed to fail
Fut: Future<Output = Result<HttpResponse, Error>>,
{
let listener = TcpListener::bind(self.addr).await?;
loop {
let mut connection = listener.accept().await?;
let request = read_http_request(&mut connection).await?;
// Pattern match on the result of the response future
match handler(request).await {
Ok(response) => write_http_response(connection, response).await?,
Err(error) => handle_error_somehow(error, connection),
}
}
}
}
添加更多的行为
现在,假设我们想确保所有的请求及时完成或失败,而不是让客户端无限期地等待一个可能永远不会到达的响应。我们可以通过给每个请求添加一个超时来做到这一点。超时设置了一个处理程序允许的最大持续时间的限制。如果它在这个时间内没有产生一个响应,就会返回一个错误。这允许客户端重试该请求或向用户报告一个错误,而不是永远等待。
你的第一个想法可能是修改Server,使其可以配置一个超时。然后它在每次调用处理程序时都会应用这个超时。然而,事实证明,你实际上可以在不修改Server的情况下添加一个超时。使用 tokio::time::timeout
,我们可以做一个新的处理函数,调用之前的 handle_request
,但超时时间为 30 秒。
async fn handler_with_timeout(request: HttpRequest) -> Result<HttpResponse, Error> {
let result = tokio::time::timeout(
Duration::from_secs(30),
handle_request(request)
).await;
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error),
Err(_timeout_elapsed) => Err(Error::timeout()),
}
}
这提供了一个相当好的关注点分离。我们能够在不改变任何现有代码的情况下增加一个超时功能。
让我们用这种方式再增加一个功能。想象一下,我们正在构建一个JSON API,因此希望在所有的响应上都有一个 Content-Type: application/json
头。我们可以用类似的方式包装 handler_with_timeout
,并像这样修改响应:
async fn handler_with_timeout_and_content_type(
request: HttpRequest,
) -> Result<HttpResponse, Error> {
let mut response = handler_with_timeout(request).await?;
response.set_header("Content-Type", "application/json");
Ok(response)
}
我们现在有了一个处理程序,它可以处理一个HTTP请求,时间不超过30秒,并且总是有正确的Content-Type头,所有这些都不用修改我们原来的handle_request函数或Server结构。
设计可以以这种方式扩展的库是非常强大的,因为它允许用户通过分层的新行为来扩展库的功能,而不必等待库的维护者为其添加支持。
它也使测试更容易,因为你可以将你的代码分解成小的隔离单元,并为它们编写细粒度的测试,而不必担心所有其他的部分。
然而,有一个问题。我们目前的设计让我们通过将一个处理函数包裹在一个新的处理函数中来组成新的行为,该处理函数实现了该行为,然后调用内部函数。这很有效,但如果我们想增加很多额外的功能,它就不能很好地扩展。想象一下,我们有许多 handle_with_* 函数,每个函数都增加了一点新的行为。要硬编码哪一个中间处理程序调用哪一个,将变得很有挑战性。我们目前的链条是
handler_with_timeout_and_content_type
调用handler_with_timeout
调用handle_request
实际处理请求
如果我们能以某种方式组合(compose)这三个函数而不需要硬编码确切的顺序,那就更好了。比如说:
let final_handler = with_content_type(with_timeout(handle_request));
同时仍然能够像以前一样运行我们的处理程序:
server.run(final_handler).await?;
你可以尝试将 with_content_type
和 with_timeout
作为函数来实现,该函数接受一个 F: Fn(HttpRequest) -> Future<Output = Result<HttpResponse, Error>>
类型的参数,并返回一个像 impl Fn(HttpRequest) -> impl Future<Output = Result<HttpResponse, Error>>
的闭包,但由于Rust今天允许 impl Trait
的限制,这实际上不可能。特别是 impl Fn() -> impl Future
是不允许的。使用 Box
是可能的,但这有一个我们想避免的性能代价。
除了调用处理程序之外,你也不能为其添加其他行为,但为什么有必要这样做,我们会再讨论。
Handler特性
让我们尝试另一种方法。与其说 Server::run
接受一个闭包(Fn(HttpRequest) -> ...
),不如说我们做一个新的trait来封装同样的 async fn(HttpRequest) -> Result<HttpResponse, Error>
:
trait Handler {
async fn call(&mut self, request: HttpRequest) -> Result<HttpResponse, Error>;
}
有了这样一个特性,我们就可以写出实现它的具体类型,这样我们就不用一直和 Fn
打交道了。
然而,Rust目前不支持异步trait方法,所以我们有两个选择:
-
让
call
返回一个封箱(boxed)的 future,如Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>
。这就是async-trait
箱的作用。 -
给
Handler
添加一个相关类型的Future
,这样用户就可以选择自己的类型。
让我们选择方案二,因为它是最灵活的。有具体的 future 类型的用户可以使用该类型,而不需要付出Box的代价,不关心的用户仍然可以使用Pin<Box<…>。
trait Handler {
type Future: Future<Output = Result<HttpResponse, Error>>;
fn call(&mut self, request: HttpRequest) -> Self::Future;
}
我们仍然必须要求 Handler::Future
实现 Future
,其输出类型为 Result<HttpResponse, Error>
,因为那是 Server::run
所要求的。
让 call
获取 &mut self
很有用,因为它允许处理程序在必要时更新其内部状态。
让我们把原来的 handle_request
函数转换成这个特性的实现。
struct RequestHandler;
impl Handler for RequestHandler {
// We use `Pin<Box<...>>` here for simplicity, but could also define our
// own `Future` type to avoid the overhead
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;
fn call(&mut self, request: HttpRequest) -> Self::Future {
Box::pin(async move {
// same implementation as we had before
if request.path() == "/" {
Ok(HttpResponse::ok("Hello, World!"))
} else if request.path() == "/important-data" {
let some_data = fetch_data_from_database().await?;
Ok(make_response(some_data))
} else {
Ok(HttpResponse::not_found())
}
})
}
}
支持超时如何?请记住,我们的目标是让我们能够将不同的功能组合在一起,而不需要修改每个单独的部分。
如果我们像这样定义一个通用的超时结构,会怎么样呢:
struct Timeout<T> {
// T will be some type that implements `Handler`
inner_handler: T,
duration: Duration,
}
然后我们可以为 Timeout<T>
实现 Handler
,并委托给 T
的 Handler
实现:
impl<T> Handler for Timeout<T>
where
T: Handler,
{
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;
fn call(&mut self, request: HttpRequest) -> Self::Future {
Box::pin(async move {
let result = tokio::time::timeout(
self.duration,
self.inner_handler.call(request),
).await;
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error),
Err(_timeout) => Err(Error::timeout()),
}
})
}
}
这里重要的一行是 self.inner_handler.call(request)
。这就是我们委托给内部处理程序并让它做它的事情的地方。我们不知道它是什么,我们只知道它完成后会产生一个 Result<HttpResponse, Error>
。
但这段代码并没有完全被编译。我们得到一个这样的错误:
error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/lib.rs:145:29
|
144 | fn call(&mut self, request: HttpRequest) -> Self::Future {
| --------- this data with an anonymous lifetime `'_`...
145 | Box::pin(async move {
| _____________________________^
146 | | let result = tokio::time::timeout(
147 | | self.duration,
148 | | self.inner_handler.call(request),
... |
155 | | }
156 | | })
| |_________^ ...is captured here, requiring it to live as long as `'static`
问题是,我们正在捕获一个 &mut self
,并将其移入一个异步块。这意味着我们的 future 的生命周期与 &mut self
的生命周期相关。这对我们来说并不适用,因为我们可能想在多个线程上运行我们的响应 future 以获得更好的性能,或者产生多个响应 future 并将它们全部并行运行。如果对处理程序的引用存在于futures中,这是不可能的。
相反,我们需要将 &mut self
转换为拥有的 self
。这正是 Clone
所做的。
// this must be `Clone` for `Timeout<T>` to be `Clone`
#[derive(Clone)]
struct RequestHandler;
impl Handler for RequestHandler {
// ...
}
#[derive(Clone)]
struct Timeout<T> {
inner_handler: T,
duration: Duration,
}
impl<T> Handler for Timeout<T>
where
T: Handler + Clone,
{
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;
fn call(&mut self, request: HttpRequest) -> Self::Future {
// Get an owned clone of `&mut self`
let mut this = self.clone();
Box::pin(async move {
let result = tokio::time::timeout(
this.duration,
this.inner_handler.call(request),
).await;
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error),
Err(_timeout) => Err(Error::timeout()),
}
})
}
}
注意,在这种情况下,克隆是非常廉价的,因为 RequestHandler
没有任何数据,而 Timeout<T>
只增加了一个 Duration
(这个会被 Copy
)。
又近了一步。我们现在得到一个不同的错误:
error[E0310]: the parameter type `T` may not live long enough
--> src/lib.rs:149:9
|
140 | impl<T> Handler for Timeout<T>
| - help: consider adding an explicit lifetime bound...: `T: 'static`
...
149 | / Box::pin(async move {
150 | | let result = tokio::time::timeout(
151 | | this.duration,
152 | | this.inner_handler.call(request),
... |
159 | | }
160 | | })
| |__________^ ...so that the type `impl Future` will meet its required lifetime bounds
现在的问题是,T
可以是任何一种类型。它甚至可以是一个包含引用的类型,比如 Vec<&'a str>
。然而,这并不可行,原因与之前一样。我们需要响应的 future 有一个 'static
生命周期,这样我们就可以更容易地传递它。
编译器实际上告诉了我们什么是修复。添加 T: 'static
:
impl<T> Handler for Timeout<T>
where
T: Handler + Clone + 'static,
{
// ...
}
响应的 future 现在满足了'static
生命周期的要求,因为它不包含引用(而 T
包含的任何引用都是 'static
)。现在,我们的代码可以编译了。
让我们创建一个类似的 handler 结构体,在响应中添加 Content-Type
头:
#[derive(Clone)]
struct JsonContentType<T> {
inner_handler: T,
}
impl<T> Handler for JsonContentType<T>
where
T: Handler + Clone + 'static,
{
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;
fn call(&mut self, request: HttpRequest) -> Self::Future {
let mut this = self.clone();
Box::pin(async move {
let mut response = this.inner_handler.call(request).await?;
response.set_header("Content-Type", "application/json");
Ok(response)
})
}
}
注意这与 Timeout
的模式非常相似。
接下来我们修改 Server::run
以接受我们新的 Handler
特性:
impl Server {
async fn run<T>(self, mut handler: T) -> Result<(), Error>
where
T: Handler,
{
let listener = TcpListener::bind(self.addr).await?;
loop {
let mut connection = listener.accept().await?;
let request = read_http_request(&mut connection).await?;
// have to call `Handler::call` here
match handler.call(request).await {
Ok(response) => write_http_response(connection, response).await?,
Err(error) => handle_error_somehow(error, connection),
}
}
}
}
我们现在可以把我们的三个 handler 组合在一起:
JsonContentType {
inner_handler: Timeout {
inner_handler: RequestHandler,
duration: Duration::from_secs(30),
},
}
而如果我们给我们的类型添加一些 new
方法,它们就会变得更容易构成:
let handler = RequestHandler;
let handler = Timeout::new(handler, Duration::from_secs(30));
let handler = JsonContentType::new(handler);
// `handler` has type `JsonContentType<Timeout<RequestHandler>>`
server.run(handler).await
这样做效果很好! 我们现在可以在不修改 RequestHandler
实现的情况下为其增加额外的功能。理论上,我们可以把我们的 JsonContentType
和 Timeout
handler 放到一个crate中,然后在 crates.io
上作为库发布给其他人使用。
让handler更灵活
我们的小 Handler
特性工作得很好,但目前它只支持我们的 HttpRequest
和 HttpResponse
类型。如果这些是泛型,那么用户可以使用他们想要的任何类型,那就更好了。
我们让请求成为特质的泛型参数,这样一个特定的服务就可以接受许多不同类型的请求。这允许定义可用于不同协议的处理程序,而不仅仅是HTTP。我们使响应成为关联类型,因为对于任何给定的请求类型,只能有一个(关联的)响应类型:相应的调用返回的类型
trait Handler<Request> {
type Response;
// Error should also be an associated type. No reason for that to be a
// hardcoded type
type Error;
// Our future type from before, but now it's output must use
// the associated `Response` and `Error` types
type Future: Future<Output = Result<Self::Response, Self::Error>>;
// `call` is unchanged, but note that `Request` here is our generic
// `Request` type parameter and not the `HttpRequest` type we've used
// until now
fn call(&mut self, request: Request) -> Self::Future;
}
我们对 RequestHandler
的实现现在变成了:
impl Handler<HttpRequest> for RequestHandler {
type Response = HttpResponse;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;
fn call(&mut self, request: Request) -> Self::Future {
// same as before
}
}
Timeout<T>
有点不同。由于它封装了其他 handler
并添加了一个异步超时,它实际上并不关心请求或响应类型是什么,只要它所封装的 handler 使用相同的类型。
Error
类型则有点不同。因为 tokio::time::timeout
返回 Result<T, tokio::time::error::Elapsed>
,我们必须能够将tokio::time::error::Elapsed
转换成内部 handler
的错误类型。
如果我们把所有这些东西放在一起,我们得到:
// `Timeout` accepts any request of type `R` as long as `T`
// accepts the same type of request
impl<R, T> Handler<R> for Timeout<T>
where
// The actual type of request must not contain
// references. The compiler would tell us to add
// this if we didn't
R: 'static,
// `T` must accept requests of type `R`
T: Handler<R> + Clone + 'static,
// We must be able to convert an `Elapsed` into
// `T`'s error type
T::Error: From<tokio::time::error::Elapsed>,
{
// Our response type is the same as `T`'s, since we
// don't have to modify it
type Response = T::Response;
// Error type is also the same
type Error = T::Error;
// Future must output a `Result` with the correct types
type Future = Pin<Box<dyn Future<Output = Result<T::Response, T::Error>>>>;
fn call(&mut self, request: R) -> Self::Future {
let mut this = self.clone();
Box::pin(async move {
let result = tokio::time::timeout(
this.duration,
this.inner_handler.call(request),
).await;
match result {
Ok(Ok(response)) => Ok(response),
Ok(Err(error)) => Err(error),
Err(elapsed) => {
// Convert the error
Err(T::Error::from(elapsed))
}
}
})
}
}
JsonContentType
有点不同。它不关心请求或错误类型,但它关心响应类型。它必须是 Response
,以便我们可以调用 set_header
。
因此,其实现是:
// Again a generic request type
impl<R, T> Handler<R> for JsonContentType<T>
where
R: 'static,
// `T` must accept requests of any type `R` and return
// responses of type `HttpResponse`
T: Handler<R, Response = HttpResponse> + Clone + 'static,
{
type Response = HttpResponse;
// Our error type is whatever `T`'s error type is
type Error = T::Error;
type Future = Pin<Box<dyn Future<Output = Result<Response, T::Error>>>>;
fn call(&mut self, request: R) -> Self::Future {
let mut this = self.clone();
Box::pin(async move {
let mut response = this.inner_handler.call(request).await?;
response.set_header("Content-Type", "application/json");
Ok(response)
})
}
}
最后,传递给 Server::run
的 handler
必须使用 HttpRequest
和 HttpResponse
。
impl Server {
async fn run<T>(self, mut handler: T) -> Result<(), Error>
where
T: Handler<HttpRequest, Response = HttpResponse>,
{
// ...
}
}
创建服务器并没有改变:
let handler = RequestHandler;
let handler = Timeout::new(handler, Duration::from_secs(30));
let handler = JsonContentType::new(handler);
server.run(handler).await
因此,我们现在有了一个 Handler
特性,使我们有可能将我们的应用程序分解成独立的小部分,并重新使用它们。不错啊
“如果我告诉你…”
到目前为止,我们只讨论了服务器端的事情。但是,我们的 Handler
特性实际上也适合HTTP客户端。我们可以想象一个客户端 Handler
,它接受一些请求并异步地将其发送给互联网上的某人。我们的 Timeout
包装器在这里也很有用。JsonContentType
可能不是,因为设置响应头不是客户端的工作。
由于我们的 Handler
特性对于定义服务器和客户端都很有用,Handler
可能不是一个合适的名字。客户端并不处理请求,它将请求发送给服务器,然后由服务器来处理它。让我们改称我们的trait为 Service
。
trait Service<Request> {
type Response;
type Error;
type Future: Future<Output = Result<Self::Response, Self::Error>>;
fn call(&mut self, request: Request) -> Self::Future;
}
这实际上几乎是Tower中定义的 Service 特性。如果你能跟到这里,你现在已经了解了 Tower 的大部分内容。除了 Service trait,Tower还提供了一些实用工具,通过包装一些同样实现 Service 的其他类型来实现 Service,就像我们对 Timeout
和 JsonContentType
所做的那样。这些服务的组成方式与我们到目前为止所做的类似。
一些由 Tower 提供的 Service 实例:
-
Timeout - 这与我们已经建立的超时基本相同。
-
Retry - 自动重试失败的请求。
-
RateLimit - 限制一个服务在一段时间内收到的请求数量。
像 Timeout
和 JsonContentType
这样的类型通常被称为中间件(middleware),因为它们包裹着另一个 Service
并以某种方式转换请求或响应。像 RequestHandler
这样的类型通常被称为叶子服务(leaf service),因为它们位于嵌套服务树的叶子上。实际的响应通常在叶子服务中产生,并由中间件修改。
唯一需要讨论的是 backpressure
和 poll_ready
。
背压
想象一下,你想写一个速率限制的中间件,包裹一个服务,并对底层服务将收到的最大并发请求数进行限制。如果你有一些服务对它能处理的负载量有一个硬性的上限,这将是非常有用的。
在我们目前的 Service 特性中,我们并没有一个好的方法来实现这样的东西。我们可以试试:
impl<R, T> Service<R> for ConcurrencyLimit<T> {
fn call(&mut self, request: R) -> Self::Future {
// 1. Check a counter for the number of requests currently being
// processed.
// 2. If there is capacity left send the request to `T`
// and increment the counter.
// 3. If not somehow wait until capacity becomes available.
// 4. When the response has been produced, decrement the counter.
}
}
内容出处: https://tokio.rs/blog/2021-05-14-inventing-the-service-trait