Rust网络通信库zenoh-link-commons的使用,高性能跨平台数据传输与链接管理组件

Rust网络通信库zenoh-link-commons的使用,高性能跨平台数据传输与链接管理组件

⚠️ 警告 ⚠️

这个crate是为Zenoh内部使用而设计的。
不能保证API在任何版本(包括补丁更新)中保持不变。
强烈建议仅依赖zenoh和zenoh-ext crates,并使用它们的公共API。

元数据

  • 版本: v1.5.0
  • 发布时间: 23天前
  • 大小: 29.2 KiB
  • 许可证: EPL-2.0 OR Apache-2.0
  • 分类: 网络编程

安装

在项目目录中运行以下Cargo命令:

cargo add zenoh-link-commons

或者在Cargo.toml中添加:

zenoh-link-commons = "1.5.0"

示例代码

以下是使用zenoh-link-commons的基本示例:

use zenoh_link_commons::{LinkManager, LinkManagerConfig};
use async_std::task;

fn main() {
    task::block_on(async {
        // 创建链接管理器配置
        let config = LinkManagerConfig::default();
        
        // 初始化链接管理器
        let manager = LinkManager::new(config).await.unwrap();
        
        // 创建TCP监听器
        let listener = manager
            .new_listener("tcp/0.0.0.0:7447")
            .await
            .unwrap();
            
        println!("Listening on: {}", listener.get_locators().await.unwrap());
        
        // 接受新连接
        let connection = listener.accept().await.unwrap();
        println!("Accepted new connection from: {}", connection.get_remote_address().unwrap());
        
        // 发送数据
        connection.write_all(b"Hello from zenoh-link-commons!").await.unwrap();
        
        // 接收数据
        let mut buf = [0u8; 1024];
        let n = connection.read(&mut buf).await.unwrap();
        println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
    });
}

完整示例demo

以下是一个更完整的示例,展示了客户端和服务器的双向通信:

use zenoh_link_commons::{LinkManager, LinkManagerConfig};
use async_std::task;
use async_std::net::TcpStream;
use std::time::Duration;

// 服务器端代码
async fn server() {
    let config = LinkManagerConfig::default();
    let manager = LinkManager::new(config).await.unwrap();
    
    // 创建TCP监听器
    let listener = manager
        .new_listener("tcp/0.0.0.0:7447")
        .await
        .unwrap();
    
    println!("[Server] Listening on: {}", listener.get_locators().await.unwrap());
    
    // 接受新连接
    let connection = listener.accept().await.unwrap();
    println!("[Server] Accepted connection from: {}", connection.get_remote_address().unwrap());
    
    // 接收数据
    let mut buf = [0u8; 1024];
    let n = connection.read(&mut buf).await.unwrap();
    println!("[Server] Received: {}", String::from_utf8_lossy(&buf[..n]));
    
    // 发送响应
    connection.write_all(b"Hello from server!").await.unwrap();
}

// 客户端代码
async fn client() {
    task::sleep(Duration::from_secs(1)).await; // 等待服务器启动
    
    let config = LinkManagerConfig::default();
    let manager = LinkManager::new(config).await.unwrap();
    
    // 连接到服务器
    let connection = manager
        .new_link("tcp/127.0.0.1:7447")
        .await
        .unwrap();
    
    println!("[Client] Connected to server");
    
    // 发送数据
    connection.write_all(b"Hello from client!").await.unwrap();
    
    // 接收响应
    let mut buf = [0u8; 1024];
    let n = connection.read(&mut buf).await.unwrap();
    println!("[Client] Received: {}", String::from_utf8_lossy(&buf[..n]));
}

fn main() {
    task::block_on(async {
        // 启动服务器
        let server_handle = task::spawn(server());
        
        // 启动客户端
        client().await;
        
        // 等待服务器完成
        server_handle.await;
    });
}

功能说明

  1. 高性能网络通信: 提供低延迟、高吞吐量的网络传输能力
  2. 跨平台支持: 支持多种操作系统和网络协议
  3. 链接管理: 统一管理各种类型的网络链接
  4. 异步IO: 基于async/await的异步编程模型

所有者

  • Julien Enoch
  • eclipse-zenoh-bot
  • Luca Cominardi
  • OlivierHecart

1 回复

Rust网络通信库zenoh-link-commons使用指南

简介

zenoh-link-commons是Zenoh项目中的一个核心组件,专注于提供高性能、跨平台的网络通信和链接管理功能。它为Zenoh的分布式通信系统提供了底层传输抽象,支持多种传输协议和平台。

主要特性

  • 跨平台支持(Linux, Windows, macOS等)
  • 高性能数据传输
  • 多协议支持(TCP, UDP, WebSocket等)
  • 链接生命周期管理
  • 异步I/O支持
  • 可扩展的传输插件架构

使用方法

添加依赖

首先在Cargo.toml中添加依赖:

[dependencies]
zenoh-link-commons = "0.7"

基本使用示例

use zenoh_link_commons::{LinkManager, Locator};
use async_std::task;

async fn example() -> Result<(), Box<dyn std::error::Error>> {
    // 创建链接管理器
    let manager = LinkManager::default();
    
    // 定义连接地址
    let locator = Locator::new("tcp/127.0.0.1:7447")?;
    
    // 建立连接
    let link = manager.new_link(&locator).await?;
    
    // 发送数据
    let data = b"Hello, zenoh-link-commons!";
    link.write_all(data).await?;
    
    // 接收数据
    let mut buffer = [0u8; 1024];
    let size = link.read(&mut buffer).await?;
    println!("Received: {}", String::from_utf8_lossy(&buffer[..size]));
    
    Ok(())
}

fn main() {
    task::block_on(example()).unwrap();
}

多链接管理示例

use zenoh_link_commons::{LinkManager, Locator};
use async_std::task;
use std::collections::HashMap;

async fn manage_multiple_connections() -> Result<(), Box<dyn std::error::Error>> {
    let manager = LinkManager::default();
    let mut connections = HashMap::new();
    
    // 添加多个连接
    let endpoints = [
        "tcp/127.0.0.1:7447",
        "udp/127.0.0.1:7448",
        "ws/127.0.0.1:7449"
    ];
    
    for endpoint in endpoints.iter() {
        let locator = Locator::new(endpoint)?;
        let link = manager.new_link(&locator).await?;
        connections.insert(endpoint.to_string(), link);
    }
    
    // 广播消息
    let message = b"Broadcast message";
    for (_, link) in connections.iter_mut() {
        link.write_all(message).await?;
    }
    
    Ok(())
}

自定义传输配置

use zenoh_link_commons::{LinkManager, Locator, LinkConfig};
use async_std::task;

async fn custom_config() -> Result<(), Box<dyn std::error::Error>> {
    let manager = LinkManager::default();
    let locator = Locator::new("tcp/127.0.0.1:7447")?;
    
    // 自定义配置
    let config = LinkConfig {
        keep_alive: Some(60),  // 60秒保活
        no_delay: true,        // 启用TCP_NODELAY
        ..Default::default()
    };
    
    let link = manager.new_link_with_config(&locator, &config).await?;
    
    // 使用连接...
    
    Ok(())
}

高级功能

自定义传输协议

zenoh-link-commons支持通过插件方式添加自定义传输协议:

use zenoh_link_commons::{LinkManager, TransportPeer, TransportTrait};

struct MyCustomTransport;

#[async_trait::async_trait]
impl TransportTrait for MyCustomTransport {
    // 实现必要的方法...
}

// 注册自定义传输
LinkManager::register_transport("myproto", Box::new(MyCustomTransport));

性能优化技巧

  1. 使用批处理发送数据:
link.write_all(&[b"Hello", b" ", b"World"]).await?;
  1. 启用零拷贝接收:
let buffer = link.read_zero_copy().await?;
  1. 调整缓冲区大小:
let config = LinkConfig {
    rx_buffer_size: Some(65536),
    tx_buffer_size: Some(65536),
    ..Default::default()
};

完整示例代码

下面是一个完整的zenoh-link-commons使用示例,包含TCP客户端和服务端实现:

// 服务端代码
use zenoh_link_commons::{LinkManager, Locator};
use async_std::{task, net::{TcpListener, TcpStream}};

async fn server() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:7447").await?;
    println!("Server listening on 127.0.0.1:7447");
    
    let manager = LinkManager::default();
    
    while let Ok((stream, _)) = listener.accept().await {
        task::spawn(async move {
            let locator = Locator::new("tcp/127.0.0.1:7447").unwrap();
            let link = manager.new_link_from_stream(&locator, stream).await.unwrap();
            
            let mut buffer = [0u8; 1024];
            loop {
                match link.read(&mut buffer).await {
                    Ok(size) => {
                        println!("Received: {}", String::from_utf8_lossy(&buffer[..size]));
                        link.write_all(&buffer[..size]).await.unwrap();
                    }
                    Err(_) => break,
                }
            }
        });
    }
    
    Ok(())
}

// 客户端代码
async fn client() -> Result<(), Box<dyn std::error::Error>> {
    let manager = LinkManager::default();
    let locator = Locator::new("tcp/127.0.0.1:7447")?;
    
    // 建立连接
    let link = manager.new_link(&locator).await?;
    
    // 发送数据
    let data = b"Hello from client!";
    link.write_all(data).await?;
    
    // 接收回显数据
    let mut buffer = [0u8; 1024];
    let size = link.read(&mut buffer).await?;
    println!("Echo: {}", String::from_utf8_lossy(&buffer[..size]));
    
    Ok(())
}

// 主函数
fn main() {
    // 启动服务端
    let server_handle = task::spawn(server());
    
    // 等待服务端启动
    task::block_on(task::sleep(std::time::Duration::from_secs(1)));
    
    // 运行客户端
    task::block_on(client()).unwrap();
    
    // 等待服务端完成
    task::block_on(server_handle);
}

注意事项

  1. 所有操作都是异步的,需要在async上下文中使用
  2. 链接需要手动关闭或由Drop trait自动关闭
  3. 不同传输协议的特性可能有所不同
  4. 生产环境中应考虑添加重连机制和错误处理

zenoh-link-commons为构建高性能分布式系统提供了强大的底层通信能力,通过合理的配置和使用可以满足各种网络通信需求。

回到顶部