Golang文档增大导致性能下降问题探讨

Golang文档增大导致性能下降问题探讨 大家好,

有人能帮我理解如何处理Go语言的性能问题吗? 我正在向Couchbase数据库中插入50万份文档(尝试了通道和WaitGroup两种方法)。

每份文档4KB(总计50万条记录)- 191秒 每份文档1KB(总计50万条记录)- 53秒

性能差异巨大。而在C++中,相同操作分别耗时7秒和11秒。

以下是我的代码:

package main
import (
    "fmt"
    "gopkg.in/couchbase/gocb.v1"
    "strconv"
    "time"
    "sync"
    "github.com/tkanos/gonfig"
    "math/rand"
)   
    
type Insert_doc struct {
    Thread_id int 
    KTAB, SyncBuffer string
} 

type Configuration struct {
    NumofDoc  int
    ServerIp  string
    Username   string
    Password   string
    Randstr   int
    BucketName string
    ThreadCount int
    Port int
    OP_TYPE int
}
func main() {
    configuration := Configuration{}
    _ = gonfig.GetConf("Couchbase_config.json", &configuration)
    fmt.Println("Config File name Passed :  Couchbase_config.json")
    fmt.Println("ThreadCount : ",configuration.ThreadCount)  //ThreadCount:2
    fmt.Println("Server IP : ",configuration.ServerIp)
    fmt.Println("Number of Requests per thread : ",configuration.NumofDoc) //NumofDoc: 250000
    var wg sync.WaitGroup 
    for i := 0; i < configuration.ThreadCount; i++ {   //ThreadCount : 2
        wg.Add(1)
        go worker(&wg,i,configuration.OP_TYPE)
    }

        wg.Wait()
}

func worker(wg *sync.WaitGroup,id int,s int) {
    configuration := Configuration{}
    _ = gonfig.GetConf("Couchbase_config.json", &configuration)
    var insertCount int64 = 0
    var readCount int64 = 0
    var readproportion int
    var updateproportion int
    var opsSequence[100]int
    operation_type := s
    cluster, err := gocb.Connect(configuration.ServerIp) //Connects to the cluster

    if err != nil {
        fmt.Println(err.Error())
        return
    }

    cluster.Authenticate(gocb.PasswordAuthenticator{
        Username: configuration.Username,
        Password: configuration.Password,
    })

    var bucket *gocb.Bucket
    bucket, err = cluster.OpenBucket(configuration.BucketName, "") //Connects to the bucket
    
    if err != nil {
    fmt.Println(err.Error())
    return
    }

    if operation_type == 1 {
        updateproportion = 100
        readproportion = 0
    } else if operation_type == 2 {
        updateproportion = 0
        readproportion = 100
    } else if operation_type == 3 {
        updateproportion = 50
        readproportion = 50
    }

    count:=0
    for b := 0; b < updateproportion; b++ {
        opsSequence[b] =1
        count++
    }

    for b := 0; b < readproportion; b++ {
        opsSequence[count+b]=2
    }
    
    Thread_Start := time.Now().Unix()
    for j :=0; j < configuration.NumofDoc; j++ {   //NumofDoc : 250000
       k := j%100;
       optype := opsSequence[k];
       var x int = int(readCount % 5000);
       switch(optype){
           case 1:
               document := Insert_doc{Thread_id: id, KTAB: "INSERT", SyncBuffer: RandomString(configuration.Randstr)} // Randstr - 4000
               test := "Go_Demo_"+strconv.Itoa(id)+"_"+strconv.Itoa(int(insertCount))
               createDocument(bucket,test, &document)
               insertCount++;
               break;
           case 2:
               test := "Go_Demo_"+strconv.Itoa(id)+"_"+strconv.Itoa(x)
               getDocument(bucket,test) 
               readCount++;
               break;

           break;
               default:
               fmt.Println("Invalid Operation Type ",optype)
       }
    }

    Thread_End := time.Now().Unix()
    timediff := Thread_End - Thread_Start
    var avgLatency float64 = float64(timediff)/float64(insertCount+readCount);
    var opsPerSec float64 = 1/avgLatency;
    fmt.Printf("THREAD_ID %d TOTAL WRITE : %d, TOTAL READ : %d, TOTAL OPERATION TIME : %d, AVG_LATENCY = %f S, OPS_PER_SECOND = %f \n",id, insertCount,readCount, timediff, avgLatency,opsPerSec);
    wg.Done()
}

func createDocument(bucket *gocb.Bucket,documentId string, document *Insert_doc) {
    _, error := bucket.Upsert(documentId, document, 0)
    if error != nil {
    fmt.Println(error.Error())
    }
}

func RandomString(n int) string {
    var letter = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
    b := make([]rune, n)
    for i := range b {
    b[i] = letter[rand.Intn(len(letter))]
    }
    return string(b)
}

func getDocument(bucket *gocb.Bucket,documentId string) {
    var get_data Insert_doc
    _, error := bucket.Get(documentId, &get_data)
    if error != nil {
    fmt.Println(error.Error())
    return
    }

}

更多关于Golang文档增大导致性能下降问题探讨的实战教程也可以访问 https://www.itying.com/category-94-b0.html

5 回复

谢谢Hooloway,是的,就是这个问题。现在已经解决了。非常感谢 😊

更多关于Golang文档增大导致性能下降问题探讨的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


你能否告诉我们哪个修复对性能影响最大?是你指出的变量声明还是其他建议?你现在的性能如何(#秒)?

谢谢!

sanjum080816:

var letter = rune(“abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789”)

另外,将其移至常量组。你不需要在每次迭代中都为同一个字符串声明变量。再加上 rune 转换和 O(n^2) 的影响,这完全是在浪费计算周期。

var letter = rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")

两个发现的陷阱和一些问题:


陷阱

O(n^2)

在你的 for j :=0; j < configuration.NumofDoc; j++ { //NumofDoc : 250000 循环中,由于 RandomString(..) 的存在,存在一个 O(n^2) 的复杂度。如果 configuration.Randstr 不是常量,当它的值很大时,你会看到巨大的性能差异。

低效地使用 rune 类型(先做这个修改,然后重新进行基准测试)

其次,由于随机字符串字符是 ASCII 兼容的,没有必要使用 rune。只有当你的字符列表包含特殊字符(例如日语、中文、印地语、俄语等)时才需要使用 rune。这里有一个使用 []byte 生成随机性的例子:https://gist.github.com/dopey/c69559607800d2f2f90b1b1ed4e550fb#file-main-go-L31

供参考,一个 rune 值是一个“多字节”值。即使作为字符串处理器,使用 []rune[]byte 之间的性能差异也是巨大的。


问题

并行性?

C++ 代码是否以并行方式运行?如果是,你需要设置 Go 以并行方式运行。Go 是并发优先的。请参阅:

  1. https://www.ardanlabs.com/blog/2014/01/concurrency-goroutines-and-gomaxprocs.html
  2. https://stackoverflow.com/questions/44039223/golang-why-are-goroutines-not-running-in-parallel
  3. https://stackoverflow.com/questions/52975260/concurrency-vs-parallelism-when-executing-a-goroutine
  4. https://blog.golang.org/waza-talk
  5. https://golang.org/pkg/runtime/#GOMAXPROCS

在相同的硬件和操作系统上运行?

虽然听起来有点傻,但为了再次确认,C++ 和 Go 都是无符号编译版本,运行在相同的硬件、相同的操作系统以及相同的网络配置上,对吗?


补充说明

如果以上问题都已解决,你仍然得到较慢的结果,那么问题很可能出在包的集成上。在这种情况下,你需要模拟每个包,并确定是哪个包导致了性能问题。

然后,根据你的调查结果继续推进工作。

性能下降的主要原因是文档增大导致序列化开销增加和网络传输时间增长。以下是具体分析和优化方案:

问题分析

  1. 序列化开销:4KB文档的JSON序列化/反序列化时间显著增加
  2. 网络传输:更大的文档需要更多网络传输时间
  3. 内存分配:更大的字符串导致更多的内存分配和GC压力

优化方案

1. 使用连接池和批处理

type BatchWorker struct {
    bucket *gocb.Bucket
    docs   []gocb.BulkOp
    mu     sync.Mutex
    size   int
}

func NewBatchWorker(bucket *gocb.Bucket, batchSize int) *BatchWorker {
    return &BatchWorker{
        bucket: bucket,
        docs:   make([]gocb.BulkOp, 0, batchSize),
        size:   batchSize,
    }
}

func (bw *BatchWorker) AddInsert(id string, doc interface{}) error {
    bw.mu.Lock()
    defer bw.mu.Unlock()
    
    bw.docs = append(bw.docs, &gocb.InsertOp{
        Key:   id,
        Value: doc,
    })
    
    if len(bw.docs) >= bw.size {
        return bw.Flush()
    }
    return nil
}

func (bw *BatchWorker) Flush() error {
    bw.mu.Lock()
    defer bw.mu.Unlock()
    
    if len(bw.docs) == 0 {
        return nil
    }
    
    err := bw.bucket.Do(bw.docs)
    bw.docs = bw.docs[:0]
    return err
}

2. 预分配内存和复用对象

type DocumentPool struct {
    pool sync.Pool
}

func NewDocumentPool() *DocumentPool {
    return &DocumentPool{
        pool: sync.Pool{
            New: func() interface{} {
                return &Insert_doc{
                    KTAB:       "INSERT",
                    SyncBuffer: make([]byte, 0, 4096), // 预分配容量
                }
            },
        },
    }
}

func (dp *DocumentPool) Get() *Insert_doc {
    return dp.pool.Get().(*Insert_doc)
}

func (dp *DocumentPool) Put(doc *Insert_doc) {
    doc.SyncBuffer = doc.SyncBuffer[:0] // 重置但不释放内存
    dp.pool.Put(doc)
}

// 使用预生成的随机字符串
var randomStrings []string

func initRandomStrings(count, size int) {
    randomStrings = make([]string, count)
    for i := 0; i < count; i++ {
        randomStrings[i] = RandomString(size)
    }
}

3. 优化序列化

// 使用自定义序列化减少反射开销
func (d *Insert_doc) MarshalJSON() ([]byte, error) {
    // 手动构建JSON避免反射
    buf := make([]byte, 0, len(d.SyncBuffer)+100)
    buf = append(buf, `{"Thread_id":`...)
    buf = strconv.AppendInt(buf, int64(d.Thread_id), 10)
    buf = append(buf, `,"KTAB":"`...)
    buf = append(buf, d.KTAB...)
    buf = append(buf, `","SyncBuffer":"`...)
    buf = append(buf, d.SyncBuffer...)
    buf = append(buf, `"}`...)
    return buf, nil
}

4. 并发优化版本

func optimizedWorker(wg *sync.WaitGroup, id int, config Configuration) {
    defer wg.Done()
    
    // 预连接
    cluster, _ := gocb.Connect(config.ServerIp)
    cluster.Authenticate(gocb.PasswordAuthenticator{
        Username: config.Username,
        Password: config.Password,
    })
    
    bucket, _ := cluster.OpenBucket(config.BucketName, "")
    
    // 使用批处理
    batchWorker := NewBatchWorker(bucket, 100)
    defer batchWorker.Flush()
    
    // 预生成文档ID
    docIDs := make([]string, config.NumofDoc)
    for i := 0; i < config.NumofDoc; i++ {
        docIDs[i] = fmt.Sprintf("Go_Demo_%d_%d", id, i)
    }
    
    // 预生成随机数据
    randStrs := make([]string, 1000)
    for i := 0; i < 1000; i++ {
        randStrs[i] = RandomString(config.Randstr)
    }
    
    start := time.Now()
    
    // 并发插入
    var insertWg sync.WaitGroup
    sem := make(chan struct{}, 10) // 控制并发数
    
    for i := 0; i < config.NumofDoc; i++ {
        insertWg.Add(1)
        go func(idx int) {
            defer insertWg.Done()
            sem <- struct{}{}
            defer func() { <-sem }()
            
            doc := &Insert_doc{
                Thread_id:  id,
                KTAB:       "INSERT",
                SyncBuffer: randStrs[idx%1000],
            }
            
            batchWorker.AddInsert(docIDs[idx], doc)
        }(i)
    }
    
    insertWg.Wait()
    batchWorker.Flush()
    
    elapsed := time.Since(start)
    opsPerSec := float64(config.NumofDoc) / elapsed.Seconds()
    fmt.Printf("Thread %d: %d ops in %v (%.2f ops/sec)\n", 
        id, config.NumofDoc, elapsed, opsPerSec)
}

5. 性能监控

import (
    "runtime"
    "runtime/debug"
)

func monitorPerformance() {
    // 设置GC参数
    debug.SetGCPercent(100)
    
    // 监控内存
    var m runtime.MemStats
    runtime.ReadMemStats(&m)
    fmt.Printf("Alloc = %v MiB", m.Alloc/1024/1024)
    fmt.Printf("\tTotalAlloc = %v MiB", m.TotalAlloc/1024/1024)
    fmt.Printf("\tSys = %v MiB", m.Sys/1024/1024)
    fmt.Printf("\tNumGC = %v\n", m.NumGC)
}

主要优化点:

  1. 批处理减少网络往返
  2. 对象池减少内存分配
  3. 预生成数据避免重复计算
  4. 控制并发度避免资源竞争
  5. 自定义序列化减少反射开销

这些优化应该能显著改善大文档插入的性能问题。

回到顶部