Rust实时数据流处理库eyeball-im的使用,高性能消息传递与状态同步解决方案
Rust实时数据流处理库eyeball-im的使用,高性能消息传递与状态同步解决方案
eyeball-im是一个Rust库,用于实现实时数据流处理、高性能消息传递和状态同步。它特别适合GUI应用和异步编程场景。
安装
在项目中添加依赖:
eyeball-im = "0.7.0"
或者运行:
cargo add eyeball-im
示例代码
以下是使用eyeball-im进行实时数据流处理的完整示例:
use eyeball_im::{ObservableVector, VectorDiff};
use std::sync::Arc;
fn main() {
// 创建一个可观察的向量
let mut vec = ObservableVector::new();
// 订阅向量变化
let subscriber = vec.subscribe();
// 在另一个线程中修改向量
std::thread::spawn(move || {
vec.push_back("Hello".to_string());
vec.push_back("World".to_string());
});
// 在主线程中处理变化
while let Some(diff) = subscriber.next() {
match diff {
VectorDiff::PushBack { value } => {
println!("新元素添加到末尾: {}", value);
}
VectorDiff::Insert { index, value } => {
println!("在位置 {} 插入元素: {}", index, value);
}
_ => {} // 处理其他类型的变更
}
}
}
高级用法示例
use eyeball_im::{ObservableVector, VectorDiff};
use futures::StreamExt;
#[tokio::main]
async fn main() {
// 创建可观察向量
let mut vec = ObservableVector::new();
// 创建异步流订阅者
let mut stream = vec.subscribe().into_stream();
// 在另一个任务中修改向量
tokio::spawn(async move {
vec.push_back("Item 1".to_string());
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
vec.push_back("Item 2".to_string());
});
// 处理异步流
while let Some(diff) = stream.next().await {
match diff {
VectorDiff::PushBack { value } => {
println!("异步接收到新元素: {}", value);
}
_ => {}
}
}
}
完整示例demo
下面是一个结合GUI应用场景的完整示例,展示如何使用eyeball-im进行状态管理:
use eyeball_im::{ObservableVector, VectorDiff};
use tokio::sync::mpsc;
use std::sync::Arc;
#[tokio::main]
async fn main() {
// 创建可观察向量来存储聊天消息
let mut messages = ObservableVector::new();
// 创建消息通道用于UI线程和后台线程通信
let (ui_tx, mut ui_rx) = mpsc::channel(32);
// 克隆消息向量用于后台任务
let messages_for_background = messages.clone();
// 启动后台任务模拟接收消息
tokio::spawn(async move {
for i in 1..=5 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
messages_for_background.push_back(format!("消息 {}", i));
// 通知UI线程更新
ui_tx.send(format!("已接收消息 {}", i)).await.unwrap();
}
});
// 订阅向量变化
let mut subscriber = messages.subscribe();
// 模拟UI线程循环
loop {
tokio::select! {
// 处理新消息通知
Some(diff) = subscriber.next() => {
match diff {
VectorDiff::PushBack { value } => {
println!("[UI] 新消息: {}", value);
}
_ => {}
}
}
// 处理后台任务通知
Some(msg) = ui_rx.recv() => {
println!("[UI] 通知: {}", msg);
}
// 5秒后退出
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
println!("[UI] 退出");
break;
}
}
}
}
主要功能
- 可观察的数据结构:提供可观察的向量和地图等数据结构
- 高效变更传播:使用差分算法最小化变更通知的开销
- 线程安全:适合在多线程环境中使用
- 异步支持:可以与异步生态系统良好集成
适用场景
- GUI应用的状态管理
- 实时数据同步
- 需要观察数据变化的任何场景
该库遵循MPL-2.0许可证,由Jonas Platte和Ivan Enderlin维护。
1 回复
eyeball-im: Rust实时数据流处理与高性能消息传递库
介绍
eyeball-im 是一个专注于实时数据流处理和高性能消息传递的Rust库,特别适合需要状态同步和实时更新的应用场景。它提供了轻量级的消息传递机制和高效的状态管理能力,适用于即时通讯、实时协作、游戏状态同步等应用。
主要特性
- 高性能的消息传递机制
- 实时数据流处理能力
- 状态同步解决方案
- 线程安全的设计
- 低延迟的更新通知
使用方法
添加依赖
首先在Cargo.toml中添加依赖:
[dependencies]
eyeball-im = "0.1"
基本消息传递示例
use eyeball_im::{Observable, Observer};
fn main() {
// 创建一个可观察的字符串
let mut observable = Observable::new("初始值".to_string());
// 创建观察者
let observer = observable.subscribe();
// 更新值并通知观察者
observable.set("新值".to_string());
// 从观察者获取最新值
println!("当前值: {}", observer.get());
}
实时数据流处理
use eyeball_im::{ObservableVector, VectorObserver};
fn main() {
// 创建一个可观察的向量
let mut vec = ObservableVector::new();
let observer = vec.subscribe();
// 添加元素
vec.push_back("第一个元素");
vec.push_back("第二个元素");
// 获取向量快照
let snapshot = observer.snapshot();
println!("向量内容: {:?}", snapshot);
// 监听变化
for change in observer.changes() {
println!("检测到变化: {:?}", change);
}
}
高性能状态同步
use eyeball_im::{SharedObservable, SharedObserver};
use std::sync::Arc;
use std::thread;
fn main() {
// 创建共享可观察状态
let state = Arc::new(SharedObservable::new(0));
// 在多个线程中共享和修改状态
let handles: Vec<_> = (0..5).map(|i| {
let state = Arc::clone(&state);
thread::spawn(move || {
state.update(|value| *value += i);
})
}).collect();
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
// 获取最终状态
let observer = state.subscribe();
println!("最终状态值: {}", observer.get());
}
高级用法
自定义观察者行为
use eyeball_im::{Observable, Observer, Subscription};
struct CustomObserver {
inner: Observer<String>,
count: usize,
}
impl CustomObserver {
fn new(observable: &mut Observable<String>) -> Self {
Self {
inner: observable.subscribe(),
count: 0,
}
}
fn check_update(&mut self) -> Option<String> {
if self.inner.has_update() {
self.count += 1;
Some(self.inner.get().clone())
} else {
None
}
}
}
fn main() {
let mut observable = Observable::new("初始值".to_string());
let mut custom_observer = CustomObserver::new(&mut observable);
observable.set("更新1".to_string());
observable.set("更新2".to_string());
while let Some(value) = custom_observer.check_update() {
println!("收到更新 {}: {}", custom_observer.count, value);
}
}
过滤和转换数据流
use eyeball_im::{ObservableVector, VectorDiff, VectorObserver};
use std::convert::Infallible;
fn main() {
let mut vec = ObservableVector::new();
let observer = vec.subscribe();
// 添加过滤转换层
let filtered = observer.filter_map(|diff| {
match diff {
VectorDiff::PushBack { value } if value % 2 == 0 => {
Some(Ok(VectorDiff::PushBack { value: value * 2 }))
}
_ => None,
}
});
vec.push_back(1); // 会被过滤掉
vec.push_back(2); // 会被转换为4
for change in filtered.changes() {
println!("过滤后的变化: {:?}", change);
}
}
性能提示
- 对于高频更新场景,考虑批量更新而不是单次小更新
- 使用
SharedObservable
进行跨线程状态共享 - 合理使用
subscribe()
和unsubscribe()
管理观察者生命周期 - 对于大型数据结构,考虑使用不可变更新模式
eyeball-im 通过精心设计的数据结构和算法,在保证线程安全的同时提供了高性能的实时数据处理能力,是构建响应式应用的理想选择。
完整示例Demo
下面是一个结合了消息传递和状态同步的完整示例:
use eyeball_im::{Observable, SharedObservable, Observer, SharedObserver};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
// 定义应用状态结构
#[derive(Debug, Clone)]
struct AppState {
user_count: usize,
last_message: String,
}
fn main() {
// 创建共享应用状态
let shared_state = Arc::new(SharedObservable::new(AppState {
user_count: 0,
last_message: "初始消息".to_string(),
}));
// 创建消息通道
let mut message_observable = Observable::new("系统: 欢迎".to_string());
let message_observer = message_observable.subscribe();
// 启动状态更新线程
let state_clone = Arc::clone(&shared_state);
thread::spawn(move || {
for i in 1..=5 {
thread::sleep(Duration::from_secs(1));
// 更新状态
state_clone.update(|state| {
state.user_count += 1;
state.last_message = format!("用户{}加入了聊天", i);
});
// 发送消息
message_observable.set(format!("系统: 新用户{}加入", i));
}
});
// 主线程监听变化
let state_observer = shared_state.subscribe();
// 打印初始状态
println!("初始状态: {:?}", state_observer.get());
println!("初始消息: {}", message_observer.get());
// 监听状态变化
for _ in 0..5 {
while !state_observer.has_update() {
thread::sleep(Duration::from_millis(100));
}
println!("状态更新: {:?}", state_observer.get());
if message_observer.has_update() {
println!("新消息: {}", message_observer.get());
}
}
}
这个完整示例展示了:
- 使用
SharedObservable
进行跨线程状态共享 - 使用
Observable
进行实时消息传递 - 多线程环境下的状态更新和消息通知
- 观察者模式监听状态变化
运行结果会显示:
- 初始状态和消息
- 每秒更新一次用户数量和最后消息
- 同时接收系统消息通知