golang高效写入Google BigQuery数据的高性能插件库bqwriter的使用

Golang高效写入Google BigQuery数据的高性能插件库bqwriter的使用

bqwriter是一个Go语言包,用于以高吞吐量并发地将数据写入Google BigQuery。它默认使用InsertAll() API(基于REST API),但也可以配置使用Storage Write API(基于GRPC)。

安装

导入bqwriter包:

import "github.com/OTA-Insight/bqwriter"

安装命令:

go get github.com/OTA-Insight/bqwriter

基本示例

基本InsertAll Streamer

import (
    "context"

    "github.com/OTA-Insight/bqwriter"
)

// 创建上下文
ctx := context.Background()

// 创建BQ写入客户端
bqWriter, err := bqwriter.NewStreamer(
    ctx,
    "my-gcloud-project",
    "my-bq-dataset",
    "my-bq-table",
    nil, // 使用默认配置
)
if err != nil {
    panic(err)
}
defer bqWriter.Close()

// 写入数据到BQ表
bqWriter.Write(&myRow{Timestamp: time.UTC().Now(), Username: "test"})

定义行数据结构:

import (
    "cloud.google.com/go/bigquery"
    "cloud.google.com/go/civil"
)

type myRow struct {
    Timestamp time.Time
    Username  string
}

func (mr *myRow) Save() (row map[string]bigquery.Value, insertID string, err error) {
    return map[string]bigquery.Value{
        "timestamp": civil.DateTimeOf(rr.Timestamp),
        "username":  mr.Username,
    }, "", nil
}

自定义InsertAll Streamer

import (
    "context"

    "github.com/OTA-Insight/bqwriter"
)

ctx := context.Background()

// 创建自定义配置的BQ写入客户端
bqWriter, err := bqwriter.NewStreamer(
    ctx,
    "my-gcloud-project",
    "my-bq-dataset",
    "my-bq-table",
    &bqwriter.StreamerConfig{
        // 使用5个工作线程
        WorkerCount: 5,
        InsertAllClient: &bqwriter.InsertAllClientConfig{
            // 对于无效/未知行/值的错误使写入失败
            FailOnInvalidRows:    true,
            FailForUnknownValues: true, 
        },
    },
)
if err != nil {
    panic(err)
}
defer bqWriter.Close()

bqWriter.Write(&myRow{Timestamp: time.UTC().Now(), Username: "test"})

Storage Streamer

import (
    "context"

    "github.com/OTA-Insight/bqwriter"
    "google.golang.org/protobuf/reflect/protodesc"

    // 定义实际预编译protobuf Go代码的路径
    "path/to/my/proto/package/protodata"
)

ctx := context.Background()

// 创建proto描述符
protoDescriptor := protodesc.ToDescriptorProto((&protodata.MyCustomProtoMessage{}).ProtoReflect().Descriptor())

// 创建BQ写入客户端
bqWriter, err := bqwriter.NewStreamer(
    ctx,
    "my-gcloud-project",
    "my-bq-dataset",
    "my-bq-table",
    &bqwriter.StreamerConfig{
        WorkerCount: 5,
        StorageClient: &bqwriter.StorageClientConfig{
            ProtobufDescriptor: protoDescriptor,
        },
    },
)
if err != nil {
    panic(err)
}
defer bqWriter.Close()

// 填充proto消息字段
msg := new(protodata.MyCustomProtoMessage)

// 写入数据
bqWriter.Write(msg)

Batch Streamer

import (
    "context"
    "path/filepath"

    "github.com/OTA-Insight/bqwriter"
    "cloud.google.com/go/bigquery"
)

func main() {
    ctx := context.Background()
	
    // 使用默认配置创建BatchClientConfig
    batchConfig := new(bqwriter.BatchClientConfig)
	
    // 创建BQ写入客户端
    bqWriter, err := bqwriter.NewStreamer(
        ctx,
        "my-gcloud-project",
        "my-bq-dataset",
        "my-bq-table",
        &bqwriter.StreamerConfig{
            BatchClient: batchConfig
        },
    )

    if err != nil {
        panic(err)
    }
    defer bqWriter.Close()

    // 从文件读取数据
    files, err := filepath.Glob("/usr/joe/my/data/path/exported_data_*.json")
    if err != nil {
        panic(err)
    }
    for _, fp := range files {
        file, err := os.Open(fp)
        if err != nil {
            panic(err)
        }

        // 写入数据到BigQuery
        err := bqWriter.Write(file)
        if err != nil {
            panic(err)
        }
    }
}

授权

bqwriter使用Google Application Default Credentials进行授权。这允许您的应用程序在许多环境中运行而无需显式配置。

性能考虑

  • 对于高性能场景,建议使用Storage API而非InsertAll API
  • 适当设置WorkerCount以匹配您的需求
  • 调整MaxBatchDelay和BatchSize以获得最佳性能
  • 使用ProtobufDescriptor而非BigQuerySchema以获得更好的性能

常见问题

InsertAll Streamer似乎每请求只插入一行而不是批量插入

确保您的配置与您的带宽需求匹配。不要使用超过您需要的工作线程数(WorkerCount),并确保MaxBatchDelay和BatchSize值配置适当。


更多关于golang高效写入Google BigQuery数据的高性能插件库bqwriter的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高效写入Google BigQuery数据的高性能插件库bqwriter的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用bqwriter高效写入Google BigQuery数据

Google BigQuery是Google提供的强大数据分析服务,而bqwriter是一个专门为Go语言设计的高性能BigQuery写入库。下面我将详细介绍如何使用bqwriter高效地将数据写入BigQuery。

bqwriter简介

bqwriter是一个开源库,专为高性能BigQuery写入场景设计,具有以下特点:

  • 支持流式写入和批量写入
  • 自动重试机制
  • 内存缓冲管理
  • 并发控制
  • 简单易用的API

安装

go get github.com/OTA-Insight/bqwriter

基本使用示例

1. 初始化客户端

package main

import (
	"context"
	"log"
	"time"

	"cloud.google.com/go/bigquery"
	"github.com/OTA-Insight/bqwriter"
)

func main() {
	ctx := context.Background()
	
	// 创建BigQuery客户端
	bqClient, err := bigquery.NewClient(ctx, "your-project-id")
	if err != nil {
		log.Fatalf("failed to create bigquery client: %v", err)
	}
	defer bqClient.Close()

	// 定义数据结构
	type User struct {
		Name      string    `bigquery:"name"`
		Age       int       `bigquery:"age"`
		CreatedAt time.Time `bigquery:"created_at"`
	}

	// 创建bqwriter实例
	writer, err := bqwriter.New(
		bqClient,
		"your-dataset-id",
		"your-table-id",
		User{},
		bqwriter.WithBufferSize(1000),      // 缓冲1000条记录
		bqwriter.WithFlushInterval(30*time.Second), // 每30秒刷新一次
		bqwriter.WithMaxRetries(3),         // 最大重试次数
		bqwriter.WithConcurrency(5),        // 并发数
	)
	if err != nil {
		log.Fatalf("failed to create bqwriter: %v", err)
	}
	defer writer.Stop()
}

2. 写入数据

// 写入单条记录
err := writer.Write(ctx, User{
	Name:      "John Doe",
	Age:       30,
	CreatedAt: time.Now(),
})
if err != nil {
	log.Printf("failed to write record: %v", err)
}

// 批量写入多条记录
users := []User{
	{Name: "Alice", Age: 25, CreatedAt: time.Now()},
	{Name: "Bob", Age: 35, CreatedAt: time.Now()},
}
err = writer.Write(ctx, users)
if err != nil {
	log.Printf("failed to write records: %v", err)
}

高级配置选项

bqwriter提供了多种配置选项来优化性能:

writer, err := bqwriter.New(
	bqClient,
	"dataset-id",
	"table-id",
	User{},
	// 缓冲大小 - 内存中保留的记录数
	bqwriter.WithBufferSize(5000),
	
	// 刷新间隔 - 定期刷新缓冲
	bqwriter.WithFlushInterval(1*time.Minute),
	
	// 最大并发数
	bqwriter.WithConcurrency(10),
	
	// 最大重试次数
	bqwriter.WithMaxRetries(5),
	
	// 重试间隔
	bqwriter.WithRetryInterval(2*time.Second),
	
	// 自定义记录ID字段(用于去重)
	bqwriter.WithRecordIDField("id"),
	
	// 忽略未知值(跳过BigQuery中不存在的字段)
	bqwriter.WithIgnoreUnknownValues(true),
	
	// 跳过无效行
	bqwriter.WithSkipInvalidRows(true),
	
	// 自定义表名后缀(用于分片表)
	bqwriter.WithTableSuffix("20230101"),
)

性能优化建议

  1. 批量写入:尽量使用批量写入而不是单条写入
  2. 适当缓冲:根据内存情况设置合理的缓冲大小
  3. 并发控制:根据网络和BigQuery配额调整并发数
  4. 重试策略:设置合理的重试次数和间隔
  5. 监控指标:利用bqwriter提供的指标监控写入性能

监控写入状态

// 获取当前缓冲中的记录数
pending := writer.Pending()

// 获取写入统计信息
stats := writer.Stats()
log.Printf("Total written: %d, failed: %d", stats.Written, stats.Failed)

// 自定义监控指标
go func() {
	for {
		select {
		case <-ctx.Done():
			return
		case <-time.After(10 * time.Second):
			stats := writer.Stats()
			log.Printf("Write stats: %+v", stats)
		}
	}
}()

错误处理

// 设置错误处理回调
writer, err := bqwriter.New(
	// ...其他参数...
	bqwriter.WithErrorHandler(func(err error, record interface{}) {
		log.Printf("failed to write record %+v: %v", record, err)
		// 可以在这里实现自定义错误处理逻辑
	}),
)

// 检查写入错误
if err := writer.Write(ctx, data); err != nil {
	if bqErr, ok := err.(*bqwriter.BulkError); ok {
		// 批量错误处理
		for _, e := range bqErr.Errors {
			log.Printf("record at index %d failed: %v", e.Index, e.Err)
		}
	} else {
		// 其他错误处理
		log.Printf("write failed: %v", err)
	}
}

总结

bqwriter为Go语言提供了高效写入BigQuery的解决方案,通过合理的配置可以显著提高数据写入性能。关键点包括:

  1. 使用缓冲减少API调用次数
  2. 设置适当的并发数
  3. 实现健壮的错误处理
  4. 监控写入性能指标
  5. 根据业务需求调整刷新策略

以上示例展示了bqwriter的基本和高级用法,您可以根据实际需求进行调整和扩展。

回到顶部