golang高性能ClickHouse SQL客户端插件库clickhouse-go的使用

golang高性能ClickHouse SQL客户端插件库clickhouse-go的使用

clickhouse-go 是 ClickHouse 的 Golang SQL 数据库客户端库,提供高性能的数据访问能力。

主要特性

  • 使用 ClickHouse 原生格式以获得最佳性能
  • 支持原生 ClickHouse TCP 客户端-服务器协议
  • 兼容 database/sql 接口
  • 支持 HTTP 协议传输(实验性)
  • 支持将行数据编组到结构体(ScanStruct, Select)
  • 支持将结构体解组到行(AppendStruct)
  • 连接池
  • 故障转移和负载均衡
  • 批量写入支持
  • 异步插入支持
  • 命名和数字占位符支持
  • LZ4/ZSTD 压缩支持
  • 外部数据
  • 查询参数

安装

go get -u github.com/ClickHouse/clickhouse-go/v2

使用示例

原生接口(native interface)示例

conn, err := clickhouse.Open(&clickhouse.Options{
    Addr: []string{"127.0.0.1:9000"},
    Auth: clickhouse.Auth{
        Database: "default",
        Username: "default",
        Password: "",
    },
    DialContext: func(ctx context.Context, addr string) (net.Conn, error) {
        dialCount++
        var d net.Dialer
        return d.DialContext(ctx, "tcp", addr)
    },
    Debug: true,
    Debugf: func(format string, v ...any) {
        fmt.Printf(format+"\n", v...)
    },
    Settings: clickhouse.Settings{
        "max_execution_time": 60,
    },
    Compression: &clickhouse.Compression{
        Method: clickhouse.CompressionLZ4,
    },
    DialTimeout:      time.Second * 30,
    MaxOpenConns:     5,
    MaxIdleConns:     5,
    ConnMaxLifetime:  time.Duration(10) * time.Minute,
    ConnOpenStrategy: clickhouse.ConnOpenInOrder,
    BlockBufferSize: 10,
    MaxCompressionBuffer: 10240,
    ClientInfo: clickhouse.ClientInfo{ // 可选
        Products: []struct {
            Name    string
            Version string
        }{
            {Name: "my-app", Version: "0.1"},
        },
    },
})
if err != nil {
    return err
}
return conn.Ping(context.Background())

database/sql 接口示例

conn := clickhouse.OpenDB(&clickhouse.Options{
    Addr: []string{"127.0.0.1:9999"},
    Auth: clickhouse.Auth{
        Database: "default",
        Username: "default",
        Password: "",
    },
    TLS: &tls.Config{
        InsecureSkipVerify: true,
    },
    Settings: clickhouse.Settings{
        "max_execution_time": 60,
    },
    DialTimeout: time.Second * 30,
    Compression: &clickhouse.Compression{
        Method: clickhouse.CompressionLZ4,
    },
    Debug: true,
    BlockBufferSize: 10,
    MaxCompressionBuffer: 10240,
    ClientInfo: clickhouse.ClientInfo{ // 可选
        Products: []struct {
            Name    string
            Version string
        }{
            {Name: "my-app", Version: "0.1"},
        },
    },
})
conn.SetMaxIdleConns(5)
conn.SetMaxOpenConns(10)
conn.SetConnMaxLifetime(time.Hour)

DSN 连接字符串示例

clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60

批量写入示例

// 原生接口批量写入
batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example")
if err != nil {
    return err
}
for i := 0; i < 1000; i++ {
    err := batch.Append(
        uint32(i),
        "example string",
        []uint8{1, 2, 3},
        time.Now(),
    )
    if err != nil {
        return err
    }
}
err = batch.Send()
if err != nil {
    return err
}

// database/sql 接口批量写入
tx, err := conn.Begin()
if err != nil {
    return err
}
stmt, err := tx.Prepare("INSERT INTO example (col1, col2) VALUES (?, ?)")
if err != nil {
    return err
}
for i := 0; i < 1000; i++ {
    if _, err := stmt.Exec(i, fmt.Sprintf("value %d", i)); err != nil {
        return err
    }
}
if err := tx.Commit(); err != nil {
    return err
}

查询示例

// 原生接口查询
rows, err := conn.Query(context.Background(), "SELECT * FROM example WHERE col1 > @col1", clickhouse.Named("col1", 10))
if err != nil {
    return err
}
defer rows.Close()

for rows.Next() {
    var (
        col1 uint32
        col2 string
        col3 []uint8
        col4 time.Time
    )
    if err := rows.Scan(&col1, &col2, &col3, &col4); err != nil {
        return err
    }
    fmt.Printf("row: col1=%d, col2=%s, col3=%v, col4=%s\n", col1, col2, col3, col4)
}

// database/sql 接口查询
rows, err := conn.Query("SELECT * FROM example WHERE col1 > ?", 10)
if err != nil {
    return err
}
defer rows.Close()

for rows.Next() {
    var (
        col1 uint32
        col2 string
        col3 []uint8
        col4 time.Time
    )
    if err := rows.Scan(&col1, &col2, &col3, &col4); err != nil {
        return err
    }
    fmt.Printf("row: col1=%d, col2=%s, col3=%v, col4=%s\n", col1, col2, col3, col4)
}

性能对比

操作类型 V1 版本 V2 版本(std) V2 版本(native)
读取 1.218s 924.390ms 675.721ms
写入 1.899s 1.177s 699.203ms

支持的 ClickHouse 和 Golang 版本

支持的 ClickHouse 版本

客户端测试针对 ClickHouse 当前支持的版本。

支持的 Golang 版本

客户端版本 Golang 版本
>= 2.0 <= 2.2 1.17, 1.18
>= 2.3 1.18.4+, 1.19
>= 2.14 1.20, 1.21
>= 2.19 1.21, 1.22
>= 2.28 1.22, 1.23
>= 2.29 1.21, 1.22, 1.23, 1.24

更多示例

更多使用示例可以参考官方仓库中的示例代码,包括批量操作、异步插入、结构体映射等高级用法。


更多关于golang高性能ClickHouse SQL客户端插件库clickhouse-go的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高性能ClickHouse SQL客户端插件库clickhouse-go的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


ClickHouse Go 客户端使用指南

clickhouse-go 是 Go 语言中高性能的 ClickHouse 数据库客户端库,支持原生接口和标准 database/sql 接口。下面详细介绍其使用方法。

1. 安装

go get github.com/ClickHouse/clickhouse-go/v2

2. 基本使用

2.1 使用原生接口

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/ClickHouse/clickhouse-go/v2"
)

func main() {
	conn, err := clickhouse.Open(&clickhouse.Options{
		Addr: []string{"127.0.0.1:9000"},
		Auth: clickhouse.Auth{
			Database: "default",
			Username: "default",
			Password: "",
		},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		panic(err)
	}

	ctx := context.Background()
	if err := conn.Ping(ctx); err != nil {
		panic(err)
	}

	// 创建表
	if err := conn.Exec(ctx, `
		CREATE TABLE IF NOT EXISTS example (
			col1 UInt32,
			col2 String,
			col3 DateTime
		) ENGINE = MergeTree()
		ORDER BY (col1)
	`); err != nil {
		panic(err)
	}

	// 批量插入
	batch, err := conn.PrepareBatch(ctx, "INSERT INTO example (col1, col2, col3)")
	if err != nil {
		panic(err)
	}
	for i := 0; i < 100; i++ {
		if err := batch.Append(uint32(i), fmt.Sprintf("value_%d", i), time.Now()); err != nil {
			panic(err)
		}
	}
	if err := batch.Send(); err != nil {
		panic(err)
	}

	// 查询
	rows, err := conn.Query(ctx, "SELECT col1, col2, col3 FROM example WHERE col1 < 10")
	if err != nil {
		panic(err)
	}
	defer rows.Close()

	for rows.Next() {
		var (
			col1 uint32
			col2 string
			col3 time.Time
		)
		if err := rows.Scan(&col1, &col2, &col3); err != nil {
			panic(err)
		}
		fmt.Printf("row: col1=%d, col2=%s, col3=%s\n", col1, col2, col3)
	}
}

2.2 使用 database/sql 接口

package main

import (
	"database/sql"
	"fmt"
	"log"

	_ "github.com/ClickHouse/clickhouse-go/v2"
)

func main() {
	dsn := "clickhouse://default:@127.0.0.1:9000/default?dial_timeout=10s"
	conn, err := sql.Open("clickhouse", dsn)
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

	// 创建表
	if _, err := conn.Exec(`
		CREATE TABLE IF NOT EXISTS example_sql (
			id UInt64,
			name String,
			created_at DateTime
		) ENGINE = MergeTree()
		ORDER BY (id)
	`); err != nil {
		log.Fatal(err)
	}

	// 插入数据
	tx, err := conn.Begin()
	if err != nil {
		log.Fatal(err)
	}
	stmt, err := tx.Prepare("INSERT INTO example_sql (id, name, created_at) VALUES (?, ?, ?)")
	if err != nil {
		log.Fatal(err)
	}
	for i := 0; i < 10; i++ {
		if _, err := stmt.Exec(i, fmt.Sprintf("name_%d", i), "2023-01-01 00:00:00"); err != nil {
			log.Fatal(err)
		}
	}
	if err := tx.Commit(); err != nil {
		log.Fatal(err)
	}

	// 查询数据
	rows, err := conn.Query("SELECT id, name, created_at FROM example_sql WHERE id < 5")
	if err != nil {
		log.Fatal(err)
	}
	defer rows.Close()

	for rows.Next() {
		var (
			id        uint64
			name      string
			createdAt string
		)
		if err := rows.Scan(&id, &name, &createdAt); err != nil {
			log.Fatal(err)
		}
		fmt.Printf("id: %d, name: %s, created_at: %s\n", id, name, createdAt)
	}
}

3. 高级特性

3.1 连接池配置

conn, err := clickhouse.Open(&clickhouse.Options{
	Addr: []string{"127.0.0.1:9000"},
	Auth: clickhouse.Auth{
		Database: "default",
		Username: "default",
		Password: "",
	},
	DialTimeout:      time.Second * 5,
	MaxOpenConns:     10,  // 最大连接数
	MaxIdleConns:     5,   // 最大空闲连接数
	ConnMaxLifetime:  time.Hour, // 连接最大生命周期
	ConnOpenStrategy: clickhouse.ConnOpenInOrder, // 连接打开策略
})

3.2 压缩支持

conn, err := clickhouse.Open(&clickhouse.Options{
	Addr: []string{"127.0.0.1:9000"},
	Compression: &clickhouse.Compression{
		Method: clickhouse.CompressionLZ4, // 或 clickhouse.CompressionZSTD
		Level:  3,
	},
})

3.3 异步插入

batch, err := conn.PrepareBatch(ctx, "INSERT INTO example", clickhouse.WithAsync())
if err != nil {
	panic(err)
}
// 添加数据...
if err := batch.Send(); err != nil {
	panic(err)
}
// 不需要等待服务器响应

3.4 使用结构体映射

type Example struct {
	Col1 uint32    `ch:"col1"`
	Col2 string    `ch:"col2"`
	Col3 time.Time `ch:"col3"`
}

var results []Example
if err := conn.Select(ctx, &results, "SELECT col1, col2, col3 FROM example"); err != nil {
	panic(err)
}

4. 性能优化建议

  1. 使用批量插入:批量插入比单条插入性能高很多
  2. 启用压缩:特别是大数据量传输时
  3. 合理配置连接池:根据应用负载调整
  4. 使用原生接口:比 database/sql 接口性能更高
  5. 考虑异步插入:对写入延迟不敏感的场景

5. 错误处理

if err := conn.Exec(ctx, "SELECT 1"); err != nil {
	if exception, ok := err.(*clickhouse.Exception); ok {
		fmt.Printf("ClickHouse exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
	} else {
		fmt.Printf("Other error: %s\n", err)
	}
}

clickhouse-go 提供了丰富的功能和良好的性能,是 Go 语言中访问 ClickHouse 数据库的优选方案。根据你的具体需求选择合适的接口和配置,可以获得最佳的性能和使用体验。

回到顶部