golang高效Avro数据编码与解码插件库goavro的使用

Golang高效Avro数据编码与解码插件库goavro的使用

重要说明

在LinkedIn内部,大多数团队已经转向使用hamba/avro进行Avro序列化/反序列化需求,因为在大规模场景下发现其性能显著更高。goavro目前没有积极开发。

goavro简介

Goavro是一个用于编码和解码Avro数据的库。

功能特性

  • 支持二进制和文本JSON格式的Avro数据编码与解码
  • Codec是无状态的,可以安全地被多个goroutine并发使用

依赖说明

所有对gopkg.in的使用已被移除,转而支持Go模块。请将导入路径更新为github.com/linkedin/goavro/v2

对于v1用户,可以通过在go.modGopkg.toml文件中添加约束来继续使用旧版本的goavro:

require (
    github.com/linkedin/goavro v1.0.5
)

使用示例

基本使用示例

package main

import (
    "fmt"
    "github.com/linkedin/goavro/v2"
)

func main() {
    // 创建Avro编解码器
    codec, err := goavro.NewCodec(`
        {
          "type": "record",
          "name": "LongList",
          "fields" : [
            {"name": "next", "type": ["null", "LongList"], "default": null}
          ]
        }`)
    if err != nil {
        fmt.Println(err)
    }

    // 注意:使用默认值时可以省略字段
    textual := []byte(`{"next":{"LongList":{}}}`)

    // 将文本Avro数据(Avro JSON格式)转换为原生Go格式
    native, _, err := codec.NativeFromTextual(textual)
    if err != nil {
        fmt.Println(err)
    }

    // 将原生Go格式转换为二进制Avro数据
    binary, err := codec.BinaryFromNative(nil, native)
    if err != nil {
        fmt.Println(err)
    }

    // 将二进制Avro数据转换回原生Go格式
    native, _, err = codec.NativeFromBinary(binary)
    if err != nil {
        fmt.Println(err)
    }

    // 将原生Go格式转换为文本Avro数据
    textual, err = codec.TextualFromNative(nil, native)
    if err != nil {
        fmt.Println(err)
    }

    // 注意:文本编码将显示所有字段,包括那些值与默认值匹配的字段
    fmt.Println(string(textual))
    // 输出: {"next":{"LongList":{"next":null}}}
}

OCF文件读写示例

package main

import (
    "bytes"
    "fmt"
    "strings"
    "github.com/linkedin/goavro/v2"
)

func main() {
    avroSchema := `
    {
      "type": "record",
      "name": "test_schema",
      "fields": [
        {
          "name": "time",
          "type": "long"
        },
        {
          "name": "customer",
          "type": "string"
        }
      ]
    }`

    // 写入OCF数据
    var ocfFileContents bytes.Buffer
    writer, err := goavro.NewOCFWriter(goavro.OCFConfig{
        W:      &ocfFileContents,
        Schema: avroSchema,
    })
    if err != nil {
        fmt.Println(err)
    }
    err = writer.Append([]map[string]interface{}{
        {
            "time":     1617104831727,
            "customer": "customer1",
        },
        {
            "time":     1717104831727,
            "customer": "customer2",
        },
    })
    fmt.Println("ocfFileContents", ocfFileContents.String())

    // 读取OCF数据
    ocfReader, err := goavro.NewOCFReader(strings.NewReader(ocfFileContents.String()))
    if err != nil {
        fmt.Println(err)
    }
    fmt.Println("Records in OCF File")
    for ocfReader.Scan() {
        record, err := ocfReader.Read()
        if err != nil {
            fmt.Println(err)
        }
        fmt.Println("record", record)
    }
}

数据类型转换

Avro到Go数据转换

Avro类型 Go类型
null nil
boolean bool
bytes []byte
float float32
double float64
long int64
int int32
string string
array []interface{}
enum string
fixed []byte
maprecord map[string]interface{}
union 见下文

对于联合类型(union),当值为null时返回简单的Go nil;当值为非nil时,返回一个带有单个键的Go map[string]interface{}

Go到Avro数据转换

func ExampleUnion() {
    codec, err := goavro.NewCodec(`["null","string","int"]`)
    if err != nil {
        fmt.Println(err)
    }
    buf, err := codec.TextualFromNative(nil, goavro.Union("string", "some string"))
    if err != nil {
        fmt.Println(err)
    }
    fmt.Println(string(buf))
    // 输出: {"string":"some string"}
}

工具程序

  • ab2t: 将Avro OCF文件转换为Avro JSON编码
  • arw: Avro重写程序,可重写Avro OCF文件
  • avroheader: 打印OCF文件的各种头信息
  • splice: 将Avro模式文件和原始Avro二进制数据文件拼接成OCF文件

局限性

  • 别名支持不完整
  • Kafka流支持暂未实现
  • 默认最大块计数和块大小限制
  • 模式演化存在限制

许可证

Goavro许可证

Apache License 2.0

Google Snappy许可证

BSD许可证


更多关于golang高效Avro数据编码与解码插件库goavro的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高效Avro数据编码与解码插件库goavro的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


GoAvro:高效的Avro数据编码与解码库

GoAvro是一个用于Go语言的Avro数据序列化/反序列化库,它提供了高效的二进制编码和解码功能,支持Avro协议的所有特性。

安装

go get github.com/linkedin/goavro

基本使用

1. 编码数据

package main

import (
	"fmt"
	"github.com/linkedin/goavro"
)

func main() {
	// 定义Avro schema
	schema := `{
		"type": "record",
		"name": "User",
		"fields": [
			{"name": "name", "type": "string"},
			{"name": "age", "type": "int"},
			{"name": "emails", "type": {"type": "array", "items": "string"}},
			{"name": "active", "type": "boolean"}
		]
	}`

	// 创建Codec
	codec, err := goavro.NewCodec(schema)
	if err != nil {
		panic(err)
	}

	// 准备数据
	data := map[string]interface{}{
		"name":   "John Doe",
		"age":    30,
		"emails": []string{"john@example.com", "jdoe@example.com"},
		"active": true,
	}

	// 编码为二进制
	binary, err := codec.BinaryFromNative(nil, data)
	if err != nil {
		panic(err)
	}

	fmt.Printf("Encoded binary: %x\n", binary)
}

2. 解码数据

func decodeExample() {
	// 假设这是之前编码的二进制数据
	binary := []byte{0x0c, 0x4a, 0x6f, 0x68, 0x6e, 0x20, 0x44, 0x6f, 0x65, 0x3c, 0x04, 0x0c, 0x6a, 0x6f, 0x68, 0x6e, 0x40, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x0c, 0x6a, 0x64, 0x6f, 0x65, 0x40, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x2e, 0x63, 0x6f, 0x6d, 0x00, 0x01}

	// 使用相同的schema创建Codec
	schema := `{
		"type": "record",
		"name": "User",
		"fields": [
			{"name": "name", "type": "string"},
			{"name": "age", "type": "int"},
			{"name": "emails", "type": {"type": "array", "items": "string"}},
			{"name": "active", "type": "boolean"}
		]
	}`
	
	codec, err := goavro.NewCodec(schema)
	if err != nil {
		panic(err)
	}

	// 解码二进制数据
	native, _, err := codec.NativeFromBinary(binary)
	if err != nil {
		panic(err)
	}

	// 类型断言为map[string]interface{}
	decoded := native.(map[string]interface{})
	fmt.Printf("Decoded data: %+v\n", decoded)
}

高级特性

1. 使用OCF (Object Container File) 格式

func ocfExample() {
	// 创建OCF写入器
	schema := `{"type": "record", "name": "User", "fields": [{"name": "name", "type": "string"}]}`
	
	// 创建文件
	f, err := os.Create("users.avro")
	if err != nil {
		panic(err)
	}
	defer f.Close()

	// 创建OCF写入器
	ocfWriter, err := goavro.NewOCFWriter(goavro.OCFConfig{
		W:      f,
		Schema: schema,
	})
	if err != nil {
		panic(err)
	}

	// 写入数据
	err = ocfWriter.Append([]map[string]interface{}{
		{"name": "Alice"},
		{"name": "Bob"},
	})
	if err != nil {
		panic(err)
	}

	// 读取OCF文件
	f, err = os.Open("users.avro")
	if err != nil {
		panic(err)
	}
	defer f.Close()

	ocfReader, err := goavro.NewOCFReader(f)
	if err != nil {
		panic(err)
	}

	for ocfReader.Scan() {
		record, err := ocfReader.Read()
		if err != nil {
			panic(err)
		}
		fmt.Printf("Read record: %v\n", record)
	}
}

2. 性能优化技巧

  1. 重用Codec:Codec创建开销较大,应该重用
  2. 批量处理:使用Append方法批量写入数据
  3. 缓冲IO:使用bufio包装文件IO
func performanceExample() {
	schema := `{"type": "record", "name": "Log", "fields": [{"name": "message", "type": "string"}]}`
	codec, _ := goavro.NewCodec(schema)
	
	// 批量编码
	messages := []map[string]interface{}{
		{"message": "log entry 1"},
		{"message": "log entry 2"},
		// ... 更多日志
	}
	
	// 使用缓冲写入
	file, _ := os.Create("logs.avro")
	buffered := bufio.NewWriter(file)
	defer func() {
		buffered.Flush()
		file.Close()
	}()
	
	// 创建OCF写入器
	ocfWriter, _ := goavro.NewOCFWriter(goavro.OCFConfig{
		W:      buffered,
		Schema: schema,
		Codec:  codec, // 重用codec
	})
	
	// 批量写入
	_ = ocfWriter.Append(messages)
}

最佳实践

  1. Schema管理:将schema存储在单独的文件或schema registry中
  2. 错误处理:始终检查编码/解码操作的错误
  3. 类型安全:解码后对数据进行验证,确保类型正确
  4. 版本兼容:考虑schema演化时的向前/向后兼容性

GoAvro提供了强大的Avro数据处理能力,特别适合在大数据管道、Kafka消息序列化等场景中使用。通过合理使用其特性,可以实现高效的数据序列化和反序列化。

回到顶部