Rust网络通信库zenoh-transport的使用:高效、可扩展的分布式数据传输与消息传递框架
Rust网络通信库zenoh-transport的使用:高效、可扩展的分布式数据传输与消息传递框架
⚠️ 警告 ⚠️
这个crate是为Zenoh内部使用而设计的。不能保证API在任何版本中保持不变,包括补丁更新。强烈建议仅依赖zenoh和zenoh-ext crate,并使用它们的公共API。
元数据
- 版本: 1.5.0
- Rust版本: v1.75.0
- 许可证: EPL-2.0 OR Apache-2.0
- 大小: 163 KiB
- 更新时间: 23天前
安装
在项目目录中运行以下Cargo命令:
cargo add zenoh-transport
或者在Cargo.toml中添加以下行:
zenoh-transport = "1.5.0"
所有者
- Julien Enoch
- eclipse-zenoh-bot
- Luca Cominardi
- OlivierHecart
分类
- 网络编程
完整示例代码
以下是一个使用zenoh-transport进行基本发布/订阅通信的示例:
use zenoh::prelude::r#async::*;
#[async_std::main]
async fn main() {
// 创建zenoh会话
let session = zenoh::open(config::default())
.res()
.await
.unwrap();
// 发布者示例
let publisher = session
.declare_publisher("example/topic")
.res()
.await
.unwrap();
// 订阅者示例
let subscriber = session
.declare_subscriber("example/topic")
.callback(|sample| {
println!(
"Received {} on {}: {}",
sample.kind,
sample.key_expr.as_str(),
sample.value
);
})
.res()
.await
.unwrap();
// 发布消息
publisher.put("Hello, Zenoh!").res().await.unwrap();
// 等待一段时间以接收消息
async_std::task::sleep(std::time::Duration::from_secs(1)).await;
// 清理资源
publisher.undeclare().res().await.unwrap();
subscriber.undeclare().res().await.unwrap();
session.close().res().await.unwrap();
}
这个示例展示了:
- 如何创建zenoh会话
- 如何声明发布者和订阅者
- 如何发布消息
- 如何接收和处理消息
- 如何清理资源
注意:实际使用时应该处理所有Result类型,而不是直接unwrap()。
扩展示例代码
以下是一个更完整的zenoh-transport使用示例,包含错误处理和更多功能:
use zenoh::prelude::r#async::*;
use zenoh::config::Config;
use async_std::task;
use std::time::Duration;
#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建自定义配置
let config = Config::default()
.set_mode(Some(WhatAmI::Peer))? // 设置为Peer模式
.add_plugin("tcp".to_string())?; // 启用TCP传输
// 创建zenoh会话并处理错误
let session = zenoh::open(config)
.res()
.await
.map_err(|e| format!("Failed to open session: {}", e))?;
// 声明发布者
let publisher = session
.declare_publisher("example/advanced")
.congestion_control(CongestionControl::Block) // 设置拥塞控制
.priority(Priority::RealTime) // 设置优先级
.res()
.await
.map_err(|e| format!("Failed to create publisher: {}", e))?;
// 声明订阅者
let subscriber = session
.declare_subscriber("example/advanced")
.reliability(Reliability::Reliable) // 设置可靠性
.callback(|sample| {
println!(
"[Subscriber] Received {:?} on {}: {:?}",
sample.kind,
sample.key_expr.as_str(),
sample.value
);
})
.res()
.await
.map_err(|e| format!("Failed to create subscriber: {}", e))?;
// 异步发布多条消息
for i in 0..5 {
let msg = format!("Message {}", i);
publisher
.put(msg)
.res()
.await
.map_err(|e| format!("Failed to publish message: {}", e))?;
task::sleep(Duration::from_millis(200)).await;
}
// 等待所有消息被接收
task::sleep(Duration::from_secs(2)).await;
// 清理资源
publisher.undeclare()
.res()
.await
.map_err(|e| format!("Failed to undeclare publisher: {}", e))?;
subscriber.undeclare()
.res()
.await
.map_err(|e| format!("Failed to undeclare subscriber: {}", e))?;
session.close()
.res()
.await
.map_err(|e| format!("Failed to close session: {}", e))?;
Ok(())
}
这个扩展示例展示了:
- 自定义配置zenoh会话
- 更完善的错误处理
- 设置发布者和订阅者的高级选项
- 批量发布消息
- 更合理的资源清理
注意:实际使用时应根据具体需求调整配置参数和错误处理逻辑。
1 回复
Rust网络通信库zenoh-transport的使用指南
简介
zenoh-transport是zenoh项目中的网络通信组件,是一个高效、可扩展的分布式数据传输与消息传递框架。它提供了低延迟、高吞吐量的通信能力,特别适合物联网(IoT)、边缘计算和分布式系统场景。
zenoh-transport的主要特点:
- 支持多种传输协议(包括TCP、UDP、QUIC等)
- 内置负载均衡和故障转移机制
- 提供发布/订阅和请求/响应两种通信模式
- 支持点对点和中转路由通信
- 高度可扩展的架构设计
完整示例代码
下面是一个完整的发布/订阅模式示例,包含发布者和订阅者的实现:
use zenoh_transport::*;
use tokio::time::{sleep, Duration};
// 创建传输层
async fn create_transport() -> zresult::ZResult<TransportUnicast> {
// 使用默认配置
let config = TransportManager::config_default()?;
let manager = TransportManager::new(config);
manager.open_transport_unicast().await
}
// 发布者
async fn publisher() -> zresult::ZResult<()> {
let transport = create_transport().await?;
let key = "/demo/example".into();
for i in 0..10 {
let value = format!("Message {}", i);
transport.put(&key, value.into()).await?;
sleep(Duration::from_secs(1)).await;
}
Ok(())
}
// 订阅者
async fn subscriber() -> zresult::ZResult<()> {
let transport = create_transport().await?;
let key = "/demo/example".into();
let sub = transport.declare_subscriber(&key).await?;
while let Ok(sample) = sub.recv().await {
println!("Received: {:?}", sample);
}
Ok(())
}
#[tokio::main]
async fn main() {
// 启动订阅者
tokio::spawn(async {
subscriber().await.unwrap();
});
// 启动发布者
publisher().await.unwrap();
}
请求/响应模式完整示例
use zenoh_transport::*;
// 服务端
async fn server() -> zresult::ZResult<()> {
let transport = create_transport().await?;
let key = "/demo/service".into();
let queryable = transport.declare_queryable(&key).await?;
while let Ok(query) = queryable.recv().await {
println!("Received query: {:?}", query);
query.reply(Ok("Server response".into())).await?;
}
Ok(())
}
// 客户端
async fn client() -> zresult::ZResult<()> {
let transport = create_transport().await?;
let key = "/demo/service".into();
let response = transport.get(&key).await?;
println!("Response: {:?}", response);
Ok(())
}
#[tokio::main]
async fn main() {
// 启动服务端
tokio::spawn(async {
server().await.unwrap();
});
// 启动客户端
client().await.unwrap();
}
自定义配置示例
use zenoh_transport::*;
use zenoh_config::*;
async fn custom_transport() -> zresult::ZResult<()> {
// 创建自定义配置
let mut config = TransportManager::config_default()?;
// 设置TCP协议
config.transport.unicast.set_protocol(Protocol::Tcp);
// 设置监听端口
config.transport.unicast.set_listen("0.0.0.0:8080".parse()?);
// 设置连接超时
config.transport.unicast.set_connect_timeout(Some(Duration::from_secs(3)));
// 创建传输管理器
let manager = TransportManager::new(config);
// 打开传输
let transport = manager.open_transport_unicast().await?;
// 使用传输...
Ok(())
}
多播传输示例
use zenoh_transport::*;
async fn multicast_demo() -> zresult::ZResult<()> {
let config = TransportManager::config_default()?;
let manager = TransportManager::new(config);
// 打开多播传输
let transport = manager.open_transport_multicast().await?;
// 发布消息
transport.put("/group/chat".into(), "Hello multicast!".into()).await?;
Ok(())
}
错误处理最佳实践
use zenoh_transport::*;
async fn safe_operation() -> zresult::ZResult<()> {
let transport = create_transport().await?;
let key = "/safe/operation".into();
// 带错误处理的发布操作
match transport.put(&key, "data".into()).await {
Ok(_) => println!("Success"),
Err(e) => {
match e.kind() {
zerror::ZErrorKind::Timeout => {
println!("Timeout occurred, retrying...");
// 重试逻辑
}
zerror::ZErrorKind::ConnectionClosed => {
println!("Connection closed, reconnecting...");
// 重新连接逻辑
}
_ => println!("Error: {}", e),
}
}
}
Ok(())
}