使用Golang通过AMQP v1+发送消息到RabbitMQ

使用Golang通过AMQP v1+发送消息到RabbitMQ 嘿! 有一个RabbitMQ,它不仅能配置队列,还能配置交换机。在其AMQP协议版本0.9(streadway/amqp)上一切运行良好。 为此,我们有一个特殊的方法 channel.ExchangeDeclare()

但现在我需要使用AMQP版本1.0+(Azure/go-amqp)。

此外,我需要使用CloudEvents。为此我使用了这个库:github.com/cloudevents/sdk-go

但我找不到如何在设置协议时,不仅仅是在RabbitMQ队列中发送和接收消息,而是在RabbitMQ交换机中发送和接收消息。

我使用了这个示例:

GitHub

cloudevents/sdk-go

Go SDK for CloudEvents. Contribute to cloudevents/sdk-go development by creating an account on GitHub.

有人能帮忙理解如何做到这一点吗?如何设置交换机?


更多关于使用Golang通过AMQP v1+发送消息到RabbitMQ的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于使用Golang通过AMQP v1+发送消息到RabbitMQ的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在AMQP 1.0协议中,交换机的概念与AMQP 0.9.1不同。AMQP 1.0使用更通用的消息路由模型,RabbitMQ通过插件支持AMQP 1.0时会自动处理交换逻辑。

对于CloudEvents + AMQP 1.0,你需要通过消息的propertiesapplication-properties来指定路由信息。以下是具体实现:

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/Azure/go-amqp"
    cloudevents "github.com/cloudevents/sdk-go/v2"
    "github.com/cloudevents/sdk-go/v2/protocol/amqp"
)

// 发送到交换机
func sendToExchange() error {
    // 创建AMQP连接
    conn, err := amqp.Dial("amqp://localhost:5672", &amqp.ConnOptions{
        SASLType: amqp.SASLTypePlain("guest", "guest"),
    })
    if err != nil {
        return fmt.Errorf("连接失败: %w", err)
    }
    defer conn.Close()

    // 创建会话
    session, err := conn.NewSession(context.Background(), nil)
    if err != nil {
        return fmt.Errorf("创建会话失败: %w", err)
    }

    // 创建发送链接,目标地址格式为"exchange-name/routing-key"
    sender, err := session.NewSender(context.Background(), 
        "my-exchange/my.routing.key", // 交换机和路由键
        &amqp.SenderOptions{
            TargetAddress: "my-exchange",
        })
    if err != nil {
        return fmt.Errorf("创建发送者失败: %w", err)
    }
    defer sender.Close(context.Background())

    // 创建CloudEvent
    event := cloudevents.NewEvent()
    event.SetID("12345")
    event.SetSource("my-app")
    event.SetType("com.example.sample")
    event.SetTime(time.Now())
    _ = event.SetData(cloudevents.ApplicationJSON, map[string]string{
        "message": "Hello RabbitMQ Exchange",
    })

    // 转换为AMQP消息
    msg := amqp.NewMessage([]byte(event.String()))
    msg.Properties = &amqp.MessageProperties{
        To: "my-exchange/my.routing.key", // 目标地址
    }
    msg.ApplicationProperties = map[string]any{
        "cloudEvents:specversion": "1.0",
        "cloudEvents:id":          event.ID(),
        "cloudEvents:source":      event.Source(),
        "cloudEvents:type":        event.Type(),
        "cloudEvents:time":        event.Time().Format(time.RFC3339),
    }

    // 发送消息
    err = sender.Send(context.Background(), msg, nil)
    if err != nil {
        return fmt.Errorf("发送失败: %w", err)
    }

    fmt.Println("消息已发送到交换机")
    return nil
}

// 使用CloudEvents SDK发送
func sendWithCloudEvents() error {
    // 创建AMQP协议
    p, err := amqp.NewProtocol(
        "amqp://guest:guest@localhost:5672",
        "my-exchange/my.routing.key", // 目标地址
        amqp.ConnOpts{},
        amqp.SenderOpts{},
        amqp.ReceiverOpts{},
    )
    if err != nil {
        return fmt.Errorf("创建协议失败: %w", err)
    }

    // 创建CloudEvents客户端
    c, err := cloudevents.NewClient(p)
    if err != nil {
        return fmt.Errorf("创建客户端失败: %w", err)
    }

    // 创建事件
    event := cloudevents.NewEvent()
    event.SetID("67890")
    event.SetSource("my-cloud-events-app")
    event.SetType("com.example.cloud")
    event.SetTime(time.Now())
    _ = event.SetData(cloudevents.ApplicationJSON, map[string]string{
        "data": "CloudEvents to Exchange",
    })

    // 发送事件
    ctx := cloudevents.ContextWithTarget(context.Background(), 
        "amqp://guest:guest@localhost:5672/my-exchange/my.routing.key")
    
    if result := c.Send(ctx, event); cloudevents.IsUndelivered(result) {
        return fmt.Errorf("发送失败: %v", result)
    }

    fmt.Println("CloudEvent已发送到交换机")
    return nil
}

// 从交换机接收消息
func receiveFromExchange() error {
    conn, err := amqp.Dial("amqp://localhost:5672", &amqp.ConnOptions{
        SASLType: amqp.SASLTypePlain("guest", "guest"),
    })
    if err != nil {
        return fmt.Errorf("连接失败: %w", err)
    }
    defer conn.Close()

    session, err := conn.NewSession(context.Background(), nil)
    if err != nil {
        return fmt.Errorf("创建会话失败: %w", err)
    }

    // 创建接收链接,使用队列名(绑定到交换机的队列)
    receiver, err := session.NewReceiver(context.Background(), 
        "my-queue", // 绑定到交换机的队列
        &amqp.ReceiverOptions{
            SourceAddress: "my-exchange",
        })
    if err != nil {
        return fmt.Errorf("创建接收者失败: %w", err)
    }
    defer receiver.Close(context.Background())

    // 接收消息
    msg, err := receiver.Receive(context.Background(), nil)
    if err != nil {
        return fmt.Errorf("接收失败: %w", err)
    }

    // 接受消息
    receiver.AcceptMessage(context.Background(), msg)

    fmt.Printf("收到消息: %s\n", string(msg.GetData()))
    
    // 如果是CloudEvents消息
    if specVersion, ok := msg.ApplicationProperties["cloudEvents:specversion"]; ok {
        fmt.Printf("CloudEvent specversion: %v\n", specVersion)
    }

    return nil
}

func main() {
    // 发送消息到交换机
    if err := sendToExchange(); err != nil {
        fmt.Printf("发送错误: %v\n", err)
    }

    // 使用CloudEvents SDK发送
    if err := sendWithCloudEvents(); err != nil {
        fmt.Printf("CloudEvents发送错误: %v\n", err)
    }

    // 从交换机接收消息
    if err := receiveFromExchange(); err != nil {
        fmt.Printf("接收错误: %v\n", err)
    }
}

关键点说明:

  1. AMQP 1.0地址格式:使用"exchange-name/routing-key"格式指定目标
  2. RabbitMQ配置:确保启用AMQP 1.0插件并正确配置交换机:
    rabbitmq-plugins enable rabbitmq_amqp1_0
    
  3. 绑定队列:需要在RabbitMQ中预先将队列绑定到交换机
  4. CloudEvents属性:通过ApplicationProperties传递CloudEvents元数据

对于直接使用AMQP 1.0协议,交换机的声明和绑定通常在RabbitMQ管理界面或使用AMQP 0.9.1客户端预先配置,因为AMQP 1.0协议本身不包含声明交换机的操作。

回到顶部