Rust实时数据流处理库zenoh的使用,zenoh实现高效低延迟的发布/订阅和存储/查询功能
Rust实时数据流处理库zenoh的使用,zenoh实现高效低延迟的发布/订阅和存储/查询功能
Eclipse Zenoh是一个零开销的发布/订阅、存储/查询和计算框架。它统一了动态数据、静态数据和计算,将传统的发布/订阅与地理分布式存储、查询和计算精心融合,同时保持了远超主流技术栈的时间和空间效率。
安装
在Cargo.toml中添加依赖:
zenoh = "1.5.0"
或者运行:
cargo add zenoh
基本使用示例
发布/订阅模式
订阅者示例 (z_sub.rs):
use zenoh::prelude::r#async::*;
#[async_std::main]
async fn main() {
// 创建Zenoh会话
let session = zenoh::open(config::peer()).res().await.unwrap();
// 订阅主题
let subscriber = session
.declare_subscriber("demo/example/**")
.res()
.await
.unwrap();
println!("等待数据...");
while let Ok(sample) = subscriber.recv_async().await {
println!("收到: {} = {}", sample.key_expr, sample.value);
}
}
发布者示例 (z_put.rs):
use zenoh::prelude::r#async::*;
#[async_std::main]
async fn main() {
// 创建Zenoh会话
let session = zenoh::open(config::peer()).res().await.unwrap();
// 发布数据
session
.put("demo/example/test", "Hello World!")
.res()
.await
.unwrap();
}
存储/查询功能
查询示例 (z_get.rs):
use zenoh::prelude::r#async::*;
#[async_std::main]
async fn main() {
// 创建Zenoh会话
let session = zenoh::open(config::peer()).res().await.unwrap();
// 查询数据
let replies = session
.get("demo/example/**")
.res()
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.sample {
Ok(sample) => println!("收到: {} = {}", sample.key_expr, sample.value),
Err(err) => println!("错误: {}", err),
}
}
}
可查询示例 (z_queryable.rs):
use zenoh::prelude::r#async::*;
#[async_std::main]
async fn main() {
// 创建Zenoh会话
let session = zenoh::open(config::peer()).res().await.unwrap();
// 注册可查询
let _queryable = session
.declare_queryable("demo/example/test")
.with_callback(|query| {
println!("收到查询: {}", query.selector());
query.reply(Ok(Sample::new(
query.key_expr().clone(),
"Hello from Queryable!",
)))
.res()
})
.res()
.await
.unwrap();
println!("等待查询...按Ctrl+C退出");
loop {
async_std::task::sleep(std::time::Duration::from_secs(1)).await;
}
}
完整示例
下面是一个完整的发布/订阅和存储/查询功能的示例:
use zenoh::prelude::*;
use async_std::task;
async fn run_publisher() {
let session = zenoh::open(config::peer()).res().await.unwrap();
for i in 0..5 {
let value = format!("Message {}", i);
println!("发布: demo/example/test = {}", value);
session.put("demo/example/test", value).res().await.unwrap();
task::sleep(std::time::Duration::from_secs(1)).await;
}
}
async fn run_subscriber() {
let session = zenoh::open(config::peer()).res().await.unwrap();
let subscriber = session
.declare_subscriber("demo/example/**")
.res()
.await
.unwrap();
println!("订阅者启动...");
while let Ok(sample) = subscriber.recv_async().await {
println!("订阅者收到: {} = {}", sample.key_expr, sample.value);
}
}
async fn run_storage() {
let session = zenoh::open(config::peer()).res().await.unwrap();
// 查询存储的数据
println!("查询存储的数据...");
let replies = session.get("demo/example/**").res().await.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.sample {
Ok(sample) => println!("存储查询结果: {} = {}", sample.key_expr, sample.value),
Err(err) => println!("查询错误: {}", err),
}
}
}
#[async_std::main]
async fn main() {
// 启动订阅者
let subscriber_handle = task::spawn(run_subscriber());
// 等待订阅者准备就绪
task::sleep(std::time::Duration::from_secs(1)).await;
// 运行发布者
run_publisher().await;
// 等待消息处理
task::sleep(std::time::Duration::from_secs(1)).await;
// 运行存储查询
run_storage().await;
// 等待订阅者完成
subscriber_handle.await;
}
特性
- 高效低延迟:Zenoh专为高效通信设计,提供极低的延迟
- 统一模型:将发布/订阅与存储/查询统一在一个框架中
- 地理分布式:支持跨地理位置的数据分发
- 灵活部署:支持点对点和路由模式
- 多种协议:支持TCP、UDP、TLS和QUIC等多种传输协议
许可证
Zenoh采用EPL-2.0或Apache-2.0双重许可证。
1 回复
Rust实时数据流处理库zenoh使用指南
zenoh简介
zenoh是一个高性能的实时数据流处理库,专为高效低延迟的发布/订阅和存储/查询功能而设计。它提供了统一的API来处理实时数据和历史数据,非常适合物联网(IoT)、机器人、自动驾驶和分布式系统等场景。
zenoh的主要特点:
- 极低延迟的消息传递
- 支持发布/订阅模式
- 内置存储和查询功能
- 支持点对点和路由网络
- 跨平台支持
安装方法
在Cargo.toml中添加依赖:
[dependencies]
zenoh = "0.7"
基本使用方法
1. 发布/订阅模式
订阅者示例
use zenoh::prelude::*;
#[async_std::main]
async fn main() {
// 创建zenoh会话
let session = zenoh::open(config::peer()).res().await.unwrap();
// 订阅主题
let subscriber = session.declare_subscriber("example/topic")
.res()
.await
.unwrap();
// 处理接收到的消息
while let Ok(sample) = subscriber.recv_async().await {
println!(
"Received {} on {}: {}",
sample.kind,
sample.key_expr.as_str(),
sample.value,
);
}
}
发布者示例
use zenoh::prelude::*;
#[async_std::main]
async fn main() {
// 创建zenoh会话
let session = zenoh::open(config::peer()).res().await.unwrap();
// 发布消息
session.put("example/topic", "Hello, zenoh!")
.res()
.await
.unwrap();
}
2. 存储/查询功能
存储数据示例
use zenoh::prelude::*;
#[async_std::main]
async fn main() {
let session = zenoh::open(config::peer()).res().await.unwrap();
// 在键"example/key"下存储值
session.put("example/key", "Stored value")
.res()
.await
.unwrap();
}
查询数据示例
use zenoh::prelude::*;
#[async_std::main]
async fn main() {
let session = zenoh::open(config::peer()).res().await.unwrap();
// 查询存储在"example/key"的值
let replies = session.get("example/key")
.res()
.await
.unwrap();
while let Ok(reply) = replies.recv_async().await {
match reply.sample {
Ok(sample) => println!(
"Received {} ('{}': '{}')",
reply.key_expr.as_str(),
sample.key_expr.as_str(),
sample.value
),
Err(err) => println!("Received error: {}", err),
}
}
}
高级功能
1. 使用选择器进行复杂查询
let replies = session.get("example/**")
.res()
.await
.unwrap();
2. 配置zenoh会话
use zenoh::config::Config;
let config = Config::default()
.set_connect(Connect {
endpoints: vec!["tcp/192.168.1.1:7447".parse().unwrap()],
..Default::default()
})
.unwrap();
let session = zenoh::open(config).res().await.unwrap();
3. 使用自定义序列化
#[derive(Serialize, Deserialize)]
struct CustomData {
id: u32,
value: f64,
}
let data = CustomData { id: 42, value: 3.14 };
session.put("example/custom", serde_json::to_string(&data).unwrap())
.res()
.await
.unwrap();
性能优化建议
- 对于高频数据,使用二进制格式而非文本
- 考虑使用zenoh的路由功能减少网络跳数
- 合理设置zenoh缓存大小
- 在局域网内使用多播可以减少带宽使用
错误处理
match session.put("example/topic", "data").res().await {
Ok(_) => println!("Successfully published"),
Err(e) => eprintln!("Error publishing: {}", e),
}
完整示例demo
下面是一个完整的发布/订阅示例,包含发布者和订阅者两个部分:
发布者完整代码
use zenoh::prelude::*;
use std::time::Duration;
use async_std::task;
#[async_std::main]
async fn main() {
// 创建zenoh会话,使用默认peer配置
let session = zenoh::open(config::peer()).res().await.unwrap();
// 每隔1秒发布一条消息
let mut count = 0;
loop {
let message = format!("Message {}", count);
println!("Publishing: {}", message);
// 发布消息到example/topic主题
match session.put("example/topic", &message).res().await {
Ok(_) => println!("Successfully published"),
Err(e) => eprintln!("Error publishing: {}", e),
}
count += 1;
task::sleep(Duration::from_secs(1)).await;
}
}
订阅者完整代码
use zenoh::prelude::*;
#[async_std::main]
async fn main() {
// 创建zenoh会话,使用默认peer配置
let session = zenoh::open(config::peer()).res().await.unwrap();
println!("Subscribing to example/topic...");
// 声明订阅者,订阅example/topic主题
let subscriber = session.declare_subscriber("example/topic")
.res()
.await
.unwrap();
// 持续接收并处理消息
while let Ok(sample) = subscriber.recv_async().await {
println!(
"Received message on {}: {}",
sample.key_expr.as_str(),
sample.value
);
}
}
存储/查询完整示例
use zenoh::prelude::*;
#[async_std::main]
async fn main() {
// 创建zenoh会话
let session = zenoh::open(config::peer()).res().await.unwrap();
// 存储数据
println!("Storing data...");
session.put("example/database/key1", "Value 1")
.res()
.await
.unwrap();
session.put("example/database/key2", "Value 2")
.res()
.await
.unwrap();
// 查询所有数据
println!("Querying stored data...");
let replies = session.get("example/database/**")
.res()
.await
.unwrap();
// 处理查询结果
while let Ok(reply) = replies.recv_async().await {
match reply.sample {
Ok(sample) => println!(
"Found data at {}: {}",
sample.key_expr.as_str(),
sample.value
),
Err(err) => println!("Error in reply: {}", err),
}
}
}
zenoh提供了强大的实时数据流处理能力,通过合理使用其发布/订阅和存储/查询功能,可以构建高效低延迟的分布式应用系统。