Rust断路器模式库recloser的使用,实现弹性容错和自动恢复的Rust微服务保护机制
Rust断路器模式库recloser的使用,实现弹性容错和自动恢复的Rust微服务保护机制
recloser是一个基于环形缓冲区实现的并发断路器模式库。Recloser
结构体提供了call(...)
方法来包装可能失败的函数调用,当达到某些failure_rate
时会主动拒绝调用,并在一定时间后重新允许调用。通过AsyncRecloser
包装器还提供了支持Future的异步版本call(...)
方法。
使用说明
Recloser
可以处于三种状态:
State::Closed(RingBuffer(len))
:初始状态。至少执行len
次调用后才会计算failure_rate
,根据该比率可能转换到State::Open(_)
状态。State::Open(duration)
:所有调用都会返回Err(Error::Rejected)
,直到duration
时间过去,然后转换到State::HalfOpen(_)
状态。State::HalfOpen(RingBuffer(len))
:至少执行len
次调用后才会计算failure_rate
,根据该比率可能转换到State::Closed(_)
或State::Open(_)
状态。
状态转换设置可以如下自定义:
use std::time::Duration;
use recloser::Recloser;
// 等同于Recloser::default()
let recloser = Recloser::custom()
.error_rate(0.5)
.closed_len(100)
.half_open_len(10)
.open_wait(Duration::from_secs(30))
.build();
包装危险函数调用以控制故障传播:
use recloser::{Recloser, Error};
// 执行1次调用后计算failure_rate
let recloser = Recloser::custom().closed_len(1).build();
let f1 = || Err::<(), usize>(1);
// 第一次调用,仅记录为错误
let res = recloser.call(f1);
assert!(matches!(res, Err(Error::Inner(1))));
// 现在也计算failure_rate,这里是100%
// 之后会转换到State::Open状态
let res = recloser.call(f1);
assert!(matches!(res, Err(Error::Inner(1)));
let f2 = || Err::<(), i64>(-1);
// 所有调用都被拒绝(处于State::Open状态)
let res = recloser.call(f2);
assert!(matches!(res, Err(Error::Rejected)));
还可以基于每次调用来丢弃某些错误。这种行为由ErrorPredicate<E>
trait控制,该trait已经为所有Fn(&E) -> bool
实现。
use recloser::{Recloser, Error};
let recloser = Recloser::default();
let f = || Err::<(), usize>(1);
// 自定义谓词,不将usize值视为错误
let p = |_: &usize| false;
// 不会将结果Err(1)记录为错误
let res = recloser.call_with(p, f);
assert!(matches!(res, Err(Error::Inner(1))));
包装返回Future
的函数需要使用AsyncRecloser
,它只是包装了一个常规的Recloser
。
use std::future;
use recloser::{Recloser, Error, AsyncRecloser};
let recloser = AsyncRecloser::from(Recloser::default());
let future = future::ready::<Result<(), usize>>(Err(1));
let future = recloser.call(future);
完整示例
下面是一个完整的微服务保护机制示例,展示了如何使用recloser实现弹性容错和自动恢复:
use std::time::Duration;
use recloser::{Recloser, Error};
use reqwest::Client;
// 微服务客户端
struct MicroserviceClient {
client: Client,
recloser: Recloser,
}
impl MicroserviceClient {
fn new() -> Self {
// 创建断路器配置:
// - 错误率超过50%时触发
// - 关闭状态下至少收集10个样本
// - 半开状态下至少收集5个样本
// - 打开状态下等待30秒
let recloser = Recloser::custom()
.error_rate(0.5)
.closed_len(10)
.half_open_len(5)
.open_wait(Duration::from_secs(30))
.build();
MicroserviceClient {
client: Client::new(),
recloser,
}
}
// 调用微服务API
async fn call_api(&self, endpoint: &str) -> Result<String, Error<reqwest::Error>> {
let client = self.client.clone();
let endpoint = endpoint.to_string();
// 使用断路器包装API调用
self.recloser.call(move || {
let fut = async {
let response = client.get(&endpoint).send().await?;
response.text().await
};
// 将Future转换为立即执行的Result
tokio::runtime::Runtime::new()
.unwrap()
.block_on(fut)
})
}
}
#[tokio::main]
async fn main() {
let client = MicroserviceClient::new();
// 模拟连续调用
for i in 0..20 {
match client.call_api("http://example.com/api").await {
Ok(response) => {
println!("Call {}: Success - {}", i, response);
}
Err(Error::Inner(e)) => {
println!("Call {}: Service error - {}", i, e);
}
Err(Error::Rejected)) => {
println!("Call {}: Request rejected by circuit breaker", i);
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
性能
Recloser
和failsafe::CircuitBreaker
的基准测试:
- 单线程工作负载:性能相同
- 多线程工作负载:
Recloser
性能好10倍
recloser_simple time: [355.17 us 358.67 us 362.52 us]
failsafe_simple time: [403.47 us 406.90 us 410.29 us]
recloser_concurrent time: [668.44 us 674.26 us 680.48 us]
failsafe_concurrent time: [11.523 ms 11.613 ms 11.694 ms]
这些基准测试是在Intel Core i7-6700HQ @ 8x 3.5GHz
CPU上运行的。
Rust断路器模式库recloser的使用
介绍
recloser是一个Rust实现的断路器(Circuit Breaker)模式库,用于构建具有弹性容错能力的微服务系统。断路器模式是微服务架构中重要的稳定性模式,可以在依赖服务出现问题时快速失败,防止级联故障,并在适当时候尝试恢复。
主要特性:
- 基于错误率和超时的断路器触发机制
- 自动恢复功能
- 可配置的断路器参数
- 异步/同步两种使用方式
- 轻量级且无外部依赖
完整示例代码
以下是结合HTTP请求的完整示例,包含同步和异步两种使用方式:
use recloser::{Recloser, RecloserBuilder};
use std::time::Duration;
use std::sync::Arc;
use lazy_static::lazy_static;
// 共享断路器实例
lazy_static! {
static ref HTTP_RECLOSER: Arc<Recloser> = Arc::new(
Recloser::custom()
.error_rate(0.3)
.min_requests(5)
.reset_timeout(Duration::from_secs(10))
.build()
);
}
// 同步HTTP请求示例
fn sync_http_request(url: &str) -> Result<String, Box<dyn std::error::Error>> {
let result = HTTP_RECLOSER.call(|| {
let response = reqwest::blocking::get(url)?;
if response.status().is_success() {
Ok(response.text()?)
} else {
Err("HTTP request failed".into())
}
});
result
}
// 异步HTTP请求示例
#[tokio::main]
async fn async_http_request(url: &str) -> Result<String, Box<dyn std::error::Error>> {
let result = HTTP_RECLOSER.call_async(|| async {
let response = reqwest::get(url).await?;
if response.status().is_success() {
Ok(response.text().await?)
} else {
Err("HTTP request failed".into())
}
}).await;
result
}
// 监控断路器状态
fn monitor_recloser(recloser: &Recloser) {
println!("当前断路器状态: {:?}", recloser.state());
println!("当前错误率: {:.2}%", recloser.error_rate() * 100.0);
println!("当前请求数: {}", recloser.requests());
}
fn main() {
// 同步调用示例
match sync_http_request("https://api.example.com/data") {
Ok(data) => println!("获取数据成功: {}", data),
Err(e) => println!("请求失败: {}", e),
}
// 异步调用示例
let async_result = async_http_request("https://api.example.com/data");
match async_result {
Ok(data) => println!("异步获取数据成功: {}", data),
Err(e) => println!("异步请求失败: {}", e),
}
// 监控断路器状态
monitor_recloser(&HTTP_RECLOSER);
}
代码说明
-
共享断路器实例:
- 使用
lazy_static
创建全局共享的断路器实例 - 配置错误率阈值为30%,最少需要5个请求才触发断路器
- 重置超时时间为10秒
- 使用
-
同步HTTP请求:
- 使用
call
方法包裹可能失败的HTTP请求 - 当请求失败时会自动计入断路器统计
- 当断路器打开时会直接返回错误而不执行请求
- 使用
-
异步HTTP请求:
- 使用
call_async
方法包裹异步HTTP请求 - 同样遵循断路器的状态规则
- 需要配合async/await语法使用
- 使用
-
状态监控:
- 可以随时检查断路器的当前状态
- 获取当前错误率和请求数统计
- 用于系统监控和告警
最佳实践建议
-
配置参数:
- 对于关键服务,建议设置较低的错误率阈值(如20-30%)
- 根据服务响应时间设置合理的重置超时
- 对于高频服务,可以增大滑动窗口大小
-
错误处理:
- 区分业务错误和系统错误,只有系统错误应触发断路器
- 记录断路器触发日志,便于问题排查
-
系统集成:
- 将断路器与监控系统集成
- 在断路器打开时提供降级方案
- 考虑结合重试机制,但要有最大重试次数限制
这个完整示例展示了如何在生产环境中使用recloser库保护HTTP请求,包含了同步和异步两种方式,以及状态监控功能。