Golang实现从Couchbase N1QL查询获取记录的轮询偏移值
Golang实现从Couchbase N1QL查询获取记录的轮询偏移值 大家好,
请问有人能帮我为偏移量限制添加轮询逻辑吗?我只需要使用10个线程就能创建偏移值,并以轮询的方式在10个线程之间分配偏移值。我正在从Couchbase获取300万条记录,这导致了内部的N1QL查询超时。这将解决我的问题,有人能在这方面帮助我吗?
query := "select count(1) as count from `" + configuration.BucketName + "` where APPID='" + configuration.APPID + "'"
var countVariable int = 0
countVariable = countVariable + 1
fmt.Println("countVariable value: ", countVariable)
rows, err := cluster.Query(query, &gocb.QueryOptions{})
if err != nil {
panic(err)
}
var row map[string]interface{} //To fetch n1ql query output
for rows.Next() {
err := rows.Row(&row)
if err != nil {
panic(err)
}
}
countTotal := int64(row["count"].(float64))
fmt.Println("countTotal num value : ", countTotal)
if countTotal == 0 {
fmt.Println("No rows returned")
os.Exit(3)
}
float_offset_count := math.Ceil(float64(int(countTotal) / configuration.Limit))
offset_count := int(float_offset_count) + 1
fmt.Println("offset_count :", offset_count)
for i := 0; i < 10; i++ { // should be only 10 threads
wg.Add(1)
//fmt.Println("Goroutine :",i)
offset_limit := configuration.Limit * i // limit is 20000
go n1qlFetch(i, &wg, offset_limit) // This function contains n1ql query which fetches 3 to 4 ,million records
}
我正在尝试实现如下逻辑:
total documents - 2000000
limit :20000
offset : 2000000/20000 = 100
threads :10
so offset value will be as below :
0
20000
40000
60000
80000
100000
.
.
.
.
2000000
thread 1 [0,20000,40000,60000,80000,100000,120000,140000,160000,180000,200000] --> 0 to 2lac records
thread 2 [220000,240000,260000,280000,300000,320000,340000,360000,380000,400000,420000]
thread 3 ""should be divided same as above
thread 4 ""should be divided same as above
thread 5 ""should be divided same as above
thread 6 ""should be divided same as above
thread 7 ""should be divided same as above
thread 8 ""should be divided same as above
thread 9 ""should be divided same as above
thread 10 ""should be divided same as above
for each ele in offsetlist:
offset = ele
更多关于Golang实现从Couchbase N1QL查询获取记录的轮询偏移值的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于Golang实现从Couchbase N1QL查询获取记录的轮询偏移值的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
以下是实现轮询偏移值分配的Go代码示例:
package main
import (
"fmt"
"sync"
"math"
)
const (
totalDocuments = 2000000
limit = 20000
numThreads = 10
)
func main() {
// 计算总偏移量数量
totalOffsets := int(math.Ceil(float64(totalDocuments) / float64(limit)))
// 创建轮询分配器
poller := NewOffsetPoller(totalOffsets, numThreads)
var wg sync.WaitGroup
// 启动10个线程
for threadID := 0; threadID < numThreads; threadID++ {
wg.Add(1)
go func(tid int) {
defer wg.Done()
// 获取该线程的所有偏移值
offsets := poller.GetOffsetsForThread(tid)
// 处理每个偏移值
for _, offset := range offsets {
actualOffset := offset * limit
fmt.Printf("Thread %d processing offset: %d (actual: %d)\n",
tid, offset, actualOffset)
// 这里调用你的n1qlFetch函数
// n1qlFetch(tid, &wg, actualOffset)
}
}(threadID)
}
wg.Wait()
}
// OffsetPoller 管理偏移值的轮询分配
type OffsetPoller struct {
totalOffsets int
numThreads int
}
func NewOffsetPoller(totalOffsets, numThreads int) *OffsetPoller {
return &OffsetPoller{
totalOffsets: totalOffsets,
numThreads: numThreads,
}
}
// GetOffsetsForThread 返回指定线程应该处理的所有偏移值
func (p *OffsetPoller) GetOffsetsForThread(threadID int) []int {
var offsets []int
// 轮询分配逻辑:每个线程获取间隔为numThreads的偏移值
for offset := threadID; offset < p.totalOffsets; offset += p.numThreads {
offsets = append(offsets, offset)
}
return offsets
}
针对你的具体场景,这里是一个完整的实现:
func processWithPolling(configuration Config) {
// 查询总记录数
query := "select count(1) as count from `" + configuration.BucketName +
"` where APPID='" + configuration.APPID + "'"
rows, err := cluster.Query(query, &gocb.QueryOptions{})
if err != nil {
panic(err)
}
var row map[string]interface{}
for rows.Next() {
err := rows.Row(&row)
if err != nil {
panic(err)
}
}
countTotal := int64(row["count"].(float64))
fmt.Println("Total records: ", countTotal)
if countTotal == 0 {
fmt.Println("No rows returned")
os.Exit(3)
}
// 计算总偏移量数量
totalOffsets := int(math.Ceil(float64(countTotal) / float64(configuration.Limit)))
fmt.Println("Total offsets needed: ", totalOffsets)
var wg sync.WaitGroup
// 创建10个线程进行轮询处理
for threadID := 0; threadID < 10; threadID++ {
wg.Add(1)
go func(tid int) {
defer wg.Done()
// 为该线程计算所有偏移值
for offsetIndex := tid; offsetIndex < totalOffsets; offsetIndex += 10 {
actualOffset := offsetIndex * configuration.Limit
fmt.Printf("Thread %d processing offset index: %d, actual offset: %d\n",
tid, offsetIndex, actualOffset)
// 执行N1QL查询
n1qlFetch(tid, &wg, actualOffset)
}
}(threadID)
}
wg.Wait()
}
// 更优化的版本,使用通道进行协调
func processWithChannels(configuration Config) {
// ... 获取countTotal的代码同上 ...
totalOffsets := int(math.Ceil(float64(countTotal) / float64(configuration.Limit)))
// 创建偏移值通道
offsetChan := make(chan int, totalOffsets)
// 生产者:生成所有偏移值
go func() {
for i := 0; i < totalOffsets; i++ {
offsetChan <- i * configuration.Limit
}
close(offsetChan)
}()
var wg sync.WaitGroup
// 消费者:10个worker线程从通道轮询获取偏移值
for workerID := 0; workerID < 10; workerID++ {
wg.Add(1)
go func(wid int) {
defer wg.Done()
for offset := range offsetChan {
fmt.Printf("Worker %d processing offset: %d\n", wid, offset)
n1qlFetch(wid, &wg, offset)
}
}(workerID)
}
wg.Wait()
}
对于你示例中的具体分配逻辑:
func calculateThreadOffsets() {
totalDocuments := 2000000
limit := 20000
threads := 10
// 计算每个线程处理的记录数
recordsPerThread := totalDocuments / threads
offsetsPerThread := recordsPerThread / limit
fmt.Println("Records per thread:", recordsPerThread)
fmt.Println("Offsets per thread:", offsetsPerThread)
// 为每个线程生成偏移值
for thread := 0; thread < threads; thread++ {
var threadOffsets []int
startOffset := thread * recordsPerThread
for i := 0; i < offsetsPerThread; i++ {
offset := startOffset + (i * limit)
threadOffsets = append(threadOffsets, offset)
}
fmt.Printf("Thread %d offsets: %v\n", thread+1, threadOffsets)
}
}
这个实现确保了:
- 10个线程均匀分配所有偏移值
- 每个线程以轮询方式获取偏移值(thread 1: 0, 10, 20…; thread 2: 1, 11, 21…)
- 避免了单个大查询的超时问题
- 可以并行处理300万条记录

