Rust实时数据流处理库zenoh的使用,zenoh实现高效低延迟的发布/订阅和存储/查询功能

Rust实时数据流处理库zenoh的使用,zenoh实现高效低延迟的发布/订阅和存储/查询功能

Zenoh Dragon Logo

Eclipse Zenoh是一个零开销的发布/订阅、存储/查询和计算框架。它统一了动态数据、静态数据和计算,将传统的发布/订阅与地理分布式存储、查询和计算精心融合,同时保持了远超主流技术栈的时间和空间效率。

安装

在Cargo.toml中添加依赖:

zenoh = "1.5.0"

或者运行:

cargo add zenoh

基本使用示例

发布/订阅模式

订阅者示例 (z_sub.rs):

use zenoh::prelude::r#async::*;

#[async_std::main]
async fn main() {
    // 创建Zenoh会话
    let session = zenoh::open(config::peer()).res().await.unwrap();
    
    // 订阅主题
    let subscriber = session
        .declare_subscriber("demo/example/**")
        .res()
        .await
        .unwrap();

    println!("等待数据...");
    while let Ok(sample) = subscriber.recv_async().await {
        println!("收到: {} = {}", sample.key_expr, sample.value);
    }
}

发布者示例 (z_put.rs):

use zenoh::prelude::r#async::*;

#[async_std::main]
async fn main() {
    // 创建Zenoh会话
    let session = zenoh::open(config::peer()).res().await.unwrap();
    
    // 发布数据
    session
        .put("demo/example/test", "Hello World!")
        .res()
        .await
        .unwrap();
}

存储/查询功能

查询示例 (z_get.rs):

use zenoh::prelude::r#async::*;

#[async_std::main]
async fn main() {
    // 创建Zenoh会话
    let session = zenoh::open(config::peer()).res().await.unwrap();
    
    // 查询数据
    let replies = session
        .get("demo/example/**")
        .res()
        .await
        .unwrap();
    
    while let Ok(reply) = replies.recv_async().await {
        match reply.sample {
            Ok(sample) => println!("收到: {} = {}", sample.key_expr, sample.value),
            Err(err) => println!("错误: {}", err),
        }
    }
}

可查询示例 (z_queryable.rs):

use zenoh::prelude::r#async::*;

#[async_std::main]
async fn main() {
    // 创建Zenoh会话
    let session = zenoh::open(config::peer()).res().await.unwrap();
    
    // 注册可查询
    let _queryable = session
        .declare_queryable("demo/example/test")
        .with_callback(|query| {
            println!("收到查询: {}", query.selector());
            query.reply(Ok(Sample::new(
                query.key_expr().clone(),
                "Hello from Queryable!",
            )))
            .res()
        })
        .res()
        .await
        .unwrap();

    println!("等待查询...按Ctrl+C退出");
    loop {
        async_std::task::sleep(std::time::Duration::from_secs(1)).await;
    }
}

完整示例

下面是一个完整的发布/订阅和存储/查询功能的示例:

use zenoh::prelude::*;
use async_std::task;

async fn run_publisher() {
    let session = zenoh::open(config::peer()).res().await.unwrap();
    
    for i in 0..5 {
        let value = format!("Message {}", i);
        println!("发布: demo/example/test = {}", value);
        session.put("demo/example/test", value).res().await.unwrap();
        task::sleep(std::time::Duration::from_secs(1)).await;
    }
}

async fn run_subscriber() {
    let session = zenoh::open(config::peer()).res().await.unwrap();
    let subscriber = session
        .declare_subscriber("demo/example/**")
        .res()
        .await
        .unwrap();

    println!("订阅者启动...");
    while let Ok(sample) = subscriber.recv_async().await {
        println!("订阅者收到: {} = {}", sample.key_expr, sample.value);
    }
}

async fn run_storage() {
    let session = zenoh::open(config::peer()).res().await.unwrap();
    
    // 查询存储的数据
    println!("查询存储的数据...");
    let replies = session.get("demo/example/**").res().await.unwrap();
    
    while let Ok(reply) = replies.recv_async().await {
        match reply.sample {
            Ok(sample) => println!("存储查询结果: {} = {}", sample.key_expr, sample.value),
            Err(err) => println!("查询错误: {}", err),
        }
    }
}

#[async_std::main]
async fn main() {
    // 启动订阅者
    let subscriber_handle = task::spawn(run_subscriber());
    
    // 等待订阅者准备就绪
    task::sleep(std::time::Duration::from_secs(1)).await;
    
    // 运行发布者
    run_publisher().await;
    
    // 等待消息处理
    task::sleep(std::time::Duration::from_secs(1)).await;
    
    // 运行存储查询
    run_storage().await;
    
    // 等待订阅者完成
    subscriber_handle.await;
}

特性

  1. 高效低延迟:Zenoh专为高效通信设计,提供极低的延迟
  2. 统一模型:将发布/订阅与存储/查询统一在一个框架中
  3. 地理分布式:支持跨地理位置的数据分发
  4. 灵活部署:支持点对点和路由模式
  5. 多种协议:支持TCP、UDP、TLS和QUIC等多种传输协议

许可证

Zenoh采用EPL-2.0或Apache-2.0双重许可证。


1 回复

Rust实时数据流处理库zenoh使用指南

zenoh简介

zenoh是一个高性能的实时数据流处理库,专为高效低延迟的发布/订阅和存储/查询功能而设计。它提供了统一的API来处理实时数据和历史数据,非常适合物联网(IoT)、机器人、自动驾驶和分布式系统等场景。

zenoh的主要特点:

  • 极低延迟的消息传递
  • 支持发布/订阅模式
  • 内置存储和查询功能
  • 支持点对点和路由网络
  • 跨平台支持

安装方法

在Cargo.toml中添加依赖:

[dependencies]
zenoh = "0.7"

基本使用方法

1. 发布/订阅模式

订阅者示例

use zenoh::prelude::*;

#[async_std::main]
async fn main() {
    // 创建zenoh会话
    let session = zenoh::open(config::peer()).res().await.unwrap();
    
    // 订阅主题
    let subscriber = session.declare_subscriber("example/topic")
        .res()
        .await
        .unwrap();
    
    // 处理接收到的消息
    while let Ok(sample) = subscriber.recv_async().await {
        println!(
            "Received {} on {}: {}",
            sample.kind,
            sample.key_expr.as_str(),
            sample.value,
        );
    }
}

发布者示例

use zenoh::prelude::*;

#[async_std::main]
async fn main() {
    // 创建zenoh会话
    let session = zenoh::open(config::peer()).res().await.unwrap();
    
    // 发布消息
    session.put("example/topic", "Hello, zenoh!")
        .res()
        .await
        .unwrap();
}

2. 存储/查询功能

存储数据示例

use zenoh::prelude::*;

#[async_std::main]
async fn main() {
    let session = zenoh::open(config::peer()).res().await.unwrap();
    
    // 在键"example/key"下存储值
    session.put("example/key", "Stored value")
        .res()
        .await
        .unwrap();
}

查询数据示例

use zenoh::prelude::*;

#[async_std::main]
async fn main() {
    let session = zenoh::open(config::peer()).res().await.unwrap();
    
    // 查询存储在"example/key"的值
    let replies = session.get("example/key")
        .res()
        .await
        .unwrap();
    
    while let Ok(reply) = replies.recv_async().await {
        match reply.sample {
            Ok(sample) => println!(
                "Received {} ('{}': '{}')",
                reply.key_expr.as_str(),
                sample.key_expr.as_str(),
                sample.value
            ),
            Err(err) => println!("Received error: {}", err),
        }
    }
}

高级功能

1. 使用选择器进行复杂查询

let replies = session.get("example/**")
    .res()
    .await
    .unwrap();

2. 配置zenoh会话

use zenoh::config::Config;

let config = Config::default()
    .set_connect(Connect {
        endpoints: vec!["tcp/192.168.1.1:7447".parse().unwrap()],
        ..Default::default()
    })
    .unwrap();

let session = zenoh::open(config).res().await.unwrap();

3. 使用自定义序列化

#[derive(Serialize, Deserialize)]
struct CustomData {
    id: u32,
    value: f64,
}

let data = CustomData { id: 42, value: 3.14 };
session.put("example/custom", serde_json::to_string(&data).unwrap())
    .res()
    .await
    .unwrap();

性能优化建议

  1. 对于高频数据,使用二进制格式而非文本
  2. 考虑使用zenoh的路由功能减少网络跳数
  3. 合理设置zenoh缓存大小
  4. 在局域网内使用多播可以减少带宽使用

错误处理

match session.put("example/topic", "data").res().await {
    Ok(_) => println!("Successfully published"),
    Err(e) => eprintln!("Error publishing: {}", e),
}

完整示例demo

下面是一个完整的发布/订阅示例,包含发布者和订阅者两个部分:

发布者完整代码

use zenoh::prelude::*;
use std::time::Duration;
use async_std::task;

#[async_std::main]
async fn main() {
    // 创建zenoh会话,使用默认peer配置
    let session = zenoh::open(config::peer()).res().await.unwrap();
    
    // 每隔1秒发布一条消息
    let mut count = 0;
    loop {
        let message = format!("Message {}", count);
        println!("Publishing: {}", message);
        
        // 发布消息到example/topic主题
        match session.put("example/topic", &message).res().await {
            Ok(_) => println!("Successfully published"),
            Err(e) => eprintln!("Error publishing: {}", e),
        }
        
        count += 1;
        task::sleep(Duration::from_secs(1)).await;
    }
}

订阅者完整代码

use zenoh::prelude::*;

#[async_std::main]
async fn main() {
    // 创建zenoh会话,使用默认peer配置
    let session = zenoh::open(config::peer()).res().await.unwrap();
    
    println!("Subscribing to example/topic...");
    
    // 声明订阅者,订阅example/topic主题
    let subscriber = session.declare_subscriber("example/topic")
        .res()
        .await
        .unwrap();
    
    // 持续接收并处理消息
    while let Ok(sample) = subscriber.recv_async().await {
        println!(
            "Received message on {}: {}",
            sample.key_expr.as_str(),
            sample.value
        );
    }
}

存储/查询完整示例

use zenoh::prelude::*;

#[async_std::main]
async fn main() {
    // 创建zenoh会话
    let session = zenoh::open(config::peer()).res().await.unwrap();
    
    // 存储数据
    println!("Storing data...");
    session.put("example/database/key1", "Value 1")
        .res()
        .await
        .unwrap();
    session.put("example/database/key2", "Value 2")
        .res()
        .await
        .unwrap();
    
    // 查询所有数据
    println!("Querying stored data...");
    let replies = session.get("example/database/**")
        .res()
        .await
        .unwrap();
    
    // 处理查询结果
    while let Ok(reply) = replies.recv_async().await {
        match reply.sample {
            Ok(sample) => println!(
                "Found data at {}: {}",
                sample.key_expr.as_str(),
                sample.value
            ),
            Err(err) => println!("Error in reply: {}", err),
        }
    }
}

zenoh提供了强大的实时数据流处理能力,通过合理使用其发布/订阅和存储/查询功能,可以构建高效低延迟的分布式应用系统。

回到顶部