Rust异步运行时共享库tokio-shared-rt的使用,实现多任务间高效共享Tokio运行时实例
Rust异步运行时共享库tokio-shared-rt的使用,实现多任务间高效共享Tokio运行时实例
tokio-shared-rt介绍
该库允许#[tokio::test]
使用共享运行时,这样静态变量中连接到tokio运行时的内容在不同测试用例之间仍然有效。
例如,如果你有一个全局的、静态的数据库连接池,它内部持有一些绑定到创建它的运行时的TCP连接引用。如果你有两个都标记为#[tokio::test]
并访问该db池的测试用例,很可能其中一个会失败并出现如下错误:
thread 't3' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "A Tokio 1.x context was found, but it is being shutdown." }'
这个crate提供了一个宏,使用共享的tokio运行时来运行测试用例以避免这个问题。只需将#[tokio::test]
替换为#[tokio_shared_rt::test(shared)]
,现在测试就能通过了。
使用方法
#[tokio_shared_rt::test]
async fn t1() {
db_pool().foo().await;
}
#[tokio_shared_rt::test(shared)]
async fn t2() {
db_pool().foo().await;
}
#[tokio_shared_rt::test(shared = true)]
async fn t3() {
db_pool().foo().await;
}
#[tokio_shared_rt::test(shared = false)]
async fn delicate_runtime_test() {
db_pool().foo().await;
}
完整示例
下面是一个更完整的示例,展示如何在多个测试用例间共享Tokio运行时实例:
use once_cell::sync::Lazy;
use tokio_shared_rt as shared_rt;
// 模拟一个全局数据库连接池
static DB_POOL: Lazy<DbPool> = Lazy::new(|| {
// 在初始化时创建运行时并连接数据库
tokio::runtime::Runtime::new().unwrap().block_on(async {
DbPool::connect("postgres://user:pass@localhost/db").await.unwrap()
})
});
// 使用共享运行时的测试1
#[shared_rt::test(shared)]
async fn test_user_query() {
let result = DB_POOL.query("SELECT * FROM users WHERE id = 1").await;
assert!(result.is_ok());
}
// 使用共享运行时的测试2
#[shared_rt::test(shared = true)]
async fn test_product_query() {
let result = DB_POOL.query("SELECT * FROM products LIMIT 10").await;
assert!(result.is_ok());
}
// 不使用共享运行时的隔离测试
#[shared_rt::test(shared = false)]
async fn isolated_connection_test() {
// 创建独立的连接池
let pool = DbPool::connect("postgres://user:pass@localhost/db").await.unwrap();
let result = pool.query("SELECT NOW()").await;
assert!(result.is_ok());
}
// 数据库连接池实现
struct DbPool {
// 这里应该有实际的连接池实现
// 为示例简化,我们只模拟行为
}
impl DbPool {
async fn connect(_url: &str) -> Result<Self, String> {
// 模拟连接延迟
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(Self)
}
async fn query(&self, _sql: &str) -> Result<Vec<String>, String> {
// 模拟查询延迟
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
Ok(vec!["mock_result".to_string()])
}
}
#[cfg(test)]
mod tests {
use super::*;
// 常规tokio测试无法共享运行时
#[tokio::test]
async fn regular_tokio_test() {
let pool = DbPool::connect("temp_url").await.unwrap();
assert!(pool.query("SELECT 1").await.is_ok());
}
}
在这个完整示例中:
- 我们使用
once_cell::Lazy
来创建全局的数据库连接池 test_user_query
和test_product_query
使用共享的Tokio运行时isolated_connection_test
使用独立的运行时- 还包含了一个常规的
#[tokio::test]
作为对比 - 为
DbPool
添加了更真实的模拟实现,包括连接延迟和查询延迟
要使用这个示例,你需要在Cargo.toml中添加依赖:
[dependencies]
once_cell = "1.18.0"
tokio = { version = "1.0", features = ["full"] }
tokio-shared-rt = "0.1"
这种模式特别适合测试中需要共享昂贵资源(如数据库连接池)的场景,可以显著提高测试执行效率。
1 回复
Rust异步运行时共享库tokio-shared-rt的使用指南
介绍
tokio-shared-rt
是一个Rust库,它允许在多个任务或线程之间高效共享Tokio运行时实例。在需要跨多个组件或模块使用同一个Tokio运行时的场景下,这个库特别有用,避免了创建多个运行时实例带来的性能开销。
主要特性
- 提供线程安全的Tokio运行时共享机制
- 支持多线程环境下的运行时共享
- 简化异步代码中运行时的管理
- 兼容Tokio的各种功能特性
使用方法
基本安装
首先在Cargo.toml中添加依赖:
[dependencies]
tokio-shared-rt = "0.1"
tokio = { version = "1.0", features = ["full"] }
基本使用示例
use tokio_shared_rt::SharedRuntime;
#[tokio::main]
async fn main() {
// 创建共享运行时
let shared_rt = SharedRuntime::new();
// 获取运行时引用
let rt = shared_rt.get();
// 使用运行时生成异步任务
rt.spawn(async {
println!("Task running on shared runtime");
});
// 等待任务完成
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
多线程共享示例
use std::sync::Arc;
use tokio_shared_rt::SharedRuntime;
#[tokio::main]
async fn main() {
let shared_rt = Arc::new(SharedRuntime::new());
let handles: Vec<_> = (0..4).map(|i| {
let rt = shared_rt.clone();
std::thread::spawn(move || {
let rt = rt.get();
rt.block_on(async {
println!("Thread {} executing async task", i);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("Thread {} finished", i);
});
})
}).collect();
for handle in handles {
handle.join().unwrap();
}
}
与结构体结合使用
use tokio_shared_rt::SharedRuntime;
struct AsyncService {
rt: SharedRuntime,
}
impl AsyncService {
fn new() -> Self {
Self {
rt: SharedRuntime::new(),
}
}
async fn perform_task(&self) {
self.rt.get().spawn(async {
println!("Performing background task");
});
}
}
#[tokio::main]
async fn main() {
let service = AsyncService::new();
service.perform_task().await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
高级用法
自定义运行时配置
use tokio_shared_rt::SharedRuntime;
use tokio::runtime::Builder;
let rt = Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap();
let shared_rt = SharedRuntime::from(rt);
跨模块共享
// 在lib.rs或main.rs中
pub static SHARED_RT: Lazy<SharedRuntime> = Lazy::new(SharedRuntime::new);
// 在其他模块中使用
use crate::SHARED_RT;
async fn some_function() {
SHARED_RT.get().spawn(async {
// 任务代码
});
}
完整示例demo
下面是一个结合多个特性的完整示例:
use std::sync::Arc;
use tokio::time::{sleep, Duration};
use tokio_shared_rt::SharedRuntime;
use once_cell::sync::Lazy;
// 全局共享运行时
pub static GLOBAL_RT: Lazy<Arc<SharedRuntime>> = Lazy::new(|| {
Arc::new(SharedRuntime::new())
});
struct TaskRunner {
rt: Arc<SharedRuntime>,
id: u32,
}
impl TaskRunner {
fn new(id: u32) -> Self {
Self {
rt: GLOBAL_RT.clone(),
id,
}
}
async fn run(&self) {
let rt = self.rt.get();
rt.spawn(async move {
println!("Task {} started", self.id);
sleep(Duration::from_millis(200)).await;
println!("Task {} completed", self.id);
});
}
}
#[tokio::main]
async fn main() {
// 创建多个任务运行器
let runners: Vec<_> = (0..5).map(|i| TaskRunner::new(i)).collect();
// 并行运行所有任务
let tasks: Vec<_> = runners.iter().map(|r| r.run()).collect();
futures::future::join_all(tasks).await;
// 等待所有任务完成
sleep(Duration::from_secs(1)).await;
}
注意事项
- 确保不要在不同的线程中创建多个
SharedRuntime
实例,除非确实需要多个运行时 - 在大多数情况下,应用程序应该只有一个共享的Tokio运行时实例
- 使用
Arc<SharedRuntime>
来跨线程共享运行时引用 - 注意运行时的生命周期管理,避免在运行时被丢弃后继续使用
tokio-shared-rt
简化了Tokio运行时在复杂应用中的管理,特别是在需要跨多个组件或线程共享运行时的场景下,能够显著提高代码的整洁性和性能。