Rust事件触发器库triggered的使用,实现高效异步事件驱动的编程模式

Rust事件触发器库triggered的使用,实现高效异步事件驱动的编程模式

概述

triggered是一个用于任务和线程之间一次性事件触发的机制。

该机制包含两种类型:TriggerListener。它们成对出现,类似于通道的发送器/接收器对。触发器部分有一个Trigger::trigger方法,该方法将使所有等待监听器的任务/线程继续执行。 监听器既有同步的Listener::wait方法,也实现了Future<Output = ()>以支持异步。

TriggerListener都可以被克隆。因此,任意数量的触发器实例可以触发任意数量的等待监听器。当属于该对的任何一个触发器实例被触发时,所有等待的监听器将被解除阻塞。等待已经触发的监听器将立即返回。因此,每个触发器/监听器对只能触发一次。

此crate不使用任何unsafe代码。

示例

显示基本用法的简单示例:

#[tokio::main]
async fn main() {
    let (trigger, listener) = triggered::trigger();

    let task = tokio::spawn(async {
        // 阻塞直到下面的`trigger.trigger()`
        listener.await;

        println!("Triggered async task");
    });

    // 这将使任何在`Listener::wait()`中阻塞的线程或等待监听器的异步任务继续执行
    trigger.trigger();

    let _ = task.await;
}

显示触发器/监听器对被用于在Ctrl-C事件上优雅关闭一些异步服务器实例的示例,其中只接受不可变的Fn闭包:

#[tokio::main]
async fn main() -> Result<(), Error> {
    let (shutdown_trigger, shutdown_signal1) = triggered::trigger();

    // 当用户按下Ctrl-C时,同步`Fn`闭包将触发触发器
    ctrlc::set_handler(move || {
        shutdown_trigger.trigger();
    }).expect("Error setting Ctrl-C handler");

    // 如果服务器库支持类似关闭信号的功能:
    let shutdown_signal2 = shutdown_signal1.clone();
    let server1_task = tokio::spawn(async move {
        SomeServer::new().serve_with_shutdown_signal(shutdown_signal1).await;
    });

    // 或者在长时间运行的future和信号之间进行选择以中止它
    tokio::select! {
        server_result = SomeServer::new().serve() => {
            eprintln!("Server error: {:?}", server_result);
        }
        _ = shutdown_signal2 => {}
    }

    let _ = server1_task.await;
    Ok(())
}

完整示例代码

use triggered;
use tokio;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    // 创建触发器/监听器对
    let (trigger, listener) = triggered::trigger();
    
    // 克隆监听器用于多个任务
    let listener2 = listener.clone();

    // 启动第一个异步任务
    let task1 = tokio::spawn(async move {
        println!("Task 1 waiting for trigger...");
        listener.await;
        println!("Task 1 triggered!");
    });

    // 启动第二个异步任务
    let task2 = tokio::spawn(async move {
        println!("Task 2 waiting for trigger...");
        listener2.await;
        println!("Task 2 triggered!");
    });

    // 模拟一些工作
    tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
    
    // 触发所有等待的监听器
    println!("Triggering events...");
    trigger.trigger();

    // 等待所有任务完成
    let _ = task1.await;
    let _ = task2.await;
    
    println!("All tasks completed successfully");
    Ok(())
}

与类似原语的比较

通道

此库中的事件触发原语与通道有些相似。主要区别以及我开发此库的原因是:

监听器有些类似于futures::channel::oneshot::Receiver<()>。但它:

  • 不可失败 - 实现Future<Output = ()>而不是Future<Output = Result<T, Canceled>>
  • 实现Clone - 任意数量的监听器可以等待同一事件
  • 具有同步的Listener::wait - 同步线程和异步任务可以同时等待

触发器与futures::channel::oneshot::Sender<()>相比,不同之处在于:

  • 不可失败 - 触发器不关心是否还有监听器存在
  • 发送时不消耗自身,而是采用&self - 因此可以在不拥有或不可变的情况下使用。例如在Drop实现中或限于FnFnMut的回调闭包中。

futures::future::Abortable

这些触发器的一个用例是在某些事件发生时中止future。请参见上面的示例。区别包括:

  • 单个句柄可以中止任意数量的future
  • 某些future在仅以Abortable的方式丢弃时无法正确清理。这些库有时允许使用触发干净中止的关闭信号来创建它们的future。类似于serve_with_shutdown(signal: impl Future<Output = ()>)

1 回复

Rust事件触发器库triggered的使用指南

介绍

triggered是一个轻量级的Rust事件触发器库,专门用于实现高效的异步事件驱动编程模式。该库提供了简单易用的API来创建和管理事件触发器,支持跨线程的事件通知,非常适合构建响应式系统和异步任务协调。

核心特性

  • 零成本抽象:高性能的事件触发机制
  • 线程安全:支持跨线程事件通知
  • 异步友好:与async/await完美集成
  • 轻量级:最小化依赖和运行时开销

安装方法

在Cargo.toml中添加依赖:

[dependencies]
triggered = "1.2"

基本使用方法

1. 创建事件触发器

use triggered::{Trigger, Listener};

// 创建触发器和监听器对
let (trigger, listener) = triggered::trigger();

2. 触发事件

// 在需要触发事件的地方
trigger.trigger();

3. 等待事件

// 异步等待事件触发
async {
    listener.await;
    println!("事件已触发!");
};

完整示例

示例1:基本事件触发

use triggered::{Trigger, Listener};
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    let (trigger, listener) = triggered::trigger();
    
    // 启动一个异步任务来触发事件
    tokio::spawn(async move {
        sleep(Duration::from_secs(2)).await;
        println!("准备触发事件...");
        trigger.trigger();
    });
    
    println!("等待事件触发...");
    listener.await;
    println!("事件已成功触发!");
}

示例2:多监听器场景

use triggered::Trigger;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let trigger = Arc::new(Trigger::new());
    
    // 创建多个监听器
    let listener1 = trigger.listen();
    let listener2 = trigger.listen();
    
    tokio::spawn(async move {
        println!("监听器1等待中...");
        listener1.await;
        println!("监听器1收到事件!");
    });
    
    tokio::spawn(async move {
        println!("监听器2等待中...");
        listener2.await;
        println!("监听器2收到事件!");
    });
    
    // 触发所有监听器
    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    trigger.trigger();
    
    // 等待所有任务完成
    tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}

示例3:超时处理

use triggered::{Trigger, Listener};
use tokio::time::{timeout, Duration};

#[tokio::main]
async fn main() {
    let (trigger, listener) = triggered::trigger();
    
    // 设置5秒超时
    match timeout(Duration::from_secs(5), listener).await {
        Ok(_) => println!("事件在超时前触发"),
        Err(_) => println!("等待超时,事件未触发"),
    }
    
    // 如果需要,仍然可以触发(虽然监听器已超时)
    trigger.trigger();
}

高级用法

一次性触发器

let trigger = Trigger::new();
let listener = trigger.listen();

// 触发后所有当前和未来的监听器都会立即收到通知
trigger.trigger();

// 新的监听器也会立即完成
let new_listener = trigger.listen();

与其他异步原语结合使用

use triggered::Trigger;
use futures::future::select;
use std::pin::Pin;

async fn wait_for_event_or_condition(trigger: Trigger, other_future: Pin<&mut dyn Future>) {
    let listener = trigger.listen();
    select(listener, other_future).await;
}

注意事项

  1. 触发器一旦触发,所有当前和未来的监听器都会立即完成
  2. 监听器的等待是异步的,不会阻塞线程
  3. 支持多个生产者多个消费者模式
  4. 内存占用极小,适合高性能场景

性能建议

  • 在热点路径中重用触发器实例
  • 避免频繁创建和销毁触发器
  • 考虑使用Arc来共享触发器引用

这个库为Rust异步编程提供了简单而强大的事件驱动模式支持。

完整示例demo

use triggered::{Trigger, Listener};
use std::sync::Arc;
use std::time::Duration;
use tokio::time::{sleep, timeout};

#[tokio::main]
async fn main() {
    println!("=== 基本事件触发示例 ===");
    basic_trigger_example().await;
    
    println!("\n=== 多监听器场景示例 ===");
    multiple_listeners_example().await;
    
    println!("\n=== 超时处理示例 ===");
    timeout_example().await;
}

// 基本事件触发示例
async fn basic_trigger_example() {
    // 创建触发器和监听器对
    let (trigger, listener) = triggered::trigger();
    
    // 启动异步任务在2秒后触发事件
    tokio::spawn(async move {
        sleep(Duration::from_secs(2)).await;
        println!("准备触发事件...");
        trigger.trigger();
    });
    
    println!("等待事件触发...");
    listener.await;
    println!("事件已成功触发!");
}

// 多监听器场景示例
async fn multiple_listeners_example() {
    let trigger = Arc::new(Trigger::new());
    
    // 创建多个监听器
    let listener1 = trigger.listen();
    let listener2 = trigger.listen();
    let listener3 = trigger.listen();
    
    // 启动多个异步任务等待事件
    let task1 = tokio::spawn(async move {
        println!("监听器1等待中...");
        listener1.await;
        println!("监听器1收到事件!");
    });
    
    let task2 = tokio::spawn(async move {
        println!("监听器2等待中...");
        listener2.await;
        println!("监听器2收到事件!");
    });
    
    let task3 = tokio::spawn(async move {
        println!("监听器3等待中...");
        listener3.await;
        println!("监听器3收到事件!");
    });
    
    // 等待1秒后触发所有监听器
    sleep(Duration::from_secs(1)).await;
    println!("触发所有监听器...");
    trigger.trigger();
    
    // 等待所有任务完成
    let _ = tokio::join!(task1, task2, task3);
}

// 超时处理示例
async fn timeout_example() {
    let (trigger, listener) = triggered::trigger();
    
    // 设置3秒超时
    match timeout(Duration::from_secs(3), listener).await {
        Ok(_) => println!("事件在超时前触发"),
        Err(_) => println!("等待超时,事件未触发"),
    }
    
    // 超时后仍然可以触发(虽然监听器已超时)
    trigger.trigger();
    println!("事件已触发(在超时后)");
    
    // 创建新的监听器测试一次性触发特性
    let new_listener = trigger.listen();
    println!("创建新的监听器...");
    new_listener.await;
    println!("新的监听器立即完成!");
}
回到顶部