Golang循环生成500万条记录并插入Couchbase数据库的性能测试

Golang循环生成500万条记录并插入Couchbase数据库的性能测试 大家好,

我是Go语言的新手,正在为我的项目进行概念验证。由于时间限制,如果有人能提供一个示例程序,帮助我生成500万条记录并通过循环将这些记录插入到Couchbase数据库中,以测试性能,我将非常感激。

任何可以运行并检查性能的示例程序都会非常有帮助。

13 回复

非常感谢您的鼎力相助,我现在已经能够通过配置多线程来插入记录了。必须感谢您及时响应并解决了问题 🙂 再次感谢。

更多关于Golang循环生成500万条记录并插入Couchbase数据库的性能测试的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


非常感谢您的及时帮助,能否请您澄清几个疑问?

  1. 我将 createDocument 函数作为 goroutine 调用,我应该将其作为普通函数调用吗?
  2. 当我使用您提供的代码时,我应该在何处调用我的 createDocument 函数?

我已移除了所有与Couchbase相关的代码:

https://play.golang.org/p/XKUyquqMSep

########### 应该传递 jobs 而不是 workerJobs 吗?我尝试传递 jobs 但出错了。

不,你应该传递 workerJobs,因为你希望每个工作线程/线程运行完全相同数量的任务。

你有什么现成的东西可以展示给我们看看吗?你需要准备:

  • 一个 Couchbase 数据库安装环境
  • 一个用于 Go 的 Couchbase 数据库客户端库
  • 一个简单的 Go 程序,用于连接数据库并生成和插入记录

sanjum080816: 检查性能

你具体想测量什么?Go 部分不会成为问题。如果有什么会影响性能,那将是数据库插入记录的速度以及你的网络连接速度的快慢。

对数据库进行基准测试或性能测试并非易事。首先,您需要对数据库有全面的了解。您需要在给定的数据库中定义正确的模式/表/集合/索引/分析器/引擎,然后才能运行测试。如果您对数据库没有深入的了解,我建议您先进行研究,查看您使用场景中现有的基准测试,然后咨询更懂行的人。

如果这是一个兴趣项目,可以从某个地方开始,进行测试,尝试提高性能,减少资源使用,然后如果您认为这是您能做到的全部,再寻求帮助。

workerJobs := make(chan interface{}, jobsCount) 改为 workerJobs := make(chan Person, jobsCount)

并在 worker 中调用 createDocument

func worker(id int, jobs <-chan Person, results chan<- interface{}) {
	// 在此处进行你的线程测量
	fmt.Printf("worker %d started \n", id)
	for person := range jobs {
        createDocument("randomdoc", person)
		results <- job
	}
	fmt.Printf("worker %d completed \n", id)
}
var jobs chan interface{}

必须是

var jobs chan Person

转换此代码:

 for _, job := range jobs {
 		for j := 1; j <= jobsCount; j++ {
 			job <- j
 		}
 		close(job)
 	}

为此代码:

for _, job := range jobs {
		for j := 1; j <= jobsCount; j++ {
			job <- Person{FirstName: "Syeda", LastName: "Anjum"}
		}
		close(job)
	}

以下是一个在Go语言中实现工作线程/线程的示例。在Go中,你无法直接控制操作系统线程,但你可以限制Go调度器在给定任务中仅使用指定数量的Go协程。

首先,我们需要有限数量的Go协程来达到你期望的5个并发数:

for i := 0; i < 5; i++ {
	// 为每个工作线程/线程创建任务
	workerJobs := make(chan interface{}, jobsCount)
	jobs = append(jobs, workerJobs)
	go worker(i, workerJobs, results)
}

然后将任务发送给所有已创建的工作线程:

for _, job := range jobs {
	for j := 1; j <= jobsCount; j++ {
		job <- j
	}
	close(job)
}

等待所有工作线程完成它们的任务:

    // 等待所有工作线程完成它们的任务
	for a := 1; a <= jobsCount*len(jobs); a++ {
		//等待所有工作线程返回结果
		<-results
	}

play链接

我需要检查Go语言执行批量插入和从Couchbase获取操作需要多长时间。它相比其他编程语言如何更高效,以及如何通过Go实现线程生成并检查性能,就像我在C++中测试的那样:

Number of Requests per thread : 100000
Total Number of threads       : 5
WRITE PROPORTION              : 50%
READ  PROPORTION              : 50%
THREAD_ID 3 TOTAL WRITE : 50000, TOTAL READ : 50000, TOTAL OPERATION TIME : 30.13 S, AVG_LATENCY = 0.000301 S, OPS_PER_SECOND = 3319.12 
THREAD_ID 2 TOTAL WRITE : 50000, TOTAL READ : 50000, TOTAL OPERATION TIME : 30.22 S, AVG_LATENCY = 0.000302 S, OPS_PER_SECOND = 3309.02 
THREAD_ID 1 TOTAL WRITE : 50000, TOTAL READ : 50000, TOTAL OPERATION TIME : 30.25 S, AVG_LATENCY = 0.000303 S, OPS_PER_SECOND = 3305.39 
THREAD_ID 0 TOTAL WRITE : 50000, TOTAL READ : 50000, TOTAL OPERATION TIME : 30.29 S, AVG_LATENCY = 0.000303 S, OPS_PER_SECOND = 3301.78 
THREAD_ID 4 TOTAL WRITE : 50000, TOTAL READ : 50000, TOTAL OPERATION TIME : 30.31 S, AVG_LATENCY = 0.000303 S, OPS_PER_SECOND = 3298.83

我已经完成了所有这些步骤。以下是我的程序。我在数据库方面有扎实的知识,但对GOLANG是新手,编写测试程序花费的时间比我项目预定的时间线要多。您能帮我修改代码吗?我想配置5个线程,每个线程并行插入10万条记录,总共插入50万条记录。

package main

import (
    "fmt"
    "gopkg.in/couchbase/gocb.v1"
    "strconv"
    "github.com/zenthangplus/goccm"
    "time"
    "runtime"
)

var  (    
    bucket *gocb.Bucket
)
type Person struct {
	FirstName, LastName string
}

func main(){
    cluster, err := gocb.Connect("couchbase://ip") //Connects to the cluster
    if err != nil {
        fmt.Println(err.Error())
        return
    }
    cluster.Authenticate(gocb.PasswordAuthenticator{
        Username: "",
        Password: "",
    })
    fmt.Println("Cluster:%v",cluster)
        bucket, err = cluster.OpenBucket("", "") //Connects to the bucket
        if err != nil {
            fmt.Println(err.Error())
            return
        }
    fmt.Println("Bucket:%v",bucket)

    // Limit 3 goroutines to run concurrently.
    c := goccm.New(3)
        dt := time.Now()
    fmt.Println("Start Current date and time is: ", dt.String())
    fmt.Println(runtime.NumCPU())
    for i := 1; i <= 500000; i++ {
        fmt.Println("Starting new thread... %v",i )
        person := Person{FirstName: "Syeda", LastName: "Anjum"}
        test := "Go_Demo_"+strconv.Itoa(i)    
        // This function have to call before any goroutine
        c.Wait()
        go createDocument(test, &person)
        fmt.Println("Completed thread... %v",i)
        c.Done()
    }
    fmt.Println(runtime.NumCPU())
    // This function have to call to ensure all goroutines have finished after close the main program.
    c.WaitAllDone()
    fmt.Println("End Current date and time is: ", dt.String())
    fmt.Println(runtime.NumCPU())
} 

func createDocument(documentId string, person *Person) {
	fmt.Println("Upserting a full document...")
	_, error := bucket.Upsert(documentId, person, 0)
	if error != nil {
		fmt.Println(error.Error())
		return
	}
	fmt.Println("After Upsert fetching the document")
}

按照建议修改了我的代码,但在下面这个错误上卡住了。尝试将 j 声明为结构体,但后来无法在 for 操作中使用它。

command-line-arguments

./goprocs.go:54:8: 不能在 send 中使用 j(类型 int)作为 Person 类型

package main

import (
    "fmt"
    "gopkg.in/couchbase/gocb.v1"
)   
    
var  (    
    bucket *gocb.Bucket
)   
type Person struct {
    FirstName, LastName string
}

func main() {
	// if you have more than 5 core, make it 5 to use 5 os thread
	//runtime.GOMAXPROCS(5)
	jobsCount := 50
    cluster, err := gocb.Connect("couchbase://") //Connects to the cluster
    if err != nil {
        fmt.Println(err.Error())
        return
    }
    cluster.Authenticate(gocb.PasswordAuthenticator{
        Username: "",
        Password: "",
    })
    fmt.Println("Cluster:%v",cluster)
        bucket, err = cluster.OpenBucket("", "") //Connects to the bucket
        if err != nil {
            fmt.Println(err.Error())
            return
    }
    fmt.Println("Bucket:%v",bucket)


    person := Person{FirstName: "Syeda", LastName: "Anjum"}
    var jobs []chan Person
    results := make(chan interface{}, jobsCount)

    //jobs := make(chan int, jobsCount)
    //results := make(chan int, jobsCount)

    fmt.Println("value of results: %v",&results)

	for i := 0; i < 5; i++ {
		// create jobs per worker/thread
		workerJobs := make(chan Person, jobsCount)
		jobs = append(jobs, workerJobs)
		go worker(i, workerJobs, results)
	}
	for _, job := range jobs {
		for j:= 1; j <= jobsCount; j++ {
			job <- j
		}
		close(job)
	}
	
	// wait all workers to complete their jobs
	for a := 1; a <= jobsCount*len(jobs); a++ {
		//wait all the workers to return results
		<-results
	}
	fmt.Println("finished")

}
func worker(id int, jobs <-chan Person, results chan<- interface{}) {
	// do your thread measurements here
	fmt.Printf("worker %d started \n", id)
	for person := range jobs {
        createDocument("randomdoc", &person)
		results <- person
	}
	fmt.Printf("worker %d completed \n", id)
}
func createDocument(documentId string, person *Person) {
    fmt.Println("Upserting a full document...")
    _, error := bucket.Upsert(documentId, person, 0)
    if error != nil {
        fmt.Println(error.Error())
        return
    }
    fmt.Println("After Upsert fetching the document")
}
  1. main() 中定义的变量 job 在函数 worker 中未被识别,我使用了您提供的同一个 worker 函数。我将其编辑为 jobs 后可以工作,但只有一个文档被插入到 COUCHBASE 中。

    command-line-arguments

    ./goprocs.go:63:14: undefined: job

  2. 不理解下面这段代码中的 job 是做什么的?

    for _, job := range jobs {
        for j := 1; j <= jobsCount; j++ {
            job <- Person{FirstName: "Syeda", LastName: "Anjum"}
        }
        close(job)
    }
    

    您能检查一下代码中提到的 ## 注释吗? 以下是完整的代码:

package main

import (
    "fmt"
    "gopkg.in/couchbase/gocb.v1"
)

var (
    bucket *gocb.Bucket
)

type Person struct {
    FirstName, LastName string
}

func main() {
    // 如果你有超过5个核心,将其设为5以使用5个操作系统线程 runtime.GOMAXPROCS(5)
    jobsCount := 500
    cluster, err := gocb.Connect("couchbase://10.10.216.53") // 连接到集群
    if err != nil {
        fmt.Println(err.Error())
        return
    }
    cluster.Authenticate(gocb.PasswordAuthenticator{
        Username: "golang",
        Password: "mavenir",
    })
    fmt.Println("Cluster:%v", cluster)
    bucket, err = cluster.OpenBucket("golang", "") // 连接到桶
    if err != nil {
        fmt.Println(err.Error())
        return
    }
    fmt.Println("Bucket:%v", bucket)

    var jobs []chan Person
    results := make(chan interface{}, jobsCount)

    for i := 0; i < 5; i++ {
        // 为每个工作线程/线程创建工作
        workerJobs := make(chan Person, jobsCount)
        jobs = append(jobs, workerJobs)
        go worker(i, workerJobs, results) // ########### 这里应该传递 jobs 而不是 workerJobs 吗?我尝试传递 jobs 但遇到了错误 ./goprocs.go:43:18: cannot use jobs (type []chan Person) as type <-chan Person in argument to worker
    }
    for _, job := range jobs {
        for j := 1; j <= jobsCount; j++ {
            job <- Person{FirstName: "Syeda", LastName: "Anjum"}
        }
        close(job)
    }
    // 等待所有工作线程完成它们的任务
    for a := 1; a <= jobsCount*len(jobs); a++ {
        // 等待所有工作线程返回结果
        <-results
    }
    fmt.Println("finished")
}

func worker(id int, jobs <-chan Person, results chan<- interface{}) {
    // 在这里进行你的线程测量
    fmt.Printf("worker %d started \n", id)
    for person := range jobs {
        createDocument("randomdoc", &person)
        results <- jobs
    }
    fmt.Printf("worker %d completed \n", id)
}

func createDocument(documentId string, person *Person) int {
    //fmt.Println("Upserting a full document...")
    _, error := bucket.Upsert(documentId, person, 0)
    if error != nil {
        fmt.Println(error.Error())
        return -1
    }
    return 1
}

以下是一个Go语言程序示例,用于生成500万条记录并批量插入Couchbase数据库,同时包含性能测试功能:

package main

import (
    "context"
    "fmt"
    "log"
    "time"
    "github.com/couchbase/gocb/v2"
    "sync/atomic"
)

type TestRecord struct {
    ID      string `json:"id"`
    Name    string `json:"name"`
    Value   int    `json:"value"`
    Timestamp int64 `json:"timestamp"`
}

func main() {
    // Couchbase连接配置
    cluster, err := gocb.Connect("couchbase://localhost", gocb.ClusterOptions{
        Authenticator: gocb.PasswordAuthenticator{
            Username: "Administrator",
            Password: "password",
        },
    })
    if err != nil {
        log.Fatal(err)
    }
    defer cluster.Close(nil)

    bucket := cluster.Bucket("test_bucket")
    collection := bucket.DefaultCollection()

    // 等待bucket就绪
    err = bucket.WaitUntilReady(30*time.Second, nil)
    if err != nil {
        log.Fatal(err)
    }

    totalRecords := 5_000_000
    batchSize := 1000
    var opsCompleted int64

    fmt.Printf("开始插入 %d 条记录...\n", totalRecords)
    startTime := time.Now()

    // 使用goroutine池并发插入
    workers := 10
    jobs := make(chan []TestRecord, workers)
    var wg sync.WaitGroup

    // 启动worker goroutines
    for w := 0; w < workers; w++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for batch := range jobs {
                insertBatch(collection, batch, &opsCompleted)
            }
        }(w)
    }

    // 生成数据并发送到jobs通道
    go func() {
        for i := 0; i < totalRecords; i += batchSize {
            batch := make([]TestRecord, 0, batchSize)
            for j := 0; j < batchSize && i+j < totalRecords; j++ {
                record := TestRecord{
                    ID:        fmt.Sprintf("record_%d", i+j),
                    Name:      fmt.Sprintf("Test Record %d", i+j),
                    Value:     i + j,
                    Timestamp: time.Now().UnixNano(),
                }
                batch = append(batch, record)
            }
            jobs <- batch
        }
        close(jobs)
    }()

    // 进度监控
    go func() {
        ticker := time.NewTicker(5 * time.Second)
        defer ticker.Stop()
        for range ticker.C {
            completed := atomic.LoadInt64(&opsCompleted)
            progress := float64(completed) / float64(totalRecords) * 100
            elapsed := time.Since(startTime).Seconds()
            rate := float64(completed) / elapsed
            fmt.Printf("进度: %.2f%% | 已插入: %d | 速率: %.0f 条/秒\n", 
                progress, completed, rate)
        }
    }()

    wg.Wait()

    elapsed := time.Since(startTime)
    fmt.Printf("\n插入完成!\n")
    fmt.Printf("总耗时: %v\n", elapsed)
    fmt.Printf("平均速率: %.0f 条/秒\n", float64(totalRecords)/elapsed.Seconds())
}

func insertBatch(collection *gocb.Collection, batch []TestRecord, opsCompleted *int64) {
    ops := make([]gocb.BulkOp, len(batch))
    for i, record := range batch {
        ops[i] = &gocb.UpsertOp{
            ID:    record.ID,
            Value: record,
        }
    }

    err := collection.Do(ops, nil)
    if err != nil {
        log.Printf("批量插入错误: %v", err)
        return
    }

    atomic.AddInt64(opsCompleted, int64(len(batch)))
}

需要安装的依赖:

go get github.com/couchbase/gocb/v2

运行前请修改以下配置:

  1. Couchbase连接字符串和认证信息
  2. bucket名称(默认为"test_bucket")
  3. 根据硬件调整workers数量(并发goroutine数)
  4. 根据需求调整batchSize(批量大小)

性能优化建议:

  1. 使用go run main.go运行程序
  2. 监控Couchbase服务器资源使用情况
  3. 调整批量大小和worker数量以获得最佳性能
  4. 确保网络延迟最小化

程序会实时显示插入进度和速率,完成时会输出总耗时和平均插入速率。

回到顶部