Rust DDS中间件库rustdds的使用,实现高效数据分发服务的实时通信框架
Rust DDS中间件库rustdds的使用,实现高效数据分发服务的实时通信框架
RustDDS简介
RustDDS是一个纯Rust实现的数据分发服务(DDS)中间件库。该库由Atostek Oy开发,Atostek提供与DDS、ROS2和机器人软件相关的支持和软件开发服务。
RustDDS尝试将DDS的关键思想转换为Rust概念,同时也遵循Rust的惯例。因此,API并不完全按照DDS规范编写,而是使用Rust概念和惯例的功能等效近似。
数据分发服务(DDS)
实时系统的数据分发服务(DDS)是对象管理组织(OMG)的机器对机器连接框架,旨在使用发布-订阅模式实现可扩展、实时、可靠、高性能和可互操作的数据交换。DDS满足了空中交通管制、智能电网管理、自动驾驶汽车、机器人、运输系统、发电、医疗设备、仿真和测试、航空航天和国防以及其他需要实时数据交换的应用的需求。
当前实现状态
目前,该实现已经足够完整,可以与ROS2软件进行数据交换。
功能状态
- 发现 ✅
- 可靠性QoS:可靠和尽力而为 ✅
- 历史QoS ✅
- 基于UDP的RTPS ✅
- 广播UDP ✅
- 非阻塞I/O ✅
- 主题类型:with_key和no_key ✅
- 零拷贝接收路径 ✅
- 零拷贝发送路径
- 主题创建 ✅
- 主题查找 ✅
- 分区QoS
- 基于时间的过滤QoS
- 所有权QoS
- 展示QoS:连贯/原子样本集和排序
- 截止时间和延迟预算QoS
- 样本分片(大对象交换) ✅
wait_for_acknowledgments
✅- 用于DomainParticipant的监听器(或等效物) ✅
- 用于主题的监听器
- 使用Rust
async
任务的替代API ✅ - 本地连接的共享内存传输
数据序列化和键控
一些现有的DDS实现使用代码生成来为每个有效载荷类型实现DataReader和DataWriter类。
我们不依赖代码生成,而是使用Rust泛型编程:有一个通用的DataReader和DataWriter,参数化为有效载荷类型D和序列化器适配器类型SA。Serde库用于有效载荷数据的序列化/反序列化。
当与DataWriter一起使用时,有效载荷类型D需要实现serde::Serialize
,而与DataReader一起使用时需要实现serde::DeserializeOwned
。许多现有的Rust类型和库已经支持Serde,因此它们可以直接使用。
在DDS中,WITH_KEY主题包含多个不同的实例,这些实例由键区分。键必须以某种方式嵌入到数据样本中。在我们的实现中,如果有效载荷类型D在WITH_KEY主题中通信,则D还需要实现Keyed
特征。
Keyed
特征需要一个方法:key(&self) -> Self::K
,用于从D
中提取关联类型K
的键。键类型K
必须实现Key
特征,它是预先存在的特征Eq + PartialEq + PartialOrd + Ord + Hash + Clone + Serialize + DeserializeOwned
的组合,没有额外的方法。
示例代码
以下是一个使用RustDDS的完整示例,展示如何创建一个简单的发布者和订阅者:
use rustdds::*;
use serde::{Serialize, Deserialize};
// 定义一个简单的消息类型
#[derive(Serialize, Deserialize, Debug)]
struct Point {
x: f64,
y: f64,
}
// 实现Keyed特征以便在WITH_KEY主题中使用
impl Keyed for Point {
type K = String;
fn key(&self) -> Self::K {
format!("Point_{}_{}", self.x as i32, self.y as i32)
}
}
fn main() {
// 创建DDS参与者
let domain_participant = DomainParticipant::new(0).unwrap();
// 创建主题
let topic = domain_participant.create_topic(
"Points",
"PointType",
TopicKind::WithKey,
QosPolicyBuilder::new().build(),
).unwrap();
// 创建发布者
let publisher = domain_participant.create_publisher(
QosPolicyBuilder::new().build(),
).unwrap();
// 创建数据写入器
let writer = publisher.create_datawriter::<Point>(
&topic,
QosPolicyBuilder::new()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_millis(100),
})
.build(),
).unwrap();
// 创建订阅者
let subscriber = domain_participant.create_subscriber(
QosPolicyBuilder::新().build(),
).unwrap();
// 创建数据读取器
let reader = subscriber.create_datareader::<Point>(
&topic,
QosPolicyBuilder::new()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_millis(100),
})
.build(),
).unwrap();
// 发布一些数据
for i in 0..5 {
let point = Point {
x: i as f64,
y: (i * 2) as f64,
};
writer.write(point, None).unwrap();
println!("Published point");
}
// 读取数据
loop {
if let Some(sample) = reader.take_next_sample().unwrap() {
println!("Received: {:?}", sample);
} else {
break;
}
}
}
与DDS规范的故意偏差
基本原理
DDS 1.4规范指定了一个对象模型和一组API,这些API构成了DDS规范。这些API的设计(例如命名约定和内存管理语义)并不完全适合Rust世界。我们尝试创建一个设计,其中重要的DDS想法被保留并以适合Rust的方式实现。这些设计妥协应该只出现在DDS的应用面向API上。网络端仍然旨在与现有的DDS实现完全互操作。
类层次结构
DDS指定了一个类层次结构,它是API的一部分。不一定要遵循该层次结构,因为Rust不使用继承和派生类,不像例如C++。
命名约定
我们尝试遵循Rust命名约定。
数据监听器和WaitSets
DDS提供了两种等待到达数据的替代方法,即WaitSets和Listeners。我们选择用mio crate中的非阻塞IO API替换这些。DDS DataReader对象可以直接与mio Poll
接口一起使用。应该可以在其上实现其他API,例如async API。
实例句柄
DDS使用"实例句柄",其行为类似于指向DDS实现管理的对象的指针。这似乎与Rust内存处理不太融合,因此我们选择不实现这些。
实例句柄可用于引用具有特定键的数据值(样本)。我们编写了API直接使用键,因为这看起来语义等效。
安装
要在项目中使用RustDDS,请将以下内容添加到您的Cargo.toml中:
[dependencies]
rustdds = "0.11.5"
或者运行以下Cargo命令:
cargo add rustdds
RustDDS仍在积极开发中,最新版本和文档可以在其GitHub仓库中找到。
完整示例代码
以下是一个更完整的RustDDS使用示例,展示了分布式系统中的发布-订阅通信:
use rustdds::*;
use serde::{Serialize, Deserialize};
use std::time::Duration;
// 定义更复杂的数据结构
#[derive(Serialize, Deserialize, Debug, Clone)]
struct SensorData {
id: String, // 传感器ID
timestamp: u64, // 时间戳
values: Vec<f32>, // 传感器读数
status: SensorStatus // 传感器状态
}
// 传感器状态枚举
#[derive(Serialize, Deserialize, Debug, Clone)]
enum SensorStatus {
Normal,
Warning,
Critical,
Offline
}
// 实现Keyed特征以便在WITH_KEY主题中使用
impl Keyed for SensorData {
type K = String;
fn key(&self) -> Self::K {
self.id.clone()
}
}
// 发布者函数
async fn publisher_task(domain_id: u32) -> Result<(), dds::Error> {
// 创建DDS参与者
let domain_participant = DomainParticipant::new(domain_id)?;
// 创建主题
let topic = domain_participant.create_topic(
"SensorDataTopic",
"SensorDataType",
TopicKind::WithKey,
QosPolicyBuilder::new().build(),
)?;
// 创建发布者
let publisher = domain_participant.create_publisher(
QosPolicyBuilder::new().build(),
)?;
// 创建数据写入器
let writer = publisher.create_datawriter::<SensorData>(
&topic,
QosPolicyBuilder::new()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_millis(100),
})
.history(History::KeepLast { depth: 10 })
.build(),
)?;
// 模拟发布传感器数据
for i in 0..10 {
let sensor_data = SensorData {
id: format!("sensor_{}", i % 3), // 3个不同的传感器ID
timestamp: chrono::Utc::now().timestamp_millis() as u64,
values: vec![i as f32, (i * 2) as f32, (i * 3) as f32],
status: if i % 5 == 0 { SensorStatus::Warning } else { SensorStatus::Normal }
};
writer.write(sensor_data, None)?;
println!("Published sensor data {}", i);
tokio::time::sleep(Duration::from_secs(1)).await;
}
Ok(())
}
// 订阅者函数
async fn subscriber_task(domain_id: u32) -> Result<(), dds::Error> {
// 创建DDS参与者
let domain_participant = DomainParticipant::new(domain_id)?;
// 创建主题
let topic = domain_participant.create_topic(
"SensorDataTopic",
"SensorDataType",
TopicKind::WithKey,
QosPolicyBuilder::new().build(),
)?;
// 创建订阅者
let subscriber = domain_participant.create_subscriber(
QosPolicyBuilder::new().build(),
)?;
// 创建数据读取器
let reader = subscriber.create_datareader::<SensorData>(
&topic,
QosPolicyBuilder::new()
.reliability(Reliability::Reliable {
max_blocking_time: Duration::from_millis(100),
})
.history(History::KeepLast { depth: 10 })
.build(),
)?;
println!("Subscriber ready, waiting for data...");
// 持续接收数据
loop {
if let Some(sample) = reader.take_next_sample()? {
println!("Received sensor data: {:?}", sample);
} else {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
#[tokio::main]
async fn main() {
let domain_id = 0;
// 在单独的线程中运行发布者和订阅者
let publisher_handle = tokio::spawn(async move {
publisher_task(domain_id).await
});
let subscriber_handle = tokio::spawn(async move {
subscriber_task(domain_id).await
});
// 等待任务完成
let _ = tokio::join!(publisher_handle, subscriber_handle);
}
这个完整示例展示了:
- 定义更复杂的数据结构
SensorData
,包含多个字段和枚举 - 实现
Keyed
特征以便在WITH_KEY主题中使用 - 使用tokio异步运行时
- 更完整的QoS配置,包括历史记录策略
- 发布者和订阅者作为异步任务运行
- 更健壮的错误处理
- 模拟真实场景中的传感器数据发布
要运行此示例,需要在Cargo.toml中添加以下依赖项:
[dependencies]
rustdds = "0.11.5"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.0", features = ["full"] }
chrono = "0.4"
Rust DDS中间件库rustdds的使用 - 实现高效数据分发服务的实时通信框架
介绍
rustdds是一个纯Rust实现的DDS(Data Distribution Service)中间件库,它实现了OMG DDS规范,为分布式系统提供高效的实时数据通信能力。DDS是一种以数据为中心的发布-订阅通信模型,特别适合需要高吞吐量、低延迟和可靠性的分布式系统。
rustdds的主要特点:
- 完全用Rust编写,无外部依赖
- 支持实时发布-订阅模式
- 提供QoS(Quality of Service)策略配置
- 支持动态发现和类型系统
- 线程安全设计
安装方法
在Cargo.toml中添加依赖:
[dependencies]
rustdds = "0.8"
serde = { version = "1.0", features = ["derive"] } # 用于序列化
完整示例代码
下面是一个完整的rustdds使用示例,展示了一个简单的发布-订阅通信模型:
use rustdds::*;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::sync::Arc;
use std::thread;
use rand::Rng;
// 1. 定义数据结构
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
struct SensorData {
id: u32,
value: f64,
timestamp: u64,
}
fn main() {
// 2. 创建DomainParticipant
let domain_id = 0;
let participant = DomainParticipant::new(domain_id)
.expect("创建DomainParticipant失败");
// 3. 创建Topic
let topic = participant
.create_topic(
"SensorDataTopic",
"SensorData".to_string(),
&QosPolicyBuilder::new().build(),
TopicKind::NoKey,
)
.expect("创建Topic失败");
// 4. 创建Publisher和DataWriter
let publisher = participant
.create_publisher(&QosPolicyBuilder::new().build())
.expect("创建Publisher失败");
let writer_qos = QosPolicyBuilder::new()
.reliability()
.reliable()
.max_blocking_time(Duration::from_millis(100))
.history()
.keep_last(10)
.build();
let writer = publisher
.create_data_writer_no_key::<SensorData>(&topic, &writer_qos)
.expect("创建DataWriter失败");
// 5. 创建Subscriber和DataReader
let subscriber = participant
.create_subscriber(&QosPolicyBuilder::new().build())
.expect("创建Subscriber失败");
let reader_qos = QosPolicyBuilder::new()
.reliability()
.reliable()
.durability()
.transient_local()
.build();
let reader = subscriber
.create_data_reader_no_key::<SensorData>(&topic, &reader_qos)
.expect("创建DataReader失败");
// 6. 创建发布者线程
let publisher_thread = thread::spawn(move || {
let mut rng = rand::thread_rng();
for i in 0..10 {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let data = SensorData {
id: i,
value: rng.gen::<f64>() * 100.0,
timestamp,
};
println!("发布数据: {:?}", data);
writer.write(data, None).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// 7. 创建订阅者线程
let reader_arc = Arc::new(reader);
let subscriber_thread = thread::spawn(move || {
// 等待历史数据
reader_arc.wait_for_historical_data(Duration::from_secs(5)).unwrap();
loop {
if let Some(sample) = reader_arc.take_next_sample().unwrap() {
let data: SensorData = sample.into_value();
println!("接收到数据: {:?}", data);
}
}
});
// 等待线程结束
publisher_thread.join().unwrap();
subscriber_thread.join().unwrap();
}
代码说明
-
数据结构定义:
- 使用
#[derive]
宏为SensorData
结构体自动实现序列化、反序列化和克隆等特性
- 使用
-
DomainParticipant创建:
- 每个应用程序通常创建一个DomainParticipant
- 指定domain ID(0)来标识通信域
-
Topic创建:
- 定义Topic名称和类型名称
- 使用默认QoS策略
-
Publisher和DataWriter:
- 配置可靠传输策略
- 设置历史缓存大小
- 创建用于发布数据的DataWriter
-
Subscriber和DataReader:
- 配置可靠传输和持久性策略
- 创建用于接收数据的DataReader
-
发布者线程:
- 生成随机传感器数据
- 每秒发布一条数据
- 使用DataWriter的write方法发送数据
-
订阅者线程:
- 使用Arc共享DataReader
- 先等待历史数据
- 循环接收并处理数据
注意事项
-
确保添加
rand
依赖到Cargo.toml以生成随机数据:[dependencies] rand = "0.8"
-
在实际应用中,应该添加适当的错误处理和线程终止条件
-
根据应用需求调整QoS策略参数
-
对于生产环境,考虑使用更高效的序列化方式