Golang解决通道冲突问题

Golang解决通道冲突问题 我正在尝试使用具有以下操作的API:

object := Get(key) // 这个过程很慢
object := increment(object, count) // 增加该对象的计数
Set(key, object) // 这个过程也很慢

问题在于:在Get和Set操作之间,有可能其他goroutine已经修改了这个对象!

我尝试通过channel来解决这个问题:

worker(){
for {
select  {
case task <- ch:
 这里执行我上面描述的操作
    }
}
}

我启动了N个这样的工作协程。

我从外部源获取连续的keycount流。

我该如何防止上面描述的问题?


更多关于Golang解决通道冲突问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html

5 回复

我的理解是否正确:您希望在三个步骤的操作期间"锁定"key?这样当一个goroutine正在处理某个key的任务时,即使有新的任务通过ch到达,也不允许其他goroutine处理相同key的任务?

您需要某种能够跨越这三个基本操作的事务机制。API是否提供了类似的功能?这是哪种类型的API?

更多关于Golang解决通道冲突问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


我认为所描述的问题是"key"(一个可变对象)在Get和Set操作之间被修改了。

这是什么对象?我猜是某个结构体指针?

没有解决办法:你必须重新组织代码结构,确保一旦将对象传入通道后,就不再在通道的这一侧使用该对象(你传递了所有权)。

zero-master,你能向我们展示通道的生产者部分吗(即key被创建的地方)?

一位真正的Reddit大神给了我更好的实现方案:https://play.golang.org/p/AXEWJhSg9sT

如果你开启failOnConflict标志,就会发现问题所在!

讨论链接:https://www.reddit.com/r/golang/comments/84f5iq/how_do_i_cleanly_close_the_program_when_the/

lutzhorn:

你需要某种能够跨越这三个基本操作的事务机制。该API是否提供类似功能?这是哪种类型的API?

是的,它提供了事务功能。这是项目地址:GitHub - dgraph-io/badger: Go语言快速键值数据库
该项目建议在单个事务中批量处理更新以提高吞吐量。
如果在此期间任何其他goroutine在其他事务中修改了键,则事务会失败并返回badger.ErrConflict。此时可以重新尝试该事务。如果我的理解正确,每次事务失败时都需要重新执行批量SET/GET操作,这显然会影响吞吐量。

Giulio_Iotti:

这是什么对象?我猜是某种结构体指针?

我只是在对键的值进行递增操作。

Giulio_Iotti:

zero-master,能否展示通道的生产者部分(创建键的位置)?

你可以将其想象为磁盘上存在大量的键:值对,我从网络接收键的数据流,需要对这些键的值进行递增操作。

这是一个典型的并发数据竞争问题。你可以通过为每个key创建一个专用的channel来确保同一key的操作按顺序处理。以下是解决方案:

type Task struct {
    key   string
    count int
}

type WorkerManager struct {
    workers map[string]chan Task
    mu      sync.RWMutex
}

func NewWorkerManager() *WorkerManager {
    return &WorkerManager{
        workers: make(map[string]chan Task),
    }
}

func (wm *WorkerManager) ProcessTask(key string, count int) {
    wm.mu.Lock()
    ch, exists := wm.workers[key]
    if !exists {
        ch = make(chan Task, 100) // 缓冲channel
        wm.workers[key] = ch
        go wm.worker(key, ch)
    }
    wm.mu.Unlock()
    
    ch <- Task{key: key, count: count}
}

func (wm *WorkerManager) worker(key string, ch chan Task) {
    for task := range ch {
        // 获取对象
        object := Get(task.key)
        
        // 增加计数
        object = increment(object, task.count)
        
        // 设置对象
        Set(task.key, object)
    }
}

func (wm *WorkerManager) Close() {
    wm.mu.Lock()
    defer wm.mu.Unlock()
    
    for _, ch := range wm.workers {
        close(ch)
    }
}

// 使用示例
func main() {
    wm := NewWorkerManager()
    
    // 处理连续的key和count流
    go func() {
        for {
            key, count := getNextKeyAndCount() // 从外部源获取
            wm.ProcessTask(key, count)
        }
    }()
    
    // 程序结束时关闭
    // wm.Close()
}

另一种更简洁的方法是使用单个channel配合map来跟踪处理状态:

type ProcessingTracker struct {
    tasks   chan Task
    working map[string]bool
    mu      sync.Mutex
}

func NewProcessingTracker(workerCount int) *ProcessingTracker {
    pt := &ProcessingTracker{
        tasks:   make(chan Task, 1000),
        working: make(map[string]bool),
    }
    
    // 启动工作协程
    for i := 0; i < workerCount; i++ {
        go pt.worker()
    }
    
    return pt
}

func (pt *ProcessingTracker) SubmitTask(key string, count int) {
    pt.mu.Lock()
    if pt.working[key] {
        pt.mu.Unlock()
        return // 该key正在处理中,跳过
    }
    pt.working[key] = true
    pt.mu.Unlock()
    
    pt.tasks <- Task{key: key, count: count}
}

func (pt *ProcessingTracker) worker() {
    for task := range pt.tasks {
        object := Get(task.key)
        object = increment(object, task.count)
        Set(task.key, object)
        
        pt.mu.Lock()
        delete(pt.working, task.key)
        pt.mu.Unlock()
    }
}

第一种方案确保同一key的操作严格按顺序处理,第二种方案通过跟踪处理状态来避免并发冲突。选择哪种取决于你的具体需求。

回到顶部