golang高效读写Parquet文件插件库parquet的使用

Golang高效读写Parquet文件插件库parquet的使用

概述

Parquet是一个基于Go结构体生成Parquet读写器的库。它可以根据您定义的结构体或者通过读取现有Parquet文件生成的结构体来工作。

请注意:如果您基于现有Parquet文件生成代码,会有一些限制:每个PageHeader的PageType必须是DATA_PAGE,ColumnMetaData中定义的Codec必须是PLAIN或SNAPPY。此外,Parquet文件的模式必须由当前支持的类型组成。

安装

go get -u github.com/parsyl/parquet/...

这将同时安装parquet的唯一两个依赖项:thrift和snappy。

使用示例

1. 定义结构体

首先定义一个结构体,用于写入Parquet数据:

type Person struct {
  	ID  int32  `parquet:"id"`
	Age *int32 `parquet:"age"`
}

2. 添加生成指令

在代码中添加go:generate注释:

// go:generate parquetgen -input main.go -type Person -package main

3. 生成读写器代码

运行以下命令生成读写器代码:

$ go generate

这将生成一个新的文件(parquet.go),其中定义了ParquetWriter和ParquetReader。

4. 使用生成的读写器

package main

import (
    "bytes"
    "encoding/json"
)

func main() {
    var buf bytes.Buffer
    w, err := NewParquetWriter(&buf)
    if err != nil {
        log.Fatal(err)
    }

    w.Add(Person{ID: 1, Age: getAge(30)})
    w.Add(Person{ID: 2})

    // 每次Write调用都会创建一个新的parquet行组
    if err := w.Write(); err != nil {
        log.Fatal(err)
    }

    // 完成后必须调用Close,它会在文件末尾写入parquet元数据
    if err := w.Close(); err != nil {
        log.Fatal(err)
    }

    r, err := NewParquetReader(bytes.NewReader(buf.Bytes()))
    if err != nil {
        log.Fatal(err)
    }

    enc := json.NewEncoder(os.Stdout)
    for r.Next() {
        var p Person
        r.Scan(&p)
        enc.Encode(p)
    }

    if err := r.Error(); err != nil {
        log.Fatal(err)
    }
}

func getAge(a int32) *int32 { return &a }

5. 可选参数

NewParquetWriter有几个可选参数:MaxPageSize、Uncompressed和Snappy。例如:

w, err := NewParquetWriter(&buf, MaxPageSize(10000), Snappy)

支持的类型

用于定义Parquet数据的结构体可以包含以下类型:

int32
uint32
int64
uint64
float32
float64
string
bool

这些类型可以是指针,表示数据是可选的。结构体还可以嵌入另一个结构体:

type Being struct {
	ID  int32  `parquet:"id"`
	Age *int32 `parquet:"age"`
}

type Person struct {
	Being
	Username string `parquet:"username"`
}

也支持嵌套和重复的结构体:

type Being struct {
	ID  int32  `parquet:"id"`
	Age *int32 `parquet:"age"`
}

type Person struct {
	Being    Being
	Username string `parquet:"username"`
	Friends  []Being
}

如果您想排除某个字段不写入Parquet,可以用破折号标记它或使其不被导出:

type Being struct {
  	ID  int32  `parquet:"id"`
	Password string`parquet:"-"` //不会写入parquet
	age int32                    //不会写入parquet
}

parquetgen工具

parquetgen是go generate应该调用的命令,用于为您的自定义类型生成代码。它还可以打印Parquet文件的页眉和文件元数据:

$ parquetgen --help
Usage of parquetgen:
  -ignore
        ignore unsupported fields in -type, otherwise log.Fatal is called when an unsupported type is encountered (default true)
  -import string
        import statement of -type if it doesn't live in -package
  -input string
        path to the go file that defines -type
  -metadata
        print the metadata of a parquet file (-parquet) and exit
  -output string
        name of the file that is produced, defaults to parquet.go (default "parquet.go")
  -package string
        package of the generated code
  -pageheaders
        print the page headers of a parquet file (-parquet) and exit (also prints the metadata)
  -parquet string
        path to a parquet file (if you are generating code based on an existing parquet file or printing the file metadata or page headers)
  -struct-output string
        name of the file that is produced, defaults to parquet.go (default "generated_struct.go")
  -type string
        name of the struct that will used for writing and reading

更多关于golang高效读写Parquet文件插件库parquet的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高效读写Parquet文件插件库parquet的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用Go高效读写Parquet文件 - parquet-go库详解

Parquet是一种高效的列式存储格式,特别适合大数据处理场景。在Go生态中,parquet-go是一个高性能的Parquet文件读写库。

安装parquet-go

go get github.com/xitongsys/parquet-go
go get github.com/xitongsys/parquet-go-source

基本使用示例

1. 写入Parquet文件

package main

import (
	"log"
	"os"
	"time"

	"github.com/xitongsys/parquet-go-source/local"
	"github.com/xitongsys/parquet-go/parquet"
	"github.com/xitongsys/parquet-go/writer"
)

type Student struct {
	Name    string  `parquet:"name=name, type=UTF8"`
	Age     int32   `parquet:"name=age, type=INT32"`
	Id      int64   `parquet:"name=id, type=INT64"`
	Weight  float32 `parquet:"name=weight, type=FLOAT"`
	Sex     bool    `parquet:"name=sex, type=BOOLEAN"`
	Day     int32   `parquet:"name=day, type=DATE"`
	Ignored int32   // 没有parquet标签的字段会被忽略
}

func main() {
	// 创建本地文件写入器
	fw, err := local.NewLocalFileWriter("output.parquet")
	if err != nil {
		log.Println("Can't create local file", err)
		return
	}
	defer fw.Close()

	// 创建Parquet writer
	pw, err := writer.NewParquetWriter(fw, new(Student), 4)
	if err != nil {
		log.Println("Can't create parquet writer", err)
		return
	}

	// 设置Parquet文件属性
	pw.RowGroupSize = 128 * 1024 * 1024 // 128M
	pw.CompressionType = parquet.CompressionCodec_SNAPPY

	// 写入数据
	students := []Student{
		{Name: "张三", Age: 23, Id: 1, Weight: 60.5, Sex: true, Day: int32(time.Now().Unix() / 3600 / 24)},
		{Name: "李四", Age: 24, Id: 2, Weight: 70.5, Sex: false, Day: int32(time.Now().Unix()/3600/24) + 1},
		{Name: "王五", Age: 25, Id: 3, Weight: 80.5, Sex: true, Day: int32(time.Now().Unix()/3600/24) + 2},
	}

	for _, student := range students {
		if err = pw.Write(student); err != nil {
			log.Println("Write error", err)
		}
	}

	// 停止写入并关闭
	if err = pw.WriteStop(); err != nil {
		log.Println("WriteStop error", err)
		return
	}

	log.Println("Write Finished")
}

2. 读取Parquet文件

package main

import (
	"log"

	"github.com/xitongsys/parquet-go-source/local"
	"github.com/xitongsys/parquet-go/reader"
)

type Student struct {
	Name   string  `parquet:"name=name, type=UTF8"`
	Age    int32   `parquet:"name=age, type=INT32"`
	Id     int64   `parquet:"name=id, type=INT64"`
	Weight float32 `parquet:"name=weight, type=FLOAT"`
	Sex    bool    `parquet:"name=sex, type=BOOLEAN"`
	Day    int32   `parquet:"name=day, type=DATE"`
}

func main() {
	// 打开Parquet文件
	fr, err := local.NewLocalFileReader("output.parquet")
	if err != nil {
		log.Println("Can't open file", err)
		return
	}
	defer fr.Close()

	// 创建Parquet reader
	pr, err := reader.NewParquetReader(fr, new(Student), 4)
	if err != nil {
		log.Println("Can't create parquet reader", err)
		return
	}
	defer pr.ReadStop()

	// 获取行数
	num := int(pr.GetNumRows())
	log.Printf("Total rows: %d\n", num)

	// 读取数据
	students := make([]Student, num)
	if err = pr.Read(&students); err != nil {
		log.Println("Read error", err)
		return
	}

	// 打印数据
	for i, student := range students {
		log.Printf("Row %d: %+v\n", i, student)
	}
}

高级特性

1. 自定义Schema

// 定义Schema
schema := `message test {
    required binary name (UTF8);
    required int32 age;
    required int64 id;
    required float weight;
    required boolean sex;
    required int32 day (DATE);
}`

// 使用Schema创建writer
pw, err := writer.NewParquetWriterFromWriter(fw, schema, 4)

2. 性能优化

// 增加并行度
pr, err := reader.NewParquetReader(fr, new(Student), 8) // 使用8个goroutine

// 预分配内存
students := make([]Student, 0, num) // 预分配足够容量

3. 读取部分列

// 只读取name和age列
pr.ReadPartial(&students, "name", "age")

最佳实践

  1. 批量写入:单次写入多条记录比多次写入单条记录更高效
  2. 合理设置RowGroup大小:通常128MB-1GB之间
  3. 选择合适的压缩算法:SNAPPY在CPU和压缩率间有较好平衡
  4. 列裁剪:读取时只选择需要的列
  5. 类型匹配:确保Go结构体类型与Parquet类型匹配

与其他库比较

parquet-go相比其他Go Parquet库(如parquet-gogoparquet)的优势:

  • 性能更高
  • 支持更完整的Parquet特性
  • 更活跃的维护
  • 更好的文档和示例

希望这个指南能帮助你在Go中高效地处理Parquet文件!

回到顶部