golang高效写入Google BigQuery数据的高性能插件库bqwriter的使用
golang高效写入Google BigQuery数据的高性能插件库bqwriter的使用
bqwriter是一个Go语言包,用于以高吞吐量并发地将数据写入Google BigQuery。它默认使用InsertAll() API(基于REST API),但也可以配置使用Storage Write API(基于GRPC)。
安装
import "github.com/OTA-Insight/bqwriter"
在你的项目目录中运行:
go get github.com/OTA-Insight/bqwriter
注意:该包仍在开发中,可能会进行不兼容的更改。
支持的Go版本
目前支持Go 1.17及更高版本。
示例
基本InsertAll Streamer
import (
"context"
"time"
"github.com/OTA-Insight/bqwriter"
)
// 定义行数据结构
type myRow struct {
Timestamp time.Time
Username string
}
func (mr *myRow) Save() (row map[string]bigquery.Value, insertID string, err error) {
return map[string]bigquery.Value{
"timestamp": civil.DateTimeOf(mr.Timestamp),
"username": mr.Username,
}, "", nil
}
func main() {
ctx := context.Background()
// 创建BQ写入客户端
bqWriter, err := bqwriter.NewStreamer(
ctx,
"my-gcloud-project",
"my-bq-dataset",
"my-bq-table",
nil, // 使用默认配置
)
if err != nil {
panic(err)
}
defer bqWriter.Close()
// 写入数据
bqWriter.Write(&myRow{Timestamp: time.Now(), Username: "test"})
}
自定义InsertAll Streamer
import (
"context"
"time"
"github.com/OTA-Insight/bqwriter"
)
func main() {
ctx := context.Background()
// 创建自定义配置的BQ写入客户端
bqWriter, err := bqwriter.NewStreamer(
ctx,
"my-gcloud-project",
"my-bq-dataset",
"my-bq-table",
&bqwriter.StreamerConfig{
WorkerCount: 5, // 使用5个工作线程
InsertAllClient: &bqwriter.InsertAllClientConfig{
FailOnInvalidRows: true,
FailForUnknownValues: true,
},
},
)
if err != nil {
panic(err)
}
defer bqWriter.Close()
bqWriter.Write(&myRow{Timestamp: time.Now(), Username: "test"})
}
Storage Streamer
import (
"context"
"github.com/OTA-Insight/bqwriter"
"google.golang.org/protobuf/reflect/protodesc"
"path/to/my/proto/package/protodata"
)
func main() {
ctx := context.Background()
// 创建proto描述符
protoDescriptor := protodesc.ToDescriptorProto((&protodata.MyCustomProtoMessage{}).ProtoReflect().Descriptor())
// 创建Storage API驱动的BQ写入客户端
bqWriter, err := bqwriter.NewStreamer(
ctx,
"my-gcloud-project",
"my-bq-dataset",
"my-bq-table",
&bqwriter.StreamerConfig{
WorkerCount: 5,
StorageClient: &bqwriter.StorageClientConfig{
ProtobufDescriptor: protoDescriptor,
},
},
)
if err != nil {
panic(err)
}
defer bqWriter.Close()
// 创建并填充proto消息
msg := new(protodata.MyCustomProtoMessage)
// TODO: 填充msg字段
// 写入数据
bqWriter.Write(msg)
}
Batch Streamer
import (
"context"
"os"
"path/filepath"
"github.com/OTA-Insight/bqwriter"
"cloud.google.com/go/bigquery"
)
func main() {
ctx := context.Background()
// 创建Batch配置
batchConfig := new(bqwriter.BatchClientConfig)
// 创建Batch驱动的BQ写入客户端
bqWriter, err := bqwriter.NewStreamer(
ctx,
"my-gcloud-project",
"my-bq-dataset",
"my-bq-table",
&bqwriter.StreamerConfig{
BatchClient: batchConfig,
},
)
if err != nil {
panic(err)
}
defer bqWriter.Close()
// 读取数据文件
files, err := filepath.Glob("/path/to/data/*.json")
if err != nil {
panic(err)
}
for _, fp := range files {
file, err := os.Open(fp)
if err != nil {
panic(err)
}
// 写入数据到BigQuery
err = bqWriter.Write(file)
if err != nil {
panic(err)
}
}
}
授权
bqwriter使用Google应用默认凭证进行API端点调用的授权。这允许你的应用在许多环境中运行而无需显式配置。
错误处理
当前版本的bqwriter采用"fire-and-forget"(发送即忘记)的哲学设计。实际的写入错误发生在异步工作goroutine上并且只被记录。你可以通过实现自己的日志记录器来获取这些日志。
贡献
欢迎贡献。请参阅CONTRIBUTING文档了解详情。请注意,本项目发布时附带贡献者行为准则。参与本项目即表示你同意遵守其条款。
开发者指南
测试
使用以下命令运行测试:
go test -v ./...
golangci-lint run
你还可以运行集成测试:
go run ./internal/test/integration --help
常见问题
我的insertAll streamer似乎每请求插入1行而不是批量插入,这是怎么回事?
确保你的配置符合你的带宽需求。不要使用超过你需要的worker数量(WorkerCount)。同时确保MaxBatchDelay和BatchSize值配置得当。
更多关于golang高效写入Google BigQuery数据的高性能插件库bqwriter的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang高效写入Google BigQuery数据的高性能插件库bqwriter的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用bqwriter高效写入Google BigQuery数据
Google BigQuery是Google提供的强大数据分析服务,而bqwriter是一个专为Golang设计的高性能BigQuery写入库。下面我将详细介绍如何使用bqwriter高效地将数据写入BigQuery。
bqwriter简介
bqwriter是一个开源的Golang库,专为高性能BigQuery数据写入设计。它具有以下特点:
- 支持流式写入和批量写入
- 自动处理表结构检测和Schema管理
- 内置缓冲和批处理机制
- 支持并发写入
- 自动重试失败的操作
安装bqwriter
首先安装bqwriter库:
go get github.com/viant/bqwriter
基本使用示例
1. 初始化bqwriter
package main
import (
"context"
"log"
"os"
"cloud.google.com/go/bigquery"
"github.com/viant/bqwriter"
)
type Record struct {
Name string
Age int
City string
Score float64
IsValid bool
}
func main() {
ctx := context.Background()
projectID := "your-project-id"
datasetID := "your_dataset"
tableID := "your_table"
// 创建BigQuery客户端
client, err := bigquery.NewClient(ctx, projectID)
if err != nil {
log.Fatalf("Failed to create BigQuery client: %v", err)
}
defer client.Close()
// 初始化bqwriter
writer, err := bqwriter.New(ctx, client, datasetID, tableID,
bqwriter.AutoCreateTable(), // 自动创建表
bqwriter.BatchSize(500), // 每500条记录批量写入一次
bqwriter.Concurrency(4), // 使用4个并发worker
)
if err != nil {
log.Fatalf("Failed to create bqwriter: %v", err)
}
defer writer.Close()
// 准备要写入的数据
records := []*Record{
{Name: "Alice", Age: 28, City: "New York", Score: 95.5, IsValid: true},
{Name: "Bob", Age: 35, City: "San Francisco", Score: 87.2, IsValid: true},
// 更多记录...
}
// 写入数据
if err := writer.Write(records); err != nil {
log.Printf("Failed to write records: %v", err)
}
log.Println("Data written successfully")
}
2. 高级配置选项
bqwriter提供了多种配置选项来优化性能:
writer, err := bqwriter.New(ctx, client, datasetID, tableID,
bqwriter.AutoCreateTable(), // 自动创建表
bqwriter.BatchSize(1000), // 每1000条记录批量写入
bqwriter.Concurrency(8), // 8个并发worker
bqwriter.MaxRetries(3), // 最多重试3次
bqwriter.RetryWait(time.Second * 5), // 重试等待5秒
bqwriter.BufferSize(5000), // 内存缓冲区大小
bqwriter.SchemaUpdate(true), // 允许自动更新schema
bqwriter.ErrorHandler(myErrorHandler), // 自定义错误处理
)
3. 自定义错误处理
func myErrorHandler(err error, data interface{}, attempt int) bool {
log.Printf("Error writing data (attempt %d): %v", attempt, err)
// 如果是配额错误,等待后重试
if strings.Contains(err.Error(), "quota exceeded") {
time.Sleep(time.Second * 10)
return true // 返回true表示应该重试
}
// 其他错误不重试
return false
}
性能优化建议
- 批量大小:根据数据大小调整BatchSize,通常500-1000条记录是一个好的起点
- 并发度:Concurrency设置为4-8通常能获得最佳性能,但需要根据实际环境测试
- 缓冲区大小:BufferSize应足够大以容纳突发流量,但不要过大导致内存问题
- 重试策略:对于临时性错误,配置适当的重试次数和等待时间
- 数据类型:使用指针字段可以更高效地处理NULL值
与原生BigQuery库对比
相比直接使用Google的BigQuery Go SDK,bqwriter提供了:
- 更简单的API
- 内置批处理和缓冲
- 自动重试机制
- 更好的性能优化选项
- 更灵活的Schema管理
注意事项
- 确保服务账号有足够的BigQuery权限
- 监控写入速率以避免超出配额
- 对于大量数据写入,考虑使用BigQuery的批量加载功能
- 定期检查错误日志,特别是重试失败的情况
通过合理配置bqwriter,你可以实现每秒数万条记录的高效写入,显著提高数据管道的性能。