Golang中如何从单个protobuf二进制文件解码多个指标(流式多路复用)
Golang中如何从单个protobuf二进制文件解码多个指标(流式多路复用) 你好
我是Go语言的新手。我有一个包含多条消息(流)的protobuf二进制消息文件。在Python中可以轻松解码,如下所示,但在Go中找不到任何简单的方法来实现相同的功能。任何帮助都将不胜感激。
Python代码:
def read_pb_stream_from_file():
amw = my_stream_pb2.MyTestMessageWrapper()
count = 0
with open("stream.strmpb",'rb') as f:
data = f.read()
n = 0
while n < len(data):
msg_len,new_pos = _DecodeVarint32(data,0)
n = new_pos
msg_buf = data[n:n+msg_len]
n += msg_len
amw.ParseFromString(msg_buf)
data = data[n:]
print(count)
print(amw)
count+=1
更多关于Golang中如何从单个protobuf二进制文件解码多个指标(流式多路复用)的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于Golang中如何从单个protobuf二进制文件解码多个指标(流式多路复用)的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在Go中处理protobuf流式数据,可以使用google.golang.org/protobuf/proto包的DecodeZigZag和Unmarshal方法。以下是完整的示例代码:
package main
import (
"bufio"
"encoding/binary"
"fmt"
"io"
"os"
"google.golang.org/protobuf/proto"
"your_project/pb" // 替换为你的protobuf包路径
)
func decodeProtobufStream(filePath string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
reader := bufio.NewReader(file)
count := 0
for {
// 读取消息长度(varint编码)
msgLen, err := binary.ReadUvarint(reader)
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("读取消息长度失败: %v", err)
}
// 读取消息体
msgBuf := make([]byte, msgLen)
_, err = io.ReadFull(reader, msgBuf)
if err != nil {
return fmt.Errorf("读取消息体失败: %v", err)
}
// 解码protobuf消息
msg := &pb.MyTestMessageWrapper{}
if err := proto.Unmarshal(msgBuf, msg); err != nil {
return fmt.Errorf("解码protobuf失败: %v", err)
}
// 处理消息
fmt.Printf("消息 #%d:\n", count)
fmt.Println(msg)
count++
}
fmt.Printf("总共解码了 %d 条消息\n", count)
return nil
}
// 如果使用gogo protobuf,可以使用以下优化版本
func decodeProtobufStreamOptimized(filePath string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
var buf [1024]byte
reader := bufio.NewReader(file)
count := 0
for {
// 使用更高效的方式读取varint
msgLen, err := binary.ReadUvarint(reader)
if err != nil {
if err == io.EOF {
break
}
return err
}
// 复用缓冲区
var msgBuf []byte
if int(msgLen) <= len(buf) {
msgBuf = buf[:msgLen]
} else {
msgBuf = make([]byte, msgLen)
}
_, err = io.ReadFull(reader, msgBuf)
if err != nil {
return err
}
msg := &pb.MyTestMessageWrapper{}
if err := proto.Unmarshal(msgBuf, msg); err != nil {
return err
}
fmt.Printf("消息 #%d:\n", count)
fmt.Println(msg)
count++
}
return nil
}
func main() {
if err := decodeProtobufStream("stream.strmpb"); err != nil {
fmt.Printf("错误: %v\n", err)
}
}
对于更复杂的场景,可以使用bufio.Reader配合自定义的流处理器:
type StreamDecoder struct {
reader *bufio.Reader
}
func NewStreamDecoder(r io.Reader) *StreamDecoder {
return &StreamDecoder{
reader: bufio.NewReader(r),
}
}
func (d *StreamDecoder) Next() (*pb.MyTestMessageWrapper, error) {
// 读取消息长度
msgLen, err := binary.ReadUvarint(d.reader)
if err != nil {
return nil, err
}
// 读取消息体
msgBuf := make([]byte, msgLen)
_, err = io.ReadFull(d.reader, msgBuf)
if err != nil {
return nil, err
}
// 解码
msg := &pb.MyTestMessageWrapper{}
if err := proto.Unmarshal(msgBuf, msg); err != nil {
return nil, err
}
return msg, nil
}
// 使用示例
func processStream(filePath string) error {
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
decoder := NewStreamDecoder(file)
count := 0
for {
msg, err := decoder.Next()
if err != nil {
if err == io.EOF {
break
}
return err
}
fmt.Printf("处理消息 #%d: %v\n", count, msg)
count++
}
return nil
}
关键点:
- 使用
binary.ReadUvarint()读取varint编码的消息长度 - 使用
io.ReadFull()确保读取完整的消息体 - 使用
proto.Unmarshal()解码protobuf消息 - 循环处理直到遇到EOF
这种方法与Python示例的功能完全一致,能够正确处理流式多路复用的protobuf数据。

