Rust异步文件监控库async-watch的使用,高效监听文件系统变更并实时响应

Rust异步文件监控库async-watch的使用,高效监听文件系统变更并实时响应

async-watch 是一个单生产者、多消费者的通道,仅保留最后发送的值。该库是从 Tokio 的 tokio::sync::watch 实现中提取出来的,由 Carl Lerche 编写。

安装

在你的项目目录中运行以下 Cargo 命令:

cargo add async-watch

或者在你的 Cargo.toml 中添加以下行:

async-watch = "0.3.1"

基本使用示例

use async_watch::channel;
use std::time::Duration;
use tokio::time::sleep;

#[tokio::main]
async fn main() {
    // 创建一个 watch 通道,初始值为 0
    let (sender, mut receiver) = channel(0);
    
    // 启动一个任务来监听变化
    let handle = tokio::spawn(async move {
        while receiver.changed().await.is_ok() {
            println!("Received value: {}", *receiver.borrow());
        }
    });
    
    // 主线程发送更新
    for i in 1..=5 {
        sleep(Duration::from_secs(1)).await;
        sender.send(i).unwrap();
    }
    
    // 等待监听任务完成
    handle.await.unwrap();
}

文件监控完整示例

下面是一个使用 async-watch 监控文件变化的完整示例:

use async_watch::channel;
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use std::path::Path;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建 watch 通道来传递文件变化事件
    let (sender, mut receiver) = channel(None);
    
    // 创建文件系统watcher
    let (tx, rx) = mpsc::channel(32);
    let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res| {
        tx.blocking_send(res).unwrap();
    })?;
    
    // 监控当前目录
    watcher.watch(Path::new("."), RecursiveMode::Recursive)?;
    
    // 启动任务处理文件变化事件
    tokio::spawn(async move {
        while let Ok(event) = rx.recv().await {
            if let Ok(event) = event {
                sender.send(Some(event)).unwrap();
            }
        }
    });
    
    // 主循环处理接收到的文件变化事件
    while receiver.changed().await.is_ok() {
        if let Some(event) = *receiver.borrow() {
            println!("File system event: {:?}", event);
        }
    }
    
    Ok(())
}

代码说明

  1. 首先我们创建了一个 async-watch 通道,用于在文件系统监视器和主程序之间传递事件。

  2. 使用 notify 库创建一个文件系统监视器,它会将事件发送到一个 MPSC 通道。

  3. 启动一个异步任务将 MPSC 通道中的事件转发到 async-watch 通道中。

  4. 主循环通过 receiver.changed().await 等待新的事件,并打印出来。

依赖

要运行此示例,你需要在 Cargo.toml 中添加以下依赖:

[dependencies]
async-watch = "0.3"
tokio = { version = "1.0", features = ["full"] }
notify = "5.0"

许可证

async-watch 采用 MIT 许可证和 Apache 许可证(2.0 版本)双重许可。


1 回复

以下是基于您提供的async-watch库使用指南的完整示例代码,包含所有基础功能和高级用法:

// 完整示例:async-watch文件监控系统
use async_watch::{EventKind, RecursiveMode, Watcher};
use std::time::{Duration, Instant};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // 1. 初始化监控器
    let mut watcher = Watcher::new()?;
    
    // 2. 添加监控路径(演示所有用法)
    // 递归监控src目录
    watcher.watch("./src", RecursiveMode::Recursive)?;
    // 非递归监控单个文件
    watcher.watch("./config.toml", RecursiveMode::NonRecursive)?;
    // 递归监控logs目录(用于性能优化演示)
    watcher.watch("./logs", RecursiveMode::Recursive)?;

    // 3. 创建事件通道(用于高级处理)
    let (tx, mut rx) = mpsc::channel(100);

    // 4. 启动独立任务处理事件
    tokio::spawn(async move {
        let mut last_processed = Instant::now();
        let debounce_duration = Duration::from_millis(300);
        let mut batch_events = Vec::new();

        while let Some(event) = rx.recv().await {
            // 性能优化:批量处理+防抖
            batch_events.push(event);
            
            if last_processed.elapsed() > debounce_duration && !batch_events.is_empty() {
                println!("\n--- 处理批量事件(共{}个)---", batch_events.len());
                
                for event in batch_events.drain(..) {
                    match event.kind {
                        EventKind::Create(_) => {
                            println!("[创建] {:?}", event.paths)
                        }
                        EventKind::Modify(_) => {
                            println!("[修改] {:?}", event.paths)
                        }
                        EventKind::Remove(_) => {
                            println!("[删除] {:?}", event.paths)
                        }
                        _ => println!("[其他事件] {:?}", event),
                    }
                }
                last_processed = Instant::now();
            }
        }
    });

    // 5. 主事件循环
    println!("开始监控文件系统... (Ctrl+C退出)");
    loop {
        match watcher.recv().await {
            Ok(event) => {
                // 将事件发送到处理通道
                if let Err(e) = tx.send(event).await {
                    eprintln!("事件发送失败: {}", e);
                }
            }
            Err(e) => eprintln!("监控错误: {}", e),
        }
    }
}

代码说明:

  1. 多路径监控:同时监控了./src目录(递归)、./config.toml文件(非递归)和./logs目录(递归)

  2. 高级事件处理

    • 使用Tokio的MPSC通道进行事件传递
    • 在工作线程中实现批量处理+防抖逻辑
    • 对事件类型进行分类处理
  3. 性能优化

    • 300毫秒的防抖间隔(可调整)
    • 批量收集事件后统一处理
    • 避免阻塞主监控循环
  4. 错误处理

    • 主循环和处理线程都有错误处理
    • 使用?操作符处理初始化错误
  5. 扩展性

    • 可以轻松添加更多监控路径
    • 处理逻辑可以独立修改而不影响监控主循环

使用建议:

  1. 将此代码保存为main.rs
  2. 确保Cargo.toml包含正确依赖:
[dependencies]
async-watch = "0.3"
tokio = { version = "1.0", features = ["full", "rt-multi-thread"] }
  1. 在项目目录中创建测试用的srclogs目录,以及config.toml文件
  2. 运行程序后尝试在这些路径中创建/修改/删除文件观察输出
回到顶部