Rust实时通信插件库zenoh-macros的使用:高效异步消息处理的宏工具集

Rust实时通信插件库zenoh-macros的使用:高效异步消息处理的宏工具集

⚠️ 警告 ⚠️

这个crate是为Zenoh内部使用而设计的。 不保证API在任何版本中保持不变,包括补丁更新。 强烈建议仅依赖zenoh和zenoh-ext crates,并使用它们的公共API。

元数据

  • 版本: v1.75.0
  • 许可证: EPL-2.0 或 Apache-2.0
  • 大小: 12.6 KiB

安装

在项目目录中运行以下Cargo命令:

cargo add zenoh-macros

或者在Cargo.toml中添加以下行:

zenoh-macros = "1.5.0"

类别

  • 开发工具::过程宏助手

完整示例代码

以下是一个使用zenoh-macros进行实时通信的完整示例:

use zenoh::prelude::*;
use zenoh_macros::*;

// 定义一个异步处理消息的宏
#[zenoh_macros::queryable]
async fn handle_message(query: Query) -> ZResult<()> {
    println!("Received query: {}", query.selector());
    query.reply(Ok(Sample::new(
        query.key_expr().clone(),
        "Hello from zenoh-macros!".into(),
    )))
    .await
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建zenoh会话
    let session = zenoh::open(zenoh::config::default()).res().await?;
    
    // 使用宏注册查询处理程序
    let _sub = session.declare_queryable("greeting")
        .callback(handle_message)
        .res()
        .await?;

    println!("Queryable declared on 'greeting'. Press CTRL-C to quit...");
    tokio::signal::ctrl_c().await?;
    Ok(())
}

这个示例展示了如何使用zenoh-macros中的queryable宏来简化异步消息处理:

  1. 定义了一个异步函数handle_message,使用#[zenoh_macros::queryable]宏标记
  2. 函数接收一个Query对象并返回ZResult
  3. 在main函数中创建zenoh会话
  4. 使用宏处理的消息回调注册查询处理程序
  5. 当收到查询时会自动调用handle_message函数

这个宏工具集可以显著简化异步消息处理代码,提高开发效率。

扩展完整示例

以下是一个更完整的zenoh-macros使用示例,展示了发布/订阅模式:

use zenoh::prelude::*;
use zenoh_macros::*;
use std::time::Duration;

// 定义一个查询处理器
#[zenoh_macros::queryable]
async fn query_handler(query: Query) -> ZResult<()> {
    println!("[Query Handler] Received query on {}", query.key_expr());
    query.reply(Ok(Sample::new(
        query.key_expr().clone(),
        format!("Response to {}", query.selector()).into(),
    )))
    .await
}

// 定义一个订阅处理器
#[zenoh_macros::subscriber]
async fn sub_handler(sample: Sample) {
    println!("[Sub Handler] Received: {} = {}", sample.key_expr.as_str(), sample.value);
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建zenoh会话
    let session = zenoh::open(zenoh::config::default()).res().await?;
    
    // 注册查询处理器
    let _queryable = session
        .declare_queryable("demo/query")
        .callback(query_handler)
        .res()
        .await?;
    
    // 注册订阅处理器
    let _subscriber = session
        .declare_subscriber("demo/sub/*")
        .callback(sub_handler)
        .res()
        .await?;
    
    // 发布一些消息
    let publisher = session.declare_publisher("demo/sub/test").res().await?;
    
    for i in 0..5 {
        publisher.put(format!("Message {}", i)).res().await?;
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
    
    println!("Demo completed. Press CTRL-C to exit...");
    tokio::signal::ctrl_c().await?;
    Ok(())
}

这个扩展示例展示了:

  1. 同时使用queryablesubscriber两种宏
  2. 实现了完整的发布/订阅模式
  3. 包含了周期性的消息发布
  4. 展示了如何同时处理查询和订阅消息

1 回复

Rust实时通信插件库zenoh-macros使用指南

简介

zenoh-macros是zenoh实时通信框架提供的一组过程宏,用于简化异步消息处理开发。这些宏可以帮助开发者更高效地构建基于zenoh的分布式系统,减少样板代码,提高代码可读性。

核心宏介绍

1. #[zsubscribe]

用于声明一个zenoh订阅处理函数,自动处理消息接收和反序列化。

use zenoh_macros::zsubscribe;

#[zsubscribe("topic/example")]
async fn handle_message(data: Vec<u8>) {
    println!("Received message: {:?}", data);
    // 处理消息逻辑
}

2. #[zpublish]

简化消息发布流程,自动处理序列化和异步发布。

use zenoh_macros::zpublish;

#[zpublish("topic/example")]
async fn publish_message() -> Vec<u8> {
    // 返回要发布的数据
    b"Hello, Zenoh!".to_vec()
}

3. #[zqueryable]

声明一个可查询的端点,自动处理查询请求和响应。

use zenoh_macros::zqueryable;

#[zqueryable("query/example")]
async fn handle_query(_input: Vec<u8>) -> Vec<u8> {
    // 处理查询并返回响应
    b"Query response".to_vec()
}

综合使用示例

use zenoh_macros::{zsubscribe, zpublish, zqueryable};
use zenoh::prelude::sync::*;

// 初始化zenoh会话
let session = zenoh::open(zenoh::config::default()).res().unwrap().into_arc();

#[zsubscribe("sensor/data", session = session)]
async fn handle_sensor_data(data: Vec<u8>) {
    println!("Received sensor data: {:?}", data);
    // 处理传感器数据...
}

#[zpublish("control/command", session = session)]
async fn publish_command() -> Vec<u8> {
    // 生成控制命令
    b"START".to_vec()
}

#[zqueryable("system/status", session = session)]
async fn get_system_status(_: Vec<u8>) -> Vec<u8> {
    // 返回系统状态
    b"SYSTEM_OK".to_vec()
}

高级特性

自定义序列化

use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize)]
struct SensorData {
    temperature: f32,
    humidity: f32,
}

#[zsubscribe("sensor/json", deserialize = "json")]
async fn handle_json_data(data: SensorData) {
    println!("Temperature: {}, Humidity: {}", data.temperature, data.humidity);
}

#[zpublish("sensor/json", serialize = "json")]
async fn generate_sensor_data() -> SensorData {
    SensorData {
        temperature: 23.5,
        humidity: 45.0,
    }
}

错误处理

use zenoh_macros::{zqueryable, ZenohError};

#[zqueryable("data/process")]
async fn process_data(input: Vec<u8>) -> Result<Vec<u8>, ZenohError> {
    if input.is_empty() {
        return Err(ZenohError::Custom("Empty input".into()));
    }
    // 处理数据...
    Ok(input.iter().map(|b| b + 1).collect())
}

配置选项

大多数宏支持以下配置参数:

  • session: 指定zenoh会话变量
  • serialize/deserialize: 指定序列化格式(json, cbor等)
  • priority: 设置消息优先级
  • congestion_control: 设置拥塞控制策略
  • reliability: 设置可靠性模式

性能建议

  1. 对于高频消息,使用二进制序列化而非文本格式
  2. 合理设置QoS参数以平衡延迟和可靠性
  3. 考虑使用#[zsubscribe(threaded)]将高负载处理程序分配到独立线程

zenoh-macros通过简化常见模式,让开发者能更专注于业务逻辑而非通信细节,是构建高效实时系统的有力工具。

完整示例demo

下面是一个完整的zenoh-macros使用示例,展示了订阅、发布和查询功能的综合应用:

use zenoh_macros::{zsubscribe, zpublish, zqueryable};
use zenoh::prelude::sync::*;
use serde::{Serialize, Deserialize};
use std::time::Duration;
use tokio::time::sleep;

// 定义数据结构
#[derive(Serialize, Deserialize, Debug)]
struct DeviceStatus {
    id: String,
    online: bool,
    load: f32,
}

#[tokio::main]
async fn main() {
    // 初始化zenoh会话
    let session = zenoh::open(zenoh::config::default())
        .res()
        .unwrap()
        .into_arc();

    // 订阅设备状态消息
    #[zsubscribe("device/status", session = session, deserialize = "json")]
    async fn handle_device_status(status: DeviceStatus) {
        println!("Received device status: {:?}", status);
        // 在这里添加业务逻辑处理
    }

    // 发布控制命令
    #[zpublish("device/control", session = session, serialize = "json")]
    async fn publish_control_command() -> Vec<u8> {
        // 模拟生成控制命令
        b"REBOOT".to_vec()
    }

    // 查询设备信息
    #[zqueryable("device/info", session = session)]
    async fn query_device_info(_: Vec<u8>) -> Result<Vec<u8>, ZenohError> {
        // 模拟查询响应
        Ok(b"DEVICE_INFO".to_vec())
    }

    // 启动发布任务
    tokio::spawn(async move {
        loop {
            // 每隔5秒发布一次控制命令
            publish_control_command().await;
            sleep(Duration::from_secs(5)).await;
        }
    });

    // 保持主线程运行
    loop {
        sleep(Duration::from_secs(1)).await;
    }
}

这个完整示例展示了:

  1. 使用#[zsubscribe]宏订阅设备状态消息并自动反序列化JSON数据
  2. 使用#[zpublish]宏定期发布控制命令
  3. 使用#[zqueryable]宏处理设备信息查询请求
  4. 结合tokio运行时实现异步处理
  5. 展示了自定义数据结构的序列化/反序列化

要运行此示例,需要在Cargo.toml中添加以下依赖:

[dependencies]
zenoh = "0.7"
zenoh-macros = "0.7"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.0", features = ["full"] }
回到顶部