使用Golang通过AMQP v1+发送消息到RabbitMQ
使用Golang通过AMQP v1+发送消息到RabbitMQ
嘿!
有一个RabbitMQ,它不仅能配置队列,还能配置交换机。在其AMQP协议版本0.9(streadway/amqp)上一切运行良好。
为此,我们有一个特殊的方法 channel.ExchangeDeclare()。
但现在我需要使用AMQP版本1.0+(Azure/go-amqp)。
此外,我需要使用CloudEvents。为此我使用了这个库:github.com/cloudevents/sdk-go
但我找不到如何在设置协议时,不仅仅是在RabbitMQ队列中发送和接收消息,而是在RabbitMQ交换机中发送和接收消息。
我使用了这个示例:
cloudevents/sdk-go
Go SDK for CloudEvents. Contribute to cloudevents/sdk-go development by creating an account on GitHub.
有人能帮忙理解如何做到这一点吗?如何设置交换机?
更多关于使用Golang通过AMQP v1+发送消息到RabbitMQ的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于使用Golang通过AMQP v1+发送消息到RabbitMQ的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在AMQP 1.0协议中,交换机的概念与AMQP 0.9.1不同。AMQP 1.0使用更通用的消息路由模型,RabbitMQ通过插件支持AMQP 1.0时会自动处理交换逻辑。
对于CloudEvents + AMQP 1.0,你需要通过消息的properties或application-properties来指定路由信息。以下是具体实现:
package main
import (
"context"
"fmt"
"time"
"github.com/Azure/go-amqp"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/protocol/amqp"
)
// 发送到交换机
func sendToExchange() error {
// 创建AMQP连接
conn, err := amqp.Dial("amqp://localhost:5672", &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("guest", "guest"),
})
if err != nil {
return fmt.Errorf("连接失败: %w", err)
}
defer conn.Close()
// 创建会话
session, err := conn.NewSession(context.Background(), nil)
if err != nil {
return fmt.Errorf("创建会话失败: %w", err)
}
// 创建发送链接,目标地址格式为"exchange-name/routing-key"
sender, err := session.NewSender(context.Background(),
"my-exchange/my.routing.key", // 交换机和路由键
&amqp.SenderOptions{
TargetAddress: "my-exchange",
})
if err != nil {
return fmt.Errorf("创建发送者失败: %w", err)
}
defer sender.Close(context.Background())
// 创建CloudEvent
event := cloudevents.NewEvent()
event.SetID("12345")
event.SetSource("my-app")
event.SetType("com.example.sample")
event.SetTime(time.Now())
_ = event.SetData(cloudevents.ApplicationJSON, map[string]string{
"message": "Hello RabbitMQ Exchange",
})
// 转换为AMQP消息
msg := amqp.NewMessage([]byte(event.String()))
msg.Properties = &amqp.MessageProperties{
To: "my-exchange/my.routing.key", // 目标地址
}
msg.ApplicationProperties = map[string]any{
"cloudEvents:specversion": "1.0",
"cloudEvents:id": event.ID(),
"cloudEvents:source": event.Source(),
"cloudEvents:type": event.Type(),
"cloudEvents:time": event.Time().Format(time.RFC3339),
}
// 发送消息
err = sender.Send(context.Background(), msg, nil)
if err != nil {
return fmt.Errorf("发送失败: %w", err)
}
fmt.Println("消息已发送到交换机")
return nil
}
// 使用CloudEvents SDK发送
func sendWithCloudEvents() error {
// 创建AMQP协议
p, err := amqp.NewProtocol(
"amqp://guest:guest@localhost:5672",
"my-exchange/my.routing.key", // 目标地址
amqp.ConnOpts{},
amqp.SenderOpts{},
amqp.ReceiverOpts{},
)
if err != nil {
return fmt.Errorf("创建协议失败: %w", err)
}
// 创建CloudEvents客户端
c, err := cloudevents.NewClient(p)
if err != nil {
return fmt.Errorf("创建客户端失败: %w", err)
}
// 创建事件
event := cloudevents.NewEvent()
event.SetID("67890")
event.SetSource("my-cloud-events-app")
event.SetType("com.example.cloud")
event.SetTime(time.Now())
_ = event.SetData(cloudevents.ApplicationJSON, map[string]string{
"data": "CloudEvents to Exchange",
})
// 发送事件
ctx := cloudevents.ContextWithTarget(context.Background(),
"amqp://guest:guest@localhost:5672/my-exchange/my.routing.key")
if result := c.Send(ctx, event); cloudevents.IsUndelivered(result) {
return fmt.Errorf("发送失败: %v", result)
}
fmt.Println("CloudEvent已发送到交换机")
return nil
}
// 从交换机接收消息
func receiveFromExchange() error {
conn, err := amqp.Dial("amqp://localhost:5672", &amqp.ConnOptions{
SASLType: amqp.SASLTypePlain("guest", "guest"),
})
if err != nil {
return fmt.Errorf("连接失败: %w", err)
}
defer conn.Close()
session, err := conn.NewSession(context.Background(), nil)
if err != nil {
return fmt.Errorf("创建会话失败: %w", err)
}
// 创建接收链接,使用队列名(绑定到交换机的队列)
receiver, err := session.NewReceiver(context.Background(),
"my-queue", // 绑定到交换机的队列
&amqp.ReceiverOptions{
SourceAddress: "my-exchange",
})
if err != nil {
return fmt.Errorf("创建接收者失败: %w", err)
}
defer receiver.Close(context.Background())
// 接收消息
msg, err := receiver.Receive(context.Background(), nil)
if err != nil {
return fmt.Errorf("接收失败: %w", err)
}
// 接受消息
receiver.AcceptMessage(context.Background(), msg)
fmt.Printf("收到消息: %s\n", string(msg.GetData()))
// 如果是CloudEvents消息
if specVersion, ok := msg.ApplicationProperties["cloudEvents:specversion"]; ok {
fmt.Printf("CloudEvent specversion: %v\n", specVersion)
}
return nil
}
func main() {
// 发送消息到交换机
if err := sendToExchange(); err != nil {
fmt.Printf("发送错误: %v\n", err)
}
// 使用CloudEvents SDK发送
if err := sendWithCloudEvents(); err != nil {
fmt.Printf("CloudEvents发送错误: %v\n", err)
}
// 从交换机接收消息
if err := receiveFromExchange(); err != nil {
fmt.Printf("接收错误: %v\n", err)
}
}
关键点说明:
- AMQP 1.0地址格式:使用
"exchange-name/routing-key"格式指定目标 - RabbitMQ配置:确保启用AMQP 1.0插件并正确配置交换机:
rabbitmq-plugins enable rabbitmq_amqp1_0 - 绑定队列:需要在RabbitMQ中预先将队列绑定到交换机
- CloudEvents属性:通过
ApplicationProperties传递CloudEvents元数据
对于直接使用AMQP 1.0协议,交换机的声明和绑定通常在RabbitMQ管理界面或使用AMQP 0.9.1客户端预先配置,因为AMQP 1.0协议本身不包含声明交换机的操作。

