发明Service特质

让我们想象一下如果你从头开始,你会如何发明 Service 背后的动机

Tower是一个模块化和可重复使用的组件库,用于构建强大的网络客户端和服务器。其核心是 Service 特性。Service 是一个异步函数,它接受一个请求并产生一个响应。然而,其设计的某些方面可能并不那么明显。与其解释今天存在于Tower中的 Service 特性,不如让我们想象一下如果你从头开始,你会如何发明 Service 背后的动机。

想象一下,你正在用Rust构建一个小小的HTTP框架。这个框架将允许用户通过提供接收请求并回复一些响应的代码来实现一个HTTP服务器。

你可能有一个这样的API:

// Create a server that listens on port 3000
let server = Server::new("127.0.0.1:3000").await?;

// Somehow run the user's application
server.run(the_users_application).await?;

问题是,the_users_application应该是什么?

最简单的可能是:

fn handle_request(request: HttpRequest) -> HttpResponse {
    // ...
}

其中 HttpRequestHttpResponse 是由我们的框架提供的一些结构体。

有了这个,我们可以像这样实现 Server::run :

impl Server {
    async fn run<F>(self, handler: F) -> Result<(), Error>
    where
        F: Fn(HttpRequest) -> HttpResponse,
    {
        let listener = TcpListener::bind(self.addr).await?;

        loop {
            let mut connection = listener.accept().await?;
            let request = read_http_request(&mut connection).await?;

            // Call the handler provided by the user
            let response = handler(request);

            write_http_response(connection, response).await?;
        }
    }
}

在这里,我们有一个异步函数 run,它接受一个接受 HttpRequest 并返回 HttpResponse 的闭包。

这意味着用户可以像这样使用我们的服务器:

fn handle_request(request: HttpRequest) -> HttpResponse {
    if request.path() == "/" {
        HttpResponse::ok("Hello, World!")
    } else {
        HttpResponse::not_found()
    }
}

// Run the server and handle requests using our `handle_request` function
server.run(handle_request).await?;

这并不坏。它使用户可以很容易地运行HTTP服务器,而不必担心任何低层次的细节问题。

然而,我们目前的设计有一个问题:我们不能异步地处理请求。想象一下,我们的用户在处理请求的同时需要查询数据库或向其他一些服务器发送请求。目前,这需要在我们等待处理程序产生响应时进行阻塞。如果我们希望我们的服务器能够处理大量的并发连接,我们需要在等待该请求异步完成的同时能够为其他请求提供服务。让我们通过让处理程序函数返回一个 future 来解决这个问题:

impl Server {
    async fn run<F, Fut>(self, handler: F) -> Result<(), Error>
    where
        // `handler` now returns a generic type `Fut`...
        F: Fn(HttpRequest) -> Fut,
        // ...which is a `Future` whose `Output` is an `HttpResponse`
        Fut: Future<Output = HttpResponse>,
    {
        let listener = TcpListener::bind(self.addr).await?;

        loop {
            let mut connection = listener.accept().await?;
            let request = read_http_request(&mut connection).await?;

            // Await the future returned by `handler`
            let response = handler(request).await;

            write_http_response(connection, response).await?;
        }
    }
}

使用这个API和以前非常相似:

// Now an async function
async fn handle_request(request: HttpRequest) -> HttpResponse {
    if request.path() == "/" {
        HttpResponse::ok("Hello, World!")
    } else if request.path() == "/important-data" {
        // We can now do async stuff in here
        let some_data = fetch_data_from_database().await;
        make_response(some_data)
    } else {
        HttpResponse::not_found()
    }
}

// Running the server is the same
server.run(handle_request).await?;

这就好得多了,因为我们的请求处理现在可以调用其他异步函数。然而,仍然缺少一些东西。如果我们的处理程序遇到了错误,不能产生响应怎么办?让我们让它返回一个 Result:

impl Server {
    async fn run<F, Fut>(self, handler: F) -> Result<(), Error>
    where
        F: Fn(HttpRequest) -> Fut,
        // The response future is now allowed to fail
        Fut: Future<Output = Result<HttpResponse, Error>>,
    {
        let listener = TcpListener::bind(self.addr).await?;

        loop {
            let mut connection = listener.accept().await?;
            let request = read_http_request(&mut connection).await?;

            // Pattern match on the result of the response future
            match handler(request).await {
                Ok(response) => write_http_response(connection, response).await?,
                Err(error) => handle_error_somehow(error, connection),
            }
        }
    }
}

添加更多的行为

现在,假设我们想确保所有的请求及时完成或失败,而不是让客户端无限期地等待一个可能永远不会到达的响应。我们可以通过给每个请求添加一个超时来做到这一点。超时设置了一个处理程序允许的最大持续时间的限制。如果它在这个时间内没有产生一个响应,就会返回一个错误。这允许客户端重试该请求或向用户报告一个错误,而不是永远等待。

你的第一个想法可能是修改Server,使其可以配置一个超时。然后它在每次调用处理程序时都会应用这个超时。然而,事实证明,你实际上可以在不修改Server的情况下添加一个超时。使用 tokio::time::timeout,我们可以做一个新的处理函数,调用之前的 handle_request,但超时时间为 30 秒。

async fn handler_with_timeout(request: HttpRequest) -> Result<HttpResponse, Error> {
    let result = tokio::time::timeout(
        Duration::from_secs(30),
        handle_request(request)
    ).await;

    match result {
        Ok(Ok(response)) => Ok(response),
        Ok(Err(error)) => Err(error),
        Err(_timeout_elapsed) => Err(Error::timeout()),
    }
}

这提供了一个相当好的关注点分离。我们能够在不改变任何现有代码的情况下增加一个超时功能。

让我们用这种方式再增加一个功能。想象一下,我们正在构建一个JSON API,因此希望在所有的响应上都有一个 Content-Type: application/json 头。我们可以用类似的方式包装 handler_with_timeout,并像这样修改响应:

async fn handler_with_timeout_and_content_type(
    request: HttpRequest,
) -> Result<HttpResponse, Error> {
    let mut response = handler_with_timeout(request).await?;
    response.set_header("Content-Type", "application/json");
    Ok(response)
}

我们现在有了一个处理程序,它可以处理一个HTTP请求,时间不超过30秒,并且总是有正确的Content-Type头,所有这些都不用修改我们原来的handle_request函数或Server结构。

设计可以以这种方式扩展的库是非常强大的,因为它允许用户通过分层的新行为来扩展库的功能,而不必等待库的维护者为其添加支持。

它也使测试更容易,因为你可以将你的代码分解成小的隔离单元,并为它们编写细粒度的测试,而不必担心所有其他的部分。

然而,有一个问题。我们目前的设计让我们通过将一个处理函数包裹在一个新的处理函数中来组成新的行为,该处理函数实现了该行为,然后调用内部函数。这很有效,但如果我们想增加很多额外的功能,它就不能很好地扩展。想象一下,我们有许多 handle_with_* 函数,每个函数都增加了一点新的行为。要硬编码哪一个中间处理程序调用哪一个,将变得很有挑战性。我们目前的链条是

  1. handler_with_timeout_and_content_type 调用
  2. handler_with_timeout 调用
  3. handle_request 实际处理请求

如果我们能以某种方式组合(compose)这三个函数而不需要硬编码确切的顺序,那就更好了。比如说:

let final_handler = with_content_type(with_timeout(handle_request));

同时仍然能够像以前一样运行我们的处理程序:

server.run(final_handler).await?;

你可以尝试将 with_content_typewith_timeout 作为函数来实现,该函数接受一个 F: Fn(HttpRequest) -> Future<Output = Result<HttpResponse, Error>> 类型的参数,并返回一个像 impl Fn(HttpRequest) -> impl Future<Output = Result<HttpResponse, Error>> 的闭包,但由于Rust今天允许 impl Trait 的限制,这实际上不可能。特别是 impl Fn() -> impl Future 是不允许的。使用 Box 是可能的,但这有一个我们想避免的性能代价。

除了调用处理程序之外,你也不能为其添加其他行为,但为什么有必要这样做,我们会再讨论。

Handler特性

让我们尝试另一种方法。与其说 Server::run 接受一个闭包(Fn(HttpRequest) -> ...),不如说我们做一个新的trait来封装同样的 async fn(HttpRequest) -> Result<HttpResponse, Error>:

trait Handler {
    async fn call(&mut self, request: HttpRequest) -> Result<HttpResponse, Error>;
}

有了这样一个特性,我们就可以写出实现它的具体类型,这样我们就不用一直和 Fn 打交道了。

然而,Rust目前不支持异步trait方法,所以我们有两个选择:

  1. call 返回一个封箱(boxed)的 future,如 Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>。这就是 async-trait 箱的作用。

  2. Handler 添加一个相关类型的 Future,这样用户就可以选择自己的类型。

让我们选择方案二,因为它是最灵活的。有具体的 future 类型的用户可以使用该类型,而不需要付出Box的代价,不关心的用户仍然可以使用Pin<Box<…>。

trait Handler {
    type Future: Future<Output = Result<HttpResponse, Error>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future;
}

我们仍然必须要求 Handler::Future 实现 Future,其输出类型为 Result<HttpResponse, Error>,因为那是 Server::run 所要求的。

call 获取 &mut self 很有用,因为它允许处理程序在必要时更新其内部状态。

让我们把原来的 handle_request 函数转换成这个特性的实现。

struct RequestHandler;

impl Handler for RequestHandler {
    // We use `Pin<Box<...>>` here for simplicity, but could also define our
    // own `Future` type to avoid the overhead
    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future {
        Box::pin(async move {
            // same implementation as we had before
            if request.path() == "/" {
                Ok(HttpResponse::ok("Hello, World!"))
            } else if request.path() == "/important-data" {
                let some_data = fetch_data_from_database().await?;
                Ok(make_response(some_data))
            } else {
                Ok(HttpResponse::not_found())
            }
        })
    }
}

支持超时如何?请记住,我们的目标是让我们能够将不同的功能组合在一起,而不需要修改每个单独的部分。

如果我们像这样定义一个通用的超时结构,会怎么样呢:

struct Timeout<T> {
    // T will be some type that implements `Handler`
    inner_handler: T,
    duration: Duration,
}

然后我们可以为 Timeout<T> 实现 Handler,并委托给 THandler 实现:

impl<T> Handler for Timeout<T>
where
    T: Handler,
{
    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future {
        Box::pin(async move {
            let result = tokio::time::timeout(
                self.duration,
                self.inner_handler.call(request),
            ).await;

            match result {
                Ok(Ok(response)) => Ok(response),
                Ok(Err(error)) => Err(error),
                Err(_timeout) => Err(Error::timeout()),
            }
        })
    }
}

这里重要的一行是 self.inner_handler.call(request)。这就是我们委托给内部处理程序并让它做它的事情的地方。我们不知道它是什么,我们只知道它完成后会产生一个 Result<HttpResponse, Error>

但这段代码并没有完全被编译。我们得到一个这样的错误:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
   --> src/lib.rs:145:29
    |
144 |       fn call(&mut self, request: HttpRequest) -> Self::Future {
    |               --------- this data with an anonymous lifetime `'_`...
145 |           Box::pin(async move {
    |  _____________________________^
146 | |             let result = tokio::time::timeout(
147 | |                 self.duration,
148 | |                 self.inner_handler.call(request),
...   |
155 | |             }
156 | |         })
    | |_________^ ...is captured here, requiring it to live as long as `'static`

问题是,我们正在捕获一个 &mut self,并将其移入一个异步块。这意味着我们的 future 的生命周期与 &mut self 的生命周期相关。这对我们来说并不适用,因为我们可能想在多个线程上运行我们的响应 future 以获得更好的性能,或者产生多个响应 future 并将它们全部并行运行。如果对处理程序的引用存在于futures中,这是不可能的。

相反,我们需要将 &mut self 转换为拥有的 self。这正是 Clone 所做的。

// this must be `Clone` for `Timeout<T>` to be `Clone`
#[derive(Clone)]
struct RequestHandler;

impl Handler for RequestHandler {
    // ...
}

#[derive(Clone)]
struct Timeout<T> {
    inner_handler: T,
    duration: Duration,
}

impl<T> Handler for Timeout<T>
where
    T: Handler + Clone,
{
    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future {
        // Get an owned clone of `&mut self`
        let mut this = self.clone();

        Box::pin(async move {
            let result = tokio::time::timeout(
                this.duration,
                this.inner_handler.call(request),
            ).await;

            match result {
                Ok(Ok(response)) => Ok(response),
                Ok(Err(error)) => Err(error),
                Err(_timeout) => Err(Error::timeout()),
            }
        })
    }
}

注意,在这种情况下,克隆是非常廉价的,因为 RequestHandler 没有任何数据,而 Timeout<T> 只增加了一个 Duration(这个会被 Copy)。

又近了一步。我们现在得到一个不同的错误:

error[E0310]: the parameter type `T` may not live long enough
   --> src/lib.rs:149:9
    |
140 |   impl<T> Handler for Timeout<T>
    |        - help: consider adding an explicit lifetime bound...: `T: 'static`
...
149 | /         Box::pin(async move {
150 | |             let result = tokio::time::timeout(
151 | |                 this.duration,
152 | |                 this.inner_handler.call(request),
...   |
159 | |             }
160 | |         })
    | |__________^ ...so that the type `impl Future` will meet its required lifetime bounds

现在的问题是,T 可以是任何一种类型。它甚至可以是一个包含引用的类型,比如 Vec<&'a str>。然而,这并不可行,原因与之前一样。我们需要响应的 future 有一个 'static 生命周期,这样我们就可以更容易地传递它。

编译器实际上告诉了我们什么是修复。添加 T: 'static:

impl<T> Handler for Timeout<T>
where
    T: Handler + Clone + 'static,
{
    // ...
}

响应的 future 现在满足了'static 生命周期的要求,因为它不包含引用(而 T 包含的任何引用都是 'static )。现在,我们的代码可以编译了。

让我们创建一个类似的 handler 结构体,在响应中添加 Content-Type 头:

#[derive(Clone)]
struct JsonContentType<T> {
    inner_handler: T,
}

impl<T> Handler for JsonContentType<T>
where
    T: Handler + Clone + 'static,
{
    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: HttpRequest) -> Self::Future {
        let mut this = self.clone();

        Box::pin(async move {
            let mut response = this.inner_handler.call(request).await?;
            response.set_header("Content-Type", "application/json");
            Ok(response)
        })
    }
}

注意这与 Timeout 的模式非常相似。

接下来我们修改 Server::run 以接受我们新的 Handler 特性:

impl Server {
    async fn run<T>(self, mut handler: T) -> Result<(), Error>
    where
        T: Handler,
    {
        let listener = TcpListener::bind(self.addr).await?;

        loop {
            let mut connection = listener.accept().await?;
            let request = read_http_request(&mut connection).await?;

            // have to call `Handler::call` here
            match handler.call(request).await {
                Ok(response) => write_http_response(connection, response).await?,
                Err(error) => handle_error_somehow(error, connection),
            }
        }
    }
}

我们现在可以把我们的三个 handler 组合在一起:

JsonContentType {
    inner_handler: Timeout {
        inner_handler: RequestHandler,
        duration: Duration::from_secs(30),
    },
}

而如果我们给我们的类型添加一些 new 方法,它们就会变得更容易构成:

let handler = RequestHandler;
let handler = Timeout::new(handler, Duration::from_secs(30));
let handler = JsonContentType::new(handler);

// `handler` has type `JsonContentType<Timeout<RequestHandler>>`

server.run(handler).await

这样做效果很好! 我们现在可以在不修改 RequestHandler 实现的情况下为其增加额外的功能。理论上,我们可以把我们的 JsonContentTypeTimeout handler 放到一个crate中,然后在 crates.io 上作为库发布给其他人使用。

让handler更灵活

我们的小 Handler 特性工作得很好,但目前它只支持我们的 HttpRequestHttpResponse 类型。如果这些是泛型,那么用户可以使用他们想要的任何类型,那就更好了。

我们让请求成为特质的泛型参数,这样一个特定的服务就可以接受许多不同类型的请求。这允许定义可用于不同协议的处理程序,而不仅仅是HTTP。我们使响应成为关联类型,因为对于任何给定的请求类型,只能有一个(关联的)响应类型:相应的调用返回的类型

trait Handler<Request> {
    type Response;

    // Error should also be an associated type. No reason for that to be a
    // hardcoded type
    type Error;

    // Our future type from before, but now it's output must use
    // the associated `Response` and `Error` types
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    // `call` is unchanged, but note that `Request` here is our generic
    // `Request` type parameter and not the `HttpRequest` type we've used
    // until now
    fn call(&mut self, request: Request) -> Self::Future;
}

我们对 RequestHandler 的实现现在变成了:

impl Handler<HttpRequest> for RequestHandler {
    type Response = HttpResponse;
    type Error = Error;
    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Error>>>>;

    fn call(&mut self, request: Request) -> Self::Future {
        // same as before
    }
}

Timeout<T> 有点不同。由于它封装了其他 handler 并添加了一个异步超时,它实际上并不关心请求或响应类型是什么,只要它所封装的 handler 使用相同的类型。

Error 类型则有点不同。因为 tokio::time::timeout 返回 Result<T, tokio::time::error::Elapsed>,我们必须能够将tokio::time::error::Elapsed 转换成内部 handler 的错误类型。

如果我们把所有这些东西放在一起,我们得到:

// `Timeout` accepts any request of type `R` as long as `T`
// accepts the same type of request
impl<R, T> Handler<R> for Timeout<T>
where
    // The actual type of request must not contain
    // references. The compiler would tell us to add
    // this if we didn't
    R: 'static,
    // `T` must accept requests of type `R`
    T: Handler<R> + Clone + 'static,
    // We must be able to convert an `Elapsed` into
    // `T`'s error type
    T::Error: From<tokio::time::error::Elapsed>,
{
    // Our response type is the same as `T`'s, since we
    // don't have to modify it
    type Response = T::Response;

    // Error type is also the same
    type Error = T::Error;

    // Future must output a `Result` with the correct types
    type Future = Pin<Box<dyn Future<Output = Result<T::Response, T::Error>>>>;

    fn call(&mut self, request: R) -> Self::Future {
        let mut this = self.clone();

        Box::pin(async move {
            let result = tokio::time::timeout(
                this.duration,
                this.inner_handler.call(request),
            ).await;

            match result {
                Ok(Ok(response)) => Ok(response),
                Ok(Err(error)) => Err(error),
                Err(elapsed) => {
                    // Convert the error
                    Err(T::Error::from(elapsed))
                }
            }
        })
    }
}

JsonContentType 有点不同。它不关心请求或错误类型,但它关心响应类型。它必须是 Response,以便我们可以调用 set_header

因此,其实现是:

// Again a generic request type
impl<R, T> Handler<R> for JsonContentType<T>
where
    R: 'static,
    // `T` must accept requests of any type `R` and return
    // responses of type `HttpResponse`
    T: Handler<R, Response = HttpResponse> + Clone + 'static,
{
    type Response = HttpResponse;

    // Our error type is whatever `T`'s error type is
    type Error = T::Error;

    type Future = Pin<Box<dyn Future<Output = Result<Response, T::Error>>>>;

    fn call(&mut self, request: R) -> Self::Future {
        let mut this = self.clone();

        Box::pin(async move {
            let mut response = this.inner_handler.call(request).await?;
            response.set_header("Content-Type", "application/json");
            Ok(response)
        })
    }
}

最后,传递给 Server::runhandler 必须使用 HttpRequestHttpResponse

impl Server {
    async fn run<T>(self, mut handler: T) -> Result<(), Error>
    where
        T: Handler<HttpRequest, Response = HttpResponse>,
    {
        // ...
    }
}

创建服务器并没有改变:

let handler = RequestHandler;
let handler = Timeout::new(handler, Duration::from_secs(30));
let handler = JsonContentType::new(handler);

server.run(handler).await

因此,我们现在有了一个 Handler 特性,使我们有可能将我们的应用程序分解成独立的小部分,并重新使用它们。不错啊

“如果我告诉你…”

到目前为止,我们只讨论了服务器端的事情。但是,我们的 Handler 特性实际上也适合HTTP客户端。我们可以想象一个客户端 Handler,它接受一些请求并异步地将其发送给互联网上的某人。我们的 Timeout 包装器在这里也很有用。JsonContentType 可能不是,因为设置响应头不是客户端的工作。

由于我们的 Handler 特性对于定义服务器和客户端都很有用,Handler 可能不是一个合适的名字。客户端并不处理请求,它将请求发送给服务器,然后由服务器来处理它。让我们改称我们的trait为 Service

trait Service<Request> {
    type Response;
    type Error;
    type Future: Future<Output = Result<Self::Response, Self::Error>>;

    fn call(&mut self, request: Request) -> Self::Future;
}

这实际上几乎是Tower中定义的 Service 特性。如果你能跟到这里,你现在已经了解了 Tower 的大部分内容。除了 Service trait,Tower还提供了一些实用工具,通过包装一些同样实现 Service 的其他类型来实现 Service,就像我们对 TimeoutJsonContentType 所做的那样。这些服务的组成方式与我们到目前为止所做的类似。

一些由 Tower 提供的 Service 实例:

  • Timeout - 这与我们已经建立的超时基本相同。

  • Retry - 自动重试失败的请求。

  • RateLimit - 限制一个服务在一段时间内收到的请求数量。

TimeoutJsonContentType 这样的类型通常被称为中间件(middleware),因为它们包裹着另一个 Service 并以某种方式转换请求或响应。像 RequestHandler 这样的类型通常被称为叶子服务(leaf service),因为它们位于嵌套服务树的叶子上。实际的响应通常在叶子服务中产生,并由中间件修改。

唯一需要讨论的是 backpressurepoll_ready

背压

想象一下,你想写一个速率限制的中间件,包裹一个服务,并对底层服务将收到的最大并发请求数进行限制。如果你有一些服务对它能处理的负载量有一个硬性的上限,这将是非常有用的。

在我们目前的 Service 特性中,我们并没有一个好的方法来实现这样的东西。我们可以试试:

impl<R, T> Service<R> for ConcurrencyLimit<T> {
    fn call(&mut self, request: R) -> Self::Future {
        // 1. Check a counter for the number of requests currently being
        //    processed.
        // 2. If there is capacity left send the request to `T`
        //    and increment the counter.
        // 3. If not somehow wait until capacity becomes available.
        // 4. When the response has been produced, decrement the counter.
    }
}

内容出处: https://tokio.rs/blog/2021-05-14-inventing-the-service-trait