golang BigQuery数据存储连接插件库bgc的使用

Golang BigQuery 数据存储连接插件库 bgc 的使用

概述

bgc 是一个用于连接 BigQuery 的 Go 语言库,兼容 Go 1.5+ 版本。它使用 SQL 模式和流式 API 作为默认的数据插入方式。

配置参数

insertMethod

控制插入方法,可以通过 config.parameters 设置:

_table_name_.insertMethod = "load"

注意:如果使用流式插入,目前不支持 UPDATE 和 DELETE 语句。

insertIdColumn

对于流式插入,可以指定用作 insertId 的列:

_table_name_.insertMethod = "stream"
_table_name_.insertIdColumn = "sessionId"

streamBatchCount

控制批处理中的行数(默认 9999)

insertWaitTimeoutInMs

插入数据时检查数据是否已添加的超时时间(默认 60 秒)

要禁用此机制,设置为:

insertWaitTimeoutInMs: -1

insertMaxRetires

当出现 503 内部错误时重试插入

datasetId

默认数据集

pageSize

默认 500,每页返回的最大数据行数

凭证配置

  1. Google 服务账户密钥

a) 设置 GOOGLE_APPLICATION_CREDENTIALS 环境变量

b) 凭证可以是放在 ~/.secret/ 文件夹中的 JSON 密钥文件名

config.yaml 示例:

driverName: bigquery
credentials: bq # 将你的 BigQuery 密钥 json 放在 ~/.secret/bg.json
parameters:
  datasetId: myDataset

c) 密钥文件的完整 URL

config.yaml 示例:

driverName: bigquery
credentials: file://tmp/secret/mySecret.json
parameters:
  datasetId: myDataset

密钥文件需要包含以下属性:

type Config struct {
	//google cloud credential
	ClientEmail  string `json:"client_email,omitempty"`
	TokenURL     string `json:"token_uri,omitempty"`
	PrivateKey   string `json:"private_key,omitempty"`
	PrivateKeyID string `json:"private_key_id,omitempty"`
	ProjectID  string `json:"project_id,omitempty"`
}
  1. 私钥 (pem)

config.yaml 示例:

driverName: bigquery
credentials: bq # 将你的 BigQuery 密钥 json 放在 ~/.secret/bg.json
parameters:
  serviceAccountId: "***@developer.gserviceaccount.com"
  datasetId: MyDataset
  projectId: spheric-arcadia-98015
  privateKeyPath: /tmp/secret/bq.pem

使用示例

以下是一个简单的数据读取和插入示例:

package main

import (
    "github.com/viant/bgc"
    "github.com/viant/dsc"
    "time"
    "fmt"
    "log"
)

type MostLikedCity struct {
	City      string
	Visits    int
	Souvenirs []string
}

type Traveler struct {
	Id            int
	Name          string
	LastVisitTime time.Time
	Achievements  []string
	MostLikedCity MostLikedCity
	VisitedCities []struct {
		City   string
		Visits int
	}
}

func main() {
    config, err := dsc.NewConfigWithParameters("bigquery", "",
    	    "bq", // google cloud secret placed in ~/.secret/bg.json
            map[string]string{
                "datasetId":"MyDataset",
            })

    if err != nil {
        log.Fatal(err)
    }
		
    factory := dsc.NewManagerFactory()
    manager, err := factory.Create(config)
    if err != nil {
        log.Fatalf("Failed to create manager %v", err)
    }
   
    traveler := Traveler{}
    success, err := manager.ReadSingle(&traveler, " SELECT id, name, lastVisitTime, visitedCities, achievements, mostLikedCity FROM travelers WHERE id = ?", []interface{}{4}, nil)
    if err != nil {
        panic(err.Error())
    }

    travelers := make([]Traveler, 0)
    err = manager.ReadAll(&travelers, "SELECT id, name, lastVisitTime, visitedCities, achievements, mostLikedCity", nil, nil)
    if err != nil {
        panic(err.Error())
    }

    inserted, updated, err := manager.PersistAll(&travelers, "travelers", nil)
    if err != nil {
        panic(err.Error())
    }
    
    // 自定义读取处理器,获取查询信息如 CacheHit, TotalRows, TotalBytesProcessed
    var resultInfo = &bgc.QueryResultInfo{}
    var perf = make(map[string]int)  
    err = manager.ReadAllWithHandler(`SELECT DATE(date), COUNT(*) FROM performance_agg WHERE DATE(date) = ? GROUP BY 1`, []interface{}{
        "2018-05-03",
        resultInfo,
    }, func(scanner dsc.Scanner) (toContinue bool, err error) {
        var date string
        var count int
        err = scanner.Scan(&date, &count)
        if err != nil {
            return false, err
        }
        perf[date] = count
        return true, nil
    })
    log.Printf("cache: %v, rows: %v, bytes: %v", resultInfo.CacheHit, resultInfo.TotalRows, resultInfo.TotalBytesProcessed)

    dialect := dsc.GetDatastoreDialect(config.DriverName)
    DDL, err := dialect.ShowCreateTable(manager, "performance_agg")
    fmt.Printf("%v %v\n", DDL, err)
}

许可证

源代码根据 Apache License, Version 2 的条款提供,详见 LICENSE 文件。

作者和贡献者

库作者: Adrian Witas
贡献者: Mikhail Berlyant


更多关于golang BigQuery数据存储连接插件库bgc的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang BigQuery数据存储连接插件库bgc的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang BigQuery 数据存储连接插件库 BGC 使用指南

BigQuery 是 Google Cloud 提供的一款强大的数据仓库服务,而 bgc 是一个用于 Go 语言连接 BigQuery 的库。下面我将详细介绍如何使用 bgc 库连接和操作 BigQuery。

安装 bgc 库

首先需要安装 bgc 库:

go get -u cloud.google.com/go/bigquery

基本使用方法

1. 初始化客户端

package main

import (
	"context"
	"fmt"
	"log"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/option"
)

func main() {
	ctx := context.Background()

	// 使用服务账号密钥文件初始化客户端
	client, err := bigquery.NewClient(ctx, "your-project-id", 
		option.WithCredentialsFile("path/to/service-account.json"))
	if err != nil {
		log.Fatalf("Failed to create client: %v", err)
	}
	defer client.Close()

	// 使用客户端进行操作...
}

2. 执行查询

func queryData(client *bigquery.Client) error {
	ctx := context.Background()
	q := client.Query(`
		SELECT name, count
		FROM ` + "`your-project-id.your_dataset.your_table`" + `
		WHERE count > @threshold
		ORDER BY count DESC
		LIMIT 10
	`)

	q.Parameters = []bigquery.QueryParameter{
		{Name: "threshold", Value: 100},
	}

	// 运行查询
	it, err := q.Read(ctx)
	if err != nil {
		return fmt.Errorf("failed to run query: %v", err)
	}

	// 处理结果
	for {
		var row struct {
			Name  string
			Count int
		}
		err := it.Next(&row)
		if err == iterator.Done {
			break
		}
		if err != nil {
			return fmt.Errorf("failed to read row: %v", err)
		}
		fmt.Printf("Name: %s, Count: %d\n", row.Name, row.Count)
	}

	return nil
}

3. 插入数据

func insertData(client *bigquery.Client) error {
	ctx := context.Background()
	inserter := client.Dataset("your_dataset").Table("your_table").Inserter()

	items := []*YourStruct{
		{Name: "Item1", Value: 10, Timestamp: time.Now()},
		{Name: "Item2", Value: 20, Timestamp: time.Now()},
	}

	if err := inserter.Put(ctx, items); err != nil {
		return fmt.Errorf("failed to insert data: %v", err)
	}
	return nil
}

4. 创建表和数据集

func createDatasetAndTable(client *bigquery.Client) error {
	ctx := context.Background()

	// 创建数据集
	dataset := client.Dataset("new_dataset")
	if err := dataset.Create(ctx, &bigquery.DatasetMetadata{
		Location: "US", // 选择合适的位置
	}); err != nil {
		return fmt.Errorf("failed to create dataset: %v", err)
	}

	// 创建表
	schema := bigquery.Schema{
		{Name: "name", Type: bigquery.StringFieldType},
		{Name: "count", Type: bigquery.IntegerFieldType},
		{Name: "created_at", Type: bigquery.TimestampFieldType},
	}
	table := dataset.Table("new_table")
	if err := table.Create(ctx, &bigquery.TableMetadata{
		Schema: schema,
	}); err != nil {
		return fmt.Errorf("failed to create table: %v", err)
	}

	return nil
}

高级功能

1. 流式插入

func streamingInsert(client *bigquery.Client) error {
	ctx := context.Background()
	inserter := client.Dataset("your_dataset").Table("your_table").Inserter()

	// 实现 ValueSaver 接口
	type Item struct {
		ID   string
		Data string
	}

	func (i *Item) Save() (map[string]bigquery.Value, string, error) {
		return map[string]bigquery.Value{
			"id":   i.ID,
			"data": i.Data,
		}, "", nil
	}

	items := []*Item{
		{ID: "1", Data: "test1"},
		{ID: "2", Data: "test2"},
	}

	return inserter.Put(ctx, items)
}

2. 使用 CSV 文件加载数据

func loadFromCSV(client *bigquery.Client) error {
	ctx := context.Background()
	gcsRef := bigquery.NewGCSReference("gs://your-bucket/your-data.csv")
	gcsRef.SourceFormat = bigquery.CSV
	gcsRef.SkipLeadingRows = 1 // 跳过标题行
	gcsRef.Schema = bigquery.Schema{
		{Name: "name", Type: bigquery.StringFieldType},
		{Name: "value", Type: bigquery.IntegerFieldType},
	}

	loader := client.Dataset("your_dataset").Table("your_table").LoaderFrom(gcsRef)
	loader.WriteDisposition = bigquery.WriteTruncate // 覆盖现有数据

	job, err := loader.Run(ctx)
	if err != nil {
		return fmt.Errorf("failed to start job: %v", err)
	}

	status, err := job.Wait(ctx)
	if err != nil {
		return fmt.Errorf("job failed: %v", err)
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}

	return nil
}

最佳实践

  1. 重用客户端:BigQuery 客户端是线程安全的,应该重用而不是为每个操作创建新客户端
  2. 处理大结果集:对于大结果集,使用迭代器模式逐步处理,避免内存问题
  3. 错误处理:始终检查错误并适当处理
  4. 资源清理:使用 defer 确保客户端和资源被正确关闭
  5. 批处理操作:对于大量写入操作,使用批处理提高效率

总结

bgc (BigQuery Go Client) 提供了完整的 BigQuery 操作接口,从简单的查询到复杂的数据加载和表管理。通过合理使用这个库,你可以在 Go 应用中高效地与 BigQuery 交互,构建强大的数据分析功能。

记得在使用前设置好 Google Cloud 项目并获取适当的服务账号凭证。对于生产环境,考虑使用更安全的凭证管理方式,如 Google Cloud 的 Secret Manager 或 Workload Identity Federation。

回到顶部