Rust断路器模式库recloser的使用,实现弹性容错和自动恢复的Rust微服务保护机制

Rust断路器模式库recloser的使用,实现弹性容错和自动恢复的Rust微服务保护机制

recloser是一个基于环形缓冲区实现的并发断路器模式库。Recloser结构体提供了call(...)方法来包装可能失败的函数调用,当达到某些failure_rate时会主动拒绝调用,并在一定时间后重新允许调用。通过AsyncRecloser包装器还提供了支持Future的异步版本call(...)方法。

使用说明

Recloser可以处于三种状态:

  • State::Closed(RingBuffer(len)):初始状态。至少执行len次调用后才会计算failure_rate,根据该比率可能转换到State::Open(_)状态。
  • State::Open(duration):所有调用都会返回Err(Error::Rejected),直到duration时间过去,然后转换到State::HalfOpen(_)状态。
  • State::HalfOpen(RingBuffer(len)):至少执行len次调用后才会计算failure_rate,根据该比率可能转换到State::Closed(_)State::Open(_)状态。

状态转换设置可以如下自定义:

use std::time::Duration;
use recloser::Recloser;

// 等同于Recloser::default()
let recloser = Recloser::custom()
   .error_rate(0.5)
   .closed_len(100)
   .half_open_len(10)
   .open_wait(Duration::from_secs(30))
   .build();

包装危险函数调用以控制故障传播:

use recloser::{Recloser, Error};

// 执行1次调用后计算failure_rate
let recloser = Recloser::custom().closed_len(1).build();

let f1 = || Err::<(), usize>(1);

// 第一次调用,仅记录为错误
let res = recloser.call(f1);
assert!(matches!(res, Err(Error::Inner(1))));

// 现在也计算failure_rate,这里是100%
// 之后会转换到State::Open状态
let res = recloser.call(f1);
assert!(matches!(res, Err(Error::Inner(1)));

let f2 = || Err::<(), i64>(-1);

// 所有调用都被拒绝(处于State::Open状态)
let res = recloser.call(f2);
assert!(matches!(res, Err(Error::Rejected)));

还可以基于每次调用来丢弃某些错误。这种行为由ErrorPredicate<E> trait控制,该trait已经为所有Fn(&E) -> bool实现。

use recloser::{Recloser, Error};

let recloser = Recloser::default();

let f = || Err::<(), usize>(1);

// 自定义谓词,不将usize值视为错误
let p = |_: &usize| false;

// 不会将结果Err(1)记录为错误
let res = recloser.call_with(p, f);
assert!(matches!(res, Err(Error::Inner(1))));

包装返回Future的函数需要使用AsyncRecloser,它只是包装了一个常规的Recloser

use std::future;
use recloser::{Recloser, Error, AsyncRecloser};

let recloser = AsyncRecloser::from(Recloser::default());

let future = future::ready::<Result<(), usize>>(Err(1));
let future = recloser.call(future);

完整示例

下面是一个完整的微服务保护机制示例,展示了如何使用recloser实现弹性容错和自动恢复:

use std::time::Duration;
use recloser::{Recloser, Error};
use reqwest::Client;

// 微服务客户端
struct MicroserviceClient {
    client: Client,
    recloser: Recloser,
}

impl MicroserviceClient {
    fn new() -> Self {
        // 创建断路器配置:
        // - 错误率超过50%时触发
        // - 关闭状态下至少收集10个样本
        // - 半开状态下至少收集5个样本 
        // - 打开状态下等待30秒
        let recloser = Recloser::custom()
            .error_rate(0.5)
            .closed_len(10)
            .half_open_len(5)
            .open_wait(Duration::from_secs(30))
            .build();
            
        MicroserviceClient {
            client: Client::new(),
            recloser,
        }
    }

    // 调用微服务API
    async fn call_api(&self, endpoint: &str) -> Result<String, Error<reqwest::Error>> {
        let client = self.client.clone();
        let endpoint = endpoint.to_string();
        
        // 使用断路器包装API调用
        self.recloser.call(move || {
            let fut = async {
                let response = client.get(&endpoint).send().await?;
                response.text().await
            };
            
            // 将Future转换为立即执行的Result
            tokio::runtime::Runtime::new()
                .unwrap()
                .block_on(fut)
        })
    }
}

#[tokio::main]
async fn main() {
    let client = MicroserviceClient::new();
    
    // 模拟连续调用
    for i in 0..20 {
        match client.call_api("http://example.com/api").await {
            Ok(response) => {
                println!("Call {}: Success - {}", i, response);
            }
            Err(Error::Inner(e)) => {
                println!("Call {}: Service error - {}", i, e);
            }
            Err(Error::Rejected)) => {
                println!("Call {}: Request rejected by circuit breaker", i);
            }
        }
        
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

性能

Recloserfailsafe::CircuitBreaker的基准测试:

  • 单线程工作负载:性能相同
  • 多线程工作负载:Recloser性能好10倍
recloser_simple         time:   [355.17 us 358.67 us 362.52 us]
failsafe_simple         time:   [403.47 us 406.90 us 410.29 us]
recloser_concurrent     time:   [668.44 us 674.26 us 680.48 us]
failsafe_concurrent     time:   [11.523 ms 11.613 ms 11.694 ms]

这些基准测试是在Intel Core i7-6700HQ @ 8x 3.5GHz CPU上运行的。


1 回复

Rust断路器模式库recloser的使用

介绍

recloser是一个Rust实现的断路器(Circuit Breaker)模式库,用于构建具有弹性容错能力的微服务系统。断路器模式是微服务架构中重要的稳定性模式,可以在依赖服务出现问题时快速失败,防止级联故障,并在适当时候尝试恢复。

主要特性:

  • 基于错误率和超时的断路器触发机制
  • 自动恢复功能
  • 可配置的断路器参数
  • 异步/同步两种使用方式
  • 轻量级且无外部依赖

完整示例代码

以下是结合HTTP请求的完整示例,包含同步和异步两种使用方式:

use recloser::{Recloser, RecloserBuilder};
use std::time::Duration;
use std::sync::Arc;
use lazy_static::lazy_static;

// 共享断路器实例
lazy_static! {
    static ref HTTP_RECLOSER: Arc<Recloser> = Arc::new(
        Recloser::custom()
            .error_rate(0.3)
            .min_requests(5)
            .reset_timeout(Duration::from_secs(10))
            .build()
    );
}

// 同步HTTP请求示例
fn sync_http_request(url: &str) -> Result<String, Box<dyn std::error::Error>> {
    let result = HTTP_RECLOSER.call(|| {
        let response = reqwest::blocking::get(url)?;
        if response.status().is_success() {
            Ok(response.text()?)
        } else {
            Err("HTTP request failed".into())
        }
    });
    
    result
}

// 异步HTTP请求示例
#[tokio::main]
async fn async_http_request(url: &str) -> Result<String, Box<dyn std::error::Error>> {
    let result = HTTP_RECLOSER.call_async(|| async {
        let response = reqwest::get(url).await?;
        if response.status().is_success() {
            Ok(response.text().await?)
        } else {
            Err("HTTP request failed".into())
        }
    }).await;
    
    result
}

// 监控断路器状态
fn monitor_recloser(recloser: &Recloser) {
    println!("当前断路器状态: {:?}", recloser.state());
    println!("当前错误率: {:.2}%", recloser.error_rate() * 100.0);
    println!("当前请求数: {}", recloser.requests());
}

fn main() {
    // 同步调用示例
    match sync_http_request("https://api.example.com/data") {
        Ok(data) => println!("获取数据成功: {}", data),
        Err(e) => println!("请求失败: {}", e),
    }
    
    // 异步调用示例
    let async_result = async_http_request("https://api.example.com/data");
    match async_result {
        Ok(data) => println!("异步获取数据成功: {}", data),
        Err(e) => println!("异步请求失败: {}", e),
    }
    
    // 监控断路器状态
    monitor_recloser(&HTTP_RECLOSER);
}

代码说明

  1. 共享断路器实例

    • 使用lazy_static创建全局共享的断路器实例
    • 配置错误率阈值为30%,最少需要5个请求才触发断路器
    • 重置超时时间为10秒
  2. 同步HTTP请求

    • 使用call方法包裹可能失败的HTTP请求
    • 当请求失败时会自动计入断路器统计
    • 当断路器打开时会直接返回错误而不执行请求
  3. 异步HTTP请求

    • 使用call_async方法包裹异步HTTP请求
    • 同样遵循断路器的状态规则
    • 需要配合async/await语法使用
  4. 状态监控

    • 可以随时检查断路器的当前状态
    • 获取当前错误率和请求数统计
    • 用于系统监控和告警

最佳实践建议

  1. 配置参数

    • 对于关键服务,建议设置较低的错误率阈值(如20-30%)
    • 根据服务响应时间设置合理的重置超时
    • 对于高频服务,可以增大滑动窗口大小
  2. 错误处理

    • 区分业务错误和系统错误,只有系统错误应触发断路器
    • 记录断路器触发日志,便于问题排查
  3. 系统集成

    • 将断路器与监控系统集成
    • 在断路器打开时提供降级方案
    • 考虑结合重试机制,但要有最大重试次数限制

这个完整示例展示了如何在生产环境中使用recloser库保护HTTP请求,包含了同步和异步两种方式,以及状态监控功能。

回到顶部