Rust异步运行时共享库tokio-shared-rt的使用,实现多任务间高效共享Tokio运行时实例

Rust异步运行时共享库tokio-shared-rt的使用,实现多任务间高效共享Tokio运行时实例

tokio-shared-rt介绍

该库允许#[tokio::test]使用共享运行时,这样静态变量中连接到tokio运行时的内容在不同测试用例之间仍然有效。

例如,如果你有一个全局的、静态的数据库连接池,它内部持有一些绑定到创建它的运行时的TCP连接引用。如果你有两个都标记为#[tokio::test]并访问该db池的测试用例,很可能其中一个会失败并出现如下错误:

thread 't3' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "A Tokio 1.x context was found, but it is being shutdown." }'

这个crate提供了一个宏,使用共享的tokio运行时来运行测试用例以避免这个问题。只需将#[tokio::test]替换为#[tokio_shared_rt::test(shared)],现在测试就能通过了。

使用方法

#[tokio_shared_rt::test]
async fn t1() {
    db_pool().foo().await;
}

#[tokio_shared_rt::test(shared)]
async fn t2() {
    db_pool().foo().await;
}

#[tokio_shared_rt::test(shared = true)]
async fn t3() {
    db_pool().foo().await;
}

#[tokio_shared_rt::test(shared = false)]
async fn delicate_runtime_test() {
    db_pool().foo().await;
}

完整示例

下面是一个更完整的示例,展示如何在多个测试用例间共享Tokio运行时实例:

use once_cell::sync::Lazy;
use tokio_shared_rt as shared_rt;

// 模拟一个全局数据库连接池
static DB_POOL: Lazy<DbPool> = Lazy::new(|| {
    // 在初始化时创建运行时并连接数据库
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        DbPool::connect("postgres://user:pass@localhost/db").await.unwrap()
    })
});

// 使用共享运行时的测试1
#[shared_rt::test(shared)]
async fn test_user_query() {
    let result = DB_POOL.query("SELECT * FROM users WHERE id = 1").await;
    assert!(result.is_ok());
}

// 使用共享运行时的测试2
#[shared_rt::test(shared = true)]
async fn test_product_query() {
    let result = DB_POOL.query("SELECT * FROM products LIMIT 10").await;
    assert!(result.is_ok());
}

// 不使用共享运行时的隔离测试
#[shared_rt::test(shared = false)]
async fn isolated_connection_test() {
    // 创建独立的连接池
    let pool = DbPool::connect("postgres://user:pass@localhost/db").await.unwrap();
    let result = pool.query("SELECT NOW()").await;
    assert!(result.is_ok());
}

// 数据库连接池实现
struct DbPool {
    // 这里应该有实际的连接池实现
    // 为示例简化,我们只模拟行为
}

impl DbPool {
    async fn connect(_url: &str) -> Result<Self, String> {
        // 模拟连接延迟
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        Ok(Self)
    }
    
    async fn query(&self, _sql: &str) -> Result<Vec<String>, String> {
        // 模拟查询延迟
        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
        Ok(vec!["mock_result".to_string()])
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    
    // 常规tokio测试无法共享运行时
    #[tokio::test]
    async fn regular_tokio_test() {
        let pool = DbPool::connect("temp_url").await.unwrap();
        assert!(pool.query("SELECT 1").await.is_ok());
    }
}

在这个完整示例中:

  1. 我们使用once_cell::Lazy来创建全局的数据库连接池
  2. test_user_querytest_product_query使用共享的Tokio运行时
  3. isolated_connection_test使用独立的运行时
  4. 还包含了一个常规的#[tokio::test]作为对比
  5. DbPool添加了更真实的模拟实现,包括连接延迟和查询延迟

要使用这个示例,你需要在Cargo.toml中添加依赖:

[dependencies]
once_cell = "1.18.0"
tokio = { version = "1.0", features = ["full"] }
tokio-shared-rt = "0.1"

这种模式特别适合测试中需要共享昂贵资源(如数据库连接池)的场景,可以显著提高测试执行效率。


1 回复

Rust异步运行时共享库tokio-shared-rt的使用指南

介绍

tokio-shared-rt是一个Rust库,它允许在多个任务或线程之间高效共享Tokio运行时实例。在需要跨多个组件或模块使用同一个Tokio运行时的场景下,这个库特别有用,避免了创建多个运行时实例带来的性能开销。

主要特性

  • 提供线程安全的Tokio运行时共享机制
  • 支持多线程环境下的运行时共享
  • 简化异步代码中运行时的管理
  • 兼容Tokio的各种功能特性

使用方法

基本安装

首先在Cargo.toml中添加依赖:

[dependencies]
tokio-shared-rt = "0.1"
tokio = { version = "1.0", features = ["full"] }

基本使用示例

use tokio_shared_rt::SharedRuntime;

#[tokio::main]
async fn main() {
    // 创建共享运行时
    let shared_rt = SharedRuntime::new();
    
    // 获取运行时引用
    let rt = shared_rt.get();
    
    // 使用运行时生成异步任务
    rt.spawn(async {
        println!("Task running on shared runtime");
    });
    
    // 等待任务完成
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}

多线程共享示例

use std::sync::Arc;
use tokio_shared_rt::SharedRuntime;

#[tokio::main]
async fn main() {
    let shared_rt = Arc::new(SharedRuntime::new());
    
    let handles: Vec<_> = (0..4).map(|i| {
        let rt = shared_rt.clone();
        std::thread::spawn(move || {
            let rt = rt.get();
            rt.block_on(async {
                println!("Thread {} executing async task", i);
                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
                println!("Thread {} finished", i);
            });
        })
    }).collect();
    
    for handle in handles {
        handle.join().unwrap();
    }
}

与结构体结合使用

use tokio_shared_rt::SharedRuntime;

struct AsyncService {
    rt: SharedRuntime,
}

impl AsyncService {
    fn new() -> Self {
        Self {
            rt: SharedRuntime::new(),
        }
    }
    
    async fn perform_task(&self) {
        self.rt.get().spawn(async {
            println!("Performing background task");
        });
    }
}

#[tokio::main]
async fn main() {
    let service = AsyncService::new();
    service.perform_task().await;
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}

高级用法

自定义运行时配置

use tokio_shared_rt::SharedRuntime;
use tokio::runtime::Builder;

let rt = Builder::new_multi_thread()
    .worker_threads(4)
    .enable_all()
    .build()
    .unwrap();

let shared_rt = SharedRuntime::from(rt);

跨模块共享

// 在lib.rs或main.rs中
pub static SHARED_RT: Lazy<SharedRuntime> = Lazy::new(SharedRuntime::new);

// 在其他模块中使用
use crate::SHARED_RT;

async fn some_function() {
    SHARED_RT.get().spawn(async {
        // 任务代码
    });
}

完整示例demo

下面是一个结合多个特性的完整示例:

use std::sync::Arc;
use tokio::time::{sleep, Duration};
use tokio_shared_rt::SharedRuntime;
use once_cell::sync::Lazy;

// 全局共享运行时
pub static GLOBAL_RT: Lazy<Arc<SharedRuntime>> = Lazy::new(|| {
    Arc::new(SharedRuntime::new())
});

struct TaskRunner {
    rt: Arc<SharedRuntime>,
    id: u32,
}

impl TaskRunner {
    fn new(id: u32) -> Self {
        Self {
            rt: GLOBAL_RT.clone(),
            id,
        }
    }

    async fn run(&self) {
        let rt = self.rt.get();
        rt.spawn(async move {
            println!("Task {} started", self.id);
            sleep(Duration::from_millis(200)).await;
            println!("Task {} completed", self.id);
        });
    }
}

#[tokio::main]
async fn main() {
    // 创建多个任务运行器
    let runners: Vec<_> = (0..5).map(|i| TaskRunner::new(i)).collect();
    
    // 并行运行所有任务
    let tasks: Vec<_> = runners.iter().map(|r| r.run()).collect();
    futures::future::join_all(tasks).await;
    
    // 等待所有任务完成
    sleep(Duration::from_secs(1)).await;
}

注意事项

  1. 确保不要在不同的线程中创建多个SharedRuntime实例,除非确实需要多个运行时
  2. 在大多数情况下,应用程序应该只有一个共享的Tokio运行时实例
  3. 使用Arc<SharedRuntime>来跨线程共享运行时引用
  4. 注意运行时的生命周期管理,避免在运行时被丢弃后继续使用

tokio-shared-rt简化了Tokio运行时在复杂应用中的管理,特别是在需要跨多个组件或线程共享运行时的场景下,能够显著提高代码的整洁性和性能。

回到顶部