Rust异步限流库async-throttle的使用:高效管理并发请求和控制速率
Rust异步限流库async-throttle的使用:高效管理并发请求和控制速率
安装
在项目目录中运行以下Cargo命令:
cargo add async-throttle
或者在Cargo.toml中添加:
async-throttle = "0.3.2"
基本使用示例
use async_throttle::RateLimiter;
use std::num::NonZeroU32;
use std::time::Duration;
#[tokio::main]
async fn main() {
// 创建一个速率限制器,每秒允许5个请求
let rate_limiter = RateLimiter::new(
NonZeroU32::new(5).unwrap(), // 每时间窗口的请求数
Duration::from_secs(1), // 时间窗口长度
);
// 模拟并发请求
for i in 0..10 {
let limiter = rate_limiter.clone();
tokio::spawn(async move {
// 等待获取许可
limiter.wait().await;
// 执行受限操作
println!("处理请求 {}", i);
// 这里可以替换为实际的API调用或其他异步操作
});
}
// 等待所有任务完成
tokio::time::sleep(Duration::from_secs(3)).await;
}
完整示例代码
下面是一个更完整的示例,展示了如何使用async-throttle来控制API请求速率:
use async_throttle::RateLimiter;
use reqwest::Client;
use std::num::NonZeroU32;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建速率限制器 - 每分钟60个请求
let rate_limiter = RateLimiter::new(
NonZeroU32::new(60).unwrap(), // 每分钟60个请求
Duration::from_secs(60), // 1分钟的时间窗口
);
let client = Client::new();
let urls = vec![
"https://api.example.com/endpoint1",
"https://api.example.com/endpoint2",
// 更多URL...
];
// 使用流处理并发请求
let tasks = urls.into_iter().map(|url| {
let limiter = rate_limiter.clone();
let client = client.clone();
tokio::spawn(async move {
// 等待获取许可
limiter.wait().await;
// 执行API请求
let response = client.get(url).send().await?;
let text = response.text().await?;
println!("从 {} 获取到响应: {}", url, text);
Ok::<_, reqwest::Error>(())
})
});
// 等待所有任务完成
for task in tasks {
task.await??;
}
Ok(())
}
高级功能
突发请求支持
use async_throttle::RateLimiter;
use std::num::NonZeroU32;
use std::time::Duration;
#[tokio::main]
async fn main() {
// 创建支持突发的速率限制器
// 允许每秒5个请求,但突发最多10个请求
let rate_limiter = RateLimiter::burst(
NonZeroU32::new(5).unwrap(), // 每秒5个请求
NonZeroU32::new(10).unwrap(), // 突发容量10个请求
Duration::from_secs(1), // 时间窗口1秒
);
// 模拟突发请求
for i in 0..15 {
let limiter = rate_limiter.clone();
tokio::spawn(async move {
limiter.wait().await;
println!("处理请求 {}", i);
});
}
tokio::time::sleep(Duration::from_secs(3)).await;
}
权重控制
use async_throttle::RateLimiter;
use std::num::NonZeroU32;
use std::time::Duration;
#[tokio::main]
async fn main() {
let rate_limiter = RateLimiter::new(
NonZeroU32::new(100).unwrap(), // 每时间窗口100个"单位"
Duration::from_secs(1), // 时间窗口1秒
);
// 不同权重的请求
for i in 0..5 {
let limiter = rate_limiter.clone();
tokio::spawn(async move {
// 权重较大的请求消耗更多配额
let weight = if i % 2 == 0 { 10 } else { 1 };
limiter.wait_weight(weight).await;
println!("处理权重为{}的请求 {}", weight, i);
});
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
完整示例demo
下面是一个结合了多种功能的完整示例:
use async_throttle::RateLimiter;
use reqwest::Client;
use std::num::NonZeroU32;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建支持突发的速率限制器
// 基础速率:每秒2个请求,突发容量:5个请求
let rate_limiter = RateLimiter::burst(
NonZeroU32::new(2).unwrap(),
NonZeroU32::new(5).unwrap(),
Duration::from_secs(1),
);
let client = Client::new();
let api_endpoints = vec![
("/light-operation", 1), // 权重1的轻量操作
("/heavy-operation", 3), // 权重3的重量级操作
("/medium-operation", 2), // 权重2的中等操作
];
// 模拟不同类型的API请求
for (i, (endpoint, weight)) in api_endpoints.iter().enumerate().cycle().take(15) {
let limiter = rate_limiter.clone();
let client = client.clone();
let endpoint = endpoint.to_string();
tokio::spawn(async move {
// 根据权重等待许可
limiter.wait_weight(*weight).await;
println!("开始处理 {} (权重: {})", endpoint, weight);
// 模拟API请求延迟
tokio::time::sleep(Duration::from_millis(200)).await;
println!("完成处理 {}", endpoint);
});
}
// 等待足够时间观察限流效果
tokio::time::sleep(Duration::from_secs(10)).await;
Ok(())
}
总结
async-throttle是一个简单而强大的Rust异步限流库,可以帮助你:
- 控制API请求速率,避免被限流或封禁
- 管理资源使用,防止系统过载
- 实现公平的资源分配策略
- 支持突发流量和加权请求
通过合理配置速率限制器参数,你可以在保证系统稳定性的同时最大化资源利用率。
1 回复
Rust异步限流库async-throttle使用指南
介绍
async-throttle
是一个Rust异步限流库,专门用于管理并发请求和控制请求速率。它特别适合需要限制API调用频率或控制资源消耗的异步应用场景。
主要特性:
- 精确控制请求速率(如每秒/每分钟最大请求数)
- 限制最大并发请求数
- 基于Tokio的异步实现
- 轻量级且易于集成
安装
在Cargo.toml中添加依赖:
[dependencies]
async-throttle = "0.3"
tokio = { version = "1.0", features = ["full"] }
基本使用方法
1. 速率限制(Rate Limiting)
限制每秒最大请求数:
use async_throttle::RateLimiter;
use std::time::Duration;
#[tokio::main]
async fn main() {
// 限制每秒最多5个请求
let rate_limiter = RateLimiter::new(5, Duration::from_secs(1));
for i in 0..10 {
rate_limiter.acquire().await;
println!("发送请求 {}", i);
// 这里替换为你的实际异步请求
// let _ = some_async_request().await;
}
}
2. 并发限制(Concurrency Limiting)
限制同时进行的最大请求数:
use async_throttle::ConcurrencyLimiter;
#[tokio::main]
async fn main() {
// 限制最大并发数为3
let concurrency_limiter = ConcurrencyLimiter::new(3);
let mut handles = vec![];
for i in 0..5 {
let limiter = concurrency_limiter.clone();
handles.push(tokio::spawn(async move {
let _guard = limiter.acquire().await;
println!("开始处理任务 {}", i);
tokio::time::sleep(Duration::from_secs(1)).await;
println!("完成任务 {}", i);
}));
}
for handle in handles {
handle.await.unwrap();
}
}
高级用法
组合速率和并发限制
use async_throttle::{RateLimiter, ConcurrencyLimiter};
#[tokio::main]
async fn main() {
// 每秒最多5个请求,同时最多3个并发
let rate_limiter = RateLimiter::new(5, Duration::from_secs(1));
let concurrency_limiter = ConcurrencyLimiter::new(3);
for i in 0..10 {
rate_limiter.acquire().await;
let _guard = concurrency_limiter.acquire().await;
println!("处理请求 {}", i);
// 执行你的异步操作
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
自定义错误处理
use async_throttle::RateLimiter;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let rate_limiter = RateLimiter::new(5, Duration::from_secs(1));
for i in 0..10 {
rate_limiter.acquire().await;
match some_fallible_operation(i).await {
Ok(_) => println!("成功处理 {}", i),
Err(e) => eprintln!("处理 {} 时出错: {}", i, e),
}
}
Ok(())
}
async fn some_fallible_operation(i: i32) -> Result<(), String> {
if i == 7 {
Err("模拟错误".to_string())
} else {
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
}
实际应用示例
限制API请求
use async_throttle::RateLimiter;
use reqwest;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 根据API限制设置速率(如GitHub API每分钟5000次)
let rate_limiter = RateLimiter::new(5000, Duration::from_secs(60));
let client = reqwest::Client::new();
let urls = vec![
"https://api.github.com/users/octocat",
"https://api.github.com/users/rust-lang",
];
for url in urls {
rate_limiter.acquire().await;
let response = client.get(url)
.header("User-Agent", "MyApp")
.send()
.await?;
println!("{} 状态: {}", url, response.status());
}
Ok(())
}
完整示例代码
下面是一个结合速率限制和并发限制的完整示例,模拟API请求场景:
use async_throttle::{RateLimiter, ConcurrencyLimiter};
use reqwest;
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 配置速率限制:每分钟最多100个请求
let rate_limiter = RateLimiter::new(100, Duration::from_secs(60));
// 配置并发限制:最多同时10个请求
let concurrency_limiter = ConcurrencyLimiter::new(10);
// 模拟API端点列表
let api_endpoints = (1..=150).map(|i| format!("https://api.example.com/v1/users/{}", i)).collect::<Vec<_>>();
let client = reqwest::Client::new();
let mut handles = vec![];
for (i, endpoint) in api_endpoints.into_iter().enumerate() {
let client = client.clone();
let rate_limiter = rate_limiter.clone();
let concurrency_limiter = concurrency_limiter.clone();
handles.push(tokio::spawn(async move {
// 等待速率限制许可
rate_limiter.acquire().await;
// 等待并发限制许可
let _guard = concurrency_limiter.acquire().await;
println!("[请求 {}] 开始处理: {}", i, endpoint);
match client.get(&endpoint)
.header("User-Agent", "AsyncThrottleDemo")
.send()
.await
{
Ok(response) => {
println!("[请求 {}] 成功 - 状态: {}", i, response.status());
// 处理响应数据...
}
Err(e) => {
eprintln!("[请求 {}] 失败: {}", i, e);
}
}
}));
}
// 等待所有任务完成
for handle in handles {
handle.await?;
}
Ok(())
}
注意事项
RateLimiter
和ConcurrencyLimiter
都是线程安全的,可以安全地跨线程共享- 对于长期运行的应用,考虑使用
RateLimiter::builder()
进行更精细的配置 - 在高并发场景下,限流器本身可能成为瓶颈,需要进行性能测试
async-throttle
提供了简单而强大的方式来管理异步应用的请求速率和并发量,是构建健壮、API友好的Rust应用的理想选择。