golang BigQuery数据存储连接插件库bgc的使用
Golang BigQuery 数据存储连接插件库 bgc 的使用
概述
bgc 是一个用于连接 BigQuery 的 Go 语言库,兼容 Go 1.5+ 版本。它使用 SQL 模式和流式 API 作为默认的数据插入方式。
配置参数
insertMethod
控制插入方法,可以通过 config.parameters 设置:
_table_name_.insertMethod = "load"
注意:如果使用流式插入,目前不支持 UPDATE 和 DELETE 语句。
insertIdColumn
对于流式插入,可以指定用作 insertId 的列:
_table_name_.insertMethod = "stream"
_table_name_.insertIdColumn = "sessionId"
streamBatchCount
控制批处理中的行数(默认 9999)
insertWaitTimeoutInMs
插入数据时检查数据是否已添加的超时时间(默认 60 秒)
要禁用此机制,设置为:
insertWaitTimeoutInMs: -1
insertMaxRetires
当出现 503 内部错误时重试插入
datasetId
默认数据集
pageSize
默认 500,每页返回的最大数据行数
凭证配置
- Google 服务账户密钥
a) 设置 GOOGLE_APPLICATION_CREDENTIALS 环境变量
b) 凭证可以是放在 ~/.secret/ 文件夹中的 JSON 密钥文件名
config.yaml 示例:
driverName: bigquery
credentials: bq # 将你的 BigQuery 密钥 json 放在 ~/.secret/bg.json
parameters:
datasetId: myDataset
c) 密钥文件的完整 URL
config.yaml 示例:
driverName: bigquery
credentials: file://tmp/secret/mySecret.json
parameters:
datasetId: myDataset
密钥文件需要包含以下属性:
type Config struct {
//google cloud credential
ClientEmail string `json:"client_email,omitempty"`
TokenURL string `json:"token_uri,omitempty"`
PrivateKey string `json:"private_key,omitempty"`
PrivateKeyID string `json:"private_key_id,omitempty"`
ProjectID string `json:"project_id,omitempty"`
}
- 私钥 (pem)
config.yaml 示例:
driverName: bigquery
credentials: bq # 将你的 BigQuery 密钥 json 放在 ~/.secret/bg.json
parameters:
serviceAccountId: "***@developer.gserviceaccount.com"
datasetId: MyDataset
projectId: spheric-arcadia-98015
privateKeyPath: /tmp/secret/bq.pem
使用示例
以下是一个简单的数据读取和插入示例:
package main
import (
"github.com/viant/bgc"
"github.com/viant/dsc"
"time"
"fmt"
"log"
)
type MostLikedCity struct {
City string
Visits int
Souvenirs []string
}
type Traveler struct {
Id int
Name string
LastVisitTime time.Time
Achievements []string
MostLikedCity MostLikedCity
VisitedCities []struct {
City string
Visits int
}
}
func main() {
config, err := dsc.NewConfigWithParameters("bigquery", "",
"bq", // google cloud secret placed in ~/.secret/bg.json
map[string]string{
"datasetId":"MyDataset",
})
if err != nil {
log.Fatal(err)
}
factory := dsc.NewManagerFactory()
manager, err := factory.Create(config)
if err != nil {
log.Fatalf("Failed to create manager %v", err)
}
traveler := Traveler{}
success, err := manager.ReadSingle(&traveler, " SELECT id, name, lastVisitTime, visitedCities, achievements, mostLikedCity FROM travelers WHERE id = ?", []interface{}{4}, nil)
if err != nil {
panic(err.Error())
}
travelers := make([]Traveler, 0)
err = manager.ReadAll(&travelers, "SELECT id, name, lastVisitTime, visitedCities, achievements, mostLikedCity", nil, nil)
if err != nil {
panic(err.Error())
}
inserted, updated, err := manager.PersistAll(&travelers, "travelers", nil)
if err != nil {
panic(err.Error())
}
// 自定义读取处理器,获取查询信息如 CacheHit, TotalRows, TotalBytesProcessed
var resultInfo = &bgc.QueryResultInfo{}
var perf = make(map[string]int)
err = manager.ReadAllWithHandler(`SELECT DATE(date), COUNT(*) FROM performance_agg WHERE DATE(date) = ? GROUP BY 1`, []interface{}{
"2018-05-03",
resultInfo,
}, func(scanner dsc.Scanner) (toContinue bool, err error) {
var date string
var count int
err = scanner.Scan(&date, &count)
if err != nil {
return false, err
}
perf[date] = count
return true, nil
})
log.Printf("cache: %v, rows: %v, bytes: %v", resultInfo.CacheHit, resultInfo.TotalRows, resultInfo.TotalBytesProcessed)
dialect := dsc.GetDatastoreDialect(config.DriverName)
DDL, err := dialect.ShowCreateTable(manager, "performance_agg")
fmt.Printf("%v %v\n", DDL, err)
}
许可证
源代码根据 Apache License, Version 2 的条款提供,详见 LICENSE 文件。
作者和贡献者
库作者: Adrian Witas
贡献者: Mikhail Berlyant
更多关于golang BigQuery数据存储连接插件库bgc的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang BigQuery数据存储连接插件库bgc的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang BigQuery 数据存储连接插件库 BGC 使用指南
BigQuery 是 Google Cloud 提供的一款强大的数据仓库服务,而 bgc
是一个用于 Go 语言连接 BigQuery 的库。下面我将详细介绍如何使用 bgc
库连接和操作 BigQuery。
安装 bgc 库
首先需要安装 bgc
库:
go get -u cloud.google.com/go/bigquery
基本使用方法
1. 初始化客户端
package main
import (
"context"
"fmt"
"log"
"cloud.google.com/go/bigquery"
"google.golang.org/api/option"
)
func main() {
ctx := context.Background()
// 使用服务账号密钥文件初始化客户端
client, err := bigquery.NewClient(ctx, "your-project-id",
option.WithCredentialsFile("path/to/service-account.json"))
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer client.Close()
// 使用客户端进行操作...
}
2. 执行查询
func queryData(client *bigquery.Client) error {
ctx := context.Background()
q := client.Query(`
SELECT name, count
FROM ` + "`your-project-id.your_dataset.your_table`" + `
WHERE count > @threshold
ORDER BY count DESC
LIMIT 10
`)
q.Parameters = []bigquery.QueryParameter{
{Name: "threshold", Value: 100},
}
// 运行查询
it, err := q.Read(ctx)
if err != nil {
return fmt.Errorf("failed to run query: %v", err)
}
// 处理结果
for {
var row struct {
Name string
Count int
}
err := it.Next(&row)
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("failed to read row: %v", err)
}
fmt.Printf("Name: %s, Count: %d\n", row.Name, row.Count)
}
return nil
}
3. 插入数据
func insertData(client *bigquery.Client) error {
ctx := context.Background()
inserter := client.Dataset("your_dataset").Table("your_table").Inserter()
items := []*YourStruct{
{Name: "Item1", Value: 10, Timestamp: time.Now()},
{Name: "Item2", Value: 20, Timestamp: time.Now()},
}
if err := inserter.Put(ctx, items); err != nil {
return fmt.Errorf("failed to insert data: %v", err)
}
return nil
}
4. 创建表和数据集
func createDatasetAndTable(client *bigquery.Client) error {
ctx := context.Background()
// 创建数据集
dataset := client.Dataset("new_dataset")
if err := dataset.Create(ctx, &bigquery.DatasetMetadata{
Location: "US", // 选择合适的位置
}); err != nil {
return fmt.Errorf("failed to create dataset: %v", err)
}
// 创建表
schema := bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType},
{Name: "count", Type: bigquery.IntegerFieldType},
{Name: "created_at", Type: bigquery.TimestampFieldType},
}
table := dataset.Table("new_table")
if err := table.Create(ctx, &bigquery.TableMetadata{
Schema: schema,
}); err != nil {
return fmt.Errorf("failed to create table: %v", err)
}
return nil
}
高级功能
1. 流式插入
func streamingInsert(client *bigquery.Client) error {
ctx := context.Background()
inserter := client.Dataset("your_dataset").Table("your_table").Inserter()
// 实现 ValueSaver 接口
type Item struct {
ID string
Data string
}
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
return map[string]bigquery.Value{
"id": i.ID,
"data": i.Data,
}, "", nil
}
items := []*Item{
{ID: "1", Data: "test1"},
{ID: "2", Data: "test2"},
}
return inserter.Put(ctx, items)
}
2. 使用 CSV 文件加载数据
func loadFromCSV(client *bigquery.Client) error {
ctx := context.Background()
gcsRef := bigquery.NewGCSReference("gs://your-bucket/your-data.csv")
gcsRef.SourceFormat = bigquery.CSV
gcsRef.SkipLeadingRows = 1 // 跳过标题行
gcsRef.Schema = bigquery.Schema{
{Name: "name", Type: bigquery.StringFieldType},
{Name: "value", Type: bigquery.IntegerFieldType},
}
loader := client.Dataset("your_dataset").Table("your_table").LoaderFrom(gcsRef)
loader.WriteDisposition = bigquery.WriteTruncate // 覆盖现有数据
job, err := loader.Run(ctx)
if err != nil {
return fmt.Errorf("failed to start job: %v", err)
}
status, err := job.Wait(ctx)
if err != nil {
return fmt.Errorf("job failed: %v", err)
}
if status.Err() != nil {
return fmt.Errorf("job completed with error: %v", status.Err())
}
return nil
}
最佳实践
- 重用客户端:BigQuery 客户端是线程安全的,应该重用而不是为每个操作创建新客户端
- 处理大结果集:对于大结果集,使用迭代器模式逐步处理,避免内存问题
- 错误处理:始终检查错误并适当处理
- 资源清理:使用 defer 确保客户端和资源被正确关闭
- 批处理操作:对于大量写入操作,使用批处理提高效率
总结
bgc
(BigQuery Go Client) 提供了完整的 BigQuery 操作接口,从简单的查询到复杂的数据加载和表管理。通过合理使用这个库,你可以在 Go 应用中高效地与 BigQuery 交互,构建强大的数据分析功能。
记得在使用前设置好 Google Cloud 项目并获取适当的服务账号凭证。对于生产环境,考虑使用更安全的凭证管理方式,如 Google Cloud 的 Secret Manager 或 Workload Identity Federation。