Rust异步超时处理库tower-timeout的使用:为Tower生态提供请求超时控制与可靠性保障

Tower Timeout

Tower Timeout 是为 Tower 中间件生态系统设计的一个异步超时处理库,主要用于为网络请求提供超时控制和可靠性保障。

功能

该库的核心功能是为请求应用超时控制,确保请求能在设定的固定时间期限内完成执行,避免长时间等待或无响应的情况。

许可证

本项目采用 MIT 开源许可证授权,允许自由使用、修改和分发。

贡献

项目欢迎社区贡献,根据默认规则,除非贡献者特别声明,否则所有提交到 Tower 项目的贡献都将被视为 MIT 许可,无需附加任何额外条款或条件。

安装

您可以通过以下两种方式将 tower-timeout 添加到您的 Rust 项目中:

  1. 使用 Cargo 命令安装:
cargo add tower-timeout
  1. 或者直接在 Cargo.toml 中添加依赖:
tower-timeout = "0.3.0"

示例代码

以下是内容中提供的基本使用示例:

use tower::{Service, ServiceBuilder, ServiceExt};
use tower_timeout::TimeoutLayer;
use std::time::Duration;

// 定义一个简单的服务
struct MyService;

impl Service<&'static str> for MyService {
    type Response = &'static str;
    type Error = tower::BoxError;
    type Future = futures::future::Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
        std::task::Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: &'static str) -> Self::Future {
        futures::future::ready(Ok(req))
    }
}

#[tokio::main]
async fn main() {
    // 创建服务并应用超时中间件
    let mut service = ServiceBuilder::new()
        .layer(TimeoutLayer::new(Duration::from_secs(1))) // 设置1秒超时
        .service(MyService);

    // 调用服务
    let response = service.ready().await.unwrap().call("Hello").await.unwrap();
    println!("Response: {}", response);
}

完整示例

以下是一个更完整的示例,展示了如何处理超时情况:

use tower::{Service, ServiceBuilder, ServiceExt};
use tower_timeout::TimeoutLayer;
use std::time::Duration;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::sleep;

// 定义一个模拟延迟的服务
struct DelayedService;

impl Service<&'static str> for DelayedService {
    type Response = &'static str;
    type Error = tower::BoxError;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: &'static str) -> Self::Future {
        Box::pin(async move {
            // 模拟耗时操作(2秒)
            sleep(Duration::from_secs(2)).await;
            Ok(req)
        })
    }
}

#[tokio::main]
async fn main() {
    // 创建带有500毫秒超时的服务
    let mut service = ServiceBuilder::new()
        .layer(TimeoutLayer::new(Duration::from_millis(500)))
        .service(DelayedService);

    match service.ready().await.unwrap().call("Hello").await {
        Ok(response) => println!("Success: {}", response),
        Err(e) => println!("Error: {}", e), // 这里会超时报错
    }
}

分类

  • 异步编程
  • 网络编程

1 回复

Rust异步超时处理库tower-timeout的使用指南

概述

tower-timeout是Tower生态系统中一个专门用于处理请求超时的中间件库。它为基于Tower的服务提供了可靠的超时控制机制,能够有效防止因服务响应过慢而导致的系统资源耗尽问题。

主要特性

  • 简单易用的超时控制接口
  • 与Tower生态无缝集成
  • 支持自定义超时时间
  • 提供清晰的超时错误处理

安装方法

在Cargo.toml中添加依赖:

[dependencies]
tower-timeout = "0.1"
tower = "0.4"
tokio = { version = "1.0", features = ["full"] }

基本使用方法

1. 基本超时设置

use tower::ServiceBuilder;
use tower_timeout::TimeoutLayer;
use std::time::Duration;

// 假设有一个实现了tower::Service的服务
let service = MyService::new();

// 添加超时中间件,设置超时时间为500毫秒
let service = ServiceBuilder::new()
    .layer(TimeoutLayer::new(Duration::from_millis(500)))
    .service(service);

2. 处理HTTP请求超时

use tower::{Service, ServiceBuilder, ServiceExt};
use tower_http::ServiceBuilderExt;
use tower_timeout::TimeoutLayer;
use std::time::Duration;
use hyper::{Request, Response, Body};

async fn handle_request(req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
    // 业务处理逻辑
    Ok(Response::new(Body::from("Hello World")))
}

#[tokio::main]
async fn main() {
    let mut service = ServiceBuilder::new()
        .layer(TimeoutLayer::new(Duration::from_secs(1)))
        .service_fn(handle_request);
    
    let request = Request::new(BBody::empty());
    match service.ready().await?.call(request).await {
        Ok(response) => println!("Got response"),
        Err(e) if e.is::<tower_timeout::error::Elapsed>() => {
            println!("Request timed out");
        }
        Err(e) => println!("Error: {}", e),
    }
}

3. 自定义错误处理

use tower::ServiceBuilder;
use tower_timeout::TimeoutLayer;
use std::time::Duration;
use thiserror::Error;

#[derive(Error, Debug)]
enum MyError {
    #[error("Request timed out")]
    Timeout,
    #[error("Service error: {0}")]
    ServiceError(String),
}

async fn my_service(req: String) -> Result<String, MyError> {
    // 模拟长时间运行的任务
    tokio::time::sleep(Duration::from_secs(2)).await;
    Ok("Response".to_string())
}

#[tokio::main]
async fn main() {
    let service = ServiceBuilder::new()
        .layer(TimeoutLayer::new(Duration::from_secs(1)))
        .service_fn(|req| async {
            my_service(req).await.map_err(|e| match e {
                MyError::Timeout => MyError::Timeout,
                e => MyError::ServiceError(e.to_string()),
            })
        });
    
    match service.call("request".to_string()).await {
        Ok(res) => println!("Success: {}", res),
        Err(MyError::Timeout) => println!("Custom timeout handler"),
        Err(e) => println!("Error: {}", e),
    }
}

高级用法

1. 动态超时设置

use tower::ServiceBuilder;
use tower_timeout::TimeoutLayer;
use std::time::Duration;

fn get_timeout_duration(request: &str) -> Duration {
    if request.starts_with("priority") {
        Duration::from_secs(5)
    } else {
        Duration::from_secs(1)
    }
}

#[tokio::main]
async fn main() {
    let service = ServiceBuilder::new()
        .layer(TimeoutLayer::new_with(|request: &String| {
            get_timeout_duration(request)
        }))
        .service_fn(|req: String| async move {
            // 处理请求
            Ok::<_, std::convert::Infallible>(format!("Processed: {}", req))
        });
    
    let response = service.call("priority_request".to_string()).await.unwrap();
    println!("{}", response);
}

2. 与其他Tower中间件组合使用

use tower::ServiceBuilder;
use tower_timeout::TimeoutLayer;
use tower_http::trace::TraceLayer;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let service = ServiceBuilder::new()
        // 添加日志记录
        .layer(TraceLayer::new_for_http())
        // 添加超时控制
        .layer(TimeoutLayer::new(Duration::from_secs(1)))
        // 添加并发限制
        .concurrency_limit(10)
        .service_fn(|req: String| async move {
            Ok::<_, std::convert::Infallible>(format!("Processed: {}", req))
        });
    
    let response = service.call("test".to_string()).await.unwrap();
    println!("{}", response);
}

最佳实践

  1. 合理设置超时时间:根据服务特性设置适当的超时时间,API服务通常设置为1-5秒

  2. 分层超时:在微服务架构中,为不同层级设置递减的超时时间

  3. 监控与调整:监控超时发生率并根据实际情况调整超时设置

  4. 优雅降级:超时发生时提供有意义的错误响应或降级服务

  5. 与重试策略配合:结合tower-retry等中间件实现更健壮的服务

错误处理

tower-timeout会在超时发生时返回tower_timeout::error::Elapsed错误。你可以使用error.is::<tower_timeout::error::Elapsed>()来检查是否是超时错误。

match service.call(request).await {
    Ok(response) => handle_success(response),
    Err(e) if e.is::<tower_timeout::error::Elapsed>() => {
        log::warn!("Request timed out");
        handle_timeout()
    }
    Err(e) => handle_other_errors(e),
}

完整示例

以下是一个结合HTTP服务和自定义错误处理的完整示例:

use std::time::Duration;
use tower::{ServiceBuilder, ServiceExt};
use tower_http::trace::TraceLayer;
use tower_timeout::TimeoutLayer;
use hyper::{Body, Request, Response, Server};
use hyper::service::{make_service_fn, service_fn};
use thiserror::Error;

#[derive(Error, Debug)]
enum AppError {
    #[error("Request timeout")]
    Timeout,
    #[error("Internal server error")]
    Internal,
}

async fn handle_request(_req: Request<Body>) -> Result<Response<Body>, AppError> {
    // 模拟耗时操作
    tokio::time::sleep(Duration::from_secs(2)).await;
    Ok(Response::new(Body::from("Hello, World!")))
}

#[tokio::main]
async fn main() {
    // 创建服务堆栈
    let service = ServiceBuilder::new()
        // 添加请求追踪
        .layer(TraceLayer::new_for_http())
        // 添加1秒超时
        .layer(TimeoutLayer::new(Duration::from_secs(1)))
        // 处理错误转换
        .service_fn(|req| async {
            handle_request(req).await.map_err(|e| match e {
                AppError::Timeout => AppError::Timeout,
                _ => AppError::Internal,
            })
        });

    // 创建HTTP服务器
    let make_svc = make_service_fn(|_conn| {
        let service = service.clone();
        async move {
            Ok::<_, std::convert::Infallible>(service_fn(move |req| {
                let service = service.clone();
                async move {
                    match service.oneshot(req).await {
                        Ok(res) => Ok(res),
                        Err(AppError::Timeout) => {
                            Ok(Response::builder()
                                .status(504)
                                .body(Body::from("Gateway Timeout"))
                                .unwrap())
                        }
                        Err(_) => {
                            Ok(Response::builder()
                                .status(500)
                                .body(Body::from("Internal Server Error"))
                                .unwrap())
                        }
                    }
                }
            }))
        }
    });

    let addr = ([127, 0, 0, 1], 3000).into();
    let server = Server::bind(&addr).serve(make_svc);

    println!("Server running on http://{}", addr);
    if let Err(e) = server.await {
        eprintln!("server error: {}", e);
    }
}

这个完整示例展示了:

  1. 使用TimeoutLayer设置1秒超时
  2. 结合TraceLayer进行请求追踪
  3. 自定义错误处理逻辑
  4. 对超时请求返回504状态码
  5. 对其他错误返回500状态码
回到顶部