使用Golang生成包含map[string]interface{}的Protobuf数据
使用Golang生成包含map[string]interface{}的Protobuf数据 我正在尝试扩展 argo events 以支持 RabbitMQ 的仲裁队列:argoproj/argo-events - Issue 1479 - 允许在 AMQPQueueDeclare 中传递 AMQP 参数,而不是总是传递 nil(启用仲裁队列和其他功能)
这应该是一个相当简单的改动。Python 中的 pika 库已经可以发送可选参数,这些参数以字典形式发送给 Argo。这些可选参数用于 RabbitMQ 的插件和可选功能,并且 amqp 通道的事件源代码中,QueueDeclare 方法已经有一个用于“参数”的 Table 参数,但它总是 nil(新用户允许的最大链接数已满,但位置是 argoproj/argo-events 的 eventsources/sources/amqp/start.go#L257)。这个 Table 是一个 map[string]interface{},将其添加到类型文件中会引入对 streadway/amqp 的导入(在 GitHub 上,但作为新用户我只能链接两个地方),这在这里看起来不太合适,因为我认为该类型本意并非用作序列化格式。
除了整体概念外,我对 protobuf 并不十分熟悉,我已经阅读了基础知识。我创建了一个“问题”PR 来讨论类型。我尝试使用 Arguments []bytejson:“arguments,omitempty” protobuf:“bytes,6,rep,name=arguments”``,然后在实际的 AMQP 代码 start.go 中,我将字节序列化为 Table,在插件特定的导入中使用它更有意义。
在 protobuf 定义中,对于这种非结构化的 JSON,有没有更好的类型可以使用?
更多关于使用Golang生成包含map[string]interface{}的Protobuf数据的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于使用Golang生成包含map[string]interface{}的Protobuf数据的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在 Protobuf 中处理 map[string]interface{} 类型的数据,通常有以下几种方案:
方案一:使用 google.protobuf.Struct 类型(推荐)
这是处理动态 JSON 数据最标准的方式:
syntax = "proto3";
import "google/protobuf/struct.proto";
message AMQPQueueDeclare {
string queue = 1;
bool durable = 2;
bool auto_delete = 3;
bool exclusive = 4;
bool no_wait = 5;
google.protobuf.Struct arguments = 6;
}
对应的 Go 代码:
package main
import (
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/structpb"
)
// 从 Protobuf Struct 转换为 amqp.Table
func structToAMQPTable(pbStruct *structpb.Struct) (amqp.Table, error) {
if pbStruct == nil {
return nil, nil
}
// 将 Struct 转换为 map[string]interface{}
data := pbStruct.AsMap()
// 转换为 amqp.Table
table := make(amqp.Table)
for k, v := range data {
table[k] = v
}
return table, nil
}
// 从 amqp.Table 转换为 Protobuf Struct
func amqpTableToStruct(table amqp.Table) (*structpb.Struct, error) {
if table == nil {
return nil, nil
}
// 直接转换,因为 amqp.Table 已经是 map[string]interface{}
return structpb.NewStruct(table)
}
// 使用示例
func main() {
// 模拟从外部接收到的参数
args := map[string]interface{}{
"x-queue-type": "quorum",
"x-max-length": int64(10000),
"x-overflow": "reject-publish",
}
// 转换为 Struct
pbStruct, err := structpb.NewStruct(args)
if err != nil {
panic(err)
}
// 序列化为 JSON bytes(用于传输)
jsonBytes, err := protojson.Marshal(pbStruct)
if err != nil {
panic(err)
}
fmt.Printf("JSON: %s\n", string(jsonBytes))
// 反序列化
var newStruct structpb.Struct
if err := protojson.Unmarshal(jsonBytes, &newStruct); err != nil {
panic(err)
}
// 转换为 amqp.Table
table, err := structToAMQPTable(&newStruct)
if err != nil {
panic(err)
}
fmt.Printf("AMQP Table: %v\n", table)
}
方案二:使用 bytes 字段存储 JSON
如果不想引入额外的依赖,可以使用 JSON bytes:
syntax = "proto3";
message AMQPQueueDeclare {
string queue = 1;
bool durable = 2;
bool auto_delete = 3;
bool exclusive = 4;
bool no_wait = 5;
bytes arguments = 6; // JSON 格式的参数
}
对应的 Go 代码:
package main
import (
"encoding/json"
"fmt"
"github.com/streadway/amqp"
)
// JSON bytes 转换为 amqp.Table
func jsonBytesToAMQPTable(jsonData []byte) (amqp.Table, error) {
if len(jsonData) == 0 {
return nil, nil
}
var args map[string]interface{}
if err := json.Unmarshal(jsonData, &args); err != nil {
return nil, err
}
table := make(amqp.Table)
for k, v := range args {
table[k] = v
}
return table, nil
}
// amqp.Table 转换为 JSON bytes
func amqpTableToJSONBytes(table amqp.Table) ([]byte, error) {
if table == nil {
return []byte("{}"), nil
}
return json.Marshal(table)
}
// 在事件源中的使用示例
func (e *EventSource) processQueueDeclare(req *AMQPQueueDeclare) error {
var table amqp.Table
if len(req.Arguments) > 0 {
var err error
table, err = jsonBytesToAMQPTable(req.Arguments)
if err != nil {
return fmt.Errorf("failed to parse arguments: %v", err)
}
}
// 使用 table 调用 QueueDeclare
_, err := e.channel.QueueDeclare(
req.Queue,
req.Durable,
req.AutoDelete,
req.Exclusive,
req.NoWait,
table, // 传递参数
)
return err
}
方案三:使用 map<string, Value> 自定义消息
如果需要更精细的控制,可以定义自己的 Value 类型:
syntax = "proto3";
message AMQPQueueDeclare {
string queue = 1;
bool durable = 2;
bool auto_delete = 3;
bool exclusive = 4;
bool no_wait = 5;
map<string, AMQPValue> arguments = 6;
}
message AMQPValue {
oneof value {
string string_value = 1;
int64 int_value = 2;
bool bool_value = 3;
double float_value = 4;
bytes bytes_value = 5;
AMQPTable table_value = 6;
AMQPArray array_value = 7;
}
}
message AMQPTable {
map<string, AMQPValue> fields = 1;
}
message AMQPArray {
repeated AMQPValue values = 1;
}
针对 Argo Events 的具体建议
对于你的 PR #1501,建议采用方案一(google.protobuf.Struct),因为:
- 类型安全:
Struct类型专门设计用于处理动态 JSON 数据 - 标准兼容:这是 Google 官方推荐的方式
- 工具链完善:有成熟的序列化/反序列化工具
- 向后兼容:可以优雅地处理空值
修改示例:
// 在 start.go 中的 QueueDeclare 调用处
func (e *EventSource) declareQueue() error {
var table amqp.Table
if e.spec.Arguments != nil {
// 将 Struct 转换为 amqp.Table
argsMap := e.spec.Arguments.AsMap()
table = make(amqp.Table)
for k, v := range argsMap {
table[k] = v
}
}
_, err := e.channel.QueueDeclare(
e.spec.QueueName,
e.spec.Durable,
e.spec.AutoDelete,
e.spec.Exclusive,
e.spec.NoWait,
table, // 传递参数
)
return err
}
这样既保持了类型安全,又能够灵活地支持 RabbitMQ 的各种插件参数。

