Rust异步超时处理库tower-timeout的使用:为Tower生态提供请求超时控制与可靠性保障
Tower Timeout
Tower Timeout 是为 Tower 中间件生态系统设计的一个异步超时处理库,主要用于为网络请求提供超时控制和可靠性保障。
功能
该库的核心功能是为请求应用超时控制,确保请求能在设定的固定时间期限内完成执行,避免长时间等待或无响应的情况。
许可证
本项目采用 MIT 开源许可证授权,允许自由使用、修改和分发。
贡献
项目欢迎社区贡献,根据默认规则,除非贡献者特别声明,否则所有提交到 Tower 项目的贡献都将被视为 MIT 许可,无需附加任何额外条款或条件。
安装
您可以通过以下两种方式将 tower-timeout 添加到您的 Rust 项目中:
- 使用 Cargo 命令安装:
cargo add tower-timeout
- 或者直接在 Cargo.toml 中添加依赖:
tower-timeout = "0.3.0"
示例代码
以下是内容中提供的基本使用示例:
use tower::{Service, ServiceBuilder, ServiceExt};
use tower_timeout::TimeoutLayer;
use std::time::Duration;
// 定义一个简单的服务
struct MyService;
impl Service<&'static str> for MyService {
type Response = &'static str;
type Error = tower::BoxError;
type Future = futures::future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, req: &'static str) -> Self::Future {
futures::future::ready(Ok(req))
}
}
#[tokio::main]
async fn main() {
// 创建服务并应用超时中间件
let mut service = ServiceBuilder::new()
.layer(TimeoutLayer::new(Duration::from_secs(1))) // 设置1秒超时
.service(MyService);
// 调用服务
let response = service.ready().await.unwrap().call("Hello").await.unwrap();
println!("Response: {}", response);
}
完整示例
以下是一个更完整的示例,展示了如何处理超时情况:
use tower::{Service, ServiceBuilder, ServiceExt};
use tower_timeout::TimeoutLayer;
use std::time::Duration;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::sleep;
// 定义一个模拟延迟的服务
struct DelayedService;
impl Service<&'static str> for DelayedService {
type Response = &'static str;
type Error = tower::BoxError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: &'static str) -> Self::Future {
Box::pin(async move {
// 模拟耗时操作(2秒)
sleep(Duration::from_secs(2)).await;
Ok(req)
})
}
}
#[tokio::main]
async fn main() {
// 创建带有500毫秒超时的服务
let mut service = ServiceBuilder::new()
.layer(TimeoutLayer::new(Duration::from_millis(500)))
.service(DelayedService);
match service.ready().await.unwrap().call("Hello").await {
Ok(response) => println!("Success: {}", response),
Err(e) => println!("Error: {}", e), // 这里会超时报错
}
}
分类
- 异步编程
- 网络编程
Rust异步超时处理库tower-timeout的使用指南
概述
tower-timeout
是Tower生态系统中一个专门用于处理请求超时的中间件库。它为基于Tower的服务提供了可靠的超时控制机制,能够有效防止因服务响应过慢而导致的系统资源耗尽问题。
主要特性
- 简单易用的超时控制接口
- 与Tower生态无缝集成
- 支持自定义超时时间
- 提供清晰的超时错误处理
安装方法
在Cargo.toml中添加依赖:
[dependencies]
tower-timeout = "0.1"
tower = "0.4"
tokio = { version = "1.0", features = ["full"] }
基本使用方法
1. 基本超时设置
use tower::ServiceBuilder;
use tower_timeout::TimeoutLayer;
use std::time::Duration;
// 假设有一个实现了tower::Service的服务
let service = MyService::new();
// 添加超时中间件,设置超时时间为500毫秒
let service = ServiceBuilder::new()
.layer(TimeoutLayer::new(Duration::from_millis(500)))
.service(service);
2. 处理HTTP请求超时
use tower::{Service, ServiceBuilder, ServiceExt};
use tower_http::ServiceBuilderExt;
use tower_timeout::TimeoutLayer;
use std::time::Duration;
use hyper::{Request, Response, Body};
async fn handle_request(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
// 业务处理逻辑
Ok(Response::new(Body::from("Hello World")))
}
#[tokio::main]
async fn main() {
let mut service = ServiceBuilder::new()
.layer(TimeoutLayer::new(Duration::from_secs(1)))
.service_fn(handle_request);
let request = Request::new(BBody::empty());
match service.ready().await?.call(request).await {
Ok(response) => println!("Got response"),
Err(e) if e.is::<tower_timeout::error::Elapsed>() => {
println!("Request timed out");
}
Err(e) => println!("Error: {}", e),
}
}
3. 自定义错误处理
use tower::ServiceBuilder;
use tower_timeout::TimeoutLayer;
use std::time::Duration;
use thiserror::Error;
#[derive(Error, Debug)]
enum MyError {
#[error("Request timed out")]
Timeout,
#[error("Service error: {0}")]
ServiceError(String),
}
async fn my_service(req: String) -> Result<String, MyError> {
// 模拟长时间运行的任务
tokio::time::sleep(Duration::from_secs(2)).await;
Ok("Response".to_string())
}
#[tokio::main]
async fn main() {
let service = ServiceBuilder::new()
.layer(TimeoutLayer::new(Duration::from_secs(1)))
.service_fn(|req| async {
my_service(req).await.map_err(|e| match e {
MyError::Timeout => MyError::Timeout,
e => MyError::ServiceError(e.to_string()),
})
});
match service.call("request".to_string()).await {
Ok(res) => println!("Success: {}", res),
Err(MyError::Timeout) => println!("Custom timeout handler"),
Err(e) => println!("Error: {}", e),
}
}
高级用法
1. 动态超时设置
use tower::ServiceBuilder;
use tower_timeout::TimeoutLayer;
use std::time::Duration;
fn get_timeout_duration(request: &str) -> Duration {
if request.starts_with("priority") {
Duration::from_secs(5)
} else {
Duration::from_secs(1)
}
}
#[tokio::main]
async fn main() {
let service = ServiceBuilder::new()
.layer(TimeoutLayer::new_with(|request: &String| {
get_timeout_duration(request)
}))
.service_fn(|req: String| async move {
// 处理请求
Ok::<_, std::convert::Infallible>(format!("Processed: {}", req))
});
let response = service.call("priority_request".to_string()).await.unwrap();
println!("{}", response);
}
2. 与其他Tower中间件组合使用
use tower::ServiceBuilder;
use tower_timeout::TimeoutLayer;
use tower_http::trace::TraceLayer;
use std::time::Duration;
#[tokio::main]
async fn main() {
let service = ServiceBuilder::new()
// 添加日志记录
.layer(TraceLayer::new_for_http())
// 添加超时控制
.layer(TimeoutLayer::new(Duration::from_secs(1)))
// 添加并发限制
.concurrency_limit(10)
.service_fn(|req: String| async move {
Ok::<_, std::convert::Infallible>(format!("Processed: {}", req))
});
let response = service.call("test".to_string()).await.unwrap();
println!("{}", response);
}
最佳实践
-
合理设置超时时间:根据服务特性设置适当的超时时间,API服务通常设置为1-5秒
-
分层超时:在微服务架构中,为不同层级设置递减的超时时间
-
监控与调整:监控超时发生率并根据实际情况调整超时设置
-
优雅降级:超时发生时提供有意义的错误响应或降级服务
-
与重试策略配合:结合tower-retry等中间件实现更健壮的服务
错误处理
tower-timeout
会在超时发生时返回tower_timeout::error::Elapsed
错误。你可以使用error.is::<tower_timeout::error::Elapsed>()
来检查是否是超时错误。
match service.call(request).await {
Ok(response) => handle_success(response),
Err(e) if e.is::<tower_timeout::error::Elapsed>() => {
log::warn!("Request timed out");
handle_timeout()
}
Err(e) => handle_other_errors(e),
}
完整示例
以下是一个结合HTTP服务和自定义错误处理的完整示例:
use std::time::Duration;
use tower::{ServiceBuilder, ServiceExt};
use tower_http::trace::TraceLayer;
use tower_timeout::TimeoutLayer;
use hyper::{Body, Request, Response, Server};
use hyper::service::{make_service_fn, service_fn};
use thiserror::Error;
#[derive(Error, Debug)]
enum AppError {
#[error("Request timeout")]
Timeout,
#[error("Internal server error")]
Internal,
}
async fn handle_request(_req: Request<Body>) -> Result<Response<Body>, AppError> {
// 模拟耗时操作
tokio::time::sleep(Duration::from_secs(2)).await;
Ok(Response::new(Body::from("Hello, World!")))
}
#[tokio::main]
async fn main() {
// 创建服务堆栈
let service = ServiceBuilder::new()
// 添加请求追踪
.layer(TraceLayer::new_for_http())
// 添加1秒超时
.layer(TimeoutLayer::new(Duration::from_secs(1)))
// 处理错误转换
.service_fn(|req| async {
handle_request(req).await.map_err(|e| match e {
AppError::Timeout => AppError::Timeout,
_ => AppError::Internal,
})
});
// 创建HTTP服务器
let make_svc = make_service_fn(|_conn| {
let service = service.clone();
async move {
Ok::<_, std::convert::Infallible>(service_fn(move |req| {
let service = service.clone();
async move {
match service.oneshot(req).await {
Ok(res) => Ok(res),
Err(AppError::Timeout) => {
Ok(Response::builder()
.status(504)
.body(Body::from("Gateway Timeout"))
.unwrap())
}
Err(_) => {
Ok(Response::builder()
.status(500)
.body(Body::from("Internal Server Error"))
.unwrap())
}
}
}
}))
}
});
let addr = ([127, 0, 0, 1], 3000).into();
let server = Server::bind(&addr).serve(make_svc);
println!("Server running on http://{}", addr);
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
这个完整示例展示了:
- 使用TimeoutLayer设置1秒超时
- 结合TraceLayer进行请求追踪
- 自定义错误处理逻辑
- 对超时请求返回504状态码
- 对其他错误返回500状态码