golang高效写入Google BigQuery数据的高性能插件库bqwriter的使用

golang高效写入Google BigQuery数据的高性能插件库bqwriter的使用

bqwriter是一个Go语言包,用于以高吞吐量并发地将数据写入Google BigQuery。它默认使用InsertAll() API(基于REST API),但也可以配置使用Storage Write API(基于GRPC)。

安装

import "github.com/OTA-Insight/bqwriter"

在你的项目目录中运行:

go get github.com/OTA-Insight/bqwriter

注意:该包仍在开发中,可能会进行不兼容的更改。

支持的Go版本

目前支持Go 1.17及更高版本。

示例

基本InsertAll Streamer

import (
    "context"
    "time"

    "github.com/OTA-Insight/bqwriter"
)

// 定义行数据结构
type myRow struct {
    Timestamp time.Time
    Username  string
}

func (mr *myRow) Save() (row map[string]bigquery.Value, insertID string, err error) {
    return map[string]bigquery.Value{
        "timestamp": civil.DateTimeOf(mr.Timestamp),
        "username":  mr.Username,
    }, "", nil
}

func main() {
    ctx := context.Background()

    // 创建BQ写入客户端
    bqWriter, err := bqwriter.NewStreamer(
        ctx,
        "my-gcloud-project",
        "my-bq-dataset",
        "my-bq-table",
        nil, // 使用默认配置
    )
    if err != nil {
        panic(err)
    }
    defer bqWriter.Close()

    // 写入数据
    bqWriter.Write(&myRow{Timestamp: time.Now(), Username: "test"})
}

自定义InsertAll Streamer

import (
    "context"
    "time"

    "github.com/OTA-Insight/bqwriter"
)

func main() {
    ctx := context.Background()

    // 创建自定义配置的BQ写入客户端
    bqWriter, err := bqwriter.NewStreamer(
        ctx,
        "my-gcloud-project",
        "my-bq-dataset",
        "my-bq-table",
        &bqwriter.StreamerConfig{
            WorkerCount: 5, // 使用5个工作线程
            InsertAllClient: &bqwriter.InsertAllClientConfig{
                FailOnInvalidRows:    true,
                FailForUnknownValues: true,
            },
        },
    )
    if err != nil {
        panic(err)
    }
    defer bqWriter.Close()

    bqWriter.Write(&myRow{Timestamp: time.Now(), Username: "test"})
}

Storage Streamer

import (
    "context"

    "github.com/OTA-Insight/bqwriter"
    "google.golang.org/protobuf/reflect/protodesc"
    "path/to/my/proto/package/protodata"
)

func main() {
    ctx := context.Background()

    // 创建proto描述符
    protoDescriptor := protodesc.ToDescriptorProto((&protodata.MyCustomProtoMessage{}).ProtoReflect().Descriptor())

    // 创建Storage API驱动的BQ写入客户端
    bqWriter, err := bqwriter.NewStreamer(
        ctx,
        "my-gcloud-project",
        "my-bq-dataset",
        "my-bq-table",
        &bqwriter.StreamerConfig{
            WorkerCount: 5,
            StorageClient: &bqwriter.StorageClientConfig{
                ProtobufDescriptor: protoDescriptor,
            },
        },
    )
    if err != nil {
        panic(err)
    }
    defer bqWriter.Close()

    // 创建并填充proto消息
    msg := new(protodata.MyCustomProtoMessage)
    // TODO: 填充msg字段

    // 写入数据
    bqWriter.Write(msg)
}

Batch Streamer

import (
    "context"
    "os"
    "path/filepath"

    "github.com/OTA-Insight/bqwriter"
    "cloud.google.com/go/bigquery"
)

func main() {
    ctx := context.Background()
    
    // 创建Batch配置
    batchConfig := new(bqwriter.BatchClientConfig)
    
    // 创建Batch驱动的BQ写入客户端
    bqWriter, err := bqwriter.NewStreamer(
        ctx,
        "my-gcloud-project",
        "my-bq-dataset",
        "my-bq-table",
        &bqwriter.StreamerConfig{
            BatchClient: batchConfig,
        },
    )
    if err != nil {
        panic(err)
    }
    defer bqWriter.Close()

    // 读取数据文件
    files, err := filepath.Glob("/path/to/data/*.json")
    if err != nil {
        panic(err)
    }
    
    for _, fp := range files {
        file, err := os.Open(fp)
        if err != nil {
            panic(err)
        }

        // 写入数据到BigQuery
        err = bqWriter.Write(file)
        if err != nil {
            panic(err)
        }
    }
}

授权

bqwriter使用Google应用默认凭证进行API端点调用的授权。这允许你的应用在许多环境中运行而无需显式配置。

错误处理

当前版本的bqwriter采用"fire-and-forget"(发送即忘记)的哲学设计。实际的写入错误发生在异步工作goroutine上并且只被记录。你可以通过实现自己的日志记录器来获取这些日志。

贡献

欢迎贡献。请参阅CONTRIBUTING文档了解详情。请注意,本项目发布时附带贡献者行为准则。参与本项目即表示你同意遵守其条款。

开发者指南

测试

使用以下命令运行测试:

go test -v ./...
golangci-lint run

你还可以运行集成测试:

go run ./internal/test/integration --help

常见问题

我的insertAll streamer似乎每请求插入1行而不是批量插入,这是怎么回事?

确保你的配置符合你的带宽需求。不要使用超过你需要的worker数量(WorkerCount)。同时确保MaxBatchDelay和BatchSize值配置得当。


更多关于golang高效写入Google BigQuery数据的高性能插件库bqwriter的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高效写入Google BigQuery数据的高性能插件库bqwriter的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用bqwriter高效写入Google BigQuery数据

Google BigQuery是Google提供的强大数据分析服务,而bqwriter是一个专为Golang设计的高性能BigQuery写入库。下面我将详细介绍如何使用bqwriter高效地将数据写入BigQuery。

bqwriter简介

bqwriter是一个开源的Golang库,专为高性能BigQuery数据写入设计。它具有以下特点:

  • 支持流式写入和批量写入
  • 自动处理表结构检测和Schema管理
  • 内置缓冲和批处理机制
  • 支持并发写入
  • 自动重试失败的操作

安装bqwriter

首先安装bqwriter库:

go get github.com/viant/bqwriter

基本使用示例

1. 初始化bqwriter

package main

import (
	"context"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"github.com/viant/bqwriter"
)

type Record struct {
	Name    string
	Age     int
	City    string
	Score   float64
	IsValid bool
}

func main() {
	ctx := context.Background()
	projectID := "your-project-id"
	datasetID := "your_dataset"
	tableID := "your_table"

	// 创建BigQuery客户端
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("Failed to create BigQuery client: %v", err)
	}
	defer client.Close()

	// 初始化bqwriter
	writer, err := bqwriter.New(ctx, client, datasetID, tableID,
		bqwriter.AutoCreateTable(), // 自动创建表
		bqwriter.BatchSize(500),    // 每500条记录批量写入一次
		bqwriter.Concurrency(4),    // 使用4个并发worker
	)
	if err != nil {
		log.Fatalf("Failed to create bqwriter: %v", err)
	}
	defer writer.Close()

	// 准备要写入的数据
	records := []*Record{
		{Name: "Alice", Age: 28, City: "New York", Score: 95.5, IsValid: true},
		{Name: "Bob", Age: 35, City: "San Francisco", Score: 87.2, IsValid: true},
		// 更多记录...
	}

	// 写入数据
	if err := writer.Write(records); err != nil {
		log.Printf("Failed to write records: %v", err)
	}

	log.Println("Data written successfully")
}

2. 高级配置选项

bqwriter提供了多种配置选项来优化性能:

writer, err := bqwriter.New(ctx, client, datasetID, tableID,
    bqwriter.AutoCreateTable(),                  // 自动创建表
    bqwriter.BatchSize(1000),                   // 每1000条记录批量写入
    bqwriter.Concurrency(8),                    // 8个并发worker
    bqwriter.MaxRetries(3),                     // 最多重试3次
    bqwriter.RetryWait(time.Second * 5),        // 重试等待5秒
    bqwriter.BufferSize(5000),                  // 内存缓冲区大小
    bqwriter.SchemaUpdate(true),                // 允许自动更新schema
    bqwriter.ErrorHandler(myErrorHandler),      // 自定义错误处理
)

3. 自定义错误处理

func myErrorHandler(err error, data interface{}, attempt int) bool {
    log.Printf("Error writing data (attempt %d): %v", attempt, err)
    
    // 如果是配额错误,等待后重试
    if strings.Contains(err.Error(), "quota exceeded") {
        time.Sleep(time.Second * 10)
        return true // 返回true表示应该重试
    }
    
    // 其他错误不重试
    return false
}

性能优化建议

  1. 批量大小:根据数据大小调整BatchSize,通常500-1000条记录是一个好的起点
  2. 并发度:Concurrency设置为4-8通常能获得最佳性能,但需要根据实际环境测试
  3. 缓冲区大小:BufferSize应足够大以容纳突发流量,但不要过大导致内存问题
  4. 重试策略:对于临时性错误,配置适当的重试次数和等待时间
  5. 数据类型:使用指针字段可以更高效地处理NULL值

与原生BigQuery库对比

相比直接使用Google的BigQuery Go SDK,bqwriter提供了:

  • 更简单的API
  • 内置批处理和缓冲
  • 自动重试机制
  • 更好的性能优化选项
  • 更灵活的Schema管理

注意事项

  1. 确保服务账号有足够的BigQuery权限
  2. 监控写入速率以避免超出配额
  3. 对于大量数据写入,考虑使用BigQuery的批量加载功能
  4. 定期检查错误日志,特别是重试失败的情况

通过合理配置bqwriter,你可以实现每秒数万条记录的高效写入,显著提高数据管道的性能。

回到顶部