Golang循环生成500万条记录并插入Couchbase数据库的性能测试
Golang循环生成500万条记录并插入Couchbase数据库的性能测试 大家好,
我是Go语言的新手,正在为我的项目进行概念验证。由于时间限制,如果有人能提供一个示例程序,帮助我生成500万条记录并通过循环将这些记录插入到Couchbase数据库中,以测试性能,我将非常感激。
任何可以运行并检查性能的示例程序都会非常有帮助。
非常感谢您的鼎力相助,我现在已经能够通过配置多线程来插入记录了。必须感谢您及时响应并解决了问题 🙂 再次感谢。
更多关于Golang循环生成500万条记录并插入Couchbase数据库的性能测试的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
非常感谢您的及时帮助,能否请您澄清几个疑问?
- 我将
createDocument函数作为 goroutine 调用,我应该将其作为普通函数调用吗? - 当我使用您提供的代码时,我应该在何处调用我的
createDocument函数?
我已移除了所有与Couchbase相关的代码:
https://play.golang.org/p/XKUyquqMSep
########### 应该传递
jobs而不是workerJobs吗?我尝试传递jobs但出错了。
不,你应该传递 workerJobs,因为你希望每个工作线程/线程运行完全相同数量的任务。
你有什么现成的东西可以展示给我们看看吗?你需要准备:
- 一个 Couchbase 数据库安装环境
- 一个用于 Go 的 Couchbase 数据库客户端库
- 一个简单的 Go 程序,用于连接数据库并生成和插入记录
sanjum080816: 检查性能
你具体想测量什么?Go 部分不会成为问题。如果有什么会影响性能,那将是数据库插入记录的速度以及你的网络连接速度的快慢。
对数据库进行基准测试或性能测试并非易事。首先,您需要对数据库有全面的了解。您需要在给定的数据库中定义正确的模式/表/集合/索引/分析器/引擎,然后才能运行测试。如果您对数据库没有深入的了解,我建议您先进行研究,查看您使用场景中现有的基准测试,然后咨询更懂行的人。
如果这是一个兴趣项目,可以从某个地方开始,进行测试,尝试提高性能,减少资源使用,然后如果您认为这是您能做到的全部,再寻求帮助。
将 workerJobs := make(chan interface{}, jobsCount) 改为 workerJobs := make(chan Person, jobsCount)。
并在 worker 中调用 createDocument:
func worker(id int, jobs <-chan Person, results chan<- interface{}) {
// 在此处进行你的线程测量
fmt.Printf("worker %d started \n", id)
for person := range jobs {
createDocument("randomdoc", person)
results <- job
}
fmt.Printf("worker %d completed \n", id)
}
var jobs chan interface{}
必须是
var jobs chan Person
转换此代码:
for _, job := range jobs {
for j := 1; j <= jobsCount; j++ {
job <- j
}
close(job)
}
为此代码:
for _, job := range jobs {
for j := 1; j <= jobsCount; j++ {
job <- Person{FirstName: "Syeda", LastName: "Anjum"}
}
close(job)
}
以下是一个在Go语言中实现工作线程/线程的示例。在Go中,你无法直接控制操作系统线程,但你可以限制Go调度器在给定任务中仅使用指定数量的Go协程。
首先,我们需要有限数量的Go协程来达到你期望的5个并发数:
for i := 0; i < 5; i++ {
// 为每个工作线程/线程创建任务
workerJobs := make(chan interface{}, jobsCount)
jobs = append(jobs, workerJobs)
go worker(i, workerJobs, results)
}
然后将任务发送给所有已创建的工作线程:
for _, job := range jobs {
for j := 1; j <= jobsCount; j++ {
job <- j
}
close(job)
}
等待所有工作线程完成它们的任务:
// 等待所有工作线程完成它们的任务
for a := 1; a <= jobsCount*len(jobs); a++ {
//等待所有工作线程返回结果
<-results
}
我需要检查Go语言执行批量插入和从Couchbase获取操作需要多长时间。它相比其他编程语言如何更高效,以及如何通过Go实现线程生成并检查性能,就像我在C++中测试的那样:
Number of Requests per thread : 100000
Total Number of threads : 5
WRITE PROPORTION : 50%
READ PROPORTION : 50%
THREAD_ID 3 TOTAL WRITE : 50000, TOTAL READ : 50000, TOTAL OPERATION TIME : 30.13 S, AVG_LATENCY = 0.000301 S, OPS_PER_SECOND = 3319.12
THREAD_ID 2 TOTAL WRITE : 50000, TOTAL READ : 50000, TOTAL OPERATION TIME : 30.22 S, AVG_LATENCY = 0.000302 S, OPS_PER_SECOND = 3309.02
THREAD_ID 1 TOTAL WRITE : 50000, TOTAL READ : 50000, TOTAL OPERATION TIME : 30.25 S, AVG_LATENCY = 0.000303 S, OPS_PER_SECOND = 3305.39
THREAD_ID 0 TOTAL WRITE : 50000, TOTAL READ : 50000, TOTAL OPERATION TIME : 30.29 S, AVG_LATENCY = 0.000303 S, OPS_PER_SECOND = 3301.78
THREAD_ID 4 TOTAL WRITE : 50000, TOTAL READ : 50000, TOTAL OPERATION TIME : 30.31 S, AVG_LATENCY = 0.000303 S, OPS_PER_SECOND = 3298.83
我已经完成了所有这些步骤。以下是我的程序。我在数据库方面有扎实的知识,但对GOLANG是新手,编写测试程序花费的时间比我项目预定的时间线要多。您能帮我修改代码吗?我想配置5个线程,每个线程并行插入10万条记录,总共插入50万条记录。
package main
import (
"fmt"
"gopkg.in/couchbase/gocb.v1"
"strconv"
"github.com/zenthangplus/goccm"
"time"
"runtime"
)
var (
bucket *gocb.Bucket
)
type Person struct {
FirstName, LastName string
}
func main(){
cluster, err := gocb.Connect("couchbase://ip") //Connects to the cluster
if err != nil {
fmt.Println(err.Error())
return
}
cluster.Authenticate(gocb.PasswordAuthenticator{
Username: "",
Password: "",
})
fmt.Println("Cluster:%v",cluster)
bucket, err = cluster.OpenBucket("", "") //Connects to the bucket
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Println("Bucket:%v",bucket)
// Limit 3 goroutines to run concurrently.
c := goccm.New(3)
dt := time.Now()
fmt.Println("Start Current date and time is: ", dt.String())
fmt.Println(runtime.NumCPU())
for i := 1; i <= 500000; i++ {
fmt.Println("Starting new thread... %v",i )
person := Person{FirstName: "Syeda", LastName: "Anjum"}
test := "Go_Demo_"+strconv.Itoa(i)
// This function have to call before any goroutine
c.Wait()
go createDocument(test, &person)
fmt.Println("Completed thread... %v",i)
c.Done()
}
fmt.Println(runtime.NumCPU())
// This function have to call to ensure all goroutines have finished after close the main program.
c.WaitAllDone()
fmt.Println("End Current date and time is: ", dt.String())
fmt.Println(runtime.NumCPU())
}
func createDocument(documentId string, person *Person) {
fmt.Println("Upserting a full document...")
_, error := bucket.Upsert(documentId, person, 0)
if error != nil {
fmt.Println(error.Error())
return
}
fmt.Println("After Upsert fetching the document")
}
按照建议修改了我的代码,但在下面这个错误上卡住了。尝试将 j 声明为结构体,但后来无法在 for 操作中使用它。
command-line-arguments
./goprocs.go:54:8: 不能在 send 中使用 j(类型 int)作为 Person 类型
package main
import (
"fmt"
"gopkg.in/couchbase/gocb.v1"
)
var (
bucket *gocb.Bucket
)
type Person struct {
FirstName, LastName string
}
func main() {
// if you have more than 5 core, make it 5 to use 5 os thread
//runtime.GOMAXPROCS(5)
jobsCount := 50
cluster, err := gocb.Connect("couchbase://") //Connects to the cluster
if err != nil {
fmt.Println(err.Error())
return
}
cluster.Authenticate(gocb.PasswordAuthenticator{
Username: "",
Password: "",
})
fmt.Println("Cluster:%v",cluster)
bucket, err = cluster.OpenBucket("", "") //Connects to the bucket
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Println("Bucket:%v",bucket)
person := Person{FirstName: "Syeda", LastName: "Anjum"}
var jobs []chan Person
results := make(chan interface{}, jobsCount)
//jobs := make(chan int, jobsCount)
//results := make(chan int, jobsCount)
fmt.Println("value of results: %v",&results)
for i := 0; i < 5; i++ {
// create jobs per worker/thread
workerJobs := make(chan Person, jobsCount)
jobs = append(jobs, workerJobs)
go worker(i, workerJobs, results)
}
for _, job := range jobs {
for j:= 1; j <= jobsCount; j++ {
job <- j
}
close(job)
}
// wait all workers to complete their jobs
for a := 1; a <= jobsCount*len(jobs); a++ {
//wait all the workers to return results
<-results
}
fmt.Println("finished")
}
func worker(id int, jobs <-chan Person, results chan<- interface{}) {
// do your thread measurements here
fmt.Printf("worker %d started \n", id)
for person := range jobs {
createDocument("randomdoc", &person)
results <- person
}
fmt.Printf("worker %d completed \n", id)
}
func createDocument(documentId string, person *Person) {
fmt.Println("Upserting a full document...")
_, error := bucket.Upsert(documentId, person, 0)
if error != nil {
fmt.Println(error.Error())
return
}
fmt.Println("After Upsert fetching the document")
}
-
在
main()中定义的变量job在函数worker中未被识别,我使用了您提供的同一个worker函数。我将其编辑为jobs后可以工作,但只有一个文档被插入到 COUCHBASE 中。command-line-arguments
./goprocs.go:63:14: undefined: job
-
不理解下面这段代码中的
job是做什么的?for _, job := range jobs { for j := 1; j <= jobsCount; j++ { job <- Person{FirstName: "Syeda", LastName: "Anjum"} } close(job) }您能检查一下代码中提到的 ## 注释吗? 以下是完整的代码:
package main
import (
"fmt"
"gopkg.in/couchbase/gocb.v1"
)
var (
bucket *gocb.Bucket
)
type Person struct {
FirstName, LastName string
}
func main() {
// 如果你有超过5个核心,将其设为5以使用5个操作系统线程 runtime.GOMAXPROCS(5)
jobsCount := 500
cluster, err := gocb.Connect("couchbase://10.10.216.53") // 连接到集群
if err != nil {
fmt.Println(err.Error())
return
}
cluster.Authenticate(gocb.PasswordAuthenticator{
Username: "golang",
Password: "mavenir",
})
fmt.Println("Cluster:%v", cluster)
bucket, err = cluster.OpenBucket("golang", "") // 连接到桶
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Println("Bucket:%v", bucket)
var jobs []chan Person
results := make(chan interface{}, jobsCount)
for i := 0; i < 5; i++ {
// 为每个工作线程/线程创建工作
workerJobs := make(chan Person, jobsCount)
jobs = append(jobs, workerJobs)
go worker(i, workerJobs, results) // ########### 这里应该传递 jobs 而不是 workerJobs 吗?我尝试传递 jobs 但遇到了错误 ./goprocs.go:43:18: cannot use jobs (type []chan Person) as type <-chan Person in argument to worker
}
for _, job := range jobs {
for j := 1; j <= jobsCount; j++ {
job <- Person{FirstName: "Syeda", LastName: "Anjum"}
}
close(job)
}
// 等待所有工作线程完成它们的任务
for a := 1; a <= jobsCount*len(jobs); a++ {
// 等待所有工作线程返回结果
<-results
}
fmt.Println("finished")
}
func worker(id int, jobs <-chan Person, results chan<- interface{}) {
// 在这里进行你的线程测量
fmt.Printf("worker %d started \n", id)
for person := range jobs {
createDocument("randomdoc", &person)
results <- jobs
}
fmt.Printf("worker %d completed \n", id)
}
func createDocument(documentId string, person *Person) int {
//fmt.Println("Upserting a full document...")
_, error := bucket.Upsert(documentId, person, 0)
if error != nil {
fmt.Println(error.Error())
return -1
}
return 1
}
以下是一个Go语言程序示例,用于生成500万条记录并批量插入Couchbase数据库,同时包含性能测试功能:
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/couchbase/gocb/v2"
"sync/atomic"
)
type TestRecord struct {
ID string `json:"id"`
Name string `json:"name"`
Value int `json:"value"`
Timestamp int64 `json:"timestamp"`
}
func main() {
// Couchbase连接配置
cluster, err := gocb.Connect("couchbase://localhost", gocb.ClusterOptions{
Authenticator: gocb.PasswordAuthenticator{
Username: "Administrator",
Password: "password",
},
})
if err != nil {
log.Fatal(err)
}
defer cluster.Close(nil)
bucket := cluster.Bucket("test_bucket")
collection := bucket.DefaultCollection()
// 等待bucket就绪
err = bucket.WaitUntilReady(30*time.Second, nil)
if err != nil {
log.Fatal(err)
}
totalRecords := 5_000_000
batchSize := 1000
var opsCompleted int64
fmt.Printf("开始插入 %d 条记录...\n", totalRecords)
startTime := time.Now()
// 使用goroutine池并发插入
workers := 10
jobs := make(chan []TestRecord, workers)
var wg sync.WaitGroup
// 启动worker goroutines
for w := 0; w < workers; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for batch := range jobs {
insertBatch(collection, batch, &opsCompleted)
}
}(w)
}
// 生成数据并发送到jobs通道
go func() {
for i := 0; i < totalRecords; i += batchSize {
batch := make([]TestRecord, 0, batchSize)
for j := 0; j < batchSize && i+j < totalRecords; j++ {
record := TestRecord{
ID: fmt.Sprintf("record_%d", i+j),
Name: fmt.Sprintf("Test Record %d", i+j),
Value: i + j,
Timestamp: time.Now().UnixNano(),
}
batch = append(batch, record)
}
jobs <- batch
}
close(jobs)
}()
// 进度监控
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
completed := atomic.LoadInt64(&opsCompleted)
progress := float64(completed) / float64(totalRecords) * 100
elapsed := time.Since(startTime).Seconds()
rate := float64(completed) / elapsed
fmt.Printf("进度: %.2f%% | 已插入: %d | 速率: %.0f 条/秒\n",
progress, completed, rate)
}
}()
wg.Wait()
elapsed := time.Since(startTime)
fmt.Printf("\n插入完成!\n")
fmt.Printf("总耗时: %v\n", elapsed)
fmt.Printf("平均速率: %.0f 条/秒\n", float64(totalRecords)/elapsed.Seconds())
}
func insertBatch(collection *gocb.Collection, batch []TestRecord, opsCompleted *int64) {
ops := make([]gocb.BulkOp, len(batch))
for i, record := range batch {
ops[i] = &gocb.UpsertOp{
ID: record.ID,
Value: record,
}
}
err := collection.Do(ops, nil)
if err != nil {
log.Printf("批量插入错误: %v", err)
return
}
atomic.AddInt64(opsCompleted, int64(len(batch)))
}
需要安装的依赖:
go get github.com/couchbase/gocb/v2
运行前请修改以下配置:
- Couchbase连接字符串和认证信息
- bucket名称(默认为"test_bucket")
- 根据硬件调整
workers数量(并发goroutine数) - 根据需求调整
batchSize(批量大小)
性能优化建议:
- 使用
go run main.go运行程序 - 监控Couchbase服务器资源使用情况
- 调整批量大小和worker数量以获得最佳性能
- 确保网络延迟最小化
程序会实时显示插入进度和速率,完成时会输出总耗时和平均插入速率。

