使用Golang生成包含map[string]interface{}的Protobuf数据

使用Golang生成包含map[string]interface{}的Protobuf数据 我正在尝试扩展 argo events 以支持 RabbitMQ 的仲裁队列:argoproj/argo-events - Issue 1479 - 允许在 AMQPQueueDeclare 中传递 AMQP 参数,而不是总是传递 nil(启用仲裁队列和其他功能)

这应该是一个相当简单的改动。Python 中的 pika 库已经可以发送可选参数,这些参数以字典形式发送给 Argo。这些可选参数用于 RabbitMQ 的插件和可选功能,并且 amqp 通道的事件源代码中,QueueDeclare 方法已经有一个用于“参数”的 Table 参数,但它总是 nil(新用户允许的最大链接数已满,但位置是 argoproj/argo-events 的 eventsources/sources/amqp/start.go#L257)。这个 Table 是一个 map[string]interface{},将其添加到类型文件中会引入对 streadway/amqp 的导入(在 GitHub 上,但作为新用户我只能链接两个地方),这在这里看起来不太合适,因为我认为该类型本意并非用作序列化格式。

除了整体概念外,我对 protobuf 并不十分熟悉,我已经阅读了基础知识。我创建了一个“问题”PR 来讨论类型。我尝试使用 Arguments []bytejson:“arguments,omitempty” protobuf:“bytes,6,rep,name=arguments”``,然后在实际的 AMQP 代码 start.go 中,我将字节序列化为 Table,在插件特定的导入中使用它更有意义。

在 protobuf 定义中,对于这种非结构化的 JSON,有没有更好的类型可以使用?


更多关于使用Golang生成包含map[string]interface{}的Protobuf数据的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于使用Golang生成包含map[string]interface{}的Protobuf数据的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在 Protobuf 中处理 map[string]interface{} 类型的数据,通常有以下几种方案:

方案一:使用 google.protobuf.Struct 类型(推荐)

这是处理动态 JSON 数据最标准的方式:

syntax = "proto3";

import "google/protobuf/struct.proto";

message AMQPQueueDeclare {
    string queue = 1;
    bool durable = 2;
    bool auto_delete = 3;
    bool exclusive = 4;
    bool no_wait = 5;
    google.protobuf.Struct arguments = 6;
}

对应的 Go 代码:

package main

import (
    "encoding/json"
    "fmt"
    
    "github.com/streadway/amqp"
    "google.golang.org/protobuf/encoding/protojson"
    "google.golang.org/protobuf/types/known/structpb"
)

// 从 Protobuf Struct 转换为 amqp.Table
func structToAMQPTable(pbStruct *structpb.Struct) (amqp.Table, error) {
    if pbStruct == nil {
        return nil, nil
    }
    
    // 将 Struct 转换为 map[string]interface{}
    data := pbStruct.AsMap()
    
    // 转换为 amqp.Table
    table := make(amqp.Table)
    for k, v := range data {
        table[k] = v
    }
    return table, nil
}

// 从 amqp.Table 转换为 Protobuf Struct
func amqpTableToStruct(table amqp.Table) (*structpb.Struct, error) {
    if table == nil {
        return nil, nil
    }
    
    // 直接转换,因为 amqp.Table 已经是 map[string]interface{}
    return structpb.NewStruct(table)
}

// 使用示例
func main() {
    // 模拟从外部接收到的参数
    args := map[string]interface{}{
        "x-queue-type": "quorum",
        "x-max-length": int64(10000),
        "x-overflow":   "reject-publish",
    }
    
    // 转换为 Struct
    pbStruct, err := structpb.NewStruct(args)
    if err != nil {
        panic(err)
    }
    
    // 序列化为 JSON bytes(用于传输)
    jsonBytes, err := protojson.Marshal(pbStruct)
    if err != nil {
        panic(err)
    }
    
    fmt.Printf("JSON: %s\n", string(jsonBytes))
    
    // 反序列化
    var newStruct structpb.Struct
    if err := protojson.Unmarshal(jsonBytes, &newStruct); err != nil {
        panic(err)
    }
    
    // 转换为 amqp.Table
    table, err := structToAMQPTable(&newStruct)
    if err != nil {
        panic(err)
    }
    
    fmt.Printf("AMQP Table: %v\n", table)
}

方案二:使用 bytes 字段存储 JSON

如果不想引入额外的依赖,可以使用 JSON bytes:

syntax = "proto3";

message AMQPQueueDeclare {
    string queue = 1;
    bool durable = 2;
    bool auto_delete = 3;
    bool exclusive = 4;
    bool no_wait = 5;
    bytes arguments = 6;  // JSON 格式的参数
}

对应的 Go 代码:

package main

import (
    "encoding/json"
    "fmt"
    
    "github.com/streadway/amqp"
)

// JSON bytes 转换为 amqp.Table
func jsonBytesToAMQPTable(jsonData []byte) (amqp.Table, error) {
    if len(jsonData) == 0 {
        return nil, nil
    }
    
    var args map[string]interface{}
    if err := json.Unmarshal(jsonData, &args); err != nil {
        return nil, err
    }
    
    table := make(amqp.Table)
    for k, v := range args {
        table[k] = v
    }
    return table, nil
}

// amqp.Table 转换为 JSON bytes
func amqpTableToJSONBytes(table amqp.Table) ([]byte, error) {
    if table == nil {
        return []byte("{}"), nil
    }
    
    return json.Marshal(table)
}

// 在事件源中的使用示例
func (e *EventSource) processQueueDeclare(req *AMQPQueueDeclare) error {
    var table amqp.Table
    
    if len(req.Arguments) > 0 {
        var err error
        table, err = jsonBytesToAMQPTable(req.Arguments)
        if err != nil {
            return fmt.Errorf("failed to parse arguments: %v", err)
        }
    }
    
    // 使用 table 调用 QueueDeclare
    _, err := e.channel.QueueDeclare(
        req.Queue,
        req.Durable,
        req.AutoDelete,
        req.Exclusive,
        req.NoWait,
        table,  // 传递参数
    )
    
    return err
}

方案三:使用 map<string, Value> 自定义消息

如果需要更精细的控制,可以定义自己的 Value 类型:

syntax = "proto3";

message AMQPQueueDeclare {
    string queue = 1;
    bool durable = 2;
    bool auto_delete = 3;
    bool exclusive = 4;
    bool no_wait = 5;
    map<string, AMQPValue> arguments = 6;
}

message AMQPValue {
    oneof value {
        string string_value = 1;
        int64 int_value = 2;
        bool bool_value = 3;
        double float_value = 4;
        bytes bytes_value = 5;
        AMQPTable table_value = 6;
        AMQPArray array_value = 7;
    }
}

message AMQPTable {
    map<string, AMQPValue> fields = 1;
}

message AMQPArray {
    repeated AMQPValue values = 1;
}

针对 Argo Events 的具体建议

对于你的 PR #1501,建议采用方案一(google.protobuf.Struct),因为:

  1. 类型安全Struct 类型专门设计用于处理动态 JSON 数据
  2. 标准兼容:这是 Google 官方推荐的方式
  3. 工具链完善:有成熟的序列化/反序列化工具
  4. 向后兼容:可以优雅地处理空值

修改示例:

// 在 start.go 中的 QueueDeclare 调用处
func (e *EventSource) declareQueue() error {
    var table amqp.Table
    
    if e.spec.Arguments != nil {
        // 将 Struct 转换为 amqp.Table
        argsMap := e.spec.Arguments.AsMap()
        table = make(amqp.Table)
        for k, v := range argsMap {
            table[k] = v
        }
    }
    
    _, err := e.channel.QueueDeclare(
        e.spec.QueueName,
        e.spec.Durable,
        e.spec.AutoDelete,
        e.spec.Exclusive,
        e.spec.NoWait,
        table,  // 传递参数
    )
    
    return err
}

这样既保持了类型安全,又能够灵活地支持 RabbitMQ 的各种插件参数。

回到顶部