Rust实时数据流处理库eyeball-im的使用,高性能消息传递与状态同步解决方案

Rust实时数据流处理库eyeball-im的使用,高性能消息传递与状态同步解决方案

eyeball-im是一个Rust库,用于实现实时数据流处理、高性能消息传递和状态同步。它特别适合GUI应用和异步编程场景。

安装

在项目中添加依赖:

eyeball-im = "0.7.0"

或者运行:

cargo add eyeball-im

示例代码

以下是使用eyeball-im进行实时数据流处理的完整示例:

use eyeball_im::{ObservableVector, VectorDiff};
use std::sync::Arc;

fn main() {
    // 创建一个可观察的向量
    let mut vec = ObservableVector::new();
    
    // 订阅向量变化
    let subscriber = vec.subscribe();
    
    // 在另一个线程中修改向量
    std::thread::spawn(move || {
        vec.push_back("Hello".to_string());
        vec.push_back("World".to_string());
    });
    
    // 在主线程中处理变化
    while let Some(diff) = subscriber.next() {
        match diff {
            VectorDiff::PushBack { value } => {
                println!("新元素添加到末尾: {}", value);
            }
            VectorDiff::Insert { index, value } => {
                println!("在位置 {} 插入元素: {}", index, value);
            }
            _ => {} // 处理其他类型的变更
        }
    }
}

高级用法示例

use eyeball_im::{ObservableVector, VectorDiff};
use futures::StreamExt;

#[tokio::main]
async fn main() {
    // 创建可观察向量
    let mut vec = ObservableVector::new();
    
    // 创建异步流订阅者
    let mut stream = vec.subscribe().into_stream();
    
    // 在另一个任务中修改向量
    tokio::spawn(async move {
        vec.push_back("Item 1".to_string());
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        vec.push_back("Item 2".to_string());
    });
    
    // 处理异步流
    while let Some(diff) = stream.next().await {
        match diff {
            VectorDiff::PushBack { value } => {
                println!("异步接收到新元素: {}", value);
            }
            _ => {}
        }
    }
}

完整示例demo

下面是一个结合GUI应用场景的完整示例,展示如何使用eyeball-im进行状态管理:

use eyeball_im::{ObservableVector, VectorDiff};
use tokio::sync::mpsc;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    // 创建可观察向量来存储聊天消息
    let mut messages = ObservableVector::new();
    
    // 创建消息通道用于UI线程和后台线程通信
    let (ui_tx, mut ui_rx) = mpsc::channel(32);
    
    // 克隆消息向量用于后台任务
    let messages_for_background = messages.clone();
    
    // 启动后台任务模拟接收消息
    tokio::spawn(async move {
        for i in 1..=5 {
            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
            messages_for_background.push_back(format!("消息 {}", i));
            
            // 通知UI线程更新
            ui_tx.send(format!("已接收消息 {}", i)).await.unwrap();
        }
    });
    
    // 订阅向量变化
    let mut subscriber = messages.subscribe();
    
    // 模拟UI线程循环
    loop {
        tokio::select! {
            // 处理新消息通知
            Some(diff) = subscriber.next() => {
                match diff {
                    VectorDiff::PushBack { value } => {
                        println!("[UI] 新消息: {}", value);
                    }
                    _ => {}
                }
            }
            // 处理后台任务通知
            Some(msg) = ui_rx.recv() => {
                println!("[UI] 通知: {}", msg);
            }
            // 5秒后退出
            _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
                println!("[UI] 退出");
                break;
            }
        }
    }
}

主要功能

  1. 可观察的数据结构:提供可观察的向量和地图等数据结构
  2. 高效变更传播:使用差分算法最小化变更通知的开销
  3. 线程安全:适合在多线程环境中使用
  4. 异步支持:可以与异步生态系统良好集成

适用场景

  • GUI应用的状态管理
  • 实时数据同步
  • 需要观察数据变化的任何场景

该库遵循MPL-2.0许可证,由Jonas Platte和Ivan Enderlin维护。


1 回复

eyeball-im: Rust实时数据流处理与高性能消息传递库

介绍

eyeball-im 是一个专注于实时数据流处理和高性能消息传递的Rust库,特别适合需要状态同步和实时更新的应用场景。它提供了轻量级的消息传递机制和高效的状态管理能力,适用于即时通讯、实时协作、游戏状态同步等应用。

主要特性

  • 高性能的消息传递机制
  • 实时数据流处理能力
  • 状态同步解决方案
  • 线程安全的设计
  • 低延迟的更新通知

使用方法

添加依赖

首先在Cargo.toml中添加依赖:

[dependencies]
eyeball-im = "0.1"

基本消息传递示例

use eyeball_im::{Observable, Observer};

fn main() {
    // 创建一个可观察的字符串
    let mut observable = Observable::new("初始值".to_string());
    
    // 创建观察者
    let observer = observable.subscribe();
    
    // 更新值并通知观察者
    observable.set("新值".to_string());
    
    // 从观察者获取最新值
    println!("当前值: {}", observer.get());
}

实时数据流处理

use eyeball_im::{ObservableVector, VectorObserver};

fn main() {
    // 创建一个可观察的向量
    let mut vec = ObservableVector::new();
    let observer = vec.subscribe();
    
    // 添加元素
    vec.push_back("第一个元素");
    vec.push_back("第二个元素");
    
    // 获取向量快照
    let snapshot = observer.snapshot();
    println!("向量内容: {:?}", snapshot);
    
    // 监听变化
    for change in observer.changes() {
        println!("检测到变化: {:?}", change);
    }
}

高性能状态同步

use eyeball_im::{SharedObservable, SharedObserver};
use std::sync::Arc;
use std::thread;

fn main() {
    // 创建共享可观察状态
    let state = Arc::new(SharedObservable::new(0));
    
    // 在多个线程中共享和修改状态
    let handles: Vec<_> = (0..5).map(|i| {
        let state = Arc::clone(&state);
        thread::spawn(move || {
            state.update(|value| *value += i);
        })
    }).collect();
    
    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }
    
    // 获取最终状态
    let observer = state.subscribe();
    println!("最终状态值: {}", observer.get());
}

高级用法

自定义观察者行为

use eyeball_im::{Observable, Observer, Subscription};

struct CustomObserver {
    inner: Observer<String>,
    count: usize,
}

impl CustomObserver {
    fn new(observable: &mut Observable<String>) -> Self {
        Self {
            inner: observable.subscribe(),
            count: 0,
        }
    }
    
    fn check_update(&mut self) -> Option<String> {
        if self.inner.has_update() {
            self.count += 1;
            Some(self.inner.get().clone())
        } else {
            None
        }
    }
}

fn main() {
    let mut observable = Observable::new("初始值".to_string());
    let mut custom_observer = CustomObserver::new(&mut observable);
    
    observable.set("更新1".to_string());
    observable.set("更新2".to_string());
    
    while let Some(value) = custom_observer.check_update() {
        println!("收到更新 {}: {}", custom_observer.count, value);
    }
}

过滤和转换数据流

use eyeball_im::{ObservableVector, VectorDiff, VectorObserver};
use std::convert::Infallible;

fn main() {
    let mut vec = ObservableVector::new();
    let observer = vec.subscribe();
    
    // 添加过滤转换层
    let filtered = observer.filter_map(|diff| {
        match diff {
            VectorDiff::PushBack { value } if value % 2 == 0 => {
                Some(Ok(VectorDiff::PushBack { value: value * 2 }))
            }
            _ => None,
        }
    });
    
    vec.push_back(1); // 会被过滤掉
    vec.push_back(2); // 会被转换为4
    
    for change in filtered.changes() {
        println!("过滤后的变化: {:?}", change);
    }
}

性能提示

  1. 对于高频更新场景,考虑批量更新而不是单次小更新
  2. 使用SharedObservable进行跨线程状态共享
  3. 合理使用subscribe()unsubscribe()管理观察者生命周期
  4. 对于大型数据结构,考虑使用不可变更新模式

eyeball-im 通过精心设计的数据结构和算法,在保证线程安全的同时提供了高性能的实时数据处理能力,是构建响应式应用的理想选择。

完整示例Demo

下面是一个结合了消息传递和状态同步的完整示例:

use eyeball_im::{Observable, SharedObservable, Observer, SharedObserver};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

// 定义应用状态结构
#[derive(Debug, Clone)]
struct AppState {
    user_count: usize,
    last_message: String,
}

fn main() {
    // 创建共享应用状态
    let shared_state = Arc::new(SharedObservable::new(AppState {
        user_count: 0,
        last_message: "初始消息".to_string(),
    }));

    // 创建消息通道
    let mut message_observable = Observable::new("系统: 欢迎".to_string());
    let message_observer = message_observable.subscribe();

    // 启动状态更新线程
    let state_clone = Arc::clone(&shared_state);
    thread::spawn(move || {
        for i in 1..=5 {
            thread::sleep(Duration::from_secs(1));
            
            // 更新状态
            state_clone.update(|state| {
                state.user_count += 1;
                state.last_message = format!("用户{}加入了聊天", i);
            });
            
            // 发送消息
            message_observable.set(format!("系统: 新用户{}加入", i));
        }
    });

    // 主线程监听变化
    let state_observer = shared_state.subscribe();
    
    // 打印初始状态
    println!("初始状态: {:?}", state_observer.get());
    println!("初始消息: {}", message_observer.get());

    // 监听状态变化
    for _ in 0..5 {
        while !state_observer.has_update() {
            thread::sleep(Duration::from_millis(100));
        }
        println!("状态更新: {:?}", state_observer.get());
        
        if message_observer.has_update() {
            println!("新消息: {}", message_observer.get());
        }
    }
}

这个完整示例展示了:

  1. 使用SharedObservable进行跨线程状态共享
  2. 使用Observable进行实时消息传递
  3. 多线程环境下的状态更新和消息通知
  4. 观察者模式监听状态变化

运行结果会显示:

  • 初始状态和消息
  • 每秒更新一次用户数量和最后消息
  • 同时接收系统消息通知
回到顶部