golang高效读写Parquet文件插件库parquet的使用
Golang高效读写Parquet文件插件库parquet的使用
概述
Parquet是一个基于Go结构体生成Parquet读写器的库。它可以根据您定义的结构体或者通过读取现有Parquet文件生成的结构体来工作。
请注意:如果您基于现有Parquet文件生成代码,会有一些限制:每个PageHeader的PageType必须是DATA_PAGE,ColumnMetaData中定义的Codec必须是PLAIN或SNAPPY。此外,Parquet文件的模式必须由当前支持的类型组成。
安装
go get -u github.com/parsyl/parquet/...
这将同时安装parquet的唯一两个依赖项:thrift和snappy。
使用示例
1. 定义结构体
首先定义一个结构体,用于写入Parquet数据:
type Person struct {
ID int32 `parquet:"id"`
Age *int32 `parquet:"age"`
}
2. 添加生成指令
在代码中添加go:generate注释:
// go:generate parquetgen -input main.go -type Person -package main
3. 生成读写器代码
运行以下命令生成读写器代码:
$ go generate
这将生成一个新的文件(parquet.go),其中定义了ParquetWriter和ParquetReader。
4. 使用生成的读写器
package main
import (
"bytes"
"encoding/json"
)
func main() {
var buf bytes.Buffer
w, err := NewParquetWriter(&buf)
if err != nil {
log.Fatal(err)
}
w.Add(Person{ID: 1, Age: getAge(30)})
w.Add(Person{ID: 2})
// 每次Write调用都会创建一个新的parquet行组
if err := w.Write(); err != nil {
log.Fatal(err)
}
// 完成后必须调用Close,它会在文件末尾写入parquet元数据
if err := w.Close(); err != nil {
log.Fatal(err)
}
r, err := NewParquetReader(bytes.NewReader(buf.Bytes()))
if err != nil {
log.Fatal(err)
}
enc := json.NewEncoder(os.Stdout)
for r.Next() {
var p Person
r.Scan(&p)
enc.Encode(p)
}
if err := r.Error(); err != nil {
log.Fatal(err)
}
}
func getAge(a int32) *int32 { return &a }
5. 可选参数
NewParquetWriter有几个可选参数:MaxPageSize、Uncompressed和Snappy。例如:
w, err := NewParquetWriter(&buf, MaxPageSize(10000), Snappy)
支持的类型
用于定义Parquet数据的结构体可以包含以下类型:
int32
uint32
int64
uint64
float32
float64
string
bool
这些类型可以是指针,表示数据是可选的。结构体还可以嵌入另一个结构体:
type Being struct {
ID int32 `parquet:"id"`
Age *int32 `parquet:"age"`
}
type Person struct {
Being
Username string `parquet:"username"`
}
也支持嵌套和重复的结构体:
type Being struct {
ID int32 `parquet:"id"`
Age *int32 `parquet:"age"`
}
type Person struct {
Being Being
Username string `parquet:"username"`
Friends []Being
}
如果您想排除某个字段不写入Parquet,可以用破折号标记它或使其不被导出:
type Being struct {
ID int32 `parquet:"id"`
Password string`parquet:"-"` //不会写入parquet
age int32 //不会写入parquet
}
parquetgen工具
parquetgen是go generate应该调用的命令,用于为您的自定义类型生成代码。它还可以打印Parquet文件的页眉和文件元数据:
$ parquetgen --help
Usage of parquetgen:
-ignore
ignore unsupported fields in -type, otherwise log.Fatal is called when an unsupported type is encountered (default true)
-import string
import statement of -type if it doesn't live in -package
-input string
path to the go file that defines -type
-metadata
print the metadata of a parquet file (-parquet) and exit
-output string
name of the file that is produced, defaults to parquet.go (default "parquet.go")
-package string
package of the generated code
-pageheaders
print the page headers of a parquet file (-parquet) and exit (also prints the metadata)
-parquet string
path to a parquet file (if you are generating code based on an existing parquet file or printing the file metadata or page headers)
-struct-output string
name of the file that is produced, defaults to parquet.go (default "generated_struct.go")
-type string
name of the struct that will used for writing and reading
更多关于golang高效读写Parquet文件插件库parquet的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang高效读写Parquet文件插件库parquet的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用Go高效读写Parquet文件 - parquet-go库详解
Parquet是一种高效的列式存储格式,特别适合大数据处理场景。在Go生态中,parquet-go
是一个高性能的Parquet文件读写库。
安装parquet-go
go get github.com/xitongsys/parquet-go
go get github.com/xitongsys/parquet-go-source
基本使用示例
1. 写入Parquet文件
package main
import (
"log"
"os"
"time"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/writer"
)
type Student struct {
Name string `parquet:"name=name, type=UTF8"`
Age int32 `parquet:"name=age, type=INT32"`
Id int64 `parquet:"name=id, type=INT64"`
Weight float32 `parquet:"name=weight, type=FLOAT"`
Sex bool `parquet:"name=sex, type=BOOLEAN"`
Day int32 `parquet:"name=day, type=DATE"`
Ignored int32 // 没有parquet标签的字段会被忽略
}
func main() {
// 创建本地文件写入器
fw, err := local.NewLocalFileWriter("output.parquet")
if err != nil {
log.Println("Can't create local file", err)
return
}
defer fw.Close()
// 创建Parquet writer
pw, err := writer.NewParquetWriter(fw, new(Student), 4)
if err != nil {
log.Println("Can't create parquet writer", err)
return
}
// 设置Parquet文件属性
pw.RowGroupSize = 128 * 1024 * 1024 // 128M
pw.CompressionType = parquet.CompressionCodec_SNAPPY
// 写入数据
students := []Student{
{Name: "张三", Age: 23, Id: 1, Weight: 60.5, Sex: true, Day: int32(time.Now().Unix() / 3600 / 24)},
{Name: "李四", Age: 24, Id: 2, Weight: 70.5, Sex: false, Day: int32(time.Now().Unix()/3600/24) + 1},
{Name: "王五", Age: 25, Id: 3, Weight: 80.5, Sex: true, Day: int32(time.Now().Unix()/3600/24) + 2},
}
for _, student := range students {
if err = pw.Write(student); err != nil {
log.Println("Write error", err)
}
}
// 停止写入并关闭
if err = pw.WriteStop(); err != nil {
log.Println("WriteStop error", err)
return
}
log.Println("Write Finished")
}
2. 读取Parquet文件
package main
import (
"log"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/reader"
)
type Student struct {
Name string `parquet:"name=name, type=UTF8"`
Age int32 `parquet:"name=age, type=INT32"`
Id int64 `parquet:"name=id, type=INT64"`
Weight float32 `parquet:"name=weight, type=FLOAT"`
Sex bool `parquet:"name=sex, type=BOOLEAN"`
Day int32 `parquet:"name=day, type=DATE"`
}
func main() {
// 打开Parquet文件
fr, err := local.NewLocalFileReader("output.parquet")
if err != nil {
log.Println("Can't open file", err)
return
}
defer fr.Close()
// 创建Parquet reader
pr, err := reader.NewParquetReader(fr, new(Student), 4)
if err != nil {
log.Println("Can't create parquet reader", err)
return
}
defer pr.ReadStop()
// 获取行数
num := int(pr.GetNumRows())
log.Printf("Total rows: %d\n", num)
// 读取数据
students := make([]Student, num)
if err = pr.Read(&students); err != nil {
log.Println("Read error", err)
return
}
// 打印数据
for i, student := range students {
log.Printf("Row %d: %+v\n", i, student)
}
}
高级特性
1. 自定义Schema
// 定义Schema
schema := `message test {
required binary name (UTF8);
required int32 age;
required int64 id;
required float weight;
required boolean sex;
required int32 day (DATE);
}`
// 使用Schema创建writer
pw, err := writer.NewParquetWriterFromWriter(fw, schema, 4)
2. 性能优化
// 增加并行度
pr, err := reader.NewParquetReader(fr, new(Student), 8) // 使用8个goroutine
// 预分配内存
students := make([]Student, 0, num) // 预分配足够容量
3. 读取部分列
// 只读取name和age列
pr.ReadPartial(&students, "name", "age")
最佳实践
- 批量写入:单次写入多条记录比多次写入单条记录更高效
- 合理设置RowGroup大小:通常128MB-1GB之间
- 选择合适的压缩算法:SNAPPY在CPU和压缩率间有较好平衡
- 列裁剪:读取时只选择需要的列
- 类型匹配:确保Go结构体类型与Parquet类型匹配
与其他库比较
parquet-go
相比其他Go Parquet库(如parquet-go
、goparquet
)的优势:
- 性能更高
- 支持更完整的Parquet特性
- 更活跃的维护
- 更好的文档和示例
希望这个指南能帮助你在Go中高效地处理Parquet文件!