Golang实现从Couchbase N1QL查询获取记录的轮询偏移值

Golang实现从Couchbase N1QL查询获取记录的轮询偏移值 大家好,

请问有人能帮我为偏移量限制添加轮询逻辑吗?我只需要使用10个线程就能创建偏移值,并以轮询的方式在10个线程之间分配偏移值。我正在从Couchbase获取300万条记录,这导致了内部的N1QL查询超时。这将解决我的问题,有人能在这方面帮助我吗?

    query := "select count(1) as count from `" + configuration.BucketName + "` where APPID='" + configuration.APPID + "'"
    var countVariable int = 0
    countVariable = countVariable + 1
    fmt.Println("countVariable value: ", countVariable)
    rows, err := cluster.Query(query, &gocb.QueryOptions{})
    if err != nil {
        panic(err)
    }
    var row map[string]interface{} //To fetch n1ql query output
    for rows.Next() {
        err := rows.Row(&row)
        if err != nil {
            panic(err)
        }
    }
    countTotal := int64(row["count"].(float64))
    fmt.Println("countTotal num value : ", countTotal)
    if countTotal == 0 {
        fmt.Println("No rows returned")
        os.Exit(3)
    }
    float_offset_count := math.Ceil(float64(int(countTotal) / configuration.Limit))
    offset_count := int(float_offset_count) + 1
    fmt.Println("offset_count :", offset_count)
    for i := 0; i < 10; i++ {    // should be only 10 threads
        wg.Add(1)
        //fmt.Println("Goroutine :",i)
        offset_limit := configuration.Limit * i  // limit is 20000
        go n1qlFetch(i, &wg, offset_limit)   // This function contains n1ql query which fetches 3 to 4 ,million records
    
    }

我正在尝试实现如下逻辑:

total documents - 2000000

limit :20000
offset : 2000000/20000 = 100
threads :10

so offset value will be as below :
0
20000
40000
60000
80000
100000
.
.
.
.
2000000

thread 1 [0,20000,40000,60000,80000,100000,120000,140000,160000,180000,200000] --> 0 to 2lac records 
thread 2  [220000,240000,260000,280000,300000,320000,340000,360000,380000,400000,420000]
thread 3  ""should be divided same as above
thread 4  ""should be divided same as above
thread 5  ""should be divided same as above
thread 6  ""should be divided same as above
thread 7   ""should be divided same as above
thread 8   ""should be divided same as above
thread 9   ""should be divided same as above
thread 10  ""should be divided same as above

for each ele in offsetlist:
      offset = ele

更多关于Golang实现从Couchbase N1QL查询获取记录的轮询偏移值的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于Golang实现从Couchbase N1QL查询获取记录的轮询偏移值的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


以下是实现轮询偏移值分配的Go代码示例:

package main

import (
    "fmt"
    "sync"
    "math"
)

const (
    totalDocuments = 2000000
    limit = 20000
    numThreads = 10
)

func main() {
    // 计算总偏移量数量
    totalOffsets := int(math.Ceil(float64(totalDocuments) / float64(limit)))
    
    // 创建轮询分配器
    poller := NewOffsetPoller(totalOffsets, numThreads)
    
    var wg sync.WaitGroup
    
    // 启动10个线程
    for threadID := 0; threadID < numThreads; threadID++ {
        wg.Add(1)
        go func(tid int) {
            defer wg.Done()
            
            // 获取该线程的所有偏移值
            offsets := poller.GetOffsetsForThread(tid)
            
            // 处理每个偏移值
            for _, offset := range offsets {
                actualOffset := offset * limit
                fmt.Printf("Thread %d processing offset: %d (actual: %d)\n", 
                    tid, offset, actualOffset)
                
                // 这里调用你的n1qlFetch函数
                // n1qlFetch(tid, &wg, actualOffset)
            }
        }(threadID)
    }
    
    wg.Wait()
}

// OffsetPoller 管理偏移值的轮询分配
type OffsetPoller struct {
    totalOffsets int
    numThreads   int
}

func NewOffsetPoller(totalOffsets, numThreads int) *OffsetPoller {
    return &OffsetPoller{
        totalOffsets: totalOffsets,
        numThreads:   numThreads,
    }
}

// GetOffsetsForThread 返回指定线程应该处理的所有偏移值
func (p *OffsetPoller) GetOffsetsForThread(threadID int) []int {
    var offsets []int
    
    // 轮询分配逻辑:每个线程获取间隔为numThreads的偏移值
    for offset := threadID; offset < p.totalOffsets; offset += p.numThreads {
        offsets = append(offsets, offset)
    }
    
    return offsets
}

针对你的具体场景,这里是一个完整的实现:

func processWithPolling(configuration Config) {
    // 查询总记录数
    query := "select count(1) as count from `" + configuration.BucketName + 
             "` where APPID='" + configuration.APPID + "'"
    
    rows, err := cluster.Query(query, &gocb.QueryOptions{})
    if err != nil {
        panic(err)
    }
    
    var row map[string]interface{}
    for rows.Next() {
        err := rows.Row(&row)
        if err != nil {
            panic(err)
        }
    }
    
    countTotal := int64(row["count"].(float64))
    fmt.Println("Total records: ", countTotal)
    
    if countTotal == 0 {
        fmt.Println("No rows returned")
        os.Exit(3)
    }
    
    // 计算总偏移量数量
    totalOffsets := int(math.Ceil(float64(countTotal) / float64(configuration.Limit)))
    fmt.Println("Total offsets needed: ", totalOffsets)
    
    var wg sync.WaitGroup
    
    // 创建10个线程进行轮询处理
    for threadID := 0; threadID < 10; threadID++ {
        wg.Add(1)
        
        go func(tid int) {
            defer wg.Done()
            
            // 为该线程计算所有偏移值
            for offsetIndex := tid; offsetIndex < totalOffsets; offsetIndex += 10 {
                actualOffset := offsetIndex * configuration.Limit
                
                fmt.Printf("Thread %d processing offset index: %d, actual offset: %d\n",
                    tid, offsetIndex, actualOffset)
                
                // 执行N1QL查询
                n1qlFetch(tid, &wg, actualOffset)
            }
        }(threadID)
    }
    
    wg.Wait()
}

// 更优化的版本,使用通道进行协调
func processWithChannels(configuration Config) {
    // ... 获取countTotal的代码同上 ...
    
    totalOffsets := int(math.Ceil(float64(countTotal) / float64(configuration.Limit)))
    
    // 创建偏移值通道
    offsetChan := make(chan int, totalOffsets)
    
    // 生产者:生成所有偏移值
    go func() {
        for i := 0; i < totalOffsets; i++ {
            offsetChan <- i * configuration.Limit
        }
        close(offsetChan)
    }()
    
    var wg sync.WaitGroup
    
    // 消费者:10个worker线程从通道轮询获取偏移值
    for workerID := 0; workerID < 10; workerID++ {
        wg.Add(1)
        
        go func(wid int) {
            defer wg.Done()
            
            for offset := range offsetChan {
                fmt.Printf("Worker %d processing offset: %d\n", wid, offset)
                n1qlFetch(wid, &wg, offset)
            }
        }(workerID)
    }
    
    wg.Wait()
}

对于你示例中的具体分配逻辑:

func calculateThreadOffsets() {
    totalDocuments := 2000000
    limit := 20000
    threads := 10
    
    // 计算每个线程处理的记录数
    recordsPerThread := totalDocuments / threads
    offsetsPerThread := recordsPerThread / limit
    
    fmt.Println("Records per thread:", recordsPerThread)
    fmt.Println("Offsets per thread:", offsetsPerThread)
    
    // 为每个线程生成偏移值
    for thread := 0; thread < threads; thread++ {
        var threadOffsets []int
        startOffset := thread * recordsPerThread
        
        for i := 0; i < offsetsPerThread; i++ {
            offset := startOffset + (i * limit)
            threadOffsets = append(threadOffsets, offset)
        }
        
        fmt.Printf("Thread %d offsets: %v\n", thread+1, threadOffsets)
    }
}

这个实现确保了:

  1. 10个线程均匀分配所有偏移值
  2. 每个线程以轮询方式获取偏移值(thread 1: 0, 10, 20…; thread 2: 1, 11, 21…)
  3. 避免了单个大查询的超时问题
  4. 可以并行处理300万条记录
回到顶部