Rust AMQP客户端库fe2o3-amqp的使用,实现高效消息队列通信与异步任务处理
fe2o3-amqp
一个基于serde和tokio的AMQP 1.0协议的Rust实现。
- 快速开始
- 文档
- 变更日志
- 示例
特性标志
default = []
特性 | 描述 |
---|---|
"rustls" |
启用与tokio-rustls 和rustls 的TLS集成 |
"native-tls" |
启用与tokio-native-tls 和native-tls 的TLS集成 |
"acceptor" |
启用ConnectionAcceptor 、SessionAcceptor 和LinkAcceptor |
"transaction" |
启用Controller 、Transaction 、OwnedTransaction 和control_link_acceptor |
"scram" |
启用SCRAM认证 |
"tracing" |
启用使用tracing 的日志记录 |
"log" |
启用使用log 的日志记录 |
快速开始
- 客户端
- 监听器
- WebSocket绑定
更多示例包括与Azure Service Bus一起使用的示例可以在GitHub仓库中找到。
客户端
以下是一个使用本地代理的示例,该代理在本地主机上监听。代理使用以下命令执行:
./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1
以下代码需要在依赖项中添加[tokio
]异步运行时。
use fe2o3_amqp::{Connection, Session, Sender, Receiver};
use fe2o3_amqp::types::messaging::Outcome;
#[tokio::main]
async fn main() {
// 打开连接
let mut connection = Connection::open(
"connection-1", // 容器ID
"amqp://guest:guest@localhost:5672" // URL
).await.unwrap();
// 开始会话
let mut session = Session::begin(&mut connection).await.unwrap();
// 创建发送者
let mut sender = Sender::attach(
&mut session, // 会话
"rust-sender-link-1", // 链接名称
"q1" // 目标地址
).await.unwrap();
// 创建接收者
let mut receiver = Receiver::attach(
&mut session,
"rust-receiver-link-1", // 链接名称
"q1" // 源地址
).await.unwrap();
// 发送消息到代理并等待结果(处置)
let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
outcome.accepted_or_else(|state| state).unwrap(); // 处理传递结果
// 发送一个可批处理字段设置为true的消息
let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
let outcome: Outcome = fut.await.unwrap(); // 等待结果(处置)
outcome.accepted_or_else(|state| state).unwrap(); // 处理传递结果
// 从代理接收消息
let delivery = receiver.recv::<String>().await.unwrap();
receiver.accept(&delivery).await.unwrap();
sender.close().await.unwrap(); // 使用关闭分离执行分离发送者
receiver.close().await.unwrap(); // 使用关闭分离执行分离接收者
session.end().await.unwrap(); // 结束会话
connection.close().await.unwrap(); // 关闭连接
}
监听器
use tokio::net::TcpListener;
use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};
#[tokio::main]
async fn main() {
let tcp_listener = TcpListener::bind("localhost:5672").await.unwrap();
let connection_acceptor = ConnectionAcceptor::new("example-listener");
while let Ok((stream, addr)) = tcp_listener.accept().await {
let mut connection = connection_acceptor.accept(stream).await.unwrap();
let handle = tokio::spawn(async move {
let session_acceptor = SessionAcceptor::new();
while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
let handle = tokio::spawn(async move {
let link_acceptor = LinkAcceptor::new();
match link_acceptor.accept(&mut session).await.unwrap() {
LinkEndpoint::Sender(sender) => { },
LinkEndpoint::Receiver(recver) => { },
}
});
}
});
}
}
WebSocket
WebSocket绑定需要[fe2o3-amqp-ws
]
use fe2o3_amqp::{
types::{messaging::Outcome, primitives::Value},
Connection, Delivery, Receiver, Sender, Session,
};
use fe2o3_amqp_ws::WebSocketStream;
#[tokio::main]
async fn main() {
let (ws_stream, _response) = WebSocketStream::connect("ws://localhost:5673")
.await
.unwrap();
let mut connection = Connection::builder()
.container_id("connection-1")
.open_with_stream(ws_stream)
.await
.unwrap();
connection.close().await.unwrap();
}
更多示例
更多发送和接收的示例可以在GitHub仓库中找到。请注意,大多数示例需要运行本地代理。Windows上可以使用的一个代理是TestAmqpBroker。
WebAssembly支持
自"0.8.11"版本起,实验性支持wasm32-unknown-unknown
目标,需要使用fe2o3-amqp-ws
与代理建立WebSocket连接。在浏览器标签页中发送和接收消息的示例可以在examples/wasm32-in-browser中找到。
组件
名称 | 描述 |
---|---|
serde_amqp_derive |
自定义派生宏,用于AMQP1.0协议中定义的描述类型 |
serde_amqp |
AMQP1.0序列化器和反序列化器以及原始类型 |
fe2o3-amqp-types |
AMQP1.0数据类型 |
fe2o3-amqp |
AMQP1.0 Connection 、Session 和Link 的实现 |
fe2o3-amqp-ext |
扩展类型和实现 |
fe2o3-amqp-ws |
fe2o3-amqp 传输的WebSocket绑定 |
fe2o3-amqp-management |
AMQP1.0管理的实验性实现 |
fe2o3-amqp-cbs |
AMQP1.0 CBS的实验性实现 |
支持的最低Rust版本
1.75.0
许可证:MIT/Apache-2.0
完整示例代码
use fe2o3_amqp::{Connection, Session, Sender, Receiver};
use fe2o3_amqp::types::messaging::Outcome;
#[tokio::main]
async fn main() {
// 建立AMQP连接
let mut connection = Connection::open(
"connection-1",
"amqp://guest:guest@localhost:5672"
).await.unwrap();
// 创建会话
let mut session = Session::begin(&mut connection).await.unwrap();
// 附加发送者链接
let mut sender = Sender::attach(
&mut session,
"rust-sender-link-1",
"q1"
).await.unwrap();
// 附加接收者链接
let mut receiver = Receiver::attach(
&mut session,
"rust-receiver-link-1",
"q1"
).await.unwrap();
// 发送消息并等待确认
let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
outcome.accepted_or_else(|state| state).unwrap();
// 发送可批处理消息
let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
let outcome: Outcome = fut.await.unwrap();
outcome.accepted_or_else(|state| state).unwrap();
// 接收消息
let delivery = receiver.recv::<String>().await.unwrap();
println!("Received: {}", delivery.body());
receiver.accept(&delivery).await.unwrap();
// 清理资源
sender.close().await.unwrap();
receiver.close().await.unwrap();
session.end().await.unwrap();
connection.close().await.unwrap();
}
1 回复
Rust AMQP客户端库fe2o3-amqp的使用指南
概述
fe2o3-amqp是一个基于Rust语言的AMQP 1.0协议客户端库,专门为高性能消息队列通信和异步任务处理而设计。该库提供了完整的AMQP协议实现,支持消息的发布/订阅、请求/响应等模式,并充分利用Rust的异步特性实现高效并发处理。
主要特性
- 完整的AMQP 1.0协议支持
- 基于async/await的异步API
- 连接池和会话管理
- 消息确认机制
- TLS/SSL安全连接
- 高性能消息序列化
安装方法
在Cargo.toml中添加依赖:
[dependencies]
fe2o3-amqp = "0.8"
tokio = { version = "1.0", features = ["full"] }
基本使用方法
1. 建立连接和会话
use fe2o3_amqp::{
Connection, Session, Sender, Receiver,
types::primitives::Value,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建连接
let mut connection = Connection::open(
"my-connection",
"amqp://localhost:5672"
).await?;
// 创建会话
let mut session = Session::begin(&mut connection).await?;
Ok(())
}
2. 消息发送示例
// 创建发送者
let mut sender = Sender::attach(
&mut session,
"my-sender",
"my-queue"
).await?;
// 发送消息
let message = fe2o3_amqp::Message::builder()
.body("Hello AMQP!")
.build();
sender.send(message).await?;
// 关闭发送者
sender.close().await?;
3. 消息接收示例
// 创建接收者
let mut receiver = Receiver::attach(
&mut session,
"my-receiver",
"my-queue"
).await?;
// 接收消息
let message = receiver.recv::<String>().await?;
println!("Received: {}", message.body());
// 确认消息
receiver.accept(&message).await?;
// 关闭接收者
receiver.close().await?;
4. 异步任务处理
use fe2o3_amqp::types::primitives::Value;
async fn process_messages(mut receiver: Receiver) {
while let Ok(message) = receiver.recv::<Value>().await {
println!("Processing message: {:?}", message.body());
// 模拟异步处理
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
receiver.accept(&message).await.unwrap();
}
}
5. 错误处理和重连机制
use fe2o3_amqp::error::Error;
async fn create_connection_with_retry() -> Result<Connection, Error> {
let mut attempts = 0;
loop {
match Connection::open("retry-connection", "amqp://localhost:5672").await {
Ok(conn) => return Ok(conn),
Err(e) => {
attempts += 1;
if attempts > 5 {
return Err(e);
}
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
}
}
高级特性
消息属性设置
let message = fe2o3_amqp::Message::builder()
.body("Important message")
.message_id("12345")
.content_type("text/plain")
.subject("notification")
.reply_to("response-queue")
.build();
批量消息处理
// 批量发送
let messages = vec![
Message::builder().body("msg1").build(),
Message::builder().body("msg2").build(),
];
for message in messages {
sender.send(message).await?;
}
// 批量接收
let batch = receiver.recv_batch::<String>(10).await?;
for message in batch {
println!("Batch message: {}", message.body());
receiver.accept(&message).await?;
}
完整示例demo
use fe2o3_amqp::{
Connection, Session, Sender, Receiver,
types::primitives::Value,
error::Error,
};
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 创建带重试机制的连接
let mut connection = create_connection_with_retry().await?;
// 创建会话
let mut session = Session::begin(&mut connection).await?;
// 创建发送者
let mut sender = Sender::attach(
&mut session,
"demo-sender",
"demo-queue"
).await?;
// 发送带属性的消息
let message = fe2o3_amqp::Message::builder()
.body("Hello AMQP from demo!")
.message_id("demo-001")
.content_type("text/plain")
.subject("demo-message")
.build();
sender.send(message).await?;
println!("Message sent successfully");
// 创建接收者
let mut receiver = Receiver::attach(
&mut session,
"demo-receiver",
"demo-queue"
).await?;
// 异步处理消息
let handle = tokio::spawn(process_messages(receiver));
// 发送批量消息
let messages = vec![
fe2o3_amqp::Message::builder().body("batch-msg-1").build(),
fe2o3_amqp::Message::builder().body("batch-msg-2").build(),
fe2o3_amqp::Message::builder().body("batch-msg-3").build(),
];
for message in messages {
sender.send(message).await?;
}
println!("Batch messages sent");
// 等待消息处理完成
handle.await?;
// 清理资源
sender.close().await?;
session.end().await?;
connection.close().await?;
Ok(())
}
async fn process_messages(mut receiver: Receiver) {
while let Ok(message) = receiver.recv::<String>().await {
println!("Processing message: {}", message.body());
// 模拟异步处理任务
sleep(Duration::from_secs(1)).await;
// 确认消息处理完成
receiver.accept(&message).await.unwrap();
println!("Message processed and accepted");
}
}
async fn create_connection_with_retry() -> Result<Connection, Error> {
let mut attempts = 0;
loop {
match Connection::open("demo-connection", "amqp://localhost:5672").await {
Ok(conn) => {
println!("Connection established successfully");
return Ok(conn);
},
Err(e) => {
attempts += 1;
println!("Connection attempt {} failed: {}", attempts, e);
if attempts > 3 {
return Err(e);
}
sleep(Duration::from_secs(2)).await;
}
}
}
}
性能优化建议
- 连接池管理:重用连接和会话以减少开销
- 批量操作:使用批量发送和接收提高吞吐量
- 适当的预取设置:调整预取计数以平衡延迟和吞吐量
- 异步处理:使用tokio的spawn来并行处理消息
注意事项
- 确保AMQP broker(如RabbitMQ或ActiveMQ)已正确配置
- 处理网络中断和重连逻辑
- 适当处理消息确认和拒绝
- 监控内存使用,特别是在处理大量消息时
这个库为Rust开发者提供了强大的AMQP消息队列功能,特别适合需要高性能异步消息处理的应用程序。