Golang向Couchbase插入50万条记录时性能极低的问题
Golang向Couchbase插入50万条记录时性能极低的问题 谁能帮我优化一下Couchbase数据库的性能,让我的代码运行得更快、更高效?在C++中,向同一个数据库插入50万条记录只需19秒,但我的Go代码完成同样的任务却需要大约19分钟。
以下是配置值: “NumofDoc”:100000, “ThreadCount”:5,
package main
import (
"fmt"
"gopkg.in/couchbase/gocb.v1"
"strconv"
"time"
//"io/ioutil"
//"strings"
"github.com/tkanos/gonfig"
//"os"
//"encoding/json"
"math/rand"
// "math"
)
var (
bucket *gocb.Bucket
)
type Insert_doc struct {
Thread_id int
KTAB, SyncBuffer string
}
type Configuration struct {
NumofDoc int
Username string
Password string
BucketName string
ThreadCount int
Port int
OP_TYPE int
}
func main() {
configuration := Configuration{}
err := gonfig.GetConf("Couchbase_config.json", &configuration)
fmt.Println("Config File name Passed : Couchbase_config.json")
fmt.Println("ThreadCount : ",configuration.ThreadCount)
fmt.Println("Number of Requests per thread : ",configuration.NumofDoc)
// if you have more than 5 core, make it 5 to use 5 os thread runtime.GOMAXPROCS(5)
cluster, err := gocb.Connect("couchbase://") //Connects to the cluster
if err != nil {
fmt.Println(err.Error())
return
}
cluster.Authenticate(gocb.PasswordAuthenticator{
Username: configuration.Username,
Password: configuration.Password,
})
fmt.Println("Cluster:%v",cluster)
bucket, err = cluster.OpenBucket(configuration.BucketName, "") //Connects to the bucket
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Println("Bucket:%v",bucket)
var jobs []chan int
results := make(chan interface{}, configuration.NumofDoc)
start := time.Now() // current local time
starttime := start.Unix() // number of seconds since January 1, 1970 UTC
for i := 0; i < configuration.ThreadCount; i++ {
workerJobs := make(chan int, configuration.NumofDoc) // create jobs per worker/thread
jobs = append(jobs, workerJobs)
go worker(i, workerJobs, results,configuration.OP_TYPE)
}
for _, jobs := range jobs {
for j := 1; j <= configuration.NumofDoc; j++ {
jobs <- j
}
close(jobs)
}
// wait all workers to complete their jobs
for a := 1; a <= configuration.NumofDoc*len(jobs); a++ {
//wait all the workers to return results
<-results
}
end := time.Now() // current local time
Endtime := end.Unix() // number of seconds since January 1, 1970 UTC
fmt.Printf("Script Starting time : %v \n",starttime)
fmt.Printf("Script Ending time : %v \n",Endtime)
//fmt.Println("Process finished")
}
func worker(id int, jobs <-chan int, results chan<- interface{},s int) {
// fmt.Printf("worker %d started \n", id) // do your thread measurements here
thread_start := time.Now() // current local time
Thread_Start := thread_start.Unix() // number of seconds since January 1, 1970 UTC
//fmt.Printf("Thread_%d Starting time : %v \n",id,Thread_Start)
var readproportion int
var updateproportion int
var opsSequence[100]int
operation_type := s
if operation_type == 1 {
updateproportion = 100
readproportion = 0
} else if operation_type == 2 {
updateproportion = 0
readproportion = 100
} else if operation_type == 3 {
updateproportion = 50
readproportion = 50
}
count:=0
for b := 0; b < updateproportion; b++ {
opsSequence[b] =1
count++
}
for b := 0; b < readproportion; b++ {
opsSequence[count+b]=2
}
var insertCount int64 = 0
var readCount int64 = 0
for j := range jobs {
k := j%100;
optype := opsSequence[k];
//fmt.Println("operation_type : ",optype)
var x int = int(readCount % 50000);
switch(optype){
case 1:
document := Insert_doc{Thread_id: id, KTAB: "INSERT", SyncBuffer: RandomString(10000)}
test := "Go_Demo_"+strconv.Itoa(id)+"_"+strconv.Itoa(int(insertCount))
//fmt.Println("createDocument %v",test)
createDocument(test, &document)
insertCount++;
break;
case 2:
test := "Go_Demo_"+strconv.Itoa(id)+"_"+strconv.Itoa(x)
//fmt.Println("getDocument %v",test)
getDocument(test)
readCount++;
break;
default:
fmt.Println("Invalid Operation Type ",optype)
}
results <- jobs
}
//fmt.Printf("worker %d completed \n", id)
thread_end := time.Now() // current local time
Thread_End := thread_end.Unix() // number of seconds since January 1, 1970 UTC
//fmt.Printf("Thread_%d Ending time : %v \n",id,Thread_End)
timediff := Thread_End - Thread_Start
var avgLatency float64 = float64(timediff)/float64(insertCount+readCount);
var opsPerSec float64 = 1/avgLatency;
fmt.Printf("THREAD_ID %d TOTAL WRITE : %d, TOTAL READ : %d, TOTAL OPERATION TIME : %d, AVG_LATENCY = %f S, OPS_PER_SECOND = %f \n",id, insertCount,readCount, timediff, avgLatency,opsPerSec);
}
func createDocument(documentId string, document *Insert_doc) {
//fmt.Println("Upserting a full document...")
_, error := bucket.Upsert(documentId, document, 0)
if error != nil {
fmt.Println(error.Error())
}
}
func RandomString(n int) string {
var letter = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
b := make([]rune, n)
for i := range b {
b[i] = letter[rand.Intn(len(letter))]
}
return string(b)
}
func getDocument(documentId string) {
//fmt.Println("Getting the full document by id...")
var get_data Insert_doc
_, error := bucket.Get(documentId, &get_data)
if error != nil {
fmt.Println(error.Error())
return
}
// jsonPerson, _ := json.Marshal(&person)
// fmt.Println(string(jsonPerson))
}
更多关于Golang向Couchbase插入50万条记录时性能极低的问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html
好的,我会试试这个。
更多关于Golang向Couchbase插入50万条记录时性能极低的问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
你为 NumOfDoc 和 ThreadCount 提供了什么值?
太棒了!很高兴能帮上忙 😊
sanjum080816:
我正在使用 Couchbase。
请在你那边查阅相关支持文档,同时我会审查你的新代码集。
"NumofDoc":100000,
"ThreadCount":5,
另外,一旦你完全完成,请告诉我 Go 与 C++ 的最终耗时对比。这对大家来说会是很好的信息。
一种可以尝试的调试方法是,将线程数设置为1,并在代码中各处使用 time.Now() 或记录与上一次日志的时间间隔来打印日志。检查代码的哪一部分耗时过长。这样我们就可以只专注于那一部分。
感谢Abhay,问题找到了……问题出在每次调用时随机字符串的生成上。非常感谢你的快速帮助。
我已经完成了代码审查,你能从这里测试源代码吗:https://play.golang.org/p/vR0x91wYyoe
看看有哪些运行时错误(我没有你的任何测试基准)。如果一切运行正常,我将在你的新源代码中添加注释。
我按照您的建议进行了修正,但仍然发现获取文档的速度并不快,它花费的时间与插入操作相同。我关心的是“获取文档的行为是否与插入操作相同?”因为在C++和Python中,获取文档花费的时间是插入操作的一半(如果插入需要10秒,那么获取只需要5秒)。
sanjum080816:
我关心的是“获取文档的行为是否与插入相同?”因为在C++和Python中,获取文档所需时间是插入的一半(如果插入需要10秒,那么获取需要5秒)。
您能否向我指出相关的支持文档(我相信您使用的是CouchDB)。顺便问一下,新的代码在哪里?我可以看一下吗?
抱歉!!我的错,我实际上尝试的是 wg.Add(1),粘贴了错误的代码……当我尝试 wg.Add(2) 时,正如你所说,程序挂起了。
总计数是 2*线程数本身,就像在 for 循环里一样 我在 worker 函数里处理了这个……你能帮帮我吗!!我该如何改进这一点?
随着文档大小(通过随机函数生成)的增加,性能在下降
好的,@abhayanoop。目前我已经成功完成了写入操作,使用Go插入50万条每条10KB的记录需要26秒……而在C++中只需要18秒。
然而,我使用的GetDocument是一个简单的直接获取操作,但获取操作却需要27秒,与C++和Python相比时间增加了两倍。通常读取操作应该更快,在C++和Python中,读取所花的时间是插入操作的一半(例如,如果写入是20秒,那么读取就是10秒)。
对于获取文档这部分,有什么想法或建议吗?
sanjum080816:
对于获取文档部分有什么想法或建议吗?
对于初学者来说,你应该在完成 C++ 代码翻译后,将代码重构为真正的 Go 代码。在 Go 语言中,你需要非常小心 O(n^2) 的 for 循环,因为你会在一些微处理过程中损失计算周期(在你另一个主题中提到的 []rune ↔ string 转换就是其中之一)。
另一件可以优化的事情是错误传递,而不是立即打印出来。最好将错误传递出去,而不是直接调用 fmt 来打印消息。fmt 打印使用了接口,当规模扩大时会显著降低性能(你可以通过使用 fmt.Sprintf 与 + 进行字符串连接来对比验证这一点)。
一个建议是,最初不要构建一个功能完备的代码。采用增量方式构建代码,并在开发过程中运行它,这样你就能知道是添加的哪段代码导致了输出延迟。如果Go代码执行的是相同的操作,那么它不可能比C代码慢。
复制你现有的文件,并尝试以下基础代码(使用等待组代替通道):
package main
import (
"fmt"
"sync"
"time"
)
func worker(wg *sync.WaitGroup, /* 其他参数 */) {
defer wg.Done()
// 你的工作器代码
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ { // 线程数(在你的例子中是5)
wg.Add(1)
go worker(&wg, /* 其他参数 */)
}
wg.Wait()
}
也可以尝试不使用并发。尝试在没有goroutine的情况下处理10万条记录。比较性能。
我没有深入研究代码。只是乍一看,我认为这可能是问题所在。
在这段代码中,
var wg sync.WaitGroup
for i := 0; i < configuration.ThreadCount; i++ {
wg.Add(2)
go worker(&wg,i,configuration.OP_TYPE)
}
wg.Wait()
你需要使用 wg.Add(1) 而不是 2。每次调用 goroutine 时,你需要将其增加 1。每次你在 worker 函数中调用 wg.Done() 时,计数会减少 1。
所以现在的情况是,在每次迭代中,wg(等待组)被增加了 2。因此总计数将是 2 * threadcount。然而,每个 worker 函数只调用一次 wg.Done(),现在总数变成了 threadcount 的值,而不是 0。
wg.Wait() 会一直等待,直到计数为 0。因此它需要更长的时间。
据我所知,在所有 worker 完成后,你的代码应该会卡在 wg.Wait() 处。我说得对吗?代码没有退出?
你只需要将 wg.Add(2) 改为 wg.Add(1)。我认为这应该能解决问题。
如果有效,请告诉我。
问题出在并发模型和连接管理上。以下是优化后的代码,使用连接池和批量操作:
package main
import (
"fmt"
"gopkg.in/couchbase/gocb.v1"
"strconv"
"sync"
"time"
"github.com/tkanos/gonfig"
"math/rand"
)
type Configuration struct {
NumofDoc int
Username string
Password string
BucketName string
ThreadCount int
Port int
OP_TYPE int
BatchSize int
}
type InsertDoc struct {
Thread_id int
KTAB string
SyncBuffer string
}
func main() {
configuration := Configuration{}
err := gonfig.GetConf("Couchbase_config.json", &configuration)
if err != nil {
fmt.Println("Config error:", err)
return
}
// 设置批量大小,默认1000
if configuration.BatchSize == 0 {
configuration.BatchSize = 1000
}
// 创建连接池
clusters := make([]*gocb.Cluster, configuration.ThreadCount)
buckets := make([]*gocb.Bucket, configuration.ThreadCount)
for i := 0; i < configuration.ThreadCount; i++ {
cluster, err := gocb.Connect("couchbase://localhost")
if err != nil {
fmt.Println("Cluster connect error:", err)
return
}
cluster.Authenticate(gocb.PasswordAuthenticator{
Username: configuration.Username,
Password: configuration.Password,
})
clusters[i] = cluster
bucket, err := cluster.OpenBucket(configuration.BucketName, "")
if err != nil {
fmt.Println("Bucket open error:", err)
return
}
buckets[i] = bucket
}
start := time.Now()
var wg sync.WaitGroup
docsPerThread := configuration.NumofDoc / configuration.ThreadCount
for i := 0; i < configuration.ThreadCount; i++ {
wg.Add(1)
go func(threadID int, bucket *gocb.Bucket) {
defer wg.Done()
insertBatch(threadID, bucket, docsPerThread, configuration.BatchSize)
}(i, buckets[i])
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("插入 %d 条记录耗时: %v\n", configuration.NumofDoc, elapsed)
fmt.Printf("吞吐量: %.2f ops/sec\n",
float64(configuration.NumofDoc)/elapsed.Seconds())
// 关闭连接
for _, bucket := range buckets {
bucket.Close()
}
for _, cluster := range clusters {
cluster.Close()
}
}
func insertBatch(threadID int, bucket *gocb.Bucket, totalDocs, batchSize int) {
var batch []gocb.BulkOp
var mu sync.Mutex
for i := 0; i < totalDocs; i++ {
doc := InsertDoc{
Thread_id: threadID,
KTAB: "INSERT",
SyncBuffer: RandomString(10000),
}
key := fmt.Sprintf("Go_Demo_%d_%d", threadID, i)
op := &gocb.UpsertOp{
Key: key,
Value: doc,
}
mu.Lock()
batch = append(batch, op)
// 批量执行
if len(batch) >= batchSize || i == totalDocs-1 {
err := bucket.Do(batch)
if err != nil {
fmt.Printf("Thread %d batch error: %v\n", threadID, err)
}
batch = nil // 清空批次
}
mu.Unlock()
}
}
// 使用sync.Pool优化字符串生成
var letterPool = sync.Pool{
New: func() interface{} {
return []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
},
}
func RandomString(n int) string {
letters := letterPool.Get().([]rune)
defer letterPool.Put(letters)
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
关键优化点:
- 连接池管理:每个goroutine使用独立的Couchbase连接,避免连接竞争
- 批量操作:使用
bucket.Do()进行批量插入,减少网络往返 - 内存池优化:使用
sync.Pool重用随机字符串生成的内存 - 简化并发模型:使用
sync.WaitGroup替代复杂的channel通信 - 移除同步等待:原代码中的results channel同步会严重降低性能
配置文件中添加BatchSize参数:
{
"NumofDoc": 100000,
"ThreadCount": 5,
"BatchSize": 1000,
"Username": "admin",
"Password": "password",
"BucketName": "test"
}
这个优化版本应该能将性能从19分钟提升到与C++相近的水平。
var wg sync.WaitGroup
我尝试了这种方法,并对建立Couchbase连接做了一些修改。我传入了每个文档大小为4KB,插入了50万条记录。完成这个过程需要200秒,而同样的任务在C++中只需10秒。
以下是代码:
package main
import (
"fmt"
"gopkg.in/couchbase/gocb.v1"
"strconv"
"time"
"sync"
"github.com/tkanos/gonfig"
"math/rand"
)
type Insert_doc struct {
Thread_id int
KTAB, SyncBuffer string
}
type Configuration struct {
NumofDoc int
ServerIp string
Username string
Password string
Randstr int
BucketName string
ThreadCount int
Port int
OP_TYPE int
}
func main() {
configuration := Configuration{}
_ = gonfig.GetConf("Couchbase_config.json", &configuration)
fmt.Println("Config File name Passed : Couchbase_config.json")
fmt.Println("ThreadCount : ",configuration.ThreadCount)
fmt.Println("Server IP : ",configuration.ServerIp)
fmt.Println("Number of Requests per thread : ",configuration.NumofDoc)
var wg sync.WaitGroup
for i := 0; i < configuration.ThreadCount; i++ {
wg.Add(2)
go worker(&wg,i,configuration.OP_TYPE)
}
wg.Wait()
}
func worker(wg *sync.WaitGroup,id int,s int) {
configuration := Configuration{}
_ = gonfig.GetConf("Couchbase_config.json", &configuration)
var insertCount int64 = 0
var readCount int64 = 0
var readproportion int
var updateproportion int
var opsSequence[100]int
operation_type := s
cluster, err := gocb.Connect(configuration.ServerIp) //Connects to the cluster
if err != nil {
fmt.Println(err.Error())
return
}
cluster.Authenticate(gocb.PasswordAuthenticator{
Username: configuration.Username,
Password: configuration.Password,
})
var bucket *gocb.Bucket
bucket, err = cluster.OpenBucket(configuration.BucketName, "") //Connects to the bucket
if err != nil {
fmt.Println(err.Error())
return
}
if operation_type == 1 {
updateproportion = 100
readproportion = 0
} else if operation_type == 2 {
updateproportion = 0
readproportion = 100
} else if operation_type == 3 {
updateproportion = 50
readproportion = 50
}
count:=0
for b := 0; b < updateproportion; b++ {
opsSequence[b] =1
count++
}
for b := 0; b < readproportion; b++ {
opsSequence[count+b]=2
}
Thread_Start := time.Now().Unix()
for j :=0; j < configuration.NumofDoc; j++ {
k := j%100;
optype := opsSequence[k];
var x int = int(readCount % 5000);
switch(optype){
case 1:
document := Insert_doc{Thread_id: id, KTAB: "INSERT", SyncBuffer: RandomString(configuration.Randstr)}
test := "Go_Demo_"+strconv.Itoa(id)+"_"+strconv.Itoa(int(insertCount))
createDocument(bucket,test, &document)
insertCount++;
break;
case 2:
test := "Go_Demo_"+strconv.Itoa(id)+"_"+strconv.Itoa(x)
getDocument(bucket,test)
readCount++;
break;
break;
default:
fmt.Println("Invalid Operation Type ",optype)
}
}
Thread_End := time.Now().Unix()
timediff := Thread_End - Thread_Start
var avgLatency float64 = float64(timediff)/float64(insertCount+readCount);
var opsPerSec float64 = 1/avgLatency;
fmt.Printf("THREAD_ID %d TOTAL WRITE : %d, TOTAL READ : %d, TOTAL OPERATION TIME : %d, AVG_LATENCY = %f S, OPS_PER_SECOND = %f \n",id, insertCount,readCount, timediff, avgLatency,opsPerSec);
wg.Done()
}
func createDocument(bucket *gocb.Bucket,documentId string, document *Insert_doc) {
_, error := bucket.Upsert(documentId, document, 0)
if error != nil {
fmt.Println(error.Error())
}
}
func RandomString(n int) string {
var letter = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
b := make([]rune, n)
for i := range b {
b[i] = letter[rand.Intn(len(letter))]
}
return string(b)
}
func getDocument(bucket *gocb.Bucket,documentId string) {
var get_data Insert_doc
_, error := bucket.Get(documentId, &get_data)
if error != nil {
fmt.Println(error.Error())
return
}
}


