Rust网络通信库zenoh-transport的使用:高效、可扩展的分布式数据传输与消息传递框架

Rust网络通信库zenoh-transport的使用:高效、可扩展的分布式数据传输与消息传递框架

⚠️ 警告 ⚠️

这个crate是为Zenoh内部使用而设计的。不能保证API在任何版本中保持不变,包括补丁更新。强烈建议仅依赖zenoh和zenoh-ext crate,并使用它们的公共API。

元数据

  • 版本: 1.5.0
  • Rust版本: v1.75.0
  • 许可证: EPL-2.0 OR Apache-2.0
  • 大小: 163 KiB
  • 更新时间: 23天前

安装

在项目目录中运行以下Cargo命令:

cargo add zenoh-transport

或者在Cargo.toml中添加以下行:

zenoh-transport = "1.5.0"

所有者

  • Julien Enoch
  • eclipse-zenoh-bot
  • Luca Cominardi
  • OlivierHecart

分类

  • 网络编程

完整示例代码

以下是一个使用zenoh-transport进行基本发布/订阅通信的示例:

use zenoh::prelude::r#async::*;

#[async_std::main]
async fn main() {
    // 创建zenoh会话
    let session = zenoh::open(config::default())
        .res()
        .await
        .unwrap();
    
    // 发布者示例
    let publisher = session
        .declare_publisher("example/topic")
        .res()
        .await
        .unwrap();
    
    // 订阅者示例
    let subscriber = session
        .declare_subscriber("example/topic")
        .callback(|sample| {
            println!(
                "Received {} on {}: {}",
                sample.kind,
                sample.key_expr.as_str(),
                sample.value
            );
        })
        .res()
        .await
        .unwrap();
    
    // 发布消息
    publisher.put("Hello, Zenoh!").res().await.unwrap();
    
    // 等待一段时间以接收消息
    async_std::task::sleep(std::time::Duration::from_secs(1)).await;
    
    // 清理资源
    publisher.undeclare().res().await.unwrap();
    subscriber.undeclare().res().await.unwrap();
    session.close().res().await.unwrap();
}

这个示例展示了:

  1. 如何创建zenoh会话
  2. 如何声明发布者和订阅者
  3. 如何发布消息
  4. 如何接收和处理消息
  5. 如何清理资源

注意:实际使用时应该处理所有Result类型,而不是直接unwrap()。

扩展示例代码

以下是一个更完整的zenoh-transport使用示例,包含错误处理和更多功能:

use zenoh::prelude::r#async::*;
use zenoh::config::Config;
use async_std::task;
use std::time::Duration;

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // 创建自定义配置
    let config = Config::default()
        .set_mode(Some(WhatAmI::Peer))?  // 设置为Peer模式
        .add_plugin("tcp".to_string())?;  // 启用TCP传输
    
    // 创建zenoh会话并处理错误
    let session = zenoh::open(config)
        .res()
        .await
        .map_err(|e| format!("Failed to open session: {}", e))?;
    
    // 声明发布者
    let publisher = session
        .declare_publisher("example/advanced")
        .congestion_control(CongestionControl::Block)  // 设置拥塞控制
        .priority(Priority::RealTime)  // 设置优先级
        .res()
        .await
        .map_err(|e| format!("Failed to create publisher: {}", e))?;
    
    // 声明订阅者
    let subscriber = session
        .declare_subscriber("example/advanced")
        .reliability(Reliability::Reliable)  // 设置可靠性
        .callback(|sample| {
            println!(
                "[Subscriber] Received {:?} on {}: {:?}",
                sample.kind,
                sample.key_expr.as_str(),
                sample.value
            );
        })
        .res()
        .await
        .map_err(|e| format!("Failed to create subscriber: {}", e))?;
    
    // 异步发布多条消息
    for i in 0..5 {
        let msg = format!("Message {}", i);
        publisher
            .put(msg)
            .res()
            .await
            .map_err(|e| format!("Failed to publish message: {}", e))?;
        task::sleep(Duration::from_millis(200)).await;
    }
    
    // 等待所有消息被接收
    task::sleep(Duration::from_secs(2)).await;
    
    // 清理资源
    publisher.undeclare()
        .res()
        .await
        .map_err(|e| format!("Failed to undeclare publisher: {}", e))?;
    
    subscriber.undeclare()
        .res()
        .await
        .map_err(|e| format!("Failed to undeclare subscriber: {}", e))?;
    
    session.close()
        .res()
        .await
        .map_err(|e| format!("Failed to close session: {}", e))?;
    
    Ok(())
}

这个扩展示例展示了:

  1. 自定义配置zenoh会话
  2. 更完善的错误处理
  3. 设置发布者和订阅者的高级选项
  4. 批量发布消息
  5. 更合理的资源清理

注意:实际使用时应根据具体需求调整配置参数和错误处理逻辑。


1 回复

Rust网络通信库zenoh-transport的使用指南

简介

zenoh-transport是zenoh项目中的网络通信组件,是一个高效、可扩展的分布式数据传输与消息传递框架。它提供了低延迟、高吞吐量的通信能力,特别适合物联网(IoT)、边缘计算和分布式系统场景。

zenoh-transport的主要特点:

  • 支持多种传输协议(包括TCP、UDP、QUIC等)
  • 内置负载均衡和故障转移机制
  • 提供发布/订阅和请求/响应两种通信模式
  • 支持点对点和中转路由通信
  • 高度可扩展的架构设计

完整示例代码

下面是一个完整的发布/订阅模式示例,包含发布者和订阅者的实现:

use zenoh_transport::*;
use tokio::time::{sleep, Duration};

// 创建传输层
async fn create_transport() -> zresult::ZResult<TransportUnicast> {
    // 使用默认配置
    let config = TransportManager::config_default()?;
    let manager = TransportManager::new(config);
    manager.open_transport_unicast().await
}

// 发布者
async fn publisher() -> zresult::ZResult<()> {
    let transport = create_transport().await?;
    let key = "/demo/example".into();
    
    for i in 0..10 {
        let value = format!("Message {}", i);
        transport.put(&key, value.into()).await?;
        sleep(Duration::from_secs(1)).await;
    }
    Ok(())
}

// 订阅者 
async fn subscriber() -> zresult::ZResult<()> {
    let transport = create_transport().await?;
    let key = "/demo/example".into();
    
    let sub = transport.declare_subscriber(&key).await?;
    while let Ok(sample) = sub.recv().await {
        println!("Received: {:?}", sample);
    }
    Ok(())
}

#[tokio::main]
async fn main() {
    // 启动订阅者
    tokio::spawn(async {
        subscriber().await.unwrap();
    });
    
    // 启动发布者
    publisher().await.unwrap();
}

请求/响应模式完整示例

use zenoh_transport::*;

// 服务端
async fn server() -> zresult::ZResult<()> {
    let transport = create_transport().await?;
    let key = "/demo/service".into();
    
    let queryable = transport.declare_queryable(&key).await?;
    while let Ok(query) = queryable.recv().await {
        println!("Received query: {:?}", query);
        query.reply(Ok("Server response".into())).await?;
    }
    Ok(())
}

// 客户端
async fn client() -> zresult::ZResult<()> {
    let transport = create_transport().await?;
    let key = "/demo/service".into();
    
    let response = transport.get(&key).await?;
    println!("Response: {:?}", response);
    Ok(())
}

#[tokio::main] 
async fn main() {
    // 启动服务端
    tokio::spawn(async {
        server().await.unwrap();
    });
    
    // 启动客户端
    client().await.unwrap();
}

自定义配置示例

use zenoh_transport::*;
use zenoh_config::*;

async fn custom_transport() -> zresult::ZResult<()> {
    // 创建自定义配置
    let mut config = TransportManager::config_default()?;
    
    // 设置TCP协议
    config.transport.unicast.set_protocol(Protocol::Tcp);
    
    // 设置监听端口
    config.transport.unicast.set_listen("0.0.0.0:8080".parse()?);
    
    // 设置连接超时
    config.transport.unicast.set_connect_timeout(Some(Duration::from_secs(3)));
    
    // 创建传输管理器
    let manager = TransportManager::new(config);
    
    // 打开传输
    let transport = manager.open_transport_unicast().await?;
    
    // 使用传输...
    Ok(())
}

多播传输示例

use zenoh_transport::*;

async fn multicast_demo() -> zresult::ZResult<()> {
    let config = TransportManager::config_default()?;
    let manager = TransportManager::new(config);
    
    // 打开多播传输
    let transport = manager.open_transport_multicast().await?;
    
    // 发布消息
    transport.put("/group/chat".into(), "Hello multicast!".into()).await?;
    
    Ok(())
}

错误处理最佳实践

use zenoh_transport::*;

async fn safe_operation() -> zresult::ZResult<()> {
    let transport = create_transport().await?;
    let key = "/safe/operation".into();
    
    // 带错误处理的发布操作
    match transport.put(&key, "data".into()).await {
        Ok(_) => println!("Success"),
        Err(e) => {
            match e.kind() {
                zerror::ZErrorKind::Timeout => {
                    println!("Timeout occurred, retrying...");
                    // 重试逻辑
                }
                zerror::ZErrorKind::ConnectionClosed => {
                    println!("Connection closed, reconnecting...");
                    // 重新连接逻辑
                }
                _ => println!("Error: {}", e),
            }
        }
    }
    Ok(())
}
回到顶部