Rust事件触发器库triggered的使用,实现高效异步事件驱动的编程模式
Rust事件触发器库triggered的使用,实现高效异步事件驱动的编程模式
概述
triggered是一个用于任务和线程之间一次性事件触发的机制。
该机制包含两种类型:Trigger
和Listener
。它们成对出现,类似于通道的发送器/接收器对。触发器部分有一个Trigger::trigger
方法,该方法将使所有等待监听器的任务/线程继续执行。
监听器既有同步的Listener::wait
方法,也实现了Future<Output = ()>
以支持异步。
Trigger
和Listener
都可以被克隆。因此,任意数量的触发器实例可以触发任意数量的等待监听器。当属于该对的任何一个触发器实例被触发时,所有等待的监听器将被解除阻塞。等待已经触发的监听器将立即返回。因此,每个触发器/监听器对只能触发一次。
此crate不使用任何unsafe
代码。
示例
显示基本用法的简单示例:
#[tokio::main]
async fn main() {
let (trigger, listener) = triggered::trigger();
let task = tokio::spawn(async {
// 阻塞直到下面的`trigger.trigger()`
listener.await;
println!("Triggered async task");
});
// 这将使任何在`Listener::wait()`中阻塞的线程或等待监听器的异步任务继续执行
trigger.trigger();
let _ = task.await;
}
显示触发器/监听器对被用于在Ctrl-C事件上优雅关闭一些异步服务器实例的示例,其中只接受不可变的Fn
闭包:
#[tokio::main]
async fn main() -> Result<(), Error> {
let (shutdown_trigger, shutdown_signal1) = triggered::trigger();
// 当用户按下Ctrl-C时,同步`Fn`闭包将触发触发器
ctrlc::set_handler(move || {
shutdown_trigger.trigger();
}).expect("Error setting Ctrl-C handler");
// 如果服务器库支持类似关闭信号的功能:
let shutdown_signal2 = shutdown_signal1.clone();
let server1_task = tokio::spawn(async move {
SomeServer::new().serve_with_shutdown_signal(shutdown_signal1).await;
});
// 或者在长时间运行的future和信号之间进行选择以中止它
tokio::select! {
server_result = SomeServer::new().serve() => {
eprintln!("Server error: {:?}", server_result);
}
_ = shutdown_signal2 => {}
}
let _ = server1_task.await;
Ok(())
}
完整示例代码
use triggered;
use tokio;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// 创建触发器/监听器对
let (trigger, listener) = triggered::trigger();
// 克隆监听器用于多个任务
let listener2 = listener.clone();
// 启动第一个异步任务
let task1 = tokio::spawn(async move {
println!("Task 1 waiting for trigger...");
listener.await;
println!("Task 1 triggered!");
});
// 启动第二个异步任务
let task2 = tokio::spawn(async move {
println!("Task 2 waiting for trigger...");
listener2.await;
println!("Task 2 triggered!");
});
// 模拟一些工作
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
// 触发所有等待的监听器
println!("Triggering events...");
trigger.trigger();
// 等待所有任务完成
let _ = task1.await;
let _ = task2.await;
println!("All tasks completed successfully");
Ok(())
}
与类似原语的比较
通道
此库中的事件触发原语与通道有些相似。主要区别以及我开发此库的原因是:
监听器有些类似于futures::channel::oneshot::Receiver<()>
。但它:
- 不可失败 - 实现
Future<Output = ()>
而不是Future<Output = Result<T, Canceled>>
- 实现
Clone
- 任意数量的监听器可以等待同一事件 - 具有同步的
Listener::wait
- 同步线程和异步任务可以同时等待
触发器与futures::channel::oneshot::Sender<()>
相比,不同之处在于:
- 不可失败 - 触发器不关心是否还有监听器存在
- 发送时不消耗自身,而是采用
&self
- 因此可以在不拥有或不可变的情况下使用。例如在Drop
实现中或限于Fn
或FnMut
的回调闭包中。
futures::future::Abortable
这些触发器的一个用例是在某些事件发生时中止future。请参见上面的示例。区别包括:
- 单个句柄可以中止任意数量的future
- 某些future在仅以
Abortable
的方式丢弃时无法正确清理。这些库有时允许使用触发干净中止的关闭信号来创建它们的future。类似于serve_with_shutdown(signal: impl Future<Output = ()>)
。
Rust事件触发器库triggered的使用指南
介绍
triggered是一个轻量级的Rust事件触发器库,专门用于实现高效的异步事件驱动编程模式。该库提供了简单易用的API来创建和管理事件触发器,支持跨线程的事件通知,非常适合构建响应式系统和异步任务协调。
核心特性
- 零成本抽象:高性能的事件触发机制
- 线程安全:支持跨线程事件通知
- 异步友好:与async/await完美集成
- 轻量级:最小化依赖和运行时开销
安装方法
在Cargo.toml中添加依赖:
[dependencies]
triggered = "1.2"
基本使用方法
1. 创建事件触发器
use triggered::{Trigger, Listener};
// 创建触发器和监听器对
let (trigger, listener) = triggered::trigger();
2. 触发事件
// 在需要触发事件的地方
trigger.trigger();
3. 等待事件
// 异步等待事件触发
async {
listener.await;
println!("事件已触发!");
};
完整示例
示例1:基本事件触发
use triggered::{Trigger, Listener};
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() {
let (trigger, listener) = triggered::trigger();
// 启动一个异步任务来触发事件
tokio::spawn(async move {
sleep(Duration::from_secs(2)).await;
println!("准备触发事件...");
trigger.trigger();
});
println!("等待事件触发...");
listener.await;
println!("事件已成功触发!");
}
示例2:多监听器场景
use triggered::Trigger;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let trigger = Arc::new(Trigger::new());
// 创建多个监听器
let listener1 = trigger.listen();
let listener2 = trigger.listen();
tokio::spawn(async move {
println!("监听器1等待中...");
listener1.await;
println!("监听器1收到事件!");
});
tokio::spawn(async move {
println!("监听器2等待中...");
listener2.await;
println!("监听器2收到事件!");
});
// 触发所有监听器
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
trigger.trigger();
// 等待所有任务完成
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
示例3:超时处理
use triggered::{Trigger, Listener};
use tokio::time::{timeout, Duration};
#[tokio::main]
async fn main() {
let (trigger, listener) = triggered::trigger();
// 设置5秒超时
match timeout(Duration::from_secs(5), listener).await {
Ok(_) => println!("事件在超时前触发"),
Err(_) => println!("等待超时,事件未触发"),
}
// 如果需要,仍然可以触发(虽然监听器已超时)
trigger.trigger();
}
高级用法
一次性触发器
let trigger = Trigger::new();
let listener = trigger.listen();
// 触发后所有当前和未来的监听器都会立即收到通知
trigger.trigger();
// 新的监听器也会立即完成
let new_listener = trigger.listen();
与其他异步原语结合使用
use triggered::Trigger;
use futures::future::select;
use std::pin::Pin;
async fn wait_for_event_or_condition(trigger: Trigger, other_future: Pin<&mut dyn Future>) {
let listener = trigger.listen();
select(listener, other_future).await;
}
注意事项
- 触发器一旦触发,所有当前和未来的监听器都会立即完成
- 监听器的等待是异步的,不会阻塞线程
- 支持多个生产者多个消费者模式
- 内存占用极小,适合高性能场景
性能建议
- 在热点路径中重用触发器实例
- 避免频繁创建和销毁触发器
- 考虑使用Arc来共享触发器引用
这个库为Rust异步编程提供了简单而强大的事件驱动模式支持。
完整示例demo
use triggered::{Trigger, Listener};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::{sleep, timeout};
#[tokio::main]
async fn main() {
println!("=== 基本事件触发示例 ===");
basic_trigger_example().await;
println!("\n=== 多监听器场景示例 ===");
multiple_listeners_example().await;
println!("\n=== 超时处理示例 ===");
timeout_example().await;
}
// 基本事件触发示例
async fn basic_trigger_example() {
// 创建触发器和监听器对
let (trigger, listener) = triggered::trigger();
// 启动异步任务在2秒后触发事件
tokio::spawn(async move {
sleep(Duration::from_secs(2)).await;
println!("准备触发事件...");
trigger.trigger();
});
println!("等待事件触发...");
listener.await;
println!("事件已成功触发!");
}
// 多监听器场景示例
async fn multiple_listeners_example() {
let trigger = Arc::new(Trigger::new());
// 创建多个监听器
let listener1 = trigger.listen();
let listener2 = trigger.listen();
let listener3 = trigger.listen();
// 启动多个异步任务等待事件
let task1 = tokio::spawn(async move {
println!("监听器1等待中...");
listener1.await;
println!("监听器1收到事件!");
});
let task2 = tokio::spawn(async move {
println!("监听器2等待中...");
listener2.await;
println!("监听器2收到事件!");
});
let task3 = tokio::spawn(async move {
println!("监听器3等待中...");
listener3.await;
println!("监听器3收到事件!");
});
// 等待1秒后触发所有监听器
sleep(Duration::from_secs(1)).await;
println!("触发所有监听器...");
trigger.trigger();
// 等待所有任务完成
let _ = tokio::join!(task1, task2, task3);
}
// 超时处理示例
async fn timeout_example() {
let (trigger, listener) = triggered::trigger();
// 设置3秒超时
match timeout(Duration::from_secs(3), listener).await {
Ok(_) => println!("事件在超时前触发"),
Err(_) => println!("等待超时,事件未触发"),
}
// 超时后仍然可以触发(虽然监听器已超时)
trigger.trigger();
println!("事件已触发(在超时后)");
// 创建新的监听器测试一次性触发特性
let new_listener = trigger.listen();
println!("创建新的监听器...");
new_listener.await;
println!("新的监听器立即完成!");
}