golang高效Avro数据编码与解码插件库goavro的使用
Golang高效Avro数据编码与解码插件库goavro的使用
重要说明
在LinkedIn内部,大多数团队已经转向使用hamba/avro进行Avro序列化/反序列化需求,因为在大规模场景下发现其性能显著更高。goavro目前没有积极开发。
goavro简介
Goavro是一个用于编码和解码Avro数据的库。
功能特性
- 支持二进制和文本JSON格式的Avro数据编码与解码
Codec
是无状态的,可以安全地被多个goroutine并发使用
依赖说明
所有对gopkg.in
的使用已被移除,转而支持Go模块。请将导入路径更新为github.com/linkedin/goavro/v2
。
对于v1用户,可以通过在go.mod
或Gopkg.toml
文件中添加约束来继续使用旧版本的goavro:
require (
github.com/linkedin/goavro v1.0.5
)
使用示例
基本使用示例
package main
import (
"fmt"
"github.com/linkedin/goavro/v2"
)
func main() {
// 创建Avro编解码器
codec, err := goavro.NewCodec(`
{
"type": "record",
"name": "LongList",
"fields" : [
{"name": "next", "type": ["null", "LongList"], "default": null}
]
}`)
if err != nil {
fmt.Println(err)
}
// 注意:使用默认值时可以省略字段
textual := []byte(`{"next":{"LongList":{}}}`)
// 将文本Avro数据(Avro JSON格式)转换为原生Go格式
native, _, err := codec.NativeFromTextual(textual)
if err != nil {
fmt.Println(err)
}
// 将原生Go格式转换为二进制Avro数据
binary, err := codec.BinaryFromNative(nil, native)
if err != nil {
fmt.Println(err)
}
// 将二进制Avro数据转换回原生Go格式
native, _, err = codec.NativeFromBinary(binary)
if err != nil {
fmt.Println(err)
}
// 将原生Go格式转换为文本Avro数据
textual, err = codec.TextualFromNative(nil, native)
if err != nil {
fmt.Println(err)
}
// 注意:文本编码将显示所有字段,包括那些值与默认值匹配的字段
fmt.Println(string(textual))
// 输出: {"next":{"LongList":{"next":null}}}
}
OCF文件读写示例
package main
import (
"bytes"
"fmt"
"strings"
"github.com/linkedin/goavro/v2"
)
func main() {
avroSchema := `
{
"type": "record",
"name": "test_schema",
"fields": [
{
"name": "time",
"type": "long"
},
{
"name": "customer",
"type": "string"
}
]
}`
// 写入OCF数据
var ocfFileContents bytes.Buffer
writer, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: &ocfFileContents,
Schema: avroSchema,
})
if err != nil {
fmt.Println(err)
}
err = writer.Append([]map[string]interface{}{
{
"time": 1617104831727,
"customer": "customer1",
},
{
"time": 1717104831727,
"customer": "customer2",
},
})
fmt.Println("ocfFileContents", ocfFileContents.String())
// 读取OCF数据
ocfReader, err := goavro.NewOCFReader(strings.NewReader(ocfFileContents.String()))
if err != nil {
fmt.Println(err)
}
fmt.Println("Records in OCF File")
for ocfReader.Scan() {
record, err := ocfReader.Read()
if err != nil {
fmt.Println(err)
}
fmt.Println("record", record)
}
}
数据类型转换
Avro到Go数据转换
Avro类型 | Go类型 |
---|---|
null |
nil |
boolean |
bool |
bytes |
[]byte |
float |
float32 |
double |
float64 |
long |
int64 |
int |
int32 |
string |
string |
array |
[]interface{} |
enum |
string |
fixed |
[]byte |
map 和record |
map[string]interface{} |
union |
见下文 |
对于联合类型(union),当值为null
时返回简单的Go nil
;当值为非nil
时,返回一个带有单个键的Go map[string]interface{}
。
Go到Avro数据转换
func ExampleUnion() {
codec, err := goavro.NewCodec(`["null","string","int"]`)
if err != nil {
fmt.Println(err)
}
buf, err := codec.TextualFromNative(nil, goavro.Union("string", "some string"))
if err != nil {
fmt.Println(err)
}
fmt.Println(string(buf))
// 输出: {"string":"some string"}
}
工具程序
ab2t
: 将Avro OCF文件转换为Avro JSON编码arw
: Avro重写程序,可重写Avro OCF文件avroheader
: 打印OCF文件的各种头信息splice
: 将Avro模式文件和原始Avro二进制数据文件拼接成OCF文件
局限性
- 别名支持不完整
- Kafka流支持暂未实现
- 默认最大块计数和块大小限制
- 模式演化存在限制
许可证
Goavro许可证
Apache License 2.0
Google Snappy许可证
BSD许可证
更多关于golang高效Avro数据编码与解码插件库goavro的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang高效Avro数据编码与解码插件库goavro的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
GoAvro:高效的Avro数据编码与解码库
GoAvro是一个用于Go语言的Avro数据序列化/反序列化库,它提供了高效的二进制编码和解码功能,支持Avro协议的所有特性。
安装
go get github.com/linkedin/goavro
基本使用
1. 编码数据
package main
import (
"fmt"
"github.com/linkedin/goavro"
)
func main() {
// 定义Avro schema
schema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "emails", "type": {"type": "array", "items": "string"}},
{"name": "active", "type": "boolean"}
]
}`
// 创建Codec
codec, err := goavro.NewCodec(schema)
if err != nil {
panic(err)
}
// 准备数据
data := map[string]interface{}{
"name": "John Doe",
"age": 30,
"emails": []string{"john@example.com", "jdoe@example.com"},
"active": true,
}
// 编码为二进制
binary, err := codec.BinaryFromNative(nil, data)
if err != nil {
panic(err)
}
fmt.Printf("Encoded binary: %x\n", binary)
}
2. 解码数据
func decodeExample() {
// 假设这是之前编码的二进制数据
binary := []byte{0x0c, 0x4a, 0x6f, 0x68, 0x6e, 0x20, 0x44, 0x6f, 0x65, 0x3c, 0x04, 0x0c, 0x6a, 0x6f, 0x68, 0x6e, 0x40, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x0c, 0x6a, 0x64, 0x6f, 0x65, 0x40, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x00, 0x01}
// 使用相同的schema创建Codec
schema := `{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"},
{"name": "emails", "type": {"type": "array", "items": "string"}},
{"name": "active", "type": "boolean"}
]
}`
codec, err := goavro.NewCodec(schema)
if err != nil {
panic(err)
}
// 解码二进制数据
native, _, err := codec.NativeFromBinary(binary)
if err != nil {
panic(err)
}
// 类型断言为map[string]interface{}
decoded := native.(map[string]interface{})
fmt.Printf("Decoded data: %+v\n", decoded)
}
高级特性
1. 使用OCF (Object Container File) 格式
func ocfExample() {
// 创建OCF写入器
schema := `{"type": "record", "name": "User", "fields": [{"name": "name", "type": "string"}]}`
// 创建文件
f, err := os.Create("users.avro")
if err != nil {
panic(err)
}
defer f.Close()
// 创建OCF写入器
ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: f,
Schema: schema,
})
if err != nil {
panic(err)
}
// 写入数据
err = ocfWriter.Append([]map[string]interface{}{
{"name": "Alice"},
{"name": "Bob"},
})
if err != nil {
panic(err)
}
// 读取OCF文件
f, err = os.Open("users.avro")
if err != nil {
panic(err)
}
defer f.Close()
ocfReader, err := goavro.NewOCFReader(f)
if err != nil {
panic(err)
}
for ocfReader.Scan() {
record, err := ocfReader.Read()
if err != nil {
panic(err)
}
fmt.Printf("Read record: %v\n", record)
}
}
2. 性能优化技巧
- 重用Codec:Codec创建开销较大,应该重用
- 批量处理:使用Append方法批量写入数据
- 缓冲IO:使用bufio包装文件IO
func performanceExample() {
schema := `{"type": "record", "name": "Log", "fields": [{"name": "message", "type": "string"}]}`
codec, _ := goavro.NewCodec(schema)
// 批量编码
messages := []map[string]interface{}{
{"message": "log entry 1"},
{"message": "log entry 2"},
// ... 更多日志
}
// 使用缓冲写入
file, _ := os.Create("logs.avro")
buffered := bufio.NewWriter(file)
defer func() {
buffered.Flush()
file.Close()
}()
// 创建OCF写入器
ocfWriter, _ := goavro.NewOCFWriter(goavro.OCFConfig{
W: buffered,
Schema: schema,
Codec: codec, // 重用codec
})
// 批量写入
_ = ocfWriter.Append(messages)
}
最佳实践
- Schema管理:将schema存储在单独的文件或schema registry中
- 错误处理:始终检查编码/解码操作的错误
- 类型安全:解码后对数据进行验证,确保类型正确
- 版本兼容:考虑schema演化时的向前/向后兼容性
GoAvro提供了强大的Avro数据处理能力,特别适合在大数据管道、Kafka消息序列化等场景中使用。通过合理使用其特性,可以实现高效的数据序列化和反序列化。