Golang解决通道冲突问题
Golang解决通道冲突问题 我正在尝试使用具有以下操作的API:
object := Get(key) // 这个过程很慢
object := increment(object, count) // 增加该对象的计数
Set(key, object) // 这个过程也很慢
问题在于:在Get和Set操作之间,有可能其他goroutine已经修改了这个对象!
我尝试通过channel来解决这个问题:
worker(){
for {
select {
case task <- ch:
这里执行我上面描述的操作
}
}
}
我启动了N个这样的工作协程。
我从外部源获取连续的key和count流。
我该如何防止上面描述的问题?
更多关于Golang解决通道冲突问题的实战教程也可以访问 https://www.itying.com/category-94-b0.html
我的理解是否正确:您希望在三个步骤的操作期间"锁定"key?这样当一个goroutine正在处理某个key的任务时,即使有新的任务通过ch到达,也不允许其他goroutine处理相同key的任务?
您需要某种能够跨越这三个基本操作的事务机制。API是否提供了类似的功能?这是哪种类型的API?
更多关于Golang解决通道冲突问题的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
我认为所描述的问题是"key"(一个可变对象)在Get和Set操作之间被修改了。
这是什么对象?我猜是某个结构体指针?
没有解决办法:你必须重新组织代码结构,确保一旦将对象传入通道后,就不再在通道的这一侧使用该对象(你传递了所有权)。
zero-master,你能向我们展示通道的生产者部分吗(即key被创建的地方)?
一位真正的Reddit大神给了我更好的实现方案:https://play.golang.org/p/AXEWJhSg9sT
如果你开启failOnConflict标志,就会发现问题所在!
讨论链接:https://www.reddit.com/r/golang/comments/84f5iq/how_do_i_cleanly_close_the_program_when_the/
lutzhorn:
你需要某种能够跨越这三个基本操作的事务机制。该API是否提供类似功能?这是哪种类型的API?
是的,它提供了事务功能。这是项目地址:GitHub - dgraph-io/badger: Go语言快速键值数据库
该项目建议在单个事务中批量处理更新以提高吞吐量。
如果在此期间任何其他goroutine在其他事务中修改了键,则事务会失败并返回badger.ErrConflict。此时可以重新尝试该事务。如果我的理解正确,每次事务失败时都需要重新执行批量SET/GET操作,这显然会影响吞吐量。
Giulio_Iotti:
这是什么对象?我猜是某种结构体指针?
我只是在对键的值进行递增操作。
Giulio_Iotti:
zero-master,能否展示通道的生产者部分(创建键的位置)?
你可以将其想象为磁盘上存在大量的键:值对,我从网络接收键的数据流,需要对这些键的值进行递增操作。
这是一个典型的并发数据竞争问题。你可以通过为每个key创建一个专用的channel来确保同一key的操作按顺序处理。以下是解决方案:
type Task struct {
key string
count int
}
type WorkerManager struct {
workers map[string]chan Task
mu sync.RWMutex
}
func NewWorkerManager() *WorkerManager {
return &WorkerManager{
workers: make(map[string]chan Task),
}
}
func (wm *WorkerManager) ProcessTask(key string, count int) {
wm.mu.Lock()
ch, exists := wm.workers[key]
if !exists {
ch = make(chan Task, 100) // 缓冲channel
wm.workers[key] = ch
go wm.worker(key, ch)
}
wm.mu.Unlock()
ch <- Task{key: key, count: count}
}
func (wm *WorkerManager) worker(key string, ch chan Task) {
for task := range ch {
// 获取对象
object := Get(task.key)
// 增加计数
object = increment(object, task.count)
// 设置对象
Set(task.key, object)
}
}
func (wm *WorkerManager) Close() {
wm.mu.Lock()
defer wm.mu.Unlock()
for _, ch := range wm.workers {
close(ch)
}
}
// 使用示例
func main() {
wm := NewWorkerManager()
// 处理连续的key和count流
go func() {
for {
key, count := getNextKeyAndCount() // 从外部源获取
wm.ProcessTask(key, count)
}
}()
// 程序结束时关闭
// wm.Close()
}
另一种更简洁的方法是使用单个channel配合map来跟踪处理状态:
type ProcessingTracker struct {
tasks chan Task
working map[string]bool
mu sync.Mutex
}
func NewProcessingTracker(workerCount int) *ProcessingTracker {
pt := &ProcessingTracker{
tasks: make(chan Task, 1000),
working: make(map[string]bool),
}
// 启动工作协程
for i := 0; i < workerCount; i++ {
go pt.worker()
}
return pt
}
func (pt *ProcessingTracker) SubmitTask(key string, count int) {
pt.mu.Lock()
if pt.working[key] {
pt.mu.Unlock()
return // 该key正在处理中,跳过
}
pt.working[key] = true
pt.mu.Unlock()
pt.tasks <- Task{key: key, count: count}
}
func (pt *ProcessingTracker) worker() {
for task := range pt.tasks {
object := Get(task.key)
object = increment(object, task.count)
Set(task.key, object)
pt.mu.Lock()
delete(pt.working, task.key)
pt.mu.Unlock()
}
}
第一种方案确保同一key的操作严格按顺序处理,第二种方案通过跟踪处理状态来避免并发冲突。选择哪种取决于你的具体需求。

