Rust实时数据流处理库eyeball的使用,轻量级高性能数据观察与事件驱动插件
Rust实时数据流处理库eyeball的使用,轻量级高性能数据观察与事件驱动插件
eyeball crate实现了观察者模式(Observer pattern)的基本形式。它提供了Observable<T>
类型,半透明地包装一个内部值T
,并将变化广播给任何关联的Subscriber<T>
。目前Subscriber
只能通过async
/.await
轮询更新,但未来可能会改变。
还有SharedObservable<T>
作为另一种变体,它实现了Clone
但不实现Deref
。从代码中的多个位置更新值比将Observable
放在Arc<RwLock<_>>
中更符合人体工程学和高效。
以下是快速入门示例:
use eyeball::Observable;
let mut observable = Observable::new("A".to_owned());
// Observable没有自己的方法,因为这些可能与内部类型的方法冲突
// 它通过Deref引用内部类型
let mut subscriber1 = Observable::subscribe(&observable);
let mut subscriber2 = Observable::subscribe(&observable);
// 你可以直接从订阅者获取当前值而无需等待更新
assert_eq!(subscriber1.get(), "A");
Observable::set(&mut observable, "B".to_owned());
// `.next().await`将等待下一次更新,然后返回新值
assert_eq!(subscriber1.next().await, Some("B".to_owned()));
// 如果发生了多个更新而没有轮询订阅者
// 下一次轮询将跳过所有更新,只获取最新的
Observable::set(&mut observable, "C".to_owned());
assert_eq!(subscriber1.next().await, Some("C".to_owned()));
assert_eq!(subscriber2.next().await, Some("C".to_owned()));
// 你甚至可以通过使用`.read()`(无需等待)或
// `.next_ref().await`(等待下一次更新)来获取值而不克隆值
// 如果仅使用这些方法,你甚至可以在内部类型不实现`Clone` trait时使用`Observable`
// 但请注意,当由`.read()`或`.next_ref().await`返回的读保护存在时
// 更新observable会被阻塞
Observable::set(&mut observable, "D".to_owned());
{
let guard = subscriber1.next_ref().await.unwrap();
assert_eq!(*guard, "D");
}
// 当`Observable`被丢弃时,最新值由订阅者保持
drop(observable);
assert_eq!(subscriber1.get(), "D");
assert_eq!(*subscriber2.read(), "D");
该库目前针对少量(0-4)订阅者进行了优化。如果你关心几十个订阅者的性能,或者使用数百个订阅者,请打开一个问题进行讨论。
完整示例代码
use eyeball::Observable;
use futures::executor::block_on;
async fn demo() {
// 创建一个新的Observable
let mut observable = Observable::new("Initial value".to_owned());
// 创建两个订阅者
let mut subscriber1 = Observable::subscribe(&observable);
let mut subscriber2 = Observable::subscribe(&observable);
// 检查初始值
assert_eq!(subscriber1.get(), "Initial value");
assert_eq!(subscriber2.get(), "Initial value");
// 更新Observable的值
Observable::set(&mut observable, "First update".to_owned());
// 等待并获取更新
assert_eq!(subscriber1.next().await, Some("First update".to_owned()));
assert_eq!(subscriber2.next().await, Some("First update".to_owned()));
// 快速更新多次
Observable::set(&mut observable, "Second update".to_owned());
Observable::set(&mut observable, "Third update".to_owned());
// 订阅者只会收到最新的更新
assert_eq!(subscriber1.next().await, Some("Third update".to_owned()));
// 使用read()方法获取引用而不克隆
{
let value_ref = subscriber2.read();
assert_eq!(*value_ref, "Third update");
// 在此作用域内,observable不能被更新
}
// 丢弃observable后,订阅者仍保持最后的值
drop(observable);
assert_eq!(subscriber1.get(), "Third update");
}
fn main() {
block_on(demo());
}
要使用eyeball库,可以将以下内容添加到你的Cargo.toml中:
[dependencies]
eyeball = "0.8.8"
futures = "0.3" # 用于async/await支持
这个示例展示了eyeball库的基本用法,包括创建Observable、订阅更改、处理更新以及在不同场景下获取值的方法。
Rust实时数据流处理库eyeball的使用指南
简介
eyeball是一个轻量级高性能的Rust数据流处理库,专注于实时数据观察和事件驱动编程。它提供了简洁的API来处理数据流,支持观察数据变化并触发相应事件,非常适合需要实时数据处理的应用场景。
主要特性
- 轻量级设计,性能优异
- 简单易用的观察者模式实现
- 支持异步数据流处理
- 线程安全的数据共享
- 低开销的事件通知机制
安装
在Cargo.toml中添加依赖:
[dependencies]
eyeball = "0.3"
基本用法
1. 创建可观察数据
use eyeball::{Observable, Subscriber};
let mut observable = Observable::new("initial value".to_string());
let mut subscriber = observable.subscribe();
2. 观察数据变化
// 更新数据
observable.set("new value".to_string());
// 检查是否有新数据
if subscriber.next().is_some() {
println!("Data changed!");
}
// 获取最新值
let latest_value = subscriber.get();
println!("Current value: {}", latest_value);
3. 事件驱动处理
use eyeball::Observable;
use std::sync::Arc;
use tokio::sync::Mutex;
let data = Arc::new(Mutex::new(Observable::new(0)));
// 在异步上下文中观察变化
let data_clone = Arc::clone(&data);
tokio::spawn(async move {
let mut subscriber = data_clone.lock().await.subscribe();
loop {
if subscriber.next().is_some() {
println!("Received update: {}", subscriber.get());
}
tokio::task::yield_now().await;
}
});
// 更新数据
*data.lock().await.set(42);
高级用法
1. 批量更新
use eyeball::{Observable, Subscriber};
let mut observable = Observable::new(vec![1, 2, 3]);
let mut subscriber = observable.subscribe();
observable.edit(|v| {
v.push(4);
v.push(5);
});
while subscriber.next().is_some() {
println!("Vector changed: {:?}", subscriber.get());
}
2. 自定义比较函数
use eyeball::ObservableBuilder;
let mut observable = ObservableBuilder::new()
.compare(|old: &i32, new: &i32| old.abs() != new.abs())
.build(10);
let mut subscriber = observable.subscribe();
observable.set(-10); // 会触发通知
observable.set(10); // 不会触发通知(绝对值相同)
3. 多观察者模式
let mut observable = Observable::new(0);
let mut sub1 = observable.subscribe();
let mut sub2 = observable.subscribe();
observable.set(1);
assert_eq!(sub1.next(), Some(()));
assert_eq!(sub2.next(), Some(()));
性能建议
- 对于高频更新的场景,考虑使用
edit
方法批量更新 - 在不需要历史数据时,使用
set
而非edit
- 对于大型数据结构,考虑使用
Arc
包装以减少克隆开销
示例应用:实时日志处理器
use eyeball::Observable;
use std::sync::Arc;
use tokio::sync::Mutex;
struct LogProcessor {
logs: Arc<Mutex<Observable<Vec<String>>>>,
}
impl LogProcessor {
async fn new() -> Self {
Self {
logs: Arc::new(Mutex::new(Observable::new(Vec::new()))),
}
}
async fn add_log(&self, message: String) {
self.logs.lock().await.edit(|logs| logs.push(message));
}
async fn start_consumer(&self) {
let logs = Arc::clone(&self.logs);
tokio::spawn(async move {
let mut subscriber = logs.lock().await.subscribe();
loop {
if subscriber.next().is_some() {
for log in subscriber.get() {
println!("[LOG] {}", log);
}
}
tokio::task::yield_now().await;
}
});
}
}
#[tokio::main]
async fn main() {
let processor = LogProcessor::new().await;
processor.start_consumer().await;
processor.add_log("System started".to_string()).await;
processor.add_log("Processing data...".to_string()).await;
}
eyeball库非常适合构建需要实时数据处理的应用程序,如GUI更新、网络监控、游戏状态同步等场景。其轻量级设计确保了高性能,同时简单的API使得集成非常方便。
完整示例:实时股票价格监控系统
use eyeball::{Observable, Subscriber};
use std::sync::Arc;
use tokio::sync::Mutex;
use rand::Rng;
use std::time::Duration;
// 股票价格结构体
#[derive(Debug, Clone)]
struct StockPrice {
symbol: String,
price: f64,
change: f64,
}
// 股票监控系统
struct StockMonitor {
prices: Arc<Mutex<Observable<Vec<StockPrice>>>>,
}
impl StockMonitor {
async fn new() -> Self {
Self {
prices: Arc::new(Mutex::new(Observable::new(Vec::new()))),
}
}
// 添加股票
async fn add_stock(&self, symbol: String, initial_price: f64) {
self.prices.lock().await.edit(|prices| {
prices.push(StockPrice {
symbol,
price: initial_price,
change: 0.0,
});
});
}
// 启动价格更新模拟器
async fn start_price_simulator(&self) {
let prices = Arc::clone(&self.prices);
tokio::spawn(async move {
let mut rng = rand::thread_rng();
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
prices.lock().await.edit(|stocks| {
for stock in stocks.iter_mut() {
let change = rng.gen_range(-5.0..5.0);
stock.price += change;
stock.change = change;
}
});
}
});
}
// 启动价格显示器
async fn start_price_display(&self) {
let prices = Arc::clone(&self.prices);
tokio::spawn(async move {
let mut subscriber = prices.lock().await.subscribe();
loop {
if subscriber.next().is_some() {
println!("\n=== 最新股票价格 ===");
for stock in subscriber.get() {
let trend = if stock.change >= 0.0 { "↑" } else { "↓" };
println!("{}: {:.2} {} {:.2}",
stock.symbol, stock.price, trend, stock.change.abs());
}
}
tokio::task::yield_now().await;
}
});
}
}
#[tokio::main]
async fn main() {
let monitor = StockMonitor::new().await;
// 添加几只股票
monitor.add_stock("AAPL".to_string(), 150.0).await;
monitor.add_stock("GOOGL".to_string(), 2700.0).await;
monitor.add_stock("TSLA".to_string(), 700.0).await;
// 启动监控和模拟器
monitor.start_price_display().await;
monitor.start_price_simulator().await;
// 让程序运行一段时间
tokio::time::sleep(Duration::from_secs(30)).await;
}
这个完整示例展示了如何使用eyeball构建一个实时股票价格监控系统:
- 创建可观察的股票价格列表
- 模拟股票价格变化
- 实时显示价格变动
- 使用异步任务处理数据流
系统会每秒更新股票价格,并通过eyeball的观察者机制实时通知显示组件更新界面。这个示例展示了eyeball在金融数据实时处理中的典型应用场景。