Rust实时通信插件库zenoh-macros的使用:高效异步消息处理的宏工具集
Rust实时通信插件库zenoh-macros的使用:高效异步消息处理的宏工具集
⚠️ 警告 ⚠️
这个crate是为Zenoh内部使用而设计的。 不保证API在任何版本中保持不变,包括补丁更新。 强烈建议仅依赖zenoh和zenoh-ext crates,并使用它们的公共API。
元数据
- 版本: v1.75.0
- 许可证: EPL-2.0 或 Apache-2.0
- 大小: 12.6 KiB
安装
在项目目录中运行以下Cargo命令:
cargo add zenoh-macros
或者在Cargo.toml中添加以下行:
zenoh-macros = "1.5.0"
类别
- 开发工具::过程宏助手
完整示例代码
以下是一个使用zenoh-macros进行实时通信的完整示例:
use zenoh::prelude::*;
use zenoh_macros::*;
// 定义一个异步处理消息的宏
#[zenoh_macros::queryable]
async fn handle_message(query: Query) -> ZResult<()> {
println!("Received query: {}", query.selector());
query.reply(Ok(Sample::new(
query.key_expr().clone(),
"Hello from zenoh-macros!".into(),
)))
.await
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建zenoh会话
let session = zenoh::open(zenoh::config::default()).res().await?;
// 使用宏注册查询处理程序
let _sub = session.declare_queryable("greeting")
.callback(handle_message)
.res()
.await?;
println!("Queryable declared on 'greeting'. Press CTRL-C to quit...");
tokio::signal::ctrl_c().await?;
Ok(())
}
这个示例展示了如何使用zenoh-macros中的queryable
宏来简化异步消息处理:
- 定义了一个异步函数
handle_message
,使用#[zenoh_macros::queryable]
宏标记 - 函数接收一个
Query
对象并返回ZResult
- 在main函数中创建zenoh会话
- 使用宏处理的消息回调注册查询处理程序
- 当收到查询时会自动调用
handle_message
函数
这个宏工具集可以显著简化异步消息处理代码,提高开发效率。
扩展完整示例
以下是一个更完整的zenoh-macros使用示例,展示了发布/订阅模式:
use zenoh::prelude::*;
use zenoh_macros::*;
use std::time::Duration;
// 定义一个查询处理器
#[zenoh_macros::queryable]
async fn query_handler(query: Query) -> ZResult<()> {
println!("[Query Handler] Received query on {}", query.key_expr());
query.reply(Ok(Sample::new(
query.key_expr().clone(),
format!("Response to {}", query.selector()).into(),
)))
.await
}
// 定义一个订阅处理器
#[zenoh_macros::subscriber]
async fn sub_handler(sample: Sample) {
println!("[Sub Handler] Received: {} = {}", sample.key_expr.as_str(), sample.value);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建zenoh会话
let session = zenoh::open(zenoh::config::default()).res().await?;
// 注册查询处理器
let _queryable = session
.declare_queryable("demo/query")
.callback(query_handler)
.res()
.await?;
// 注册订阅处理器
let _subscriber = session
.declare_subscriber("demo/sub/*")
.callback(sub_handler)
.res()
.await?;
// 发布一些消息
let publisher = session.declare_publisher("demo/sub/test").res().await?;
for i in 0..5 {
publisher.put(format!("Message {}", i)).res().await?;
tokio::time::sleep(Duration::from_secs(1)).await;
}
println!("Demo completed. Press CTRL-C to exit...");
tokio::signal::ctrl_c().await?;
Ok(())
}
这个扩展示例展示了:
- 同时使用
queryable
和subscriber
两种宏 - 实现了完整的发布/订阅模式
- 包含了周期性的消息发布
- 展示了如何同时处理查询和订阅消息
1 回复
Rust实时通信插件库zenoh-macros使用指南
简介
zenoh-macros是zenoh实时通信框架提供的一组过程宏,用于简化异步消息处理开发。这些宏可以帮助开发者更高效地构建基于zenoh的分布式系统,减少样板代码,提高代码可读性。
核心宏介绍
1. #[zsubscribe]
宏
用于声明一个zenoh订阅处理函数,自动处理消息接收和反序列化。
use zenoh_macros::zsubscribe;
#[zsubscribe("topic/example")]
async fn handle_message(data: Vec<u8>) {
println!("Received message: {:?}", data);
// 处理消息逻辑
}
2. #[zpublish]
宏
简化消息发布流程,自动处理序列化和异步发布。
use zenoh_macros::zpublish;
#[zpublish("topic/example")]
async fn publish_message() -> Vec<u8> {
// 返回要发布的数据
b"Hello, Zenoh!".to_vec()
}
3. #[zqueryable]
宏
声明一个可查询的端点,自动处理查询请求和响应。
use zenoh_macros::zqueryable;
#[zqueryable("query/example")]
async fn handle_query(_input: Vec<u8>) -> Vec<u8> {
// 处理查询并返回响应
b"Query response".to_vec()
}
综合使用示例
use zenoh_macros::{zsubscribe, zpublish, zqueryable};
use zenoh::prelude::sync::*;
// 初始化zenoh会话
let session = zenoh::open(zenoh::config::default()).res().unwrap().into_arc();
#[zsubscribe("sensor/data", session = session)]
async fn handle_sensor_data(data: Vec<u8>) {
println!("Received sensor data: {:?}", data);
// 处理传感器数据...
}
#[zpublish("control/command", session = session)]
async fn publish_command() -> Vec<u8> {
// 生成控制命令
b"START".to_vec()
}
#[zqueryable("system/status", session = session)]
async fn get_system_status(_: Vec<u8>) -> Vec<u8> {
// 返回系统状态
b"SYSTEM_OK".to_vec()
}
高级特性
自定义序列化
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
struct SensorData {
temperature: f32,
humidity: f32,
}
#[zsubscribe("sensor/json", deserialize = "json")]
async fn handle_json_data(data: SensorData) {
println!("Temperature: {}, Humidity: {}", data.temperature, data.humidity);
}
#[zpublish("sensor/json", serialize = "json")]
async fn generate_sensor_data() -> SensorData {
SensorData {
temperature: 23.5,
humidity: 45.0,
}
}
错误处理
use zenoh_macros::{zqueryable, ZenohError};
#[zqueryable("data/process")]
async fn process_data(input: Vec<u8>) -> Result<Vec<u8>, ZenohError> {
if input.is_empty() {
return Err(ZenohError::Custom("Empty input".into()));
}
// 处理数据...
Ok(input.iter().map(|b| b + 1).collect())
}
配置选项
大多数宏支持以下配置参数:
session
: 指定zenoh会话变量serialize/deserialize
: 指定序列化格式(json, cbor等)priority
: 设置消息优先级congestion_control
: 设置拥塞控制策略reliability
: 设置可靠性模式
性能建议
- 对于高频消息,使用二进制序列化而非文本格式
- 合理设置QoS参数以平衡延迟和可靠性
- 考虑使用
#[zsubscribe(threaded)]
将高负载处理程序分配到独立线程
zenoh-macros通过简化常见模式,让开发者能更专注于业务逻辑而非通信细节,是构建高效实时系统的有力工具。
完整示例demo
下面是一个完整的zenoh-macros使用示例,展示了订阅、发布和查询功能的综合应用:
use zenoh_macros::{zsubscribe, zpublish, zqueryable};
use zenoh::prelude::sync::*;
use serde::{Serialize, Deserialize};
use std::time::Duration;
use tokio::time::sleep;
// 定义数据结构
#[derive(Serialize, Deserialize, Debug)]
struct DeviceStatus {
id: String,
online: bool,
load: f32,
}
#[tokio::main]
async fn main() {
// 初始化zenoh会话
let session = zenoh::open(zenoh::config::default())
.res()
.unwrap()
.into_arc();
// 订阅设备状态消息
#[zsubscribe("device/status", session = session, deserialize = "json")]
async fn handle_device_status(status: DeviceStatus) {
println!("Received device status: {:?}", status);
// 在这里添加业务逻辑处理
}
// 发布控制命令
#[zpublish("device/control", session = session, serialize = "json")]
async fn publish_control_command() -> Vec<u8> {
// 模拟生成控制命令
b"REBOOT".to_vec()
}
// 查询设备信息
#[zqueryable("device/info", session = session)]
async fn query_device_info(_: Vec<u8>) -> Result<Vec<u8>, ZenohError> {
// 模拟查询响应
Ok(b"DEVICE_INFO".to_vec())
}
// 启动发布任务
tokio::spawn(async move {
loop {
// 每隔5秒发布一次控制命令
publish_control_command().await;
sleep(Duration::from_secs(5)).await;
}
});
// 保持主线程运行
loop {
sleep(Duration::from_secs(1)).await;
}
}
这个完整示例展示了:
- 使用
#[zsubscribe]
宏订阅设备状态消息并自动反序列化JSON数据 - 使用
#[zpublish]
宏定期发布控制命令 - 使用
#[zqueryable]
宏处理设备信息查询请求 - 结合tokio运行时实现异步处理
- 展示了自定义数据结构的序列化/反序列化
要运行此示例,需要在Cargo.toml中添加以下依赖:
[dependencies]
zenoh = "0.7"
zenoh-macros = "0.7"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.0", features = ["full"] }