golang高效批量插入数据到ClickHouse插件库clickhouse-bulk的使用
Golang高效批量插入数据到ClickHouse插件库clickhouse-bulk的使用
简介
ClickHouse-Bulk是一个简单的Yandex ClickHouse插入收集器,它收集请求并发送到ClickHouse服务器。主要功能包括:
- 将多个请求分组并发送到任意ClickHouse服务器
- 按时间间隔发送收集的数据
- 支持VALUES、TabSeparated等格式
- 支持多服务器发送
- 支持查询参数和请求体中的查询
- 支持用户名、密码、数据库等查询参数
- 支持基本认证
安装
二进制安装
下载适合您平台的二进制文件
Docker安装
使用docker镜像
源码安装(Go 1.23+):
git clone https://github.com/nikepan/clickhouse-bulk
cd clickhouse-bulk
go build
配置示例
{
"listen": ":8124",
"flush_count": 10000, // 按\n字符检查
"flush_interval": 1000, // 毫秒
"clean_interval": 0, // 清理内部表的频率(毫秒)
"remove_query_id": true, // 某些驱动发送query_id会阻止批量插入
"dump_check_interval": 300, // 尝试发送转储的间隔(秒); -1表示禁用
"debug": false, // 记录传入请求
"dump_dir": "dumps", // 转储未发送数据的目录(如果clickhouse出错)
"clickhouse": {
"down_timeout": 60, // 服务器宕机等待时间(秒)
"connect_timeout": 10, // 服务器连接超时(秒)
"tls_server_name": "", // 覆盖TLS证书验证的serverName
"insecure_tls_skip_verify": false, // 不安全-完全跳过证书验证
"servers": [
"http://127.0.0.1:8123"
]
},
"metrics_prefix": "prefix"
}
Golang使用示例
package main
import (
"bytes"
"fmt"
"net/http"
"time"
)
func main() {
// clickhouse-bulk服务地址
bulkServer := "http://localhost:8124"
// 示例数据
data1 := "INSERT INTO table3 (c1, c2, c3) VALUES ('v1', 'v2', 'v3')"
data2 := "INSERT INTO table3 (c1, c2, c3) VALUES ('v4', 'v5', 'v6')"
// 创建HTTP客户端
client := &http.Client{
Timeout: 10 * time.Second,
}
// 发送第一个插入请求
resp1, err := client.Post(bulkServer, "text/plain", bytes.NewBufferString(data1))
if err != nil {
fmt.Printf("Error sending first request: %v\n", err)
return
}
defer resp1.Body.Close()
fmt.Println("First insert sent, status:", resp1.Status)
// 发送第二个插入请求
resp2, err := client.Post(bulkServer, "text/plain", bytes.NewBufferString(data2))
if err != nil {
fmt.Printf("Error sending second request: %v\n", err)
return
}
defer resp2.Body.Close()
fmt.Println("Second insert sent, status:", resp2.Status)
// 注意:实际clickhouse-bulk会将这两个插入合并为:
// INSERT INTO table3 (c1, c2, c3) VALUES ('v1', 'v2', 'v3')('v4', 'v5', 'v6')
}
批量插入优化示例
package main
import (
"bytes"
"fmt"
"net/http"
"time"
)
func main() {
bulkServer := "http://localhost:8124"
// 创建批量数据
var batch bytes.Buffer
for i := 0; i < 1000; i++ {
batch.WriteString(fmt.Sprintf("INSERT INTO metrics (time, value, tag) VALUES (now(), %f, 'tag%d')\n", float64(i), i%10))
}
// 发送批量请求
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Post(bulkServer, "text/plain", &batch)
if err != nil {
fmt.Printf("Error sending batch request: %v\n", err)
return
}
defer resp.Body.Close()
fmt.Println("Batch insert sent, status:", resp.Status)
}
环境变量配置
可以通过环境变量配置clickhouse-bulk:
CLICKHOUSE_BULK_DEBUG
- 启用调试日志CLICKHOUSE_SERVERS
- 逗号分隔的服务器列表CLICKHOUSE_FLUSH_COUNT
- 插入行数阈值CLICKHOUSE_FLUSH_INTERVAL
- 插入间隔CLICKHOUSE_CLEAN_INTERVAL
- 内部表清理间隔DUMP_CHECK_INTERVAL
- 重新发送转储的间隔CLICKHOUSE_DOWN_TIMEOUT
- 服务器宕机等待时间CLICKHOUSE_CONNECT_TIMEOUT
- 服务器连接超时CLICKHOUSE_TLS_SERVER_NAME
- TLS证书验证的服务器名称CLICKHOUSE_INSECURE_TLS_SKIP_VERIFY
- 跳过证书验证METRICS_PREFIX
- prometheus指标前缀
性能提示
为了获得更好的性能,FORMAT和VALUES关键字必须大写。
更多关于golang高效批量插入数据到ClickHouse插件库clickhouse-bulk的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang高效批量插入数据到ClickHouse插件库clickhouse-bulk的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用clickhouse-bulk高效批量插入数据到ClickHouse
clickhouse-bulk是一个高效的ClickHouse批量插入工具,它通过缓冲和批量处理数据来显著提高插入性能。下面我将详细介绍如何使用这个工具。
clickhouse-bulk简介
clickhouse-bulk作为中间代理服务,主要功能包括:
- 缓冲数据并批量插入
- 自动重试失败请求
- 支持多种输入格式
- 轻量级且易于部署
安装clickhouse-bulk
可以通过Docker快速安装:
docker pull propan/clickhouse-bulk
docker run -d -p 8123:8123 propan/clickhouse-bulk \
-clickhouse.url="http://clickhouse-server:8123" \
-clickhouse.db="default" \
-clickhouse.table="your_table" \
-workers=4 \
-flush.count=10000 \
-flush.interval=10s
或者下载二进制文件直接运行:
wget https://github.com/nikepan/clickhouse-bulk/releases/download/v2.4/clickhouse-bulk.linux-amd64.tgz
tar -xzvf clickhouse-bulk.linux-amd64.tgz
./clickhouse-bulk --config=config.yaml
Golang客户端示例
以下是使用Golang通过clickhouse-bulk批量插入数据的完整示例:
package main
import (
"bytes"
"fmt"
"net/http"
"time"
)
func main() {
// clickhouse-bulk服务地址
bulkURL := "http://localhost:8123/?query=INSERT%20INTO%20test_table%20FORMAT%20JSONEachRow"
// 模拟生成批量数据
var buf bytes.Buffer
for i := 0; i < 1000; i++ {
// JSONEachRow格式: 每行一个完整的JSON对象,用换行符分隔
buf.WriteString(fmt.Sprintf(`{"id":%d,"name":"user%d","created_at":"%s"}`+"\n",
i, i, time.Now().Format(time.RFC3339)))
}
// 发送HTTP POST请求
resp, err := http.Post(bulkURL, "application/x-www-form-urlencoded", &buf)
if err != nil {
panic(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
fmt.Printf("Insert failed with status: %s\n", resp.Status)
} else {
fmt.Println("Batch insert successful")
}
}
性能优化建议
-
批量大小:根据数据大小调整
-flush.count
和-flush.interval
参数- 典型值:10,000-100,000行或10-30秒间隔
-
并发工作线程:设置
-workers
参数匹配CPU核心数 -
数据格式:使用高效的格式
- JSONEachRow:灵活但稍慢
- TabSeparated:更高效但需要严格格式控制
-
网络优化:确保clickhouse-bulk与ClickHouse服务器在同一个网络
监控与管理
clickhouse-bulk提供监控端点:
curl http://localhost:8123/stat
典型输出:
{
"workers": 4,
"queue": 12,
"inserts": 123456,
"errors": 5,
"last_error": "2023-01-01T12:00:00Z"
}
配置示例
完整的config.yaml配置示例:
clickhouse:
url: "http://clickhouse-server:8123"
db: "analytics"
table: "events"
username: "user"
password: "password"
server:
listen: ":8123"
workers: 8
flush:
count: 50000
interval: "15s"
log:
level: "info"
错误处理
在Golang客户端中实现重试逻辑:
func sendWithRetry(url string, data []byte, maxRetries int) error {
var lastErr error
for i := 0; i < maxRetries; i++ {
resp, err := http.Post(url, "application/x-www-form-urlencoded", bytes.NewReader(data))
if err == nil && resp.StatusCode == http.StatusOK {
return nil
}
if err != nil {
lastErr = err
} else {
lastErr = fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status)
}
time.Sleep(time.Second * time.Duration(i+1)) // 指数退避
}
return fmt.Errorf("after %d retries, last error: %v", maxRetries, lastErr)
}
clickhouse-bulk是提高ClickHouse写入性能的有效工具,特别适合高吞吐量场景。通过合理配置和客户端优化,可以实现每秒数十万甚至百万级的写入性能。