golang高效批量插入数据到ClickHouse插件库clickhouse-bulk的使用

Golang高效批量插入数据到ClickHouse插件库clickhouse-bulk的使用

简介

ClickHouse-Bulk是一个简单的Yandex ClickHouse插入收集器,它收集请求并发送到ClickHouse服务器。主要功能包括:

  • 将多个请求分组并发送到任意ClickHouse服务器
  • 按时间间隔发送收集的数据
  • 支持VALUES、TabSeparated等格式
  • 支持多服务器发送
  • 支持查询参数和请求体中的查询
  • 支持用户名、密码、数据库等查询参数
  • 支持基本认证

安装

二进制安装

下载适合您平台的二进制文件

Docker安装

使用docker镜像

源码安装(Go 1.23+):

git clone https://github.com/nikepan/clickhouse-bulk
cd clickhouse-bulk
go build

配置示例

{
  "listen": ":8124",
  "flush_count": 10000, // 按\n字符检查
  "flush_interval": 1000, // 毫秒
  "clean_interval": 0, // 清理内部表的频率(毫秒)
  "remove_query_id": true, // 某些驱动发送query_id会阻止批量插入
  "dump_check_interval": 300, // 尝试发送转储的间隔(秒); -1表示禁用
  "debug": false, // 记录传入请求
  "dump_dir": "dumps", // 转储未发送数据的目录(如果clickhouse出错)
  "clickhouse": {
    "down_timeout": 60, // 服务器宕机等待时间(秒)
    "connect_timeout": 10, // 服务器连接超时(秒)
    "tls_server_name": "", // 覆盖TLS证书验证的serverName
    "insecure_tls_skip_verify": false, // 不安全-完全跳过证书验证
    "servers": [
      "http://127.0.0.1:8123"
    ]
  },
  "metrics_prefix": "prefix"
}

Golang使用示例

package main

import (
	"bytes"
	"fmt"
	"net/http"
	"time"
)

func main() {
	// clickhouse-bulk服务地址
	bulkServer := "http://localhost:8124"
	
	// 示例数据
	data1 := "INSERT INTO table3 (c1, c2, c3) VALUES ('v1', 'v2', 'v3')"
	data2 := "INSERT INTO table3 (c1, c2, c3) VALUES ('v4', 'v5', 'v6')"
	
	// 创建HTTP客户端
	client := &http.Client{
		Timeout: 10 * time.Second,
	}
	
	// 发送第一个插入请求
	resp1, err := client.Post(bulkServer, "text/plain", bytes.NewBufferString(data1))
	if err != nil {
		fmt.Printf("Error sending first request: %v\n", err)
		return
	}
	defer resp1.Body.Close()
	fmt.Println("First insert sent, status:", resp1.Status)
	
	// 发送第二个插入请求
	resp2, err := client.Post(bulkServer, "text/plain", bytes.NewBufferString(data2))
	if err != nil {
		fmt.Printf("Error sending second request: %v\n", err)
		return
	}
	defer resp2.Body.Close()
	fmt.Println("Second insert sent, status:", resp2.Status)
	
	// 注意:实际clickhouse-bulk会将这两个插入合并为:
	// INSERT INTO table3 (c1, c2, c3) VALUES ('v1', 'v2', 'v3')('v4', 'v5', 'v6')
}

批量插入优化示例

package main

import (
	"bytes"
	"fmt"
	"net/http"
	"time"
)

func main() {
	bulkServer := "http://localhost:8124"
	
	// 创建批量数据
	var batch bytes.Buffer
	for i := 0; i < 1000; i++ {
		batch.WriteString(fmt.Sprintf("INSERT INTO metrics (time, value, tag) VALUES (now(), %f, 'tag%d')\n", float64(i), i%10))
	}
	
	// 发送批量请求
	client := &http.Client{Timeout: 30 * time.Second}
	resp, err := client.Post(bulkServer, "text/plain", &batch)
	if err != nil {
		fmt.Printf("Error sending batch request: %v\n", err)
		return
	}
	defer resp.Body.Close()
	
	fmt.Println("Batch insert sent, status:", resp.Status)
}

环境变量配置

可以通过环境变量配置clickhouse-bulk:

  • CLICKHOUSE_BULK_DEBUG - 启用调试日志
  • CLICKHOUSE_SERVERS - 逗号分隔的服务器列表
  • CLICKHOUSE_FLUSH_COUNT - 插入行数阈值
  • CLICKHOUSE_FLUSH_INTERVAL - 插入间隔
  • CLICKHOUSE_CLEAN_INTERVAL - 内部表清理间隔
  • DUMP_CHECK_INTERVAL - 重新发送转储的间隔
  • CLICKHOUSE_DOWN_TIMEOUT - 服务器宕机等待时间
  • CLICKHOUSE_CONNECT_TIMEOUT - 服务器连接超时
  • CLICKHOUSE_TLS_SERVER_NAME - TLS证书验证的服务器名称
  • CLICKHOUSE_INSECURE_TLS_SKIP_VERIFY - 跳过证书验证
  • METRICS_PREFIX - prometheus指标前缀

性能提示

为了获得更好的性能,FORMAT和VALUES关键字必须大写。


更多关于golang高效批量插入数据到ClickHouse插件库clickhouse-bulk的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang高效批量插入数据到ClickHouse插件库clickhouse-bulk的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用clickhouse-bulk高效批量插入数据到ClickHouse

clickhouse-bulk是一个高效的ClickHouse批量插入工具,它通过缓冲和批量处理数据来显著提高插入性能。下面我将详细介绍如何使用这个工具。

clickhouse-bulk简介

clickhouse-bulk作为中间代理服务,主要功能包括:

  • 缓冲数据并批量插入
  • 自动重试失败请求
  • 支持多种输入格式
  • 轻量级且易于部署

安装clickhouse-bulk

可以通过Docker快速安装:

docker pull propan/clickhouse-bulk
docker run -d -p 8123:8123 propan/clickhouse-bulk \
    -clickhouse.url="http://clickhouse-server:8123" \
    -clickhouse.db="default" \
    -clickhouse.table="your_table" \
    -workers=4 \
    -flush.count=10000 \
    -flush.interval=10s

或者下载二进制文件直接运行:

wget https://github.com/nikepan/clickhouse-bulk/releases/download/v2.4/clickhouse-bulk.linux-amd64.tgz
tar -xzvf clickhouse-bulk.linux-amd64.tgz
./clickhouse-bulk --config=config.yaml

Golang客户端示例

以下是使用Golang通过clickhouse-bulk批量插入数据的完整示例:

package main

import (
	"bytes"
	"fmt"
	"net/http"
	"time"
)

func main() {
	// clickhouse-bulk服务地址
	bulkURL := "http://localhost:8123/?query=INSERT%20INTO%20test_table%20FORMAT%20JSONEachRow"

	// 模拟生成批量数据
	var buf bytes.Buffer
	for i := 0; i < 1000; i++ {
		// JSONEachRow格式: 每行一个完整的JSON对象,用换行符分隔
		buf.WriteString(fmt.Sprintf(`{"id":%d,"name":"user%d","created_at":"%s"}`+"\n", 
			i, i, time.Now().Format(time.RFC3339)))
	}

	// 发送HTTP POST请求
	resp, err := http.Post(bulkURL, "application/x-www-form-urlencoded", &buf)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		fmt.Printf("Insert failed with status: %s\n", resp.Status)
	} else {
		fmt.Println("Batch insert successful")
	}
}

性能优化建议

  1. 批量大小:根据数据大小调整-flush.count-flush.interval参数

    • 典型值:10,000-100,000行或10-30秒间隔
  2. 并发工作线程:设置-workers参数匹配CPU核心数

  3. 数据格式:使用高效的格式

    • JSONEachRow:灵活但稍慢
    • TabSeparated:更高效但需要严格格式控制
  4. 网络优化:确保clickhouse-bulk与ClickHouse服务器在同一个网络

监控与管理

clickhouse-bulk提供监控端点:

curl http://localhost:8123/stat

典型输出:

{
  "workers": 4,
  "queue": 12,
  "inserts": 123456,
  "errors": 5,
  "last_error": "2023-01-01T12:00:00Z"
}

配置示例

完整的config.yaml配置示例:

clickhouse:
  url: "http://clickhouse-server:8123"
  db: "analytics"
  table: "events"
  username: "user"
  password: "password"
  
server:
  listen: ":8123"
  
workers: 8
flush:
  count: 50000
  interval: "15s"
  
log:
  level: "info"

错误处理

在Golang客户端中实现重试逻辑:

func sendWithRetry(url string, data []byte, maxRetries int) error {
	var lastErr error
	
	for i := 0; i < maxRetries; i++ {
		resp, err := http.Post(url, "application/x-www-form-urlencoded", bytes.NewReader(data))
		if err == nil && resp.StatusCode == http.StatusOK {
			return nil
		}
		
		if err != nil {
			lastErr = err
		} else {
			lastErr = fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status)
		}
		
		time.Sleep(time.Second * time.Duration(i+1)) // 指数退避
	}
	
	return fmt.Errorf("after %d retries, last error: %v", maxRetries, lastErr)
}

clickhouse-bulk是提高ClickHouse写入性能的有效工具,特别适合高吞吐量场景。通过合理配置和客户端优化,可以实现每秒数十万甚至百万级的写入性能。

回到顶部