Rust异步工具库tokio-utils的使用,tokio-utils为Tokio生态提供高效实用的异步编程辅助工具

Rust异步工具库tokio-utils的使用,tokio-utils为Tokio生态提供高效实用的异步编程辅助工具

功能概述

tokio-utils为Tokio异步运行时提供了一系列实用工具:

资源池

  • Pool - 共享资源池

速率限制

  • RateLimiter - 速率限制器
  • MultiRateLimiter - 基于键的速率限制器

优雅关闭

  • ShutdownController - 优雅关闭控制器
  • ShutdownMonitor - 优雅关闭监视器

标准输入

  • recv_from_stdin - 从stdin接收数据的通道

安装

在Cargo.toml中添加依赖:

[dependencies]
tokio-utils = "0.1.2"

示例代码

资源池(Pool)示例

use tokio_utils::Pool;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    // 创建包含5个字符串的资源池
    let pool = Pool::new(
        || async { Arc::new("resource".to_string()) },
        5
    );

    // 从池中获取资源
    let resource = pool.get().await;
    println!("Got resource: {}", resource);
    
    // 使用后资源会自动返回池中
}

速率限制器(RateLimiter)示例

use tokio_utils::RateLimiter;
use std::time::Duration;

#[tokio::main]
async fn main() {
    // 创建每秒最多处理2个请求的限流器
    let limiter = RateLimiter::new(2, Duration::from_secs(1));

    for i in 0..5 {
        // 等待允许执行
        limiter.wait().await;
        println!("Processing request {}", i);
    }
}

优雅关闭示例

use tokio_utils::{ShutdownController, ShutdownMonitor};
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    let (controller, monitor) = ShutdownController::new();
    
    tokio::spawn(async move {
        // 模拟工作
        loop {
            if monitor.is_shutdown() {
                println!("Worker received shutdown signal");
                break;
            }
            println!("Working...");
            sleep(Duration::from_secs(1)).await;
        }
    });

    // 5秒后触发关闭
    sleep(Duration::from_secs(5)).await;
    controller.shutdown().await;
    println!("Shutdown complete");
}

从标准输入读取示例

use tokio_utils::recv_from_stdin;

#[tokio::main]
async fn main() {
    // 创建从stdin接收数据的通道
    let mut rx = recv_from_stdin();
    
    println!("Type something and press Enter:");
    
    while let Some(input) = rx.recv().await {
        println!("You typed: {}", input);
        if input.trim() == "exit" {
            break;
        }
    }
}

完整示例代码

资源池完整示例

use tokio_utils::Pool;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    // 创建包含3个连接资源的池
    let pool = Pool::new(
        || async { 
            Arc::new(format!("Connection-{}", rand::random::<u16>())) 
        },
        3
    );

    // 模拟多个任务并发获取资源
    let mut handles = vec![];
    for i in 0..5 {
        let pool = pool.clone();
        handles.push(tokio::spawn(async move {
            // 获取资源
            let resource = pool.get().await;
            println!("Task {} got resource: {}", i, resource);
            
            // 模拟资源使用时间
            sleep(Duration::from_millis(500)).await;
            
            // 资源会在作用域结束时自动返回池中
        }));
    }

    // 等待所有任务完成
    for handle in handles {
        handle.await.unwrap();
    }
}

速率限制器完整示例

use tokio_utils::RateLimiter;
use std::time::{Duration, Instant};
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    // 创建每200毫秒允许1个操作的限流器
    let limiter = RateLimiter::new(1, Duration::from_millis(200));
    
    let start = Instant::now();
    
    // 模拟10个需要限流的操作
    for i in 0..10 {
        // 等待限流器允许
        limiter.wait().await;
        
        let elapsed = start.elapsed().as_secs_f32();
        println!("[t={:.2}s] Operation {} completed", elapsed, i);
        
        // 模拟操作处理时间
        sleep(Duration::from_millis(50)).await;
    }
}

优雅关闭完整示例

use tokio_utils::{ShutdownController, ShutdownMonitor};
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    // 创建关闭控制器和监视器
    let (controller, monitor) = ShutdownController::new();
    
    // 启动多个工作线程
    for i in 0..3 {
        let monitor = monitor.clone();
        tokio::spawn(async move {
            while !monitor.is_shutdown() {
                println!("Worker {} doing work...", i);
                sleep(Duration::from_millis(300)).await;
            }
            println!("Worker {} shutting down cleanly", i);
        });
    }
    
    // 模拟运行一段时间后触发关闭
    sleep(Duration::from_secs(2)).await;
    println!("Initiating graceful shutdown...");
    
    // 触发关闭并等待所有任务完成
    controller.shutdown().await;
    println!("All workers shut down successfully");
}

标准输入交互完整示例

use tokio_utils::recv_from_stdin;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    // 创建标准输入通道
    let mut rx = recv_from_stdin();
    
    println!("Chat server started. Type 'exit' to quit.");
    
    // 同时处理其他异步任务
    tokio::spawn(async move {
        let mut count = 0;
        loop {
            sleep(Duration::from_secs(1)).await;
            println!("[System] Server has been running for {} seconds", count);
            count += 1;
        }
    });
    
    // 处理用户输入
    while let Some(input) = rx.recv().await {
        let input = input.trim();
        if input.is_empty() {
            continue;
        }
        
        println!("[User] {}", input);
        
        if input == "exit" {
            println!("Goodbye!");
            break;
        }
    }
}

这些完整示例展示了tokio-utils库在实际应用场景中的使用方法,可以帮助开发者更好地理解如何将这些工具集成到自己的异步应用程序中。


1 回复

Rust异步工具库tokio-utils的使用指南

tokio-utils 是 Tokio 生态系统中的一个实用工具库,为异步编程提供了许多高效实用的辅助功能。这个库包含了许多在异步编程中常见问题的解决方案,可以帮助开发者编写更简洁、更高效的异步代码。

主要功能

tokio-utils 提供了以下几类实用工具:

  1. 异步原语:扩展了 Tokio 的基础异步原语
  2. Future 工具:提供了各种 Future 组合器和实用程序
  3. Stream 工具:增强了 Stream 的处理能力
  4. IO 工具:提供了各种 IO 相关的实用功能

安装

Cargo.toml 中添加依赖:

[dependencies]
tokio-utils = "0.7"
tokio = { version = "1.0", features = ["full"] }

核心功能示例

1. 定时器工具

use tokio_utils::time::{delay_for, Delay};

#[tokio::main]
async fn main() {
    // 延迟500毫秒
    delay_for(std::time::Duration::from_millis(500)).await;
    println!("500ms后执行");
    
    // 创建可取消的延迟
    let delay = Delay::new(std::time::Duration::from_secs(1));
    tokio::select! {
        _ = delay => println!("1秒后执行"),
        _ = tokio::signal::ctrl_c() => println!("取消延迟"),
    }
}

2. Future 工具

use tokio_utils::future::{FutureExt, OptionFuture};

#[tokio::main]
async fn main() {
    // 将Option转换为Future
    let some_future = OptionFuture::from(Some(async { 42 }));
    let none_future = OptionFuture::<i32>::from(None);
    
    assert_eq!(some_future.await, Some(42));
    assert_eq!(none_future.await, None);
    
    // Future的扩展方法
    let future = async { 1 };
    let result = future.map(|x| x + 1).await;
    assert_eq!(result, 2);
}

3. Stream 工具

use tokio_utils::stream::{StreamExt, MergeStreams};
use tokio::stream::{self, Stream};

#[tokio::main]
async fn main() {
    // 合并多个Stream
    let s1 = stream::iter(vec![1, 2, 3]);
    let s2 = stream::iter(vec![4, 5, 6]);
    
    let merged = MergeStreams::new(vec![s1, s2]);
    let results: Vec<_> = merged.collect().await;
    assert!(results.contains(&1) && results.contains(&6));
    
    // Stream扩展方法
    let s = stream::iter(1..=3);
    let doubled: Vec<_> = s.map(|x| x * 2). collect().await;
    assert_eq!(doubled, vec![2, 4, 6]);
}

4. 同步工具

use tokio_utils::sync::{CancellationToken, Semaphore};

#[tokio::main]
async fn main() {
    // 取消令牌
    let token = CancellationToken::new();
    let child_token = token.child_token();
    
    tokio::spawn(async move {
        tokio::select! {
            _ = child_token.cancelled() => println!("任务被取消"),
            _ = tokio::time::delay_for(std::time::Duration::from_secs(1)) => {
                println!("任务完成")
            }
        }
    });
    
    // 模拟取消
    token.cancel();
    
    // 信号量
    let semaphore = Semaphore::new(3);
    for i in 0..5 {
        let permit = semaphore.acquire().await.unwrap();
        tokio::spawn(async move {
            println!("任务{}开始", i);
            tokio::time::delay_for(std::time::Duration::from_secs(1)).await;
            drop(permit); // 释放许可
            println!("任务{}完成", i);
        });
    }
}

高级用法

自定义 Future 组合器

use tokio_utils::future::FutureExt;
use std::time::Duration;

#[tokio::main]
async fn main() {
    let future = async {
        tokio::time::sleep(Duration::from_secs(1)).await;
        42
    };
    
    // 添加超时
    match future.timeout(Duration::from_millis(500)).await {
        Ok(result) => println!("成功: {}", result),
        Err(_) => println!("超时"),
    }
}

批量处理 Stream

use tokio_utils::stream::StreamExt;
use tokio::stream;

#[tokio::main]
async fn main() {
    let stream = stream::iter(0..10);
    
    // 每3个元素一批处理
    let batches = stream.chunks(3);
    
    tokio::pin!(batches);
    while let Some(batch) = batches.next().await {
        println!("批处理: {:?}", batch);
    }
}

完整示例

下面是一个整合了tokio-utils多个功能的完整示例:

use tokio_utils::{
    future::FutureExt,
    stream::StreamExt,
    sync::{CancellationToken, Semaphore},
    time::{delay_for, Delay}
};
use tokio::stream::{self, Stream};
use std::time::Duration;

#[tokio::main]
async fn main() {
    // 1. 定时器示例
    println!("--- 定时器示例 ---");
    delay_for(Duration::from_millis(300)).await;
    println!("300ms延迟后执行");
    
    let delay = Delay::new(Duration::from_secs(1));
    tokio::select! {
        _ = delay => println!("1秒延迟完成"),
        _ = tokio::signal::ctrl_c() => println!("取消了延迟"),
    }

    // 2. Future工具示例
    println!("\n--- Future工具示例 ---");
    let future = async {
        delay_for(Duration::from_millis(800)).await;
        100
    };
    
    match future.timeout(Duration::from_millis(500)).await {
        Ok(val) => println!("在超时前完成: {}", val),
        Err(_) => println!("操作超时"),
    }

    // 3. Stream工具示例
    println!("\n--- Stream工具示例 ---");
    let s1 = stream::iter(vec!["a", "b", "c"]);
    let s2 = stream::iter(vec![1, 2, 3]);
    
    let merged = s1.zip(s2);
    let results: Vec<_> = merged.collect().await;
    println!("合并后的结果: {:?}", results);

    // 4. 同步工具示例
    println!("\n--- 同步工具示例 ---");
    let token = CancellationToken::new();
    let child = token.child_token();
    
    let task = tokio::spawn(async move {
        tokio::select! {
            _ = child.cancelled() => println!("子任务被取消"),
            _ = delay_for(Duration::from_secs(2)) => println!("子任务完成"),
        }
    });
    
    delay_for(Duration::from_millis(500)).await;
    token.cancel();
    task.await.unwrap();
    
    let sem = Semaphore::new(2);
    for i in 0..4 {
        let permit = sem.acquire().await.unwrap();
        tokio::spawn(async move {
            println!("任务 {} 开始", i);
            delay_for(Duration::from_secs(1)).await;
            drop(permit);
            println!("任务 {} 完成", i);
        });
    }
    
    delay_for(Duration::from_secs(2)).await;
}

最佳实践

  1. 合理使用取消令牌:对于需要取消的长时运行任务,使用 CancellationToken 可以更优雅地处理取消逻辑

  2. 控制并发:使用 Semaphore 限制资源密集型操作的并发数量

  3. 组合Future:利用提供的组合器减少样板代码,使异步逻辑更清晰

  4. 处理超时:为所有可能长时间运行的操作添加超时处理

tokio-utils 通过提供这些实用工具,大大简化了 Tokio 异步编程中的常见模式实现,让开发者能更专注于业务逻辑而非基础设施代码。

回到顶部