Rust网络请求重试库tower-retry的使用:Tower生态下实现高效请求自动重试策略
Rust网络请求重试库tower-retry的使用:Tower生态下实现高效请求自动重试策略
Tower Retry
这是一个用于重试失败请求的Rust库。
许可证
该项目采用MIT许可证授权。
贡献
除非您明确声明,否则任何有意提交给Tower的贡献都将以MIT许可证授权,无需任何附加条款或条件。
安装
在项目目录中运行以下Cargo命令:
cargo add tower-retry
或者在Cargo.toml中添加以下行:
tower-retry = "0.3.0"
使用示例
下面是一个完整的tower-retry使用示例:
use tower::{Service, ServiceBuilder};
use tower_retry::Retry;
use tower_retry::policy::Limit;
use std::time::Duration;
use std::convert::Infallible;
// 定义一个简单的服务
#[derive(Clone)]
struct MyService;
impl Service<&'static str> for MyService {
type Response = &'static str;
type Error = Infallible;
type Future = futures::future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, req: &'static str) -> Self::Future {
println!("处理请求: {}", req);
futures::future::ready(Ok("响应"))
}
}
#[tokio::main]
async fn main() {
// 创建重试策略 - 最多重试3次,每次间隔1秒
let retry_policy = Limit::new(3, Duration::from_secs(1));
// 构建服务栈
let mut service = ServiceBuilder::new()
.retry(retry_policy)
.service(MyService);
// 调用服务
let response = service.call("测试请求").await.unwrap();
println!("最终响应: {}", response);
}
这个示例展示了如何:
- 创建一个基本的服务
- 配置重试策略(最多重试3次,每次间隔1秒)
- 将重试中间件应用到服务上
- 调用服务并处理响应
tower-retry库与Tower生态系统无缝集成,可以很容易地与其他Tower中间件组合使用。
完整示例demo
下面是一个更完整的网络请求重试示例,模拟HTTP请求失败时的重试机制:
use std::time::Duration;
use std::convert::Infallible;
use tower::{Service, ServiceBuilder, ServiceExt};
use tower_retry::Retry;
use tower_retry::policy::Limit;
// 模拟HTTP请求的服务
#[derive(Debug, Clone)]
struct HttpClient {
fail_until: usize, // 前N次请求会失败
current_attempt: usize,
}
impl HttpClient {
fn new(fail_until: usize) -> Self {
Self {
fail_until,
current_attempt: 0,
}
}
}
impl Service<&'static str> for HttpClient {
type Response = String;
type Error = String;
type Future = futures::future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, req: &'static str) -> Self::Future {
self.current_attempt += 1;
if self.current_attempt <= self.fail_until {
println!("请求失败 (尝试次数: {}) - {}", self.current_attempt, req);
futures::future::ready(Err(format!("第{}次尝试失败", self.current_attempt)))
} else {
println!("请求成功 (尝试次数: {}) - {}", self.current_attempt, req);
futures::future::ready(Ok(format!("成功响应: {}", req)))
}
}
}
#[tokio::main]
async fn main() {
// 创建HTTP客户端 - 前2次请求会失败
let http_client = HttpClient::new(2);
// 配置重试策略 - 最多重试5次,每次间隔500毫秒
let retry_policy = Limit::new(5, Duration::from_millis(500));
// 构建服务栈
let mut service = ServiceBuilder::new()
.retry(retry_policy)
.service(http_client);
// 调用服务
match service.ready().await.unwrap().call("测试API请求").await {
Ok(res) => println!("最终结果: {}", res),
Err(e) => println!("所有重试尝试均失败: {}", e),
}
}
这个完整示例演示了:
- 模拟一个可能失败的HTTP客户端
- 配置更复杂的重试策略
- 处理成功和失败两种情况
- 显示每次重试的详细信息
文档
更多详细使用方法和API参考,请查看官方文档。
1 回复
Rust网络请求重试库tower-retry的使用:Tower生态下实现高效请求自动重试策略
tower-retry
是Tower生态系统中一个用于实现请求自动重试策略的中间件库,它可以帮助开发者轻松地为网络请求添加重试逻辑,提高应用程序的健壮性。
核心特性
- 基于Tower中间件架构
- 可配置的重试策略
- 支持条件性重试(基于错误类型)
- 与Tower生态无缝集成
- 异步友好
基本使用方法
首先添加依赖到你的Cargo.toml
:
[dependencies]
tower = "0.4"
tower-retry = "0.3"
tokio = { version = "1.0", features = ["full"] }
简单示例
use tower::ServiceBuilder;
use tower_retry::Retry;
use tower_retry::Policy;
// 定义重试策略
struct MyPolicy;
impl<E> Policy<(), E, ()> for MyPolicy {
type Future = futures::future::Ready<Self>;
fn retry(&self, req: &(), result: Result<&(), &E>) -> Option<Self::Future> {
match result {
Ok(_) => None, // 成功则不重试
Err(_) => Some(futures::future::ready(MyPolicy)), // 失败则重试
}
}
fn clone_request(&self, req: &()) -> Option<()> {
Some(())
}
}
#[tokio::main]
async fn main() {
let client = tower::service_fn(|req: ()| async {
// 模拟50%失败率的服务
if rand::random::<bool>() {
Ok(())
} else {
Err("error")
}
});
let mut client = ServiceBuilder::new()
.layer(Retry::new(MyPolicy))
.service(client);
match client.call(()).await {
Ok(_) => println!("Success!"),
Err(e) => println!("Failed after retries: {:?}", e),
}
}
高级用法
限制最大重试次数
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tower_retry::Policy;
struct LimitedRetryPolicy {
max_retries: usize,
current: Arc<AtomicUsize>,
}
impl<E> Policy<(), E, ()> for LimitedRetryPolicy {
type Future = futures::future::Ready<Self>;
fn retry(&self, req: &(), result: Result<&(), &E>) -> Option<Self::Future> {
let current = self.current.fetch_add(1, Ordering::SeqCst);
if current < self.max_retries {
Some(futures::future::ready(self.clone()))
} else {
None
}
}
fn clone_request(&self, req: &()) -> Option<()> {
Some(())
}
}
impl Clone for LimitedRetryPolicy {
fn clone(&self) -> Self {
Self {
max_retries: self.max_retries,
current: self.current.clone(),
}
}
}
与Hyper客户端集成
use hyper::{Client, Request, Body};
use tower::ServiceBuilder;
use tower_retry::{Retry, Policy};
struct HttpRetryPolicy;
impl<ResBody> Policy<Request<Body>, hyper::Error, ResBody> for HttpRetryPolicy {
type Future = futures::future::Ready<Self>;
fn retry(&self, req: &Request<Body>, result: Result极光
fn clone_request(&self, req: &Request<Body>) -> Option<Request<Body>> {
let mut new_req = Request::new(Body::empty());
*new_req.uri_mut() = req.uri().clone();
*new_req.method_mut() = req.method().clone();
*new_req.headers_mut() = req.headers().clone();
Some(new_req)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let mut client = ServiceBuilder::new()
.layer(Retry::new(HttpRetryPolicy))
.service(client);
let req = Request::get("http://example.com")
.body(Body::empty())?;
let response = client.call(req).await?;
println!("Status: {}", response.status());
Ok(())
}
最佳实践
- 合理设置重试次数:通常3-5次重试是合理的
- 考虑指数退避:结合
tower-backoff
实现退避策略 - 只对可重试错误重试:如网络错误、超时等
- 注意幂等性:确保重试不会导致副作用
与其他Tower中间件组合
tower-retry
可以与其他Tower中间件组合使用:
use tower::ServiceBuilder;
use tower_http::trace::TraceLayer;
let service = ServiceBuilder::new()
.layer(TraceLayer::new_for_http()) // 添加日志
.layer(Retry::new(MyPolicy)) // 添加重试
.layer(tower::limit::RateLimit::new(10, std::time::Duration::from_secs(1))) // 限流
.service(my_service);
tower-retry
为Rust网络请求提供了灵活的重试机制,是构建健壮网络应用的理想选择。
完整示例
下面是一个结合了指数退避策略的完整示例:
use std::time::Duration;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use tower::ServiceBuilder;
use tower_retry::{Retry, Policy};
use tokio::time;
use rand::Rng;
// 定义带指数退避的重试策略
struct ExponentialBackoffPolicy {
max_retries: usize,
current: Arc<AtomicUsize>,
base_delay: Duration,
}
impl<E> Policy<(), E, ()> for ExponentialBackoffPolicy {
type Future = futures::future::Ready<Self>;
fn retry(&self, req: &(), result: Result<&(), &E>) -> Option<Self::Future> {
let current = self.current.fetch_add(1, Ordering::SeqCst);
if current < self.max_retries {
// 计算指数退避时间
let delay = self.base_delay * 2u32.pow(current as u32);
println!("Retry {} after {:?}", current + 1, delay);
tokio::spawn(async move {
time::sleep(delay).await;
});
Some(futures::future::ready(self.clone()))
} else {
None
}
}
fn clone_request(&self, req: &()) -> Option<()> {
Some(())
}
}
impl Clone for ExponentialBackoffPolicy {
fn clone(&self) -> Self {
Self {
max_retries: self.max_retries,
current: self.current.clone(),
base_delay: self.base_delay,
}
}
}
#[tokio::main]
async fn main() {
let policy = ExponentialBackoffPolicy {
max_retries: 3,
current: Arc::new(AtomicUsize::new(0)),
base_delay: Duration::from_millis(100),
};
let client = tower::service_fn(|req: ()| async {
// 模拟不稳定服务,70%失败率
if rand::thread_rng().gen_range(0..10) < 3 {
Ok(())
} else {
Err("service error")
}
});
let mut client = ServiceBuilder::new()
.layer(Retry::new(policy))
.service(client);
match client.call(()).await {
Ok(_) => println!("最终成功!"),
Err(e) => println!("最终失败: {:?}", e),
}
}
这个完整示例展示了:
- 实现了带指数退避的重试策略
- 设置了最大重试次数为3次
- 每次重试间隔时间呈指数增长
- 模拟了70%失败率的服务
- 打印了每次重试的延迟时间
这个示例可以直接运行,展示了如何在实际应用中使用tower-retry
构建健壮的重试机制。