Rust DDS中间件库rustdds的使用,实现高效数据分发服务的实时通信框架

Rust DDS中间件库rustdds的使用,实现高效数据分发服务的实时通信框架

RustDDS简介

RustDDS是一个纯Rust实现的数据分发服务(DDS)中间件库。该库由Atostek Oy开发,Atostek提供与DDS、ROS2和机器人软件相关的支持和软件开发服务。

RustDDS尝试将DDS的关键思想转换为Rust概念,同时也遵循Rust的惯例。因此,API并不完全按照DDS规范编写,而是使用Rust概念和惯例的功能等效近似。

数据分发服务(DDS)

实时系统的数据分发服务(DDS)是对象管理组织(OMG)的机器对机器连接框架,旨在使用发布-订阅模式实现可扩展、实时、可靠、高性能和可互操作的数据交换。DDS满足了空中交通管制、智能电网管理、自动驾驶汽车、机器人、运输系统、发电、医疗设备、仿真和测试、航空航天和国防以及其他需要实时数据交换的应用的需求。

当前实现状态

目前,该实现已经足够完整,可以与ROS2软件进行数据交换。

功能状态

  • 发现 ✅
  • 可靠性QoS:可靠和尽力而为 ✅
  • 历史QoS ✅
  • 基于UDP的RTPS ✅
  • 广播UDP ✅
  • 非阻塞I/O ✅
  • 主题类型:with_key和no_key ✅
  • 零拷贝接收路径 ✅
  • 零拷贝发送路径
  • 主题创建 ✅
  • 主题查找 ✅
  • 分区QoS
  • 基于时间的过滤QoS
  • 所有权QoS
  • 展示QoS:连贯/原子样本集和排序
  • 截止时间和延迟预算QoS
  • 样本分片(大对象交换) ✅
  • wait_for_acknowledgments
  • 用于DomainParticipant的监听器(或等效物) ✅
  • 用于主题的监听器
  • 使用Rust async任务的替代API ✅
  • 本地连接的共享内存传输

数据序列化和键控

一些现有的DDS实现使用代码生成来为每个有效载荷类型实现DataReader和DataWriter类。

我们不依赖代码生成,而是使用Rust泛型编程:有一个通用的DataReader和DataWriter,参数化为有效载荷类型D和序列化器适配器类型SA。Serde库用于有效载荷数据的序列化/反序列化。

当与DataWriter一起使用时,有效载荷类型D需要实现serde::Serialize,而与DataReader一起使用时需要实现serde::DeserializeOwned。许多现有的Rust类型和库已经支持Serde,因此它们可以直接使用。

在DDS中,WITH_KEY主题包含多个不同的实例,这些实例由键区分。键必须以某种方式嵌入到数据样本中。在我们的实现中,如果有效载荷类型D在WITH_KEY主题中通信,则D还需要实现Keyed特征。

Keyed特征需要一个方法:key(&self) -> Self::K,用于从D中提取关联类型K的键。键类型K必须实现Key特征,它是预先存在的特征Eq + PartialEq + PartialOrd + Ord + Hash + Clone + Serialize + DeserializeOwned的组合,没有额外的方法。

示例代码

以下是一个使用RustDDS的完整示例,展示如何创建一个简单的发布者和订阅者:

use rustdds::*;
use serde::{Serialize, Deserialize};

// 定义一个简单的消息类型
#[derive(Serialize, Deserialize, Debug)]
struct Point {
    x: f64,
    y: f64,
}

// 实现Keyed特征以便在WITH_KEY主题中使用
impl Keyed for Point {
    type K = String;
    
    fn key(&self) -> Self::K {
        format!("Point_{}_{}", self.x as i32, self.y as i32)
    }
}

fn main() {
    // 创建DDS参与者
    let domain_participant = DomainParticipant::new(0).unwrap();
    
    // 创建主题
    let topic = domain_participant.create_topic(
        "Points",
        "PointType",
        TopicKind::WithKey,
        QosPolicyBuilder::new().build(),
    ).unwrap();
    
    // 创建发布者
    let publisher = domain_participant.create_publisher(
        QosPolicyBuilder::new().build(),
    ).unwrap();
    
    // 创建数据写入器
    let writer = publisher.create_datawriter::<Point>(
        &topic,
        QosPolicyBuilder::new()
            .reliability(Reliability::Reliable {
                max_blocking_time: Duration::from_millis(100),
            })
            .build(),
    ).unwrap();
    
    // 创建订阅者
    let subscriber = domain_participant.create_subscriber(
        QosPolicyBuilder::新().build(),
    ).unwrap();
    
    // 创建数据读取器
    let reader = subscriber.create_datareader::<Point>(
        &topic,
        QosPolicyBuilder::new()
            .reliability(Reliability::Reliable {
                max_blocking_time: Duration::from_millis(100),
            })
            .build(),
    ).unwrap();
    
    // 发布一些数据
    for i in 0..5 {
        let point = Point {
            x: i as f64,
            y: (i * 2) as f64,
        };
        writer.write(point, None).unwrap();
        println!("Published point");
    }
    
    // 读取数据
    loop {
        if let Some(sample) = reader.take_next_sample().unwrap() {
            println!("Received: {:?}", sample);
        } else {
            break;
        }
    }
}

与DDS规范的故意偏差

基本原理

DDS 1.4规范指定了一个对象模型和一组API,这些API构成了DDS规范。这些API的设计(例如命名约定和内存管理语义)并不完全适合Rust世界。我们尝试创建一个设计,其中重要的DDS想法被保留并以适合Rust的方式实现。这些设计妥协应该只出现在DDS的应用面向API上。网络端仍然旨在与现有的DDS实现完全互操作。

类层次结构

DDS指定了一个类层次结构,它是API的一部分。不一定要遵循该层次结构,因为Rust不使用继承和派生类,不像例如C++。

命名约定

我们尝试遵循Rust命名约定。

数据监听器和WaitSets

DDS提供了两种等待到达数据的替代方法,即WaitSets和Listeners。我们选择用mio crate中的非阻塞IO API替换这些。DDS DataReader对象可以直接与mio Poll接口一起使用。应该可以在其上实现其他API,例如async API。

实例句柄

DDS使用"实例句柄",其行为类似于指向DDS实现管理的对象的指针。这似乎与Rust内存处理不太融合,因此我们选择不实现这些。

实例句柄可用于引用具有特定键的数据值(样本)。我们编写了API直接使用键,因为这看起来语义等效。

安装

要在项目中使用RustDDS,请将以下内容添加到您的Cargo.toml中:

[dependencies]
rustdds = "0.11.5"

或者运行以下Cargo命令:

cargo add rustdds

RustDDS仍在积极开发中,最新版本和文档可以在其GitHub仓库中找到。

完整示例代码

以下是一个更完整的RustDDS使用示例,展示了分布式系统中的发布-订阅通信:

use rustdds::*;
use serde::{Serialize, Deserialize};
use std::time::Duration;

// 定义更复杂的数据结构
#[derive(Serialize, Deserialize, Debug, Clone)]
struct SensorData {
    id: String,          // 传感器ID
    timestamp: u64,      // 时间戳
    values: Vec<f32>,    // 传感器读数
    status: SensorStatus // 传感器状态
}

// 传感器状态枚举
#[derive(Serialize, Deserialize, Debug, Clone)]
enum SensorStatus {
    Normal,
    Warning,
    Critical,
    Offline
}

// 实现Keyed特征以便在WITH_KEY主题中使用
impl Keyed for SensorData {
    type K = String;
    
    fn key(&self) -> Self::K {
        self.id.clone()
    }
}

// 发布者函数
async fn publisher_task(domain_id: u32) -> Result<(), dds::Error> {
    // 创建DDS参与者
    let domain_participant = DomainParticipant::new(domain_id)?;
    
    // 创建主题
    let topic = domain_participant.create_topic(
        "SensorDataTopic",
        "SensorDataType",
        TopicKind::WithKey,
        QosPolicyBuilder::new().build(),
    )?;
    
    // 创建发布者
    let publisher = domain_participant.create_publisher(
        QosPolicyBuilder::new().build(),
    )?;
    
    // 创建数据写入器
    let writer = publisher.create_datawriter::<SensorData>(
        &topic,
        QosPolicyBuilder::new()
            .reliability(Reliability::Reliable {
                max_blocking_time: Duration::from_millis(100),
            })
            .history(History::KeepLast { depth: 10 })
            .build(),
    )?;
    
    // 模拟发布传感器数据
    for i in 0..10 {
        let sensor_data = SensorData {
            id: format!("sensor_{}", i % 3),  // 3个不同的传感器ID
            timestamp: chrono::Utc::now().timestamp_millis() as u64,
            values: vec![i as f32, (i * 2) as f32, (i * 3) as f32],
            status: if i % 5 == 0 { SensorStatus::Warning } else { SensorStatus::Normal }
        };
        
        writer.write(sensor_data, None)?;
        println!("Published sensor data {}", i);
        
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
    
    Ok(())
}

// 订阅者函数
async fn subscriber_task(domain_id: u32) -> Result<(), dds::Error> {
    // 创建DDS参与者
    let domain_participant = DomainParticipant::new(domain_id)?;
    
    // 创建主题
    let topic = domain_participant.create_topic(
        "SensorDataTopic",
        "SensorDataType",
        TopicKind::WithKey,
        QosPolicyBuilder::new().build(),
    )?;
    
    // 创建订阅者
    let subscriber = domain_participant.create_subscriber(
        QosPolicyBuilder::new().build(),
    )?;
    
    // 创建数据读取器
    let reader = subscriber.create_datareader::<SensorData>(
        &topic,
        QosPolicyBuilder::new()
            .reliability(Reliability::Reliable {
                max_blocking_time: Duration::from_millis(100),
            })
            .history(History::KeepLast { depth: 10 })
            .build(),
    )?;
    
    println!("Subscriber ready, waiting for data...");
    
    // 持续接收数据
    loop {
        if let Some(sample) = reader.take_next_sample()? {
            println!("Received sensor data: {:?}", sample);
        } else {
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    }
}

#[tokio::main]
async fn main() {
    let domain_id = 0;
    
    // 在单独的线程中运行发布者和订阅者
    let publisher_handle = tokio::spawn(async move {
        publisher_task(domain_id).await
    });
    
    let subscriber_handle = tokio::spawn(async move {
        subscriber_task(domain_id).await
    });
    
    // 等待任务完成
    let _ = tokio::join!(publisher_handle, subscriber_handle);
}

这个完整示例展示了:

  1. 定义更复杂的数据结构SensorData,包含多个字段和枚举
  2. 实现Keyed特征以便在WITH_KEY主题中使用
  3. 使用tokio异步运行时
  4. 更完整的QoS配置,包括历史记录策略
  5. 发布者和订阅者作为异步任务运行
  6. 更健壮的错误处理
  7. 模拟真实场景中的传感器数据发布

要运行此示例,需要在Cargo.toml中添加以下依赖项:

[dependencies]
rustdds = "0.11.5"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.0", features = ["full"] }
chrono = "0.4"

1 回复

Rust DDS中间件库rustdds的使用 - 实现高效数据分发服务的实时通信框架

介绍

rustdds是一个纯Rust实现的DDS(Data Distribution Service)中间件库,它实现了OMG DDS规范,为分布式系统提供高效的实时数据通信能力。DDS是一种以数据为中心的发布-订阅通信模型,特别适合需要高吞吐量、低延迟和可靠性的分布式系统。

rustdds的主要特点:

  • 完全用Rust编写,无外部依赖
  • 支持实时发布-订阅模式
  • 提供QoS(Quality of Service)策略配置
  • 支持动态发现和类型系统
  • 线程安全设计

安装方法

在Cargo.toml中添加依赖:

[dependencies]
rustdds = "0.8"
serde = { version = "1.0", features = ["derive"] } # 用于序列化

完整示例代码

下面是一个完整的rustdds使用示例,展示了一个简单的发布-订阅通信模型:

use rustdds::*;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::sync::Arc;
use std::thread;
use rand::Rng;

// 1. 定义数据结构
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
struct SensorData {
    id: u32,
    value: f64,
    timestamp: u64,
}

fn main() {
    // 2. 创建DomainParticipant
    let domain_id = 0;
    let participant = DomainParticipant::new(domain_id)
        .expect("创建DomainParticipant失败");

    // 3. 创建Topic
    let topic = participant
        .create_topic(
            "SensorDataTopic",
            "SensorData".to_string(),
            &QosPolicyBuilder::new().build(),
            TopicKind::NoKey,
        )
        .expect("创建Topic失败");

    // 4. 创建Publisher和DataWriter
    let publisher = participant
        .create_publisher(&QosPolicyBuilder::new().build())
        .expect("创建Publisher失败");

    let writer_qos = QosPolicyBuilder::new()
        .reliability()
            .reliable()
            .max_blocking_time(Duration::from_millis(100))
        .history()
            .keep_last(10)
        .build();

    let writer = publisher
        .create_data_writer_no_key::<SensorData>(&topic, &writer_qos)
        .expect("创建DataWriter失败");

    // 5. 创建Subscriber和DataReader
    let subscriber = participant
        .create_subscriber(&QosPolicyBuilder::new().build())
        .expect("创建Subscriber失败");

    let reader_qos = QosPolicyBuilder::new()
        .reliability()
            .reliable()
        .durability()
            .transient_local()
        .build();

    let reader = subscriber
        .create_data_reader_no_key::<SensorData>(&topic, &reader_qos)
        .expect("创建DataReader失败");

    // 6. 创建发布者线程
    let publisher_thread = thread::spawn(move || {
        let mut rng = rand::thread_rng();
        
        for i in 0..10 {
            let timestamp = SystemTime::now()
                .duration_since(UNIX_EPOCH)
                .unwrap()
                .as_secs();
            
            let data = SensorData {
                id: i,
                value: rng.gen::<f64>() * 100.0,
                timestamp,
            };
            
            println!("发布数据: {:?}", data);
            writer.write(data, None).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    // 7. 创建订阅者线程
    let reader_arc = Arc::new(reader);
    let subscriber_thread = thread::spawn(move || {
        // 等待历史数据
        reader_arc.wait_for_historical_data(Duration::from_secs(5)).unwrap();

        loop {
            if let Some(sample) = reader_arc.take_next_sample().unwrap() {
                let data: SensorData = sample.into_value();
                println!("接收到数据: {:?}", data);
            }
        }
    });

    // 等待线程结束
    publisher_thread.join().unwrap();
    subscriber_thread.join().unwrap();
}

代码说明

  1. 数据结构定义

    • 使用#[derive]宏为SensorData结构体自动实现序列化、反序列化和克隆等特性
  2. DomainParticipant创建

    • 每个应用程序通常创建一个DomainParticipant
    • 指定domain ID(0)来标识通信域
  3. Topic创建

    • 定义Topic名称和类型名称
    • 使用默认QoS策略
  4. Publisher和DataWriter

    • 配置可靠传输策略
    • 设置历史缓存大小
    • 创建用于发布数据的DataWriter
  5. Subscriber和DataReader

    • 配置可靠传输和持久性策略
    • 创建用于接收数据的DataReader
  6. 发布者线程

    • 生成随机传感器数据
    • 每秒发布一条数据
    • 使用DataWriter的write方法发送数据
  7. 订阅者线程

    • 使用Arc共享DataReader
    • 先等待历史数据
    • 循环接收并处理数据

注意事项

  1. 确保添加rand依赖到Cargo.toml以生成随机数据:

    [dependencies]
    rand = "0.8"
    
  2. 在实际应用中,应该添加适当的错误处理和线程终止条件

  3. 根据应用需求调整QoS策略参数

  4. 对于生产环境,考虑使用更高效的序列化方式

回到顶部