Rust网络请求重试库tower-retry的使用:Tower生态下实现高效请求自动重试策略

Rust网络请求重试库tower-retry的使用:Tower生态下实现高效请求自动重试策略

Tower Retry

这是一个用于重试失败请求的Rust库。

许可证

该项目采用MIT许可证授权。

贡献

除非您明确声明,否则任何有意提交给Tower的贡献都将以MIT许可证授权,无需任何附加条款或条件。

安装

在项目目录中运行以下Cargo命令:

cargo add tower-retry

或者在Cargo.toml中添加以下行:

tower-retry = "0.3.0"

使用示例

下面是一个完整的tower-retry使用示例:

use tower::{Service, ServiceBuilder};
use tower_retry::Retry;
use tower_retry::policy::Limit;
use std::time::Duration;
use std::convert::Infallible;

// 定义一个简单的服务
#[derive(Clone)]
struct MyService;

impl Service<&'static str> for MyService {
    type Response = &'static str;
    type Error = Infallible;
    type Future = futures::future::Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
        std::task::Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: &'static str) -> Self::Future {
        println!("处理请求: {}", req);
        futures::future::ready(Ok("响应"))
    }
}

#[tokio::main]
async fn main() {
    // 创建重试策略 - 最多重试3次,每次间隔1秒
    let retry_policy = Limit::new(3, Duration::from_secs(1));
    
    // 构建服务栈
    let mut service = ServiceBuilder::new()
        .retry(retry_policy)
        .service(MyService);
    
    // 调用服务
    let response = service.call("测试请求").await.unwrap();
    println!("最终响应: {}", response);
}

这个示例展示了如何:

  1. 创建一个基本的服务
  2. 配置重试策略(最多重试3次,每次间隔1秒)
  3. 将重试中间件应用到服务上
  4. 调用服务并处理响应

tower-retry库与Tower生态系统无缝集成,可以很容易地与其他Tower中间件组合使用。

完整示例demo

下面是一个更完整的网络请求重试示例,模拟HTTP请求失败时的重试机制:

use std::time::Duration;
use std::convert::Infallible;
use tower::{Service, ServiceBuilder, ServiceExt};
use tower_retry::Retry;
use tower_retry::policy::Limit;

// 模拟HTTP请求的服务
#[derive(Debug, Clone)]
struct HttpClient {
    fail_until: usize,  // 前N次请求会失败
    current_attempt: usize,
}

impl HttpClient {
    fn new(fail_until: usize) -> Self {
        Self {
            fail_until,
            current_attempt: 0,
        }
    }
}

impl Service<&'static str> for HttpClient {
    type Response = String;
    type Error = String;
    type Future = futures::future::Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
        std::task::Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: &'static str) -> Self::Future {
        self.current_attempt += 1;
        
        if self.current_attempt <= self.fail_until {
            println!("请求失败 (尝试次数: {}) - {}", self.current_attempt, req);
            futures::future::ready(Err(format!("第{}次尝试失败", self.current_attempt)))
        } else {
            println!("请求成功 (尝试次数: {}) - {}", self.current_attempt, req);
            futures::future::ready(Ok(format!("成功响应: {}", req)))
        }
    }
}

#[tokio::main]
async fn main() {
    // 创建HTTP客户端 - 前2次请求会失败
    let http_client = HttpClient::new(2);
    
    // 配置重试策略 - 最多重试5次,每次间隔500毫秒
    let retry_policy = Limit::new(5, Duration::from_millis(500));
    
    // 构建服务栈
    let mut service = ServiceBuilder::new()
        .retry(retry_policy)
        .service(http_client);
    
    // 调用服务
    match service.ready().await.unwrap().call("测试API请求").await {
        Ok(res) => println!("最终结果: {}", res),
        Err(e) => println!("所有重试尝试均失败: {}", e),
    }
}

这个完整示例演示了:

  1. 模拟一个可能失败的HTTP客户端
  2. 配置更复杂的重试策略
  3. 处理成功和失败两种情况
  4. 显示每次重试的详细信息

文档

更多详细使用方法和API参考,请查看官方文档。


1 回复

Rust网络请求重试库tower-retry的使用:Tower生态下实现高效请求自动重试策略

tower-retry是Tower生态系统中一个用于实现请求自动重试策略的中间件库,它可以帮助开发者轻松地为网络请求添加重试逻辑,提高应用程序的健壮性。

核心特性

  • 基于Tower中间件架构
  • 可配置的重试策略
  • 支持条件性重试(基于错误类型)
  • 与Tower生态无缝集成
  • 异步友好

基本使用方法

首先添加依赖到你的Cargo.toml

[dependencies]
tower = "0.4"
tower-retry = "0.3"
tokio = { version = "1.0", features = ["full"] }

简单示例

use tower::ServiceBuilder;
use tower_retry::Retry;
use tower_retry::Policy;

// 定义重试策略
struct MyPolicy;

impl<E> Policy<(), E, ()> for MyPolicy {
    type Future = futures::future::Ready<Self>;
    
    fn retry(&self, req: &(), result: Result<&(), &E>) -> Option<Self::Future> {
        match result {
            Ok(_) => None, // 成功则不重试
            Err(_) => Some(futures::future::ready(MyPolicy)), // 失败则重试
        }
    }
    
    fn clone_request(&self, req: &()) -> Option<()> {
        Some(())
    }
}

#[tokio::main]
async fn main() {
    let client = tower::service_fn(|req: ()| async {
        // 模拟50%失败率的服务
        if rand::random::<bool>() {
            Ok(())
        } else {
            Err("error")
        }
    });
    
    let mut client = ServiceBuilder::new()
        .layer(Retry::new(MyPolicy))
        .service(client);
    
    match client.call(()).await {
        Ok(_) => println!("Success!"),
        Err(e) => println!("Failed after retries: {:?}", e),
    }
}

高级用法

限制最大重试次数

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tower_retry::Policy;

struct LimitedRetryPolicy {
    max_retries: usize,
    current: Arc<AtomicUsize>,
}

impl<E> Policy<(), E, ()> for LimitedRetryPolicy {
    type Future = futures::future::Ready<Self>;
    
    fn retry(&self, req: &(), result: Result<&(), &E>) -> Option<Self::Future> {
        let current = self.current.fetch_add(1, Ordering::SeqCst);
        if current < self.max_retries {
            Some(futures::future::ready(self.clone()))
        } else {
            None
        }
    }
    
    fn clone_request(&self, req: &()) -> Option<()> {
        Some(())
    }
}

impl Clone for LimitedRetryPolicy {
    fn clone(&self) -> Self {
        Self {
            max_retries: self.max_retries,
            current: self.current.clone(),
        }
    }
}

与Hyper客户端集成

use hyper::{Client, Request, Body};
use tower::ServiceBuilder;
use tower_retry::{Retry, Policy};

struct HttpRetryPolicy;

impl<ResBody> Policy<Request<Body>, hyper::Error, ResBody> for HttpRetryPolicy {
    type Future = futures::future::Ready<Self>;
    
    fn retry(&self, req: &Request<Body>, result: Result极光

    fn clone_request(&self, req: &Request<Body>) -> Option<Request<Body>> {
        let mut new_req = Request::new(Body::empty());
        *new_req.uri_mut() = req.uri().clone();
        *new_req.method_mut() = req.method().clone();
        *new_req.headers_mut() = req.headers().clone();
        Some(new_req)
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = Client::new();
    
    let mut client = ServiceBuilder::new()
        .layer(Retry::new(HttpRetryPolicy))
        .service(client);
    
    let req = Request::get("http://example.com")
        .body(Body::empty())?;
    
    let response = client.call(req).await?;
    println!("Status: {}", response.status());
    
    Ok(())
}

最佳实践

  1. 合理设置重试次数:通常3-5次重试是合理的
  2. 考虑指数退避:结合tower-backoff实现退避策略
  3. 只对可重试错误重试:如网络错误、超时等
  4. 注意幂等性:确保重试不会导致副作用

与其他Tower中间件组合

tower-retry可以与其他Tower中间件组合使用:

use tower::ServiceBuilder;
use tower_http::trace::TraceLayer;

let service = ServiceBuilder::new()
    .layer(TraceLayer::new_for_http())  // 添加日志
    .layer(Retry::new(MyPolicy))       // 添加重试
    .layer(tower::limit::RateLimit::new(10, std::time::Duration::from_secs(1))) // 限流
    .service(my_service);

tower-retry为Rust网络请求提供了灵活的重试机制,是构建健壮网络应用的理想选择。

完整示例

下面是一个结合了指数退避策略的完整示例:

use std::time::Duration;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tower::ServiceBuilder;
use tower_retry::{Retry, Policy};
use tokio::time;
use rand::Rng;

// 定义带指数退避的重试策略
struct ExponentialBackoffPolicy {
    max_retries: usize,
    current: Arc<AtomicUsize>,
    base_delay: Duration,
}

impl<E> Policy<(), E, ()> for ExponentialBackoffPolicy {
    type Future = futures::future::Ready<Self>;
    
    fn retry(&self, req: &(), result: Result<&(), &E>) -> Option<Self::Future> {
        let current = self.current.fetch_add(1, Ordering::SeqCst);
        if current < self.max_retries {
            // 计算指数退避时间
            let delay = self.base_delay * 2u32.pow(current as u32);
            println!("Retry {} after {:?}", current + 1, delay);
            tokio::spawn(async move {
                time::sleep(delay).await;
            });
            Some(futures::future::ready(self.clone()))
        } else {
            None
        }
    }
    
    fn clone_request(&self, req: &()) -> Option<()> {
        Some(())
    }
}

impl Clone for ExponentialBackoffPolicy {
    fn clone(&self) -> Self {
        Self {
            max_retries: self.max_retries,
            current: self.current.clone(),
            base_delay: self.base_delay,
        }
    }
}

#[tokio::main]
async fn main() {
    let policy = ExponentialBackoffPolicy {
        max_retries: 3,
        current: Arc::new(AtomicUsize::new(0)),
        base_delay: Duration::from_millis(100),
    };

    let client = tower::service_fn(|req: ()| async {
        // 模拟不稳定服务,70%失败率
        if rand::thread_rng().gen_range(0..10) < 3 {
            Ok(())
        } else {
            Err("service error")
        }
    });
    
    let mut client = ServiceBuilder::new()
        .layer(Retry::new(policy))
        .service(client);
    
    match client.call(()).await {
        Ok(_) => println!("最终成功!"),
        Err(e) => println!("最终失败: {:?}", e),
    }
}

这个完整示例展示了:

  1. 实现了带指数退避的重试策略
  2. 设置了最大重试次数为3次
  3. 每次重试间隔时间呈指数增长
  4. 模拟了70%失败率的服务
  5. 打印了每次重试的延迟时间

这个示例可以直接运行,展示了如何在实际应用中使用tower-retry构建健壮的重试机制。

回到顶部