Rust限流中间件tower-limit的使用,tower-limit提供高效请求速率限制和并发控制功能
Tower Rate Limit
限制对Service
的最大请求速率。
许可证
本项目采用MIT许可证。
贡献
除非您明确声明,否则任何有意提交给Tower的贡献都应被视为MIT许可,无需任何附加条款或条件。
安装
在项目目录中运行以下Cargo命令:
cargo add tower-limit
或在Cargo.toml中添加以下行:
tower-limit = "0.3.1"
使用示例
以下是一个使用tower-limit进行请求速率限制和并发控制的完整示例:
use std::convert::Infallible;
use std::time::Duration;
use tower::{Service, ServiceBuilder, ServiceExt};
use tower_limit::{ConcurrencyLimitLayer, RateLimitLayer};
// 定义一个简单的服务
#[derive(Debug)]
struct MyService;
impl Service<&'static str> for MyService {
type Response = &'static str;
type Error = Infallible;
type Future = futures::future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, req: &'static str) -> Self::Future {
println!("处理请求: {}", req);
futures::future::ready(Ok(req))
}
}
#[tokio::main]
async fn main() {
// 创建服务并应用限流中间件
// 限制每秒最多2个请求
// 限制最大并发数为1
let mut service = ServiceBuilder::new()
.layer(RateLimitLayer::new(2, Duration::from_secs(1)))
.layer(ConcurrencyLimitLayer::new(1))
.service(MyService);
// 模拟多个请求
let requests = vec!["请求1", "请求2", "请求3", "请求4"];
for req in requests {
match service.ready().await {
Ok(svc) => {
if let Ok(resp) = svc.call(req).await {
println!("收到响应: {}", resp);
}
}
Err(e) => eprintln!("服务不可用: {:?}", e),
}
}
}
这个示例展示了如何:
- 创建一个简单的服务
- 使用
RateLimitLayer
限制每秒请求数(2个请求/秒) - 使用
ConcurrencyLimitLayer
限制最大并发数(1个) - 处理多个请求并观察限流效果
当请求速率超过限制时,服务会返回不可用状态(通过ready()
方法)。
完整示例代码
use std::convert::Infallible;
use std::time::Duration;
use tokio::time;
use tower::{Service, ServiceBuilder, ServiceExt};
use tower_limit::{ConcurrencyLimitLayer, RateLimitLayer};
// 定义一个简单的服务结构体
#[derive(Debug)]
struct EchoService;
// 为服务实现Service trait
impl Service<&'static str> for EchoService {
type Response = &'static str;
type Error = Infallible;
type Future = futures::future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, req: &'static str) -> Self::Future {
println!("[处理中] 请求: {}", req);
futures::future::ready(Ok(req))
}
}
#[tokio::main]
async fn main() {
// 创建服务构建器并添加限流中间件
let mut service = ServiceBuilder::new()
// 添加速率限制层:每秒最多处理3个请求
.layer(RateLimitLayer::new(3, Duration::from_secs(1)))
// 添加并发限制层:最多同时处理2个请求
.layer(ConcurrencyLimitLayer::new(2))
// 构建最终服务
.service(EchoService);
println!("启动服务,配置如下:");
println!("- 速率限制: 3 请求/秒");
println!("- 并发限制: 2 同时请求");
println!("---------------------");
// 模拟10个连续请求
for i in 0..10 {
let req = format!("请求-{}", i + 1);
println!("[发送] {}", req);
match service.ready().await {
Ok(svc) => {
if let Ok(resp) = svc.call(req.as_str()).await {
println!("[成功] 响应: {}", resp);
}
}
Err(e) => {
eprintln!("[限流] 请求被拒绝: {:?}", e);
// 当请求被限流时,等待一段时间再重试
time::sleep(Duration::from_millis(500)).await;
}
}
}
}
这个完整示例展示了:
- 创建了一个简单的回显服务(EchoService)
- 使用
RateLimitLayer
设置每秒最多处理3个请求 - 使用
ConcurrencyLimitLayer
设置最大并发数为2 - 模拟发送10个连续请求
- 处理限流情况,当请求被拒绝时等待500毫秒后继续
运行此代码可以看到速率限制和并发限制的实际效果,部分请求会因为超过限制而被拒绝。
1 回复
以下是基于提供的tower-limit
使用指南内容整理的完整示例demo,所有代码均来自原始内容并添加了详细注释:
基础速率限制完整示例
use tower::{ServiceBuilder, ServiceExt};
use tower::limit::{RateLimitLayer};
use std::time::Duration;
#[tokio::main]
async fn main() {
// 创建基础服务:打印接收到的请求
let svc = tower::service_fn(|req: &'static str| async move {
println!("[处理中] {}", req);
Ok::<_, ()>(req)
});
// 应用速率限制层:每秒最多处理3个请求
let mut rate_limited = ServiceBuilder::new()
.layer(RateLimitLayer::new(3, Duration::from_secs(1)))
.service(svc);
// 模拟10个连续请求
for i in 1..=10 {
println!("[发送] 请求{}", i);
match rate_limited.ready().await {
Ok(svc) => {
let _ = svc.call("测试数据").await;
println!("[成功] 请求{}完成", i);
}
Err(_) => println!("[拒绝] 请求{}被限流", i),
}
// 每个请求间隔300毫秒
tokio::time::sleep(Duration::from_millis(300)).await;
}
}
并发限制完整示例
use tower::{ServiceBuilder, ServiceExt};
use tower::limit::ConcurrencyLimitLayer;
use std::time::Duration;
#[tokio::main]
async fn main() {
// 模拟耗时服务:每个请求处理需要1秒
let svc = tower::service_fn(|req: String| async move {
println!("[开始处理] {}", req);
tokio::time::sleep(Duration::from_secs(1)).await;
println!("[完成处理] {}", req);
Ok::<_, ()>(req)
});
// 限制并发数为2
let concurrency_limited = ServiceBuilder::new()
.layer(ConcurrencyLimitLayer::new(2))
.service(svc);
// 模拟5个并发请求
let mut handles = vec![];
for i in 1..=5 {
let mut client = concurrency_limited.clone();
handles.push(tokio::spawn(async move {
let req = format!("并发请求-{}", i);
let _ = client.call(req).await;
}));
}
// 等待所有请求完成
futures::future::join_all(handles).await;
}
组合限制完整示例
use tower::{ServiceBuilder, ServiceExt};
use tower::limit::{RateLimitLayer, ConcurrencyLimitLayer};
use std::time::Duration;
#[tokio::main]
async fn main() {
// 基础服务:模拟API处理
let svc = tower::service_fn(|req: &str| async move {
println!("处理API请求: {}", req);
Ok::<_, ()>(req)
});
// 组合限制策略:
// - 每秒最多10个请求
// - 同时最多3个并发
let api_service = ServiceBuilder::new()
.layer(RateLimitLayer::new(10, Duration::from_secs(1)))
.layer(ConcurrencyLimitLayer::new(3))
.service(svc);
// 模拟突发流量
let mut handles = vec![];
for i in 1..=15 {
let mut client = api_service.clone();
handles.push(tokio::spawn(async move {
let req = format!("API调用#{}", i);
match client.call(&req).await {
Ok(_) => println!("{} 成功", req),
Err(e) => println!("{} 失败: {:?}", req, e),
}
}));
tokio::time::sleep(Duration::from_millis(100)).await;
}
futures::future::join_all(handles).await;
}
自定义错误处理完整示例
use tower::{ServiceBuilder, ServiceExt};
use tower::limit::RateLimitLayer;
use std::time::Duration;
#[tokio::main]
async fn main() {
// 创建可能失败的服务
let svc = tower::service_fn(|req: u32| async move {
if req % 2 == 0 {
Ok::<u32, String>(req)
} else {
Err("奇数请求失败".to_string())
}
});
// 应用速率限制:每秒2个请求
let mut limited_svc = ServiceBuilder::new()
.layer(RateLimitLayer::new(2, Duration::from_secs(1)))
.service(svc);
// 发送测试请求
for i in 1..=5 {
tokio::time::sleep(Duration::from_millis(400)).await;
match limited_svc.call(i).await {
Ok(res) => println!("请求{}成功: {}", i, res),
Err(e) => println!("请求{}失败: {}", i, e),
}
}
}