Rust异步限流库async-throttle的使用:高效管理并发请求和控制速率

Rust异步限流库async-throttle的使用:高效管理并发请求和控制速率

安装

在项目目录中运行以下Cargo命令:

cargo add async-throttle

或者在Cargo.toml中添加:

async-throttle = "0.3.2"

基本使用示例

use async_throttle::RateLimiter;
use std::num::NonZeroU32;
use std::time::Duration;

#[tokio::main]
async fn main() {
    // 创建一个速率限制器,每秒允许5个请求
    let rate_limiter = RateLimiter::new(
        NonZeroU32::new(5).unwrap(),  // 每时间窗口的请求数
        Duration::from_secs(1),        // 时间窗口长度
    );

    // 模拟并发请求
    for i in 0..10 {
        let limiter = rate_limiter.clone();
        tokio::spawn(async move {
            // 等待获取许可
            limiter.wait().await;
            
            // 执行受限操作
            println!("处理请求 {}", i);
            // 这里可以替换为实际的API调用或其他异步操作
        });
    }

    // 等待所有任务完成
    tokio::time::sleep(Duration::from_secs(3)).await;
}

完整示例代码

下面是一个更完整的示例,展示了如何使用async-throttle来控制API请求速率:

use async_throttle::RateLimiter;
use reqwest::Client;
use std::num::NonZeroU32;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建速率限制器 - 每分钟60个请求
    let rate_limiter = RateLimiter::new(
        NonZeroU32::new(60).unwrap(),  // 每分钟60个请求
        Duration::from_secs(60),       // 1分钟的时间窗口
    );

    let client = Client::new();
    let urls = vec![
        "https://api.example.com/endpoint1",
        "https://api.example.com/endpoint2",
        // 更多URL...
    ];

    // 使用流处理并发请求
    let tasks = urls.into_iter().map(|url| {
        let limiter = rate_limiter.clone();
        let client = client.clone();
        
        tokio::spawn(async move {
            // 等待获取许可
            limiter.wait().await;
            
            // 执行API请求
            let response = client.get(url).send().await?;
            let text = response.text().await?;
            
            println!("从 {} 获取到响应: {}", url, text);
            Ok::<_, reqwest::Error>(())
        })
    });

    // 等待所有任务完成
    for task in tasks {
        task.await??;
    }

    Ok(())
}

高级功能

突发请求支持

use async_throttle::RateLimiter;
use std::num::NonZeroU32;
use std::time::Duration;

#[tokio::main]
async fn main() {
    // 创建支持突发的速率限制器
    // 允许每秒5个请求,但突发最多10个请求
    let rate_limiter = RateLimiter::burst(
        NonZeroU32::new(5).unwrap(),  // 每秒5个请求
        NonZeroU32::new(10).unwrap(), // 突发容量10个请求
        Duration::from_secs(1),       // 时间窗口1秒
    );

    // 模拟突发请求
    for i in 0..15 {
        let limiter = rate_limiter.clone();
        tokio::spawn(async move {
            limiter.wait().await;
            println!("处理请求 {}", i);
        });
    }

    tokio::time::sleep(Duration::from_secs(3)).await;
}

权重控制

use async_throttle::RateLimiter;
use std::num::NonZeroU32;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let rate_limiter = RateLimiter::new(
        NonZeroU32::new(100).unwrap(), // 每时间窗口100个"单位"
        Duration::from_secs(1),         // 时间窗口1秒
    );

    // 不同权重的请求
    for i in 0..5 {
        let limiter = rate_limiter.clone();
        tokio::spawn(async move {
            // 权重较大的请求消耗更多配额
            let weight = if i % 2 == 0 { 10 } else { 1 };
            limiter.wait_weight(weight).await;
            
            println!("处理权重为{}的请求 {}", weight, i);
        });
    }

    tokio::time::sleep(Duration::from_secs(2)).await;
}

完整示例demo

下面是一个结合了多种功能的完整示例:

use async_throttle::RateLimiter;
use reqwest::Client;
use std::num::NonZeroU32;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建支持突发的速率限制器
    // 基础速率:每秒2个请求,突发容量:5个请求
    let rate_limiter = RateLimiter::burst(
        NonZeroU32::new(2).unwrap(),
        NonZeroU32::new(5).unwrap(),
        Duration::from_secs(1),
    );

    let client = Client::new();
    let api_endpoints = vec![
        ("/light-operation", 1),    // 权重1的轻量操作
        ("/heavy-operation", 3),    // 权重3的重量级操作
        ("/medium-operation", 2),   // 权重2的中等操作
    ];

    // 模拟不同类型的API请求
    for (i, (endpoint, weight)) in api_endpoints.iter().enumerate().cycle().take(15) {
        let limiter = rate_limiter.clone();
        let client = client.clone();
        let endpoint = endpoint.to_string();
        
        tokio::spawn(async move {
            // 根据权重等待许可
            limiter.wait_weight(*weight).await;
            
            println!("开始处理 {} (权重: {})", endpoint, weight);
            
            // 模拟API请求延迟
            tokio::time::sleep(Duration::from_millis(200)).await;
            
            println!("完成处理 {}", endpoint);
        });
    }

    // 等待足够时间观察限流效果
    tokio::time::sleep(Duration::from_secs(10)).await;
    Ok(())
}

总结

async-throttle是一个简单而强大的Rust异步限流库,可以帮助你:

  • 控制API请求速率,避免被限流或封禁
  • 管理资源使用,防止系统过载
  • 实现公平的资源分配策略
  • 支持突发流量和加权请求

通过合理配置速率限制器参数,你可以在保证系统稳定性的同时最大化资源利用率。


1 回复

Rust异步限流库async-throttle使用指南

介绍

async-throttle是一个Rust异步限流库,专门用于管理并发请求和控制请求速率。它特别适合需要限制API调用频率或控制资源消耗的异步应用场景。

主要特性:

  • 精确控制请求速率(如每秒/每分钟最大请求数)
  • 限制最大并发请求数
  • 基于Tokio的异步实现
  • 轻量级且易于集成

安装

在Cargo.toml中添加依赖:

[dependencies]
async-throttle = "0.3"
tokio = { version = "1.0", features = ["full"] }

基本使用方法

1. 速率限制(Rate Limiting)

限制每秒最大请求数:

use async_throttle::RateLimiter;
use std::time::Duration;

#[tokio::main]
async fn main() {
    // 限制每秒最多5个请求
    let rate_limiter = RateLimiter::new(5, Duration::from_secs(1));
    
    for i in 0..10 {
        rate_limiter.acquire().await;
        println!("发送请求 {}", i);
        // 这里替换为你的实际异步请求
        // let _ = some_async_request().await;
    }
}

2. 并发限制(Concurrency Limiting)

限制同时进行的最大请求数:

use async_throttle::ConcurrencyLimiter;

#[tokio::main]
async fn main() {
    // 限制最大并发数为3
    let concurrency_limiter = ConcurrencyLimiter::new(3);
    
    let mut handles = vec![];
    
    for i in 0..5 {
        let limiter = concurrency_limiter.clone();
        handles.push(tokio::spawn(async move {
            let _guard = limiter.acquire().await;
            println!("开始处理任务 {}", i);
            tokio::time::sleep(Duration::from_secs(1)).await;
            println!("完成任务 {}", i);
        }));
    }
    
    for handle in handles {
        handle.await.unwrap();
    }
}

高级用法

组合速率和并发限制

use async_throttle::{RateLimiter, ConcurrencyLimiter};

#[tokio::main]
async fn main() {
    // 每秒最多5个请求,同时最多3个并发
    let rate_limiter = RateLimiter::new(5, Duration::from_secs(1));
    let concurrency_limiter = ConcurrencyLimiter::new(3);
    
    for i in 0..10 {
        rate_limiter.acquire().await;
        let _guard = concurrency_limiter.acquire().await;
        
        println!("处理请求 {}", i);
        // 执行你的异步操作
        tokio::time::sleep(Duration::from_millis(200)).await;
    }
}

自定义错误处理

use async_throttle::RateLimiter;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let rate_limiter = RateLimiter::new(5, Duration::from_secs(1));
    
    for i in 0..10 {
        rate_limiter.acquire().await;
        
        match some_fallible_operation(i).await {
            Ok(_) => println!("成功处理 {}", i),
            Err(e) => eprintln!("处理 {} 时出错: {}", i, e),
        }
    }
    
    Ok(())
}

async fn some_fallible_operation(i: i32) -> Result<(), String> {
    if i == 7 {
        Err("模拟错误".to_string())
    } else {
        tokio::time::sleep(Duration::from_millis(100)).await;
        Ok(())
    }
}

实际应用示例

限制API请求

use async_throttle::RateLimiter;
use reqwest;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 根据API限制设置速率(如GitHub API每分钟5000次)
    let rate_limiter = RateLimiter::new(5000, Duration::from_secs(60));
    let client = reqwest::Client::new();
    
    let urls = vec![
        "https://api.github.com/users/octocat",
        "https://api.github.com/users/rust-lang",
    ];
    
    for url in urls {
        rate_limiter.acquire().await;
        
        let response = client.get(url)
            .header("User-Agent", "MyApp")
            .send()
            .await?;
            
        println!("{} 状态: {}", url, response.status());
    }
    
    Ok(())
}

完整示例代码

下面是一个结合速率限制和并发限制的完整示例,模拟API请求场景:

use async_throttle::{RateLimiter, ConcurrencyLimiter};
use reqwest;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 配置速率限制:每分钟最多100个请求
    let rate_limiter = RateLimiter::new(100, Duration::from_secs(60));
    
    // 配置并发限制:最多同时10个请求
    let concurrency_limiter = ConcurrencyLimiter::new(10);
    
    // 模拟API端点列表
    let api_endpoints = (1..=150).map(|i| format!("https://api.example.com/v1/users/{}", i)).collect::<Vec<_>>();
    
    let client = reqwest::Client::new();
    let mut handles = vec![];
    
    for (i, endpoint) in api_endpoints.into_iter().enumerate() {
        let client = client.clone();
        let rate_limiter = rate_limiter.clone();
        let concurrency_limiter = concurrency_limiter.clone();
        
        handles.push(tokio::spawn(async move {
            // 等待速率限制许可
            rate_limiter.acquire().await;
            
            // 等待并发限制许可
            let _guard = concurrency_limiter.acquire().await;
            
            println!("[请求 {}] 开始处理: {}", i, endpoint);
            
            match client.get(&endpoint)
                .header("User-Agent", "AsyncThrottleDemo")
                .send()
                .await 
            {
                Ok(response) => {
                    println!("[请求 {}] 成功 - 状态: {}", i, response.status());
                    // 处理响应数据...
                }
                Err(e) => {
                    eprintln!("[请求 {}] 失败: {}", i, e);
                }
            }
        }));
    }
    
    // 等待所有任务完成
    for handle in handles {
        handle.await?;
    }
    
    Ok(())
}

注意事项

  1. RateLimiterConcurrencyLimiter都是线程安全的,可以安全地跨线程共享
  2. 对于长期运行的应用,考虑使用RateLimiter::builder()进行更精细的配置
  3. 在高并发场景下,限流器本身可能成为瓶颈,需要进行性能测试

async-throttle提供了简单而强大的方式来管理异步应用的请求速率和并发量,是构建健壮、API友好的Rust应用的理想选择。

回到顶部