Rust异步文件监控库async-watch的使用,高效监听文件系统变更并实时响应
Rust异步文件监控库async-watch的使用,高效监听文件系统变更并实时响应
async-watch 是一个单生产者、多消费者的通道,仅保留最后发送的值。该库是从 Tokio 的 tokio::sync::watch
实现中提取出来的,由 Carl Lerche 编写。
安装
在你的项目目录中运行以下 Cargo 命令:
cargo add async-watch
或者在你的 Cargo.toml 中添加以下行:
async-watch = "0.3.1"
基本使用示例
use async_watch::channel;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::main]
async fn main() {
// 创建一个 watch 通道,初始值为 0
let (sender, mut receiver) = channel(0);
// 启动一个任务来监听变化
let handle = tokio::spawn(async move {
while receiver.changed().await.is_ok() {
println!("Received value: {}", *receiver.borrow());
}
});
// 主线程发送更新
for i in 1..=5 {
sleep(Duration::from_secs(1)).await;
sender.send(i).unwrap();
}
// 等待监听任务完成
handle.await.unwrap();
}
文件监控完整示例
下面是一个使用 async-watch 监控文件变化的完整示例:
use async_watch::channel;
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use std::path::Path;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建 watch 通道来传递文件变化事件
let (sender, mut receiver) = channel(None);
// 创建文件系统watcher
let (tx, rx) = mpsc::channel(32);
let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res| {
tx.blocking_send(res).unwrap();
})?;
// 监控当前目录
watcher.watch(Path::new("."), RecursiveMode::Recursive)?;
// 启动任务处理文件变化事件
tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
if let Ok(event) = event {
sender.send(Some(event)).unwrap();
}
}
});
// 主循环处理接收到的文件变化事件
while receiver.changed().await.is_ok() {
if let Some(event) = *receiver.borrow() {
println!("File system event: {:?}", event);
}
}
Ok(())
}
代码说明
-
首先我们创建了一个 async-watch 通道,用于在文件系统监视器和主程序之间传递事件。
-
使用
notify
库创建一个文件系统监视器,它会将事件发送到一个 MPSC 通道。 -
启动一个异步任务将 MPSC 通道中的事件转发到 async-watch 通道中。
-
主循环通过
receiver.changed().await
等待新的事件,并打印出来。
依赖
要运行此示例,你需要在 Cargo.toml 中添加以下依赖:
[dependencies]
async-watch = "0.3"
tokio = { version = "1.0", features = ["full"] }
notify = "5.0"
许可证
async-watch 采用 MIT 许可证和 Apache 许可证(2.0 版本)双重许可。
1 回复
以下是基于您提供的async-watch
库使用指南的完整示例代码,包含所有基础功能和高级用法:
// 完整示例:async-watch文件监控系统
use async_watch::{EventKind, RecursiveMode, Watcher};
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
#[tokio::main]
async fn main() -> std::io::Result<()> {
// 1. 初始化监控器
let mut watcher = Watcher::new()?;
// 2. 添加监控路径(演示所有用法)
// 递归监控src目录
watcher.watch("./src", RecursiveMode::Recursive)?;
// 非递归监控单个文件
watcher.watch("./config.toml", RecursiveMode::NonRecursive)?;
// 递归监控logs目录(用于性能优化演示)
watcher.watch("./logs", RecursiveMode::Recursive)?;
// 3. 创建事件通道(用于高级处理)
let (tx, mut rx) = mpsc::channel(100);
// 4. 启动独立任务处理事件
tokio::spawn(async move {
let mut last_processed = Instant::now();
let debounce_duration = Duration::from_millis(300);
let mut batch_events = Vec::new();
while let Some(event) = rx.recv().await {
// 性能优化:批量处理+防抖
batch_events.push(event);
if last_processed.elapsed() > debounce_duration && !batch_events.is_empty() {
println!("\n--- 处理批量事件(共{}个)---", batch_events.len());
for event in batch_events.drain(..) {
match event.kind {
EventKind::Create(_) => {
println!("[创建] {:?}", event.paths)
}
EventKind::Modify(_) => {
println!("[修改] {:?}", event.paths)
}
EventKind::Remove(_) => {
println!("[删除] {:?}", event.paths)
}
_ => println!("[其他事件] {:?}", event),
}
}
last_processed = Instant::now();
}
}
});
// 5. 主事件循环
println!("开始监控文件系统... (Ctrl+C退出)");
loop {
match watcher.recv().await {
Ok(event) => {
// 将事件发送到处理通道
if let Err(e) = tx.send(event).await {
eprintln!("事件发送失败: {}", e);
}
}
Err(e) => eprintln!("监控错误: {}", e),
}
}
}
代码说明:
-
多路径监控:同时监控了
./src
目录(递归)、./config.toml
文件(非递归)和./logs
目录(递归) -
高级事件处理:
- 使用Tokio的MPSC通道进行事件传递
- 在工作线程中实现批量处理+防抖逻辑
- 对事件类型进行分类处理
-
性能优化:
- 300毫秒的防抖间隔(可调整)
- 批量收集事件后统一处理
- 避免阻塞主监控循环
-
错误处理:
- 主循环和处理线程都有错误处理
- 使用
?
操作符处理初始化错误
-
扩展性:
- 可以轻松添加更多监控路径
- 处理逻辑可以独立修改而不影响监控主循环
使用建议:
- 将此代码保存为
main.rs
- 确保
Cargo.toml
包含正确依赖:
[dependencies]
async-watch = "0.3"
tokio = { version = "1.0", features = ["full", "rt-multi-thread"] }
- 在项目目录中创建测试用的
src
和logs
目录,以及config.toml
文件 - 运行程序后尝试在这些路径中创建/修改/删除文件观察输出