Rust实时数据流处理库zenoh-core的使用,高效实现分布式通信与消息传递

Rust实时数据流处理库zenoh-core的使用,高效实现分布式通信与消息传递

⚠️ 警告 ⚠️
这个crate是为Zenoh内部使用而设计的。
不能保证在任何版本(包括补丁更新)中API保持不变。
强烈建议仅依赖zenoh和zenoh-ext crates并使用它们的公共API。

元数据

  • 版本: v1.5.0
  • 许可证: EPL-2.0 OR Apache-2.0
  • 大小: 8.17 KiB
  • 分类: 网络编程

安装

在项目目录中运行以下Cargo命令:

cargo add zenoh-core

或者在Cargo.toml中添加以下行:

zenoh-core = "1.5.0"

基本使用示例

以下是一个使用zenoh-core进行基本发布/订阅通信的示例:

use zenoh_core::sync::SyncResolve;
use zenoh_core::zconfig::Config;
use zenoh_core::Session;

#[tokio::main]
async fn main() {
    // 创建Zenoh配置
    let config = Config::default();
    
    // 打开Zenoh会话
    let session = Session::open(config)
        .res()
        .await
        .unwrap();
    
    // 声明一个发布者
    let publisher = session
        .declare_publisher("example/topic")
        .res()
        .await
        .unwrap();
    
    // 声明一个订阅者
    let subscriber = session
        .declare_subscriber("example/topic")
        .callback(|sample| {
            println!(
                "Received {} on {}: {}",
                sample.kind,
                sample.key_expr.as_str(),
                String::from_utf8_lossy(&sample.value.payload.contiguous().into_owned())
            );
        })
        .res()
        .await
        .unwrap();
    
    // 发布消息
    publisher
        .put("Hello, Zenoh!")
        .res()
        .await
        .unwrap();
    
    // 等待一段时间让消息传递
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    
    // 清理资源
    subscriber.undeclare().res().await.unwrap();
    publisher.undeclare().res().await.unwrap();
    session.close().res().await.unwrap();
}

完整分布式通信示例

下面是一个更完整的示例,展示如何在分布式环境中使用zenoh-core进行通信:

use zenoh_core::sync::SyncResolve;
use zenoh_core::zconfig::Config;
use zenoh_core::Session;
use std::time::Duration;

// 发布者节点
async fn publisher_node() {
    let config = Config::default();
    let session = Session::open(config)
        .res()
        .await
        .unwrap();
    
    let publisher = session
        .declare_publisher("distributed/data")
        .res()
        .await
        .unwrap();
    
    for i in 0..10 {
        let message = format!("Message {}", i);
        println!("Publishing: {}", message);
        publisher
            .put(message)
            .res()
            .await
            .unwrap();
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
    
    publisher.undeclare().res().await.unwrap();
    session.close().res().await.unwrap();
}

// 订阅者节点
async fn subscriber_node() {
    let config = Config::default();
    let session = Session::open(config)
        .res()
        .await
        .unwrap();
    
    let _subscriber = session
        .declare_subscriber("distributed/data")
        .callback(|sample| {
            println!(
                "Subscriber received: {}",
                String::from_utf8_lossy(&sample.value.payload.contiguous().into_owned())
            );
        })
        .res()
        .await
        .unwrap();
    
    // 保持订阅者运行
    tokio::time::sleep(Duration::from_secs(15)).await;
    
    session.close().res().await.unwrap();
}

#[tokio::main]
async fn main() {
    // 在实际分布式环境中,这两个函数会在不同的节点上运行
    tokio::join!(
        publisher_node(),
        subscriber_node()
    );
}

注意事项

  1. zenoh-core主要设计用于Zenoh内部使用,API可能会发生变化
  2. 生产环境中建议使用zenoh或zenoh-ext crate提供的稳定API
  3. 示例中使用了简单的配置,实际应用中可能需要更复杂的配置

这个库提供了高效的分布式通信能力,特别适合实时数据流处理场景。通过发布/订阅模式,可以轻松实现节点间的消息传递。


1 回复

Rust实时数据流处理库zenoh-core的使用指南

简介

zenoh-core是一个高性能的Rust库,专为实时数据流处理和分布式通信而设计。它提供了低延迟、高吞吐量的消息传递能力,非常适合物联网(IoT)、边缘计算和分布式系统场景。

主要特性

  • 发布/订阅模式
  • 请求/响应模式
  • 点对点和多播通信
  • 内存高效的消息处理
  • 支持多种传输协议(包括TCP、UDP、共享内存等)
  • 内置数据序列化

安装方法

在Cargo.toml中添加依赖:

[dependencies]
zenoh-core = "0.7"

完整示例代码

下面是一个完整的发布/订阅和请求/响应模式的示例:

use zenoh_core::prelude::*;
use zenoh_core::sync::SyncResolve;
use std::time::Duration;
use tokio::task;

#[tokio::main]
async fn main() {
    // 启动订阅者
    let sub_task = task::spawn(async {
        subscriber().await;
    });

    // 启动响应者
    let resp_task = task::spawn(async {
        responder().await;
    });

    // 给订阅者和响应者一些时间启动
    tokio::time::sleep(Duration::from_secs(1)).await;

    // 运行发布者
    publisher().await;
    
    // 运行请求者
    requester().await;

    // 等待任务完成
    let _ = tokio::join!(sub_task, resp_task);
}

// 发布者函数
async fn publisher() {
    // 使用默认配置打开zenoh会话
    let session = zenoh_core::open(zenoh_core::config::default()).res().await.unwrap();
    let key_expr = "demo/example";
    
    // 发布10条消息,每秒一条
    for i in 0..10 {
        let value = format!("Hello, zenoh! ({})", i);
        println!("Publishing: {}", value);
        session.put(key_expr, value).res().await.unwrap();
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

// 订阅者函数
async fn subscriber() {
    // 使用默认配置打开zenoh会话
    let session = zenoh_core::open(zenoh_core::config::default()).res().await.unwrap();
    let key_expr = "demo/example";
    
    // 声明订阅者
    let subscriber = session.declare_subscriber(key_expr).res().await.unwrap();
    println!("Subscriber started, waiting for messages...");
    
    // 接收并打印消息
    while let Ok(sample) = subscriber.recv_async().await {
        println!("Received: {:?}", sample.value);
    }
}

// 响应者函数
async fn responder() {
    // 使用默认配置打开zenoh会话
    let session = zenoh_core::open(zenoh_core::config::default()).res().await.unwrap();
    let key_expr = "demo/query";
    
    // 声明可查询端点
    let _replier = session
        .declare_queryable(key_expr)
        .res()
        .await
        .unwrap();
    
    println!("Responder ready, waiting for queries...");
    
    // 处理请求并响应
    while let Ok(query) = _replier.recv_async().await {
        println!("Received query from: {:?}", query.selector);
        let reply_value = "This is the response!";
        query.reply(Ok(reply_value.into())).res().await.unwrap();
    }
}

// 请求者函数
async fn requester() {
    // 使用默认配置打开zenoh会话
    let session = zenoh_core::open(zenoh_core::config::default()).res().await.unwrap();
    let key_expr = "demo/query";
    
    println!("Sending query...");
    
    // 发送查询并获取响应
    let response = session
        .get(key_expr)
        .res()
        .await
        .unwrap();
    
    println!("Response: {:?}", response);
}

高级功能示例

自定义配置示例

use zenoh_core::config::Config;

async fn custom_config_example() {
    // 创建自定义配置
    let config = Config::builder()
        .add_plugin("tcp")  // 添加TCP插件
        .listen("tcp/0.0.0.0:7447")  // 监听端口
        .build()
        .unwrap();

    // 使用自定义配置打开会话
    let session = zenoh_core::open(config).res().await.unwrap();
    println!("Session opened with custom configuration");
}

带选择器的订阅示例

async fn selector_subscription() {
    let session = zenoh_core::open(zenoh_core::config::default()).res().await.unwrap();
    
    // 使用通配符订阅多个主题
    let subscriber = session
        .declare_subscriber("demo/sensors/*")  // 订阅所有sensors下的消息
        .res()
        .await
        .unwrap();
    
    println!("Subscribed to demo/sensors/*");
    
    while let Ok(sample) = subscriber.recv_async().await {
        println!("Received from {}: {:?}", sample.key_expr, sample.value);
    }
}

数据序列化示例

#[derive(serde::Serialize, serde::Deserialize, Debug)]
struct SensorData {
    temperature: f32,
    humidity: f32,
    timestamp: u64,
}

async fn serialization_example() {
    let session = zenoh_core::open(zenoh_core::config::default()).res().await.unwrap();
    
    // 创建传感器数据
    let data = SensorData {
        temperature: 23.5,
        humidity: 45.0,
        timestamp: 1625097600,
    };
    
    // 序列化并发布数据
    session.put("demo/sensor/1", serde_json::to_string(&data).unwrap())
        .res()
        .await
        .unwrap();
    println!("Published sensor data");
    
    // 订阅并反序列化数据
    let subscriber = session
        .declare_subscriber("demo/sensor/1")
        .res()
        .await
        .unwrap();
    
    if let Ok(sample) = subscriber.recv_async().await {
        if let Ok(deserialized) = serde_json::from_str::<SensorData>(&sample.value.to_string()) {
            println!("Deserialized data: {:?}", deserialized);
        }
    }
}

性能优化示例

async fn performance_optimization() {
    let session = zenoh_core::open(zenoh_core::config::default()).res().await.unwrap();
    
    // 使用put_owned优化高频消息
    for i in 0..100 {
        let value = format!("High frequency message {}", i);
        session.put_owned("demo/high_freq", value).res().await.unwrap();
    }
    
    // 使用批处理
    let mut batch = session.batch();
    for i in 0..100 {
        batch.put("demo/batch", format!("Batch message {}", i));
    }
    batch.res().await.unwrap();
}

错误处理示例

async fn error_handling_example() {
    let session = match zenoh_core::open(zenoh_core::config::default()).res().await {
        Ok(s) => s,
        Err(e) => {
            eprintln!("Failed to open session: {}", e);
            return;
        }
    };
    
    match session.put("demo/error_test", "test value").res().await {
        Ok(_) => println!("Message sent successfully"),
        Err(e) => eprintln!("Failed to send message: {}", e),
    }
    
    match session.get("demo/non_existent").res().await {
        Ok(response) => println!("Got response: {:?}", response),
        Err(e) => eprintln!("Query failed: {}", e),
    }
}

总结

zenoh-core为Rust开发者提供了强大的实时数据流处理能力,特别适合构建分布式系统和IoT应用。通过其简洁的API和高效的实现,可以轻松实现跨网络的消息传递和数据共享。以上示例展示了zenoh-core的主要功能和使用方法,您可以根据实际需求进行调整和扩展。

回到顶部