Rust消息处理插件库janus_messages的使用,高效实现跨平台通信与数据序列化
janus_messages
janus_messages
crate 提供了表示由分布式聚合协议定义的各种消息的结构。
安装
在项目目录中运行以下 Cargo 命令:
cargo add janus_messages
或者在 Cargo.toml 中添加以下行:
janus_messages = "0.7.78"
使用示例
下面是一个使用 janus_messages
进行跨平台通信和数据序列化的完整示例:
use janus_messages::{TaskId, HpkeConfig, HpkeCiphertext};
use serde::{Serialize, Deserialize};
// 定义一个简单的消息结构
#[derive(Debug, Serialize, Deserialize)]
struct CustomMessage {
task_id: TaskId,
payload: Vec<u8>,
ciphertext: HpkeCiphertext,
}
fn main() {
// 创建一个任务ID
let task_id = TaskId::from([0u8; 32]);
// 创建HPKE配置
let hpke_config = HpkeConfig::new(
0x01, // 配置ID
0x0010, // KEM ID
0x0001, // KDF ID
0x0001, // AEAD ID
vec![0x01, 0x02, 0x03], // 公钥
);
// 创建加密文本
let ciphertext = HpkeCiphertext::new(
hpke_config.config_id(),
vec![0x01, 0x02, 0x03], // 加密数据
vec![0x04, 0x05, 0x06], // 关联数据
);
// 创建自定义消息
let message = CustomMessage {
task_id,
payload: vec![0x07, 0x08, 0x09],
ciphertext,
};
// 序列化消息
let serialized = serde_json::to_string(&message).unwrap();
println!("Serialized message: {}", serialized);
// 反序列化消息
let deserialized: CustomMessage = serde_json::from_str(&serialized).unwrap();
println!("Deserialized message: {:?}", deserialized);
}
完整示例代码
基于上述示例,这里提供一个更完整的示例,展示了如何使用 janus_messages 进行实际的加密通信:
use janus_messages::{TaskId, HpkeConfig, HpkeCiphertext, Duration};
use serde::{Serialize, Deserialize};
use std::time::{SystemTime, UNIX_EPOCH};
// 定义聚合请求消息
#[derive(Debug, Serialize, Deserialize)]
struct AggregateRequest {
task_id: TaskId,
batch_id: Vec<u8>,
aggregation_param: Vec<u8>,
report_count: u64,
checksum: Vec<u8>,
expiry: Duration,
encrypted_reports: Vec<HpkeCiphertext>,
}
fn main() {
// 创建任务ID
let task_id = TaskId::from([0x11; 32]);
// 创建HPKE配置列表
let hpke_configs = vec![
HpkeConfig::new(
0x01, // 配置ID
0x0010, // KEM ID: DHKEM(X25519, HKDF-SHA256)
0x0001, // KDF ID: HKDF-SHA256
0x0001, // AEAD ID: AES-128-GCM
vec![0x21, 0x22, 0x23, 0x24], // 公钥
),
HpkeConfig::new(
0x02, // 配置ID
0x0010, // KEM ID
0x0002, // KDF ID: HKDF-SHA384
0x0002, // AEAD ID: AES-256-GCM
vec![0x31, 0x32, 0x33, 0x34], // 公钥
)
];
// 创建加密报告
let reports = vec![
HpkeCiphertext::new(
0x01, // 使用第一个HPKE配置
vec![0x41, 0x42, 0x43], // 加密数据
vec![0x44, 0x45, 0x46], // 关联数据
),
HpkeCiphertext::new(
0x02, // 使用第二个HPKE配置
vec![0x51, 0x52, 0x53], // 加密数据
vec![0x54, 0x55, 0x56], // 关联数据
)
];
// 计算过期时间 (当前时间 + 1小时)
let expiry = Duration::from_secs(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() + 3600
);
// 创建聚合请求
let request = AggregateRequest {
task_id,
batch_id: vec![0x61, 0x62, 0x63],
aggregation_param: vec![0x71, 0x72, 0x73],
report_count: 2,
checksum: vec![0x81, 0x82, 0x83],
expiry,
encrypted_reports: reports,
};
// 序列化请求
let serialized = serde_json::to_string(&request).unwrap();
println!("Serialized aggregate request:\n{}", serialized);
// 反序列化请求
let deserialized: AggregateRequest = serde_json::from_str(&serialized).unwrap();
println!("Deserialized request: {:?}", deserialized);
// 验证任务ID
println!("Task ID matches: {}", deserialized.task_id == task_id);
// 验证报告数量
println!("Report count correct: {}",
deserialized.encrypted_reports.len() == deserialized.report_count as usize);
}
主要功能
- 消息结构:提供表示分布式聚合协议消息的结构
- 序列化/反序列化:支持通过 serde 进行消息的序列化和反序列化
- 加密支持:包含 HPKE 相关结构,用于安全通信
- 跨平台兼容:设计用于在不同平台间交换数据
许可证
MPL-2.0
1 回复
Rust消息处理插件库janus_messages的使用指南
概述
janus_messages是一个高效的Rust消息处理插件库,专注于跨平台通信和数据序列化。它提供了简洁的API来处理消息的发送、接收和序列化,特别适合需要高性能通信的分布式系统或微服务架构。
主要特性
- 跨平台支持(Windows/Linux/macOS)
- 高性能消息序列化
- 多线程安全的消息处理
- 灵活的插件架构
- 低延迟通信
安装
在Cargo.toml中添加依赖:
[dependencies]
janus_messages = "0.3.0"
基本使用方法
1. 定义消息
use janus_messages::{Message, MessageBuilder};
// 定义一个简单的消息
let message = MessageBuilder::new("example.topic")
.payload("Hello, Janus!".as_bytes())
.build();
2. 发送消息
use janus_messages::Producer;
let producer = Producer::new("tcp://127.0.0.1:5000").unwrap();
producer.send(&message).unwrap();
3. 接收消息
use janus_messages::Consumer;
let consumer = Consumer::new("tcp://127.0.0.1:5000", "example.topic").unwrap();
for received in consumer {
println!("Received: {:?}", received.payload());
}
高级功能
自定义序列化
use janus_messages::{Message, Serializer, Deserializer};
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize, Debug)]
struct CustomData {
id: u32,
name: String,
}
// 序列化
let data = CustomData { id: 42, name: "Janus".into() };
let serialized = janus_messages::to_bytes(&data).unwrap();
// 反序列化
let deserialized: CustomData = janus_messages::from_bytes(&serialized).unwrap();
插件系统使用
use janus_messages::{Plugin, PluginManager};
struct MyPlugin;
impl Plugin for MyPlugin {
fn process(&self, msg: &Message) -> Option<Message> {
println!("Processing message: {:?}", msg.topic());
None
}
}
let mut manager = PluginManager::new();
manager.register(Box::new(MyPlugin));
let message = MessageBuilder::new("test.topic").build();
manager process(&message);
性能优化建议
- 重用Producer和Consumer实例
- 对大消息使用零拷贝技术
- 批量发送消息
- 选择合适的序列化格式(JSON/MessagePack/Bincode)
示例:完整的客户端-服务器通信
服务器端代码:
use janus_messages::{Producer, Consumer, MessageBuilder};
use std::thread;
fn server() {
let producer = Producer::new("tcp://127.0.0.1:5001").unwrap();
let consumer = Consumer::new("tcp://127.0.0.1:5000", "server.topic").unwrap();
for msg in consumer {
println!("Server received: {:?}", String::from_utf8_lossy(msg.payload()));
let response = MessageBuilder::new("client.topic")
.payload(b"Message received by server")
.build();
producer.send(&response).unwrap();
}
}
客户端代码:
use janus_messages::{Producer, Consumer, MessageBuilder};
fn client() {
let producer = Producer::new("tcp://127.0.0.1:5000").unwrap();
let consumer = Consumer::new("tcp://127.0.0.1:5001", "client.topic").unwrap();
let message = MessageBuilder::new("server.topic")
.payload(b"Hello from client")
.build();
producer.send(&message).unwrap();
for msg in consumer.take(1) {
println!("Client received: {:?}", String::from_utf8_lossy(msg.payload()));
}
}
fn main() {
thread::spawn(server);
std::thread::sleep(std::time::Duration::from_secs(1));
client();
}
错误处理
janus_messages使用Rust的标准Result类型进行错误处理:
match producer.send(&message) {
Ok(_) => println!("Message sent successfully"),
Err(e) => eprintln!("Failed to send message: {}", e),
}
完整示例代码
下面是一个完整的消息处理示例,展示了如何使用janus_messages库实现基本的消息收发:
use janus_messages::{Message, MessageBuilder, Producer, Consumer};
use std::thread;
use std::time::Duration;
// 自定义消息结构体
#[derive(Debug)]
struct AppMessage {
id: u64,
content: String,
timestamp: i64,
}
fn main() {
// 启动服务器线程
thread::spawn(|| {
server();
});
// 等待服务器启动
thread::sleep(Duration::from_secs(1));
// 运行客户端
client();
}
fn server() {
// 创建服务器端的生产者和消费者
let producer = Producer::new("tcp://127.0.0.1:5001").expect("Failed to create producer");
let consumer = Consumer::new("tcp://127.0.0.1:5000", "server.topic")
.expect("Failed to create consumer");
println!("Server started, waiting for messages...");
// 处理接收到的消息
for msg in consumer {
let received = String::from_utf8_lossy(msg.payload());
println!("Server received: {}", received);
// 创建响应消息
let response = MessageBuilder::new("client.topic")
.payload(b"ACK: Message processed by server")
.build();
// 发送响应
producer.send(&response).expect("Failed to send response");
}
}
fn client() {
// 创建客户端生产者和消费者
let producer = Producer::new("tcp://127.0.0.1:5000").expect("Failed to create producer");
let consumer = Consumer::new("tcp://127.0.0.1:5001", "client.topic")
.expect("Failed to create consumer");
// 创建要发送的消息
let message = AppMessage {
id: 12345,
content: "Hello from client".to_string(),
timestamp: chrono::Utc::now().timestamp(),
};
// 序列化消息
let serialized = bincode::serialize(&message).expect("Serialization failed");
// 构建并发送消息
let msg = MessageBuilder::new("server.topic")
.payload(&serialized)
.build();
producer.send(&msg).expect("Failed to send message");
println!("Client sent message with ID: {}", message.id);
// 等待并处理响应
for response in consumer.take(1) {
let content = String::from_utf8_lossy(response.payload());
println!("Client received response: {}", content);
}
}
总结
janus_messages库为Rust开发者提供了一套完整的工具来处理跨平台消息通信,其高性能的序列化和插件系统使其成为构建分布式系统的理想选择。通过简单的API,开发者可以快速实现复杂的通信模式,而无需担心底层细节。