Rust实时数据流处理库eyeball的使用,轻量级高性能数据观察与事件驱动插件

Rust实时数据流处理库eyeball的使用,轻量级高性能数据观察与事件驱动插件

eyeball crate实现了观察者模式(Observer pattern)的基本形式。它提供了Observable<T>类型,半透明地包装一个内部值T,并将变化广播给任何关联的Subscriber<T>。目前Subscriber只能通过async/.await轮询更新,但未来可能会改变。

还有SharedObservable<T>作为另一种变体,它实现了Clone但不实现Deref。从代码中的多个位置更新值比将Observable放在Arc<RwLock<_>>中更符合人体工程学和高效。

以下是快速入门示例:

use eyeball::Observable;

let mut observable = Observable::new("A".to_owned());
// Observable没有自己的方法,因为这些可能与内部类型的方法冲突
// 它通过Deref引用内部类型
let mut subscriber1 = Observable::subscribe(&observable);
let mut subscriber2 = Observable::subscribe(&observable);

// 你可以直接从订阅者获取当前值而无需等待更新
assert_eq!(subscriber1.get(), "A");

Observable::set(&mut observable, "B".to_owned());
// `.next().await`将等待下一次更新,然后返回新值
assert_eq!(subscriber1.next().await, Some("B".to_owned()));

// 如果发生了多个更新而没有轮询订阅者
// 下一次轮询将跳过所有更新,只获取最新的
Observable::set(&mut observable, "C".to_owned());
assert_eq!(subscriber1.next().await, Some("C".to_owned()));
assert_eq!(subscriber2.next().await, Some("C".to_owned()));

// 你甚至可以通过使用`.read()`(无需等待)或
// `.next_ref().await`(等待下一次更新)来获取值而不克隆值
// 如果仅使用这些方法,你甚至可以在内部类型不实现`Clone` trait时使用`Observable`
// 但请注意,当由`.read()`或`.next_ref().await`返回的读保护存在时
// 更新observable会被阻塞
Observable::set(&mut observable, "D".to_owned());
{
    let guard = subscriber1.next_ref().await.unwrap();
    assert_eq!(*guard, "D");
}

// 当`Observable`被丢弃时,最新值由订阅者保持
drop(observable);
assert_eq!(subscriber1.get(), "D");
assert_eq!(*subscriber2.read(), "D");

该库目前针对少量(0-4)订阅者进行了优化。如果你关心几十个订阅者的性能,或者使用数百个订阅者,请打开一个问题进行讨论。

完整示例代码

use eyeball::Observable;
use futures::executor::block_on;

async fn demo() {
    // 创建一个新的Observable
    let mut observable = Observable::new("Initial value".to_owned());
    
    // 创建两个订阅者
    let mut subscriber1 = Observable::subscribe(&observable);
    let mut subscriber2 = Observable::subscribe(&observable);
    
    // 检查初始值
    assert_eq!(subscriber1.get(), "Initial value");
    assert_eq!(subscriber2.get(), "Initial value");
    
    // 更新Observable的值
    Observable::set(&mut observable, "First update".to_owned());
    
    // 等待并获取更新
    assert_eq!(subscriber1.next().await, Some("First update".to_owned()));
    assert_eq!(subscriber2.next().await, Some("First update".to_owned()));
    
    // 快速更新多次
    Observable::set(&mut observable, "Second update".to_owned());
    Observable::set(&mut observable, "Third update".to_owned());
    
    // 订阅者只会收到最新的更新
    assert_eq!(subscriber1.next().await, Some("Third update".to_owned()));
    
    // 使用read()方法获取引用而不克隆
    {
        let value_ref = subscriber2.read();
        assert_eq!(*value_ref, "Third update");
        // 在此作用域内,observable不能被更新
    }
    
    // 丢弃observable后,订阅者仍保持最后的值
    drop(observable);
    assert_eq!(subscriber1.get(), "Third update");
}

fn main() {
    block_on(demo());
}

要使用eyeball库,可以将以下内容添加到你的Cargo.toml中:

[dependencies]
eyeball = "0.8.8"
futures = "0.3"  # 用于async/await支持

这个示例展示了eyeball库的基本用法,包括创建Observable、订阅更改、处理更新以及在不同场景下获取值的方法。


1 回复

Rust实时数据流处理库eyeball的使用指南

简介

eyeball是一个轻量级高性能的Rust数据流处理库,专注于实时数据观察和事件驱动编程。它提供了简洁的API来处理数据流,支持观察数据变化并触发相应事件,非常适合需要实时数据处理的应用场景。

主要特性

  • 轻量级设计,性能优异
  • 简单易用的观察者模式实现
  • 支持异步数据流处理
  • 线程安全的数据共享
  • 低开销的事件通知机制

安装

在Cargo.toml中添加依赖:

[dependencies]
eyeball = "0.3"

基本用法

1. 创建可观察数据

use eyeball::{Observable, Subscriber};

let mut observable = Observable::new("initial value".to_string());
let mut subscriber = observable.subscribe();

2. 观察数据变化

// 更新数据
observable.set("new value".to_string());

// 检查是否有新数据
if subscriber.next().is_some() {
    println!("Data changed!");
}

// 获取最新值
let latest_value = subscriber.get();
println!("Current value: {}", latest_value);

3. 事件驱动处理

use eyeball::Observable;
use std::sync::Arc;
use tokio::sync::Mutex;

let data = Arc::new(Mutex::new(Observable::new(0)));

// 在异步上下文中观察变化
let data_clone = Arc::clone(&data);
tokio::spawn(async move {
    let mut subscriber = data_clone.lock().await.subscribe();
    loop {
        if subscriber.next().is_some() {
            println!("Received update: {}", subscriber.get());
        }
        tokio::task::yield_now().await;
    }
});

// 更新数据
*data.lock().await.set(42);

高级用法

1. 批量更新

use eyeball::{Observable, Subscriber};

let mut observable = Observable::new(vec![1, 2, 3]);
let mut subscriber = observable.subscribe();

observable.edit(|v| {
    v.push(4);
    v.push(5);
});

while subscriber.next().is_some() {
    println!("Vector changed: {:?}", subscriber.get());
}

2. 自定义比较函数

use eyeball::ObservableBuilder;

let mut observable = ObservableBuilder::new()
    .compare(|old: &i32, new: &i32| old.abs() != new.abs())
    .build(10);

let mut subscriber = observable.subscribe();

observable.set(-10); // 会触发通知
observable.set(10);  // 不会触发通知(绝对值相同)

3. 多观察者模式

let mut observable = Observable::new(0);

let mut sub1 = observable.subscribe();
let mut sub2 = observable.subscribe();

observable.set(1);

assert_eq!(sub1.next(), Some(()));
assert_eq!(sub2.next(), Some(()));

性能建议

  1. 对于高频更新的场景,考虑使用edit方法批量更新
  2. 在不需要历史数据时,使用set而非edit
  3. 对于大型数据结构,考虑使用Arc包装以减少克隆开销

示例应用:实时日志处理器

use eyeball::Observable;
use std::sync::Arc;
use tokio::sync::Mutex;

struct LogProcessor {
    logs: Arc<Mutex<Observable<Vec<String>>>>,
}

impl LogProcessor {
    async fn new() -> Self {
        Self {
            logs: Arc::new(Mutex::new(Observable::new(Vec::new()))),
        }
    }
    
    async fn add_log(&self, message: String) {
        self.logs.lock().await.edit(|logs| logs.push(message));
    }
    
    async fn start_consumer(&self) {
        let logs = Arc::clone(&self.logs);
        tokio::spawn(async move {
            let mut subscriber = logs.lock().await.subscribe();
            loop {
                if subscriber.next().is_some() {
                    for log in subscriber.get() {
                        println!("[LOG] {}", log);
                    }
                }
                tokio::task::yield_now().await;
            }
        });
    }
}

#[tokio::main]
async fn main() {
    let processor = LogProcessor::new().await;
    processor.start_consumer().await;
    
    processor.add_log("System started".to_string()).await;
    processor.add_log("Processing data...".to_string()).await;
}

eyeball库非常适合构建需要实时数据处理的应用程序,如GUI更新、网络监控、游戏状态同步等场景。其轻量级设计确保了高性能,同时简单的API使得集成非常方便。

完整示例:实时股票价格监控系统

use eyeball::{Observable, Subscriber};
use std::sync::Arc;
use tokio::sync::Mutex;
use rand::Rng;
use std::time::Duration;

// 股票价格结构体
#[derive(Debug, Clone)]
struct StockPrice {
    symbol: String,
    price: f64,
    change: f64,
}

// 股票监控系统
struct StockMonitor {
    prices: Arc<Mutex<Observable<Vec<StockPrice>>>>,
}

impl StockMonitor {
    async fn new() -> Self {
        Self {
            prices: Arc::new(Mutex::new(Observable::new(Vec::new()))),
        }
    }
    
    // 添加股票
    async fn add_stock(&self, symbol: String, initial_price: f64) {
        self.prices.lock().await.edit(|prices| {
            prices.push(StockPrice {
                symbol,
                price: initial_price,
                change: 0.0,
            });
        });
    }
    
    // 启动价格更新模拟器
    async fn start_price_simulator(&self) {
        let prices = Arc::clone(&self.prices);
        tokio::spawn(async move {
            let mut rng = rand::thread_rng();
            loop {
                tokio::time::sleep(Duration::from_secs(1)).await;
                
                prices.lock().await.edit(|stocks| {
                    for stock in stocks.iter_mut() {
                        let change = rng.gen_range(-5.0..5.0);
                        stock.price += change;
                        stock.change = change;
                    }
                });
            }
        });
    }
    
    // 启动价格显示器
    async fn start_price_display(&self) {
        let prices = Arc::clone(&self.prices);
        tokio::spawn(async move {
            let mut subscriber = prices.lock().await.subscribe();
            loop {
                if subscriber.next().is_some() {
                    println!("\n=== 最新股票价格 ===");
                    for stock in subscriber.get() {
                        let trend = if stock.change >= 0.0 { "↑" } else { "↓" };
                        println!("{}: {:.2} {} {:.2}", 
                            stock.symbol, stock.price, trend, stock.change.abs());
                    }
                }
                tokio::task::yield_now().await;
            }
        });
    }
}

#[tokio::main]
async fn main() {
    let monitor = StockMonitor::new().await;
    
    // 添加几只股票
    monitor.add_stock("AAPL".to_string(), 150.0).await;
    monitor.add_stock("GOOGL".to_string(), 2700.0).await;
    monitor.add_stock("TSLA".to_string(), 700.0).await;
    
    // 启动监控和模拟器
    monitor.start_price_display().await;
    monitor.start_price_simulator().await;
    
    // 让程序运行一段时间
    tokio::time::sleep(Duration::from_secs(30)).await;
}

这个完整示例展示了如何使用eyeball构建一个实时股票价格监控系统:

  1. 创建可观察的股票价格列表
  2. 模拟股票价格变化
  3. 实时显示价格变动
  4. 使用异步任务处理数据流

系统会每秒更新股票价格,并通过eyeball的观察者机制实时通知显示组件更新界面。这个示例展示了eyeball在金融数据实时处理中的典型应用场景。

回到顶部