Rust AMQP协议代码生成库amq-protocol-codegen的使用,高效实现RabbitMQ消息队列协议自动化生成
Rust AMQP协议代码生成库amq-protocol-codegen的使用,高效实现RabbitMQ消息队列协议自动化生成
安装
在您的项目目录中运行以下Cargo命令:
cargo add amq-protocol-codegen
或者在您的Cargo.toml中添加以下行:
amq-protocol-codegen = "8.1.2"
示例代码
下面是一个使用amq-protocol-codegen库的完整示例,展示如何自动生成AMQP协议代码并与RabbitMQ交互:
// 在build.rs中使用amq-protocol-codegen生成AMQP协议代码
fn main() {
amq_protocol_codegen::build("amqp0-9.1.stripped.extended.xml").unwrap();
}
// 在main.rs中使用生成的代码
use amq_protocol::frame::{AMQPFrame, AMQPContentHeader, AMQPContentBody};
use amq_protocol::types::*;
use amq_protocol::protocol::*;
async fn connect_to_rabbitmq() -> Result<(), Box<dyn std::error::Error>> {
// 建立TCP连接
let stream = TcpStream::connect("localhost:5672").await?;
// 创建AMQP连接
let mut connection = Connection::start(
stream,
ConnectionProperties::default()
.with_client_properties(FieldTable::default())
.with_channel_max(0)
.with_frame_max(0)
.with_heartbeat(0)
).await?;
// 打开通道
let channel = connection.open_channel(1).await?;
// 声明队列
let queue_declare = Queue::Declare {
ticket: 0,
queue: "my_queue".into(),
passive: false,
durable: false,
exclusive: false,
auto_delete: false,
nowait: false,
arguments: FieldTable::default(),
};
channel.send_method(queue_declare).await?;
// 发送消息
let basic_publish = Basic::Publish {
ticket: 0,
exchange: "".into(),
routing_key: "my_queue".into(),
mandatory: false,
immediate: false,
};
channel.send_method(basic_publish).await?;
let content_header = AMQPContentHeader {
class_id: 60,
weight: 0,
body_size: 11,
properties: BasicProperties::default()
.with_content_type("text/plain".into()),
};
channel.send_content_header(content_header).await?;
let content_body = AMQPContentBody {
data: "Hello world".as_bytes().to_vec(),
};
channel.send_content_body(content_body).await?;
Ok(())
}
#[tokio::main]
async fn main() {
connect_to_rabbitmq().await.unwrap();
}
完整示例代码
// build.rs - 生成AMQP协议代码
fn main() {
// 从AMQP协议XML文件生成Rust代码
amq_protocol_codegen::build("amqp0-9.1.stripped.extended.xml").unwrap();
}
// main.rs - 完整的RabbitMQ生产者和消费者示例
use amq_protocol::frame::{AMQPFrame, AMQPContentHeader, AMQPContentBody};
use amq_protocol::types::*;
use amq_protocol::protocol::*;
use tokio::net::TcpStream;
async fn setup_rabbitmq() -> Result<(), Box<dyn std::error::Error>> {
// 1. 建立TCP连接
let stream = TcpStream::connect("localhost:5672").await?;
// 2. 创建AMQP连接
let mut connection = Connection::start(
stream,
ConnectionProperties::default()
.with_client_properties(FieldTable::default())
.with_channel_max(0)
.with_frame_max(0)
.with_heartbeat(0)
).await?;
// 3. 打开通道
let channel = connection.open_channel(1).await?;
// 4. 声明队列
let queue_declare = Queue::Declare {
ticket: 0,
queue: "rust_demo_queue".into(),
passive: false,
durable: true, // 持久化队列
exclusive: false,
auto_delete: false,
nowait: false,
arguments: FieldTable::default(),
};
let queue_info = channel.send_method(queue_declare).await?;
println!("Queue declared: {:?}", queue_info);
// 5. 发送消息
let basic_publish = Basic::Publish {
ticket: 0,
exchange: "".into(), // 默认交换器
routing_key: "rust_demo_queue".into(),
mandatory: false,
immediate: false,
};
channel.send_method(basic_publish).await?;
// 消息头
let content_header = AMQPContentHeader {
class_id: 60, // Basic类
weight: 0,
body_size: 11, // 消息体大小
properties: BasicProperties::default()
.with_content_type("text/plain".into())
.with_delivery_mode(2), // 持久化消息
};
channel.send_content_header(content_header).await?;
// 消息体
let content_body = AMQPContentBody {
data: "Hello AMQP".as_bytes().to_vec(),
};
channel.send_content_body(content_body).await?;
// 6. 消费消息
let basic_consume = Basic::Consume {
ticket: 0,
queue: "rust_demo_queue".into(),
consumer_tag: "rust_consumer".into(),
no_local: false,
no_ack: false,
exclusive: false,
nowait: false,
arguments: FieldTable::default(),
};
channel.send_method(basic_consume).await?;
// 处理收到的消息
while let Some(frame) = connection.next_frame().await? {
match frame {
AMQPFrame::Method(channel_id, method) => {
println!("Received method on channel {}: {:?}", channel_id, method);
}
AMQPFrame::Header(channel_id, header) => {
println!("Received header on channel {}: {:?}", channel_id, header);
}
AMQPFrame::Body(channel_id, body) => {
println!("Received body on channel {}: {:?}", channel_id, String::from_utf8_lossy(&body.data));
// 确认消息
let basic_ack = Basic::Ack {
delivery_tag: 1,
multiple: false,
};
channel.send_method(basic_ack).await?;
}
_ => {}
}
}
Ok(())
}
#[tokio::main]
async fn main() {
setup_rabbitmq().await.expect("RabbitMQ operation failed");
}
关键特性
- 协议代码自动生成:从AMQP规范XML文件自动生成Rust代码
- 类型安全:所有AMQP方法和字段都转换为类型安全的Rust结构体
- 异步支持:与async/await完美配合,适合现代Rust网络编程
- 完整协议覆盖:支持AMQP 0-9-1协议的全部特性
许可证
该项目使用BSD-2-Clause许可证。
所有者
- Marc-Antoine Perennou
分类
- API绑定
- 网络编程
1 回复