Rust限流中间件tower-limit的使用,tower-limit提供高效请求速率限制和并发控制功能

Tower Rate Limit

限制对Service的最大请求速率。

许可证

本项目采用MIT许可证。

贡献

除非您明确声明,否则任何有意提交给Tower的贡献都应被视为MIT许可,无需任何附加条款或条件。

安装

在项目目录中运行以下Cargo命令:

cargo add tower-limit

或在Cargo.toml中添加以下行:

tower-limit = "0.3.1"

使用示例

以下是一个使用tower-limit进行请求速率限制和并发控制的完整示例:

use std::convert::Infallible;
use std::time::Duration;
use tower::{Service, ServiceBuilder, ServiceExt};
use tower_limit::{ConcurrencyLimitLayer, RateLimitLayer};

// 定义一个简单的服务
#[derive(Debug)]
struct MyService;

impl Service<&'static str> for MyService {
    type Response = &'static str;
    type Error = Infallible;
    type Future = futures::future::Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
        std::task::Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: &'static str) -> Self::Future {
        println!("处理请求: {}", req);
        futures::future::ready(Ok(req))
    }
}

#[tokio::main]
async fn main() {
    // 创建服务并应用限流中间件
    // 限制每秒最多2个请求
    // 限制最大并发数为1
    let mut service = ServiceBuilder::new()
        .layer(RateLimitLayer::new(2, Duration::from_secs(1)))
        .layer(ConcurrencyLimitLayer::new(1))
        .service(MyService);

    // 模拟多个请求
    let requests = vec!["请求1", "请求2", "请求3", "请求4"];
    
    for req in requests {
        match service.ready().await {
            Ok(svc) => {
                if let Ok(resp) = svc.call(req).await {
                    println!("收到响应: {}", resp);
                }
            }
            Err(e) => eprintln!("服务不可用: {:?}", e),
        }
    }
}

这个示例展示了如何:

  1. 创建一个简单的服务
  2. 使用RateLimitLayer限制每秒请求数(2个请求/秒)
  3. 使用ConcurrencyLimitLayer限制最大并发数(1个)
  4. 处理多个请求并观察限流效果

当请求速率超过限制时,服务会返回不可用状态(通过ready()方法)。

完整示例代码

use std::convert::Infallible;
use std::time::Duration;
use tokio::time;
use tower::{Service, ServiceBuilder, ServiceExt};
use tower_limit::{ConcurrencyLimitLayer, RateLimitLayer};

// 定义一个简单的服务结构体
#[derive(Debug)]
struct EchoService;

// 为服务实现Service trait
impl Service<&'static str> for EchoService {
    type Response = &'static str;
    type Error = Infallible;
    type Future = futures::future::Ready<Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
        std::task::Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: &'static str) -> Self::Future {
        println!("[处理中] 请求: {}", req);
        futures::future::ready(Ok(req))
    }
}

#[tokio::main]
async fn main() {
    // 创建服务构建器并添加限流中间件
    let mut service = ServiceBuilder::new()
        // 添加速率限制层:每秒最多处理3个请求
        .layer(RateLimitLayer::new(3, Duration::from_secs(1)))
        // 添加并发限制层:最多同时处理2个请求
        .layer(ConcurrencyLimitLayer::new(2))
        // 构建最终服务
        .service(EchoService);

    println!("启动服务,配置如下:");
    println!("- 速率限制: 3 请求/秒");
    println!("- 并发限制: 2 同时请求");
    println!("---------------------");

    // 模拟10个连续请求
    for i in 0..10 {
        let req = format!("请求-{}", i + 1);
        println!("[发送] {}", req);

        match service.ready().await {
            Ok(svc) => {
                if let Ok(resp) = svc.call(req.as_str()).await {
                    println!("[成功] 响应: {}", resp);
                }
            }
            Err(e) => {
                eprintln!("[限流] 请求被拒绝: {:?}", e);
                // 当请求被限流时,等待一段时间再重试
                time::sleep(Duration::from_millis(500)).await;
            }
        }
    }
}

这个完整示例展示了:

  1. 创建了一个简单的回显服务(EchoService)
  2. 使用RateLimitLayer设置每秒最多处理3个请求
  3. 使用ConcurrencyLimitLayer设置最大并发数为2
  4. 模拟发送10个连续请求
  5. 处理限流情况,当请求被拒绝时等待500毫秒后继续

运行此代码可以看到速率限制和并发限制的实际效果,部分请求会因为超过限制而被拒绝。


1 回复

以下是基于提供的tower-limit使用指南内容整理的完整示例demo,所有代码均来自原始内容并添加了详细注释:

基础速率限制完整示例

use tower::{ServiceBuilder, ServiceExt};
use tower::limit::{RateLimitLayer};
use std::time::Duration;

#[tokio::main]
async fn main() {
    // 创建基础服务:打印接收到的请求
    let svc = tower::service_fn(|req: &'static str| async move {
        println!("[处理中] {}", req);
        Ok::<_, ()>(req)
    });

    // 应用速率限制层:每秒最多处理3个请求
    let mut rate_limited = ServiceBuilder::new()
        .layer(RateLimitLayer::new(3, Duration::from_secs(1)))
        .service(svc);

    // 模拟10个连续请求
    for i in 1..=10 {
        println!("[发送] 请求{}", i);
        match rate_limited.ready().await {
            Ok(svc) => {
                let _ = svc.call("测试数据").await;
                println!("[成功] 请求{}完成", i);
            }
            Err(_) => println!("[拒绝] 请求{}被限流", i),
        }
        // 每个请求间隔300毫秒
        tokio::time::sleep(Duration::from_millis(300)).await;
    }
}

并发限制完整示例

use tower::{ServiceBuilder, ServiceExt};
use tower::limit::ConcurrencyLimitLayer;
use std::time::Duration;

#[tokio::main]
async fn main() {
    // 模拟耗时服务:每个请求处理需要1秒
    let svc = tower::service_fn(|req: String| async move {
        println!("[开始处理] {}", req);
        tokio::time::sleep(Duration::from_secs(1)).await;
        println!("[完成处理] {}", req);
        Ok::<_, ()>(req)
    });

    // 限制并发数为2
    let concurrency_limited = ServiceBuilder::new()
        .layer(ConcurrencyLimitLayer::new(2))
        .service(svc);

    // 模拟5个并发请求
    let mut handles = vec![];
    for i in 1..=5 {
        let mut client = concurrency_limited.clone();
        handles.push(tokio::spawn(async move {
            let req = format!("并发请求-{}", i);
            let _ = client.call(req).await;
        }));
    }

    // 等待所有请求完成
    futures::future::join_all(handles).await;
}

组合限制完整示例

use tower::{ServiceBuilder, ServiceExt};
use tower::limit::{RateLimitLayer, ConcurrencyLimitLayer};
use std::time::Duration;

#[tokio::main]
async fn main() {
    // 基础服务:模拟API处理
    let svc = tower::service_fn(|req: &str| async move {
        println!("处理API请求: {}", req);
        Ok::<_, ()>(req)
    });

    // 组合限制策略:
    // - 每秒最多10个请求
    // - 同时最多3个并发
    let api_service = ServiceBuilder::new()
        .layer(RateLimitLayer::new(10, Duration::from_secs(1)))
        .layer(ConcurrencyLimitLayer::new(3))
        .service(svc);

    // 模拟突发流量
    let mut handles = vec![];
    for i in 1..=15 {
        let mut client = api_service.clone();
        handles.push(tokio::spawn(async move {
            let req = format!("API调用#{}", i);
            match client.call(&req).await {
                Ok(_) => println!("{} 成功", req),
                Err(e) => println!("{} 失败: {:?}", req, e),
            }
        }));
        tokio::time::sleep(Duration::from_millis(100)).await;
    }

    futures::future::join_all(handles).await;
}

自定义错误处理完整示例

use tower::{ServiceBuilder, ServiceExt};
use tower::limit::RateLimitLayer;
use std::time::Duration;

#[tokio::main]
async fn main() {
    // 创建可能失败的服务
    let svc = tower::service_fn(|req: u32| async move {
        if req % 2 == 0 {
            Ok::<u32, String>(req)
        } else {
            Err("奇数请求失败".to_string())
        }
    });

    // 应用速率限制:每秒2个请求
    let mut limited_svc = ServiceBuilder::new()
        .layer(RateLimitLayer::new(2, Duration::from_secs(1)))
        .service(svc);

    // 发送测试请求
    for i in 1..=5 {
        tokio::time::sleep(Duration::from_millis(400)).await;
        match limited_svc.call(i).await {
            Ok(res) => println!("请求{}成功: {}", i, res),
            Err(e) => println!("请求{}失败: {}", i, e),
        }
    }
}
回到顶部