Golang中如何从单个protobuf二进制文件解码多个指标(流式多路复用)

Golang中如何从单个protobuf二进制文件解码多个指标(流式多路复用) 你好

我是Go语言的新手。我有一个包含多条消息(流)的protobuf二进制消息文件。在Python中可以轻松解码,如下所示,但在Go中找不到任何简单的方法来实现相同的功能。任何帮助都将不胜感激。

Python代码:

def read_pb_stream_from_file():
    amw = my_stream_pb2.MyTestMessageWrapper()
    count = 0
    with open("stream.strmpb",'rb') as f:
        data = f.read()
        n = 0

        while n < len(data):
            msg_len,new_pos = _DecodeVarint32(data,0)
            n = new_pos
            msg_buf = data[n:n+msg_len]
            n += msg_len
            amw.ParseFromString(msg_buf)
            data = data[n:]
            print(count)
            print(amw)
            count+=1

更多关于Golang中如何从单个protobuf二进制文件解码多个指标(流式多路复用)的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang中如何从单个protobuf二进制文件解码多个指标(流式多路复用)的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go中处理protobuf流式数据,可以使用google.golang.org/protobuf/proto包的DecodeZigZagUnmarshal方法。以下是完整的示例代码:

package main

import (
    "bufio"
    "encoding/binary"
    "fmt"
    "io"
    "os"

    "google.golang.org/protobuf/proto"
    "your_project/pb" // 替换为你的protobuf包路径
)

func decodeProtobufStream(filePath string) error {
    file, err := os.Open(filePath)
    if err != nil {
        return err
    }
    defer file.Close()

    reader := bufio.NewReader(file)
    count := 0

    for {
        // 读取消息长度(varint编码)
        msgLen, err := binary.ReadUvarint(reader)
        if err != nil {
            if err == io.EOF {
                break
            }
            return fmt.Errorf("读取消息长度失败: %v", err)
        }

        // 读取消息体
        msgBuf := make([]byte, msgLen)
        _, err = io.ReadFull(reader, msgBuf)
        if err != nil {
            return fmt.Errorf("读取消息体失败: %v", err)
        }

        // 解码protobuf消息
        msg := &pb.MyTestMessageWrapper{}
        if err := proto.Unmarshal(msgBuf, msg); err != nil {
            return fmt.Errorf("解码protobuf失败: %v", err)
        }

        // 处理消息
        fmt.Printf("消息 #%d:\n", count)
        fmt.Println(msg)
        count++
    }

    fmt.Printf("总共解码了 %d 条消息\n", count)
    return nil
}

// 如果使用gogo protobuf,可以使用以下优化版本
func decodeProtobufStreamOptimized(filePath string) error {
    file, err := os.Open(filePath)
    if err != nil {
        return err
    }
    defer file.Close()

    var buf [1024]byte
    reader := bufio.NewReader(file)
    count := 0

    for {
        // 使用更高效的方式读取varint
        msgLen, err := binary.ReadUvarint(reader)
        if err != nil {
            if err == io.EOF {
                break
            }
            return err
        }

        // 复用缓冲区
        var msgBuf []byte
        if int(msgLen) <= len(buf) {
            msgBuf = buf[:msgLen]
        } else {
            msgBuf = make([]byte, msgLen)
        }

        _, err = io.ReadFull(reader, msgBuf)
        if err != nil {
            return err
        }

        msg := &pb.MyTestMessageWrapper{}
        if err := proto.Unmarshal(msgBuf, msg); err != nil {
            return err
        }

        fmt.Printf("消息 #%d:\n", count)
        fmt.Println(msg)
        count++
    }

    return nil
}

func main() {
    if err := decodeProtobufStream("stream.strmpb"); err != nil {
        fmt.Printf("错误: %v\n", err)
    }
}

对于更复杂的场景,可以使用bufio.Reader配合自定义的流处理器:

type StreamDecoder struct {
    reader *bufio.Reader
}

func NewStreamDecoder(r io.Reader) *StreamDecoder {
    return &StreamDecoder{
        reader: bufio.NewReader(r),
    }
}

func (d *StreamDecoder) Next() (*pb.MyTestMessageWrapper, error) {
    // 读取消息长度
    msgLen, err := binary.ReadUvarint(d.reader)
    if err != nil {
        return nil, err
    }

    // 读取消息体
    msgBuf := make([]byte, msgLen)
    _, err = io.ReadFull(d.reader, msgBuf)
    if err != nil {
        return nil, err
    }

    // 解码
    msg := &pb.MyTestMessageWrapper{}
    if err := proto.Unmarshal(msgBuf, msg); err != nil {
        return nil, err
    }

    return msg, nil
}

// 使用示例
func processStream(filePath string) error {
    file, err := os.Open(filePath)
    if err != nil {
        return err
    }
    defer file.Close()

    decoder := NewStreamDecoder(file)
    count := 0

    for {
        msg, err := decoder.Next()
        if err != nil {
            if err == io.EOF {
                break
            }
            return err
        }

        fmt.Printf("处理消息 #%d: %v\n", count, msg)
        count++
    }

    return nil
}

关键点:

  1. 使用binary.ReadUvarint()读取varint编码的消息长度
  2. 使用io.ReadFull()确保读取完整的消息体
  3. 使用proto.Unmarshal()解码protobuf消息
  4. 循环处理直到遇到EOF

这种方法与Python示例的功能完全一致,能够正确处理流式多路复用的protobuf数据。

回到顶部