Rust消息处理插件库janus_messages的使用,高效实现跨平台通信与数据序列化

janus_messages

Build Status latest version docs badge

janus_messages crate 提供了表示由分布式聚合协议定义的各种消息的结构。

安装

在项目目录中运行以下 Cargo 命令:

cargo add janus_messages

或者在 Cargo.toml 中添加以下行:

janus_messages = "0.7.78"

使用示例

下面是一个使用 janus_messages 进行跨平台通信和数据序列化的完整示例:

use janus_messages::{TaskId, HpkeConfig, HpkeCiphertext};
use serde::{Serialize, Deserialize};

// 定义一个简单的消息结构
#[derive(Debug, Serialize, Deserialize)]
struct CustomMessage {
    task_id: TaskId,
    payload: Vec<u8>,
    ciphertext: HpkeCiphertext,
}

fn main() {
    // 创建一个任务ID
    let task_id = TaskId::from([0u8; 32]);
    
    // 创建HPKE配置
    let hpke_config = HpkeConfig::new(
        0x01,  // 配置ID
        0x0010,  // KEM ID
        0x0001,  // KDF ID
        0x0001,  // AEAD ID
        vec![0x01, 0x02, 0x03],  // 公钥
    );
    
    // 创建加密文本
    let ciphertext = HpkeCiphertext::new(
        hpke_config.config_id(),
        vec![0x01, 0x02, 0x03],  // 加密数据
        vec![0x04, 0x05, 0x06],  // 关联数据
    );
    
    // 创建自定义消息
    let message = CustomMessage {
        task_id,
        payload: vec![0x07, 0x08, 0x09],
        ciphertext,
    };
    
    // 序列化消息
    let serialized = serde_json::to_string(&message).unwrap();
    println!("Serialized message: {}", serialized);
    
    // 反序列化消息
    let deserialized: CustomMessage = serde_json::from_str(&serialized).unwrap();
    println!("Deserialized message: {:?}", deserialized);
}

完整示例代码

基于上述示例,这里提供一个更完整的示例,展示了如何使用 janus_messages 进行实际的加密通信:

use janus_messages::{TaskId, HpkeConfig, HpkeCiphertext, Duration};
use serde::{Serialize, Deserialize};
use std::time::{SystemTime, UNIX_EPOCH};

// 定义聚合请求消息
#[derive(Debug, Serialize, Deserialize)]
struct AggregateRequest {
    task_id: TaskId,
    batch_id: Vec<u8>,
    aggregation_param: Vec<u8>,
    report_count: u64,
    checksum: Vec<u8>,
    expiry: Duration,
    encrypted_reports: Vec<HpkeCiphertext>,
}

fn main() {
    // 创建任务ID
    let task_id = TaskId::from([0x11; 32]);
    
    // 创建HPKE配置列表
    let hpke_configs = vec![
        HpkeConfig::new(
            0x01,  // 配置ID
            0x0010,  // KEM ID: DHKEM(X25519, HKDF-SHA256)
            0x0001,  // KDF ID: HKDF-SHA256
            0x0001,  // AEAD ID: AES-128-GCM
            vec![0x21, 0x22, 0x23, 0x24],  // 公钥
        ),
        HpkeConfig::new(
            0x02,  // 配置ID
            0x0010,  // KEM ID
            0x0002,  // KDF ID: HKDF-SHA384
            0x0002,  // AEAD ID: AES-256-GCM
            vec![0x31, 0x32, 0x33, 0x34],  // 公钥
        )
    ];
    
    // 创建加密报告
    let reports = vec![
        HpkeCiphertext::new(
            0x01,  // 使用第一个HPKE配置
            vec![0x41, 0x42, 0x43],  // 加密数据
            vec![0x44, 0x45, 0x46],  // 关联数据
        ),
        HpkeCiphertext::new(
            0x02,  // 使用第二个HPKE配置
            vec![0x51, 0x52, 0x53],  // 加密数据
            vec![0x54, 0x55, 0x56],  // 关联数据
        )
    ];
    
    // 计算过期时间 (当前时间 + 1小时)
    let expiry = Duration::from_secs(
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs() + 3600
    );
    
    // 创建聚合请求
    let request = AggregateRequest {
        task_id,
        batch_id: vec![0x61, 0x62, 0x63],
        aggregation_param: vec![0x71, 0x72, 0x73],
        report_count: 2,
        checksum: vec![0x81, 0x82, 0x83],
        expiry,
        encrypted_reports: reports,
    };
    
    // 序列化请求
    let serialized = serde_json::to_string(&request).unwrap();
    println!("Serialized aggregate request:\n{}", serialized);
    
    // 反序列化请求
    let deserialized: AggregateRequest = serde_json::from_str(&serialized).unwrap();
    println!("Deserialized request: {:?}", deserialized);
    
    // 验证任务ID
    println!("Task ID matches: {}", deserialized.task_id == task_id);
    
    // 验证报告数量
    println!("Report count correct: {}", 
        deserialized.encrypted_reports.len() == deserialized.report_count as usize);
}

主要功能

  1. 消息结构:提供表示分布式聚合协议消息的结构
  2. 序列化/反序列化:支持通过 serde 进行消息的序列化和反序列化
  3. 加密支持:包含 HPKE 相关结构,用于安全通信
  4. 跨平台兼容:设计用于在不同平台间交换数据

许可证

MPL-2.0


1 回复

Rust消息处理插件库janus_messages的使用指南

概述

janus_messages是一个高效的Rust消息处理插件库,专注于跨平台通信和数据序列化。它提供了简洁的API来处理消息的发送、接收和序列化,特别适合需要高性能通信的分布式系统或微服务架构。

主要特性

  • 跨平台支持(Windows/Linux/macOS)
  • 高性能消息序列化
  • 多线程安全的消息处理
  • 灵活的插件架构
  • 低延迟通信

安装

在Cargo.toml中添加依赖:

[dependencies]
janus_messages = "0.3.0"

基本使用方法

1. 定义消息

use janus_messages::{Message, MessageBuilder};

// 定义一个简单的消息
let message = MessageBuilder::new("example.topic")
    .payload("Hello, Janus!".as_bytes())
    .build();

2. 发送消息

use janus_messages::Producer;

let producer = Producer::new("tcp://127.0.0.1:5000").unwrap();
producer.send(&message).unwrap();

3. 接收消息

use janus_messages::Consumer;

let consumer = Consumer::new("tcp://127.0.0.1:5000", "example.topic").unwrap();

for received in consumer {
    println!("Received: {:?}", received.payload());
}

高级功能

自定义序列化

use janus_messages::{Message, Serializer, Deserializer};
use serde::{Serialize, Deserialize};

#[derive(Serialize, Deserialize, Debug)]
struct CustomData {
    id: u32,
    name: String,
}

// 序列化
let data = CustomData { id: 42, name: "Janus".into() };
let serialized = janus_messages::to_bytes(&data).unwrap();

// 反序列化
let deserialized: CustomData = janus_messages::from_bytes(&serialized).unwrap();

插件系统使用

use janus_messages::{Plugin, PluginManager};

struct MyPlugin;
impl Plugin for MyPlugin {
    fn process(&self, msg: &Message) -> Option<Message> {
        println!("Processing message: {:?}", msg.topic());
        None
    }
}

let mut manager = PluginManager::new();
manager.register(Box::new(MyPlugin));

let message = MessageBuilder::new("test.topic").build();
manager process(&message);

性能优化建议

  1. 重用Producer和Consumer实例
  2. 对大消息使用零拷贝技术
  3. 批量发送消息
  4. 选择合适的序列化格式(JSON/MessagePack/Bincode)

示例:完整的客户端-服务器通信

服务器端代码:

use janus_messages::{Producer, Consumer, MessageBuilder};
use std::thread;

fn server() {
    let producer = Producer::new("tcp://127.0.0.1:5001").unwrap();
    let consumer = Consumer::new("tcp://127.0.0.1:5000", "server.topic").unwrap();
    
    for msg in consumer {
        println!("Server received: {:?}", String::from_utf8_lossy(msg.payload()));
        
        let response = MessageBuilder::new("client.topic")
            .payload(b"Message received by server")
            .build();
            
        producer.send(&response).unwrap();
    }
}

客户端代码:

use janus_messages::{Producer, Consumer, MessageBuilder};

fn client() {
    let producer = Producer::new("tcp://127.0.0.1:5000").unwrap();
    let consumer = Consumer::new("tcp://127.0.0.1:5001", "client.topic").unwrap();
    
    let message = MessageBuilder::new("server.topic")
        .payload(b"Hello from client")
        .build();
        
    producer.send(&message).unwrap();
    
    for msg in consumer.take(1) {
        println!("Client received: {:?}", String::from_utf8_lossy(msg.payload()));
    }
}

fn main() {
    thread::spawn(server);
    std::thread::sleep(std::time::Duration::from_secs(1));
    client();
}

错误处理

janus_messages使用Rust的标准Result类型进行错误处理:

match producer.send(&message) {
    Ok(_) => println!("Message sent successfully"),
    Err(e) => eprintln!("Failed to send message: {}", e),
}

完整示例代码

下面是一个完整的消息处理示例,展示了如何使用janus_messages库实现基本的消息收发:

use janus_messages::{Message, MessageBuilder, Producer, Consumer};
use std::thread;
use std::time::Duration;

// 自定义消息结构体
#[derive(Debug)]
struct AppMessage {
    id: u64,
    content: String,
    timestamp: i64,
}

fn main() {
    // 启动服务器线程
    thread::spawn(|| {
        server();
    });
    
    // 等待服务器启动
    thread::sleep(Duration::from_secs(1));
    
    // 运行客户端
    client();
}

fn server() {
    // 创建服务器端的生产者和消费者
    let producer = Producer::new("tcp://127.0.0.1:5001").expect("Failed to create producer");
    let consumer = Consumer::new("tcp://127.0.0.1:5000", "server.topic")
        .expect("Failed to create consumer");
    
    println!("Server started, waiting for messages...");
    
    // 处理接收到的消息
    for msg in consumer {
        let received = String::from_utf8_lossy(msg.payload());
        println!("Server received: {}", received);
        
        // 创建响应消息
        let response = MessageBuilder::new("client.topic")
            .payload(b"ACK: Message processed by server")
            .build();
            
        // 发送响应
        producer.send(&response).expect("Failed to send response");
    }
}

fn client() {
    // 创建客户端生产者和消费者
    let producer = Producer::new("tcp://127.0.0.1:5000").expect("Failed to create producer");
    let consumer = Consumer::new("tcp://127.0.0.1:5001", "client.topic")
        .expect("Failed to create consumer");
    
    // 创建要发送的消息
    let message = AppMessage {
        id: 12345,
        content: "Hello from client".to_string(),
        timestamp: chrono::Utc::now().timestamp(),
    };
    
    // 序列化消息
    let serialized = bincode::serialize(&message).expect("Serialization failed");
    
    // 构建并发送消息
    let msg = MessageBuilder::new("server.topic")
        .payload(&serialized)
        .build();
        
    producer.send(&msg).expect("Failed to send message");
    println!("Client sent message with ID: {}", message.id);
    
    // 等待并处理响应
    for response in consumer.take(1) {
        let content = String::from_utf8_lossy(response.payload());
        println!("Client received response: {}", content);
    }
}

总结

janus_messages库为Rust开发者提供了一套完整的工具来处理跨平台消息通信,其高性能的序列化和插件系统使其成为构建分布式系统的理想选择。通过简单的API,开发者可以快速实现复杂的通信模式,而无需担心底层细节。

回到顶部