Golang并发写入的线程安全Map存储方案
Golang并发写入的线程安全Map存储方案 我正在开发一个使用MapReduce计算单词列表统计数据的应用程序。会有多个映射方法,我希望并发地启动它们,但显然需要收集结果。
在单线程情况下,我使用以下结构:
map[string][]string
这给了我如下数据:
{"length": ["1", "1", "33"], "colour": ["red", "blue", "red"]}
根据我的理解,我认为需要一个队列系统:将映射结果添加到队列中,队列工作器按时间顺序逐步处理每个条目并将其存储到数据映射中。假设标准队列是单线程的,就不需要互斥锁来锁定映射。
归约方法要等到所有映射完成后才会调用,所以我不需要担心这些方法访问映射的问题。
有几个问题:这听起来像是解决问题的正确方法吗?
我担心单线程队列会抵消多线程映射的好处,因为它会成为瓶颈。如果是这样,带有互斥锁的多线程队列会更好吗?因为互斥锁仍然会将其速度降低到几乎单线程的水平。
更多关于Golang并发写入的线程安全Map存储方案的实战教程也可以访问 https://www.itying.com/category-94-b0.html
我打算为用户提供的另一个选项是Redis,但这适用于那些不希望有外部依赖的用户。
更多关于Golang并发写入的线程安全Map存储方案的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
确实如此 😄 这样看起来更美观。使用 sync.Map 肯定更简单,所以我同意你的观点。
这涉及到 goroutine,所以我会研究一下通道。除了官方文档,你能推荐一些好的示例吗?
我会尝试仅使用映射来看看效果如何。Ruby 不太喜欢这种方式,但希望 Go 能更好地处理。
来自标准库 - https://golang.org/pkg/sync/#Map
几十兆字节,20次循环时接近1GB。我的手机有4GB可用空间。你可以试试,如果出现内存不足的情况,再考虑其他方案。😊
在YouTube上搜索Go并发,有很多优质视频,通过实际编程练习,几个月后就能掌握,不用担心。
如果只在单个实例上运行,没必要使用Redis,可以看看BoltDB,链接已经发给你了。
通道也通过互斥锁进行"保护"。做一些基准测试,你可能会发现 sync.Map 速度相当快——这正是它在谷歌内部被使用的原因。
这是为你准备的基准测试代码:https://gist.github.com/craigmj/5770480
对于如此简单的任务,无需导入数据库。按以下步骤操作:
- 所有 goroutine 共享一个(只写)通道,用于发送修改映射的函数。为此定义一个类型:type mapFunc func() error
- 一个 goroutine 负责管理映射:它读取共享通道,并对每个接收到的对象执行函数。
- 请记住,如果您的类型不是不可变的(例如数字或字符串),那么在通道发送的函数之外使用它们时需要小心。
如果您需要更多提示,请将目前编写的代码发给我们!
type mapFunc func() error
我记得在StackOverflow的一条评论中看到过,wg.Add 应该在 goroutine 之前执行,而不是之后,但刚才试了一下,仍然失败。
我想我已经弄明白是怎么回事了。所有的 add_one 和 double 函数都被调用了,但 handle_ret 在处理完所有内容之前就被关闭了,导致队列没有被清空,结果也没有被打印出来。但如果真是这样,那就意味着通道不会按照它们被放入的顺序处理内容,因为所有的加法和加倍操作肯定已经被调用了,第一个等待组才会结束。我刚刚通过移除 done 并交换 wg.Add 的顺序来测试这一点,现在似乎可以正常工作了。我是真的修复了问题,还是只是运气好?
func main() {
fmt.Println("hello world")
}
在Github上,Boltdb项目显示已被所有者归档,但从他的留言来看似乎仍然完全可用,你同意吗?
使用数据库的原因是,尽管我处理的大多数列表可能不会太大,但偶尔需要处理大型列表,有时甚至达到几十兆字节。在Ruby中,将20个不同检查的结果保存在内存中效果不佳,很快就占满了内存,因此我希望在这种情况下让用户能够将存储转移到磁盘。我还没有编写任何代码,但你的提示很有道理,我会看看能写出什么。
这是我正在尝试重写的内容:
digininja/pipal
pipal - Pipal,密码分析器
以下是我目前使用通道实现的内容:
https://play.golang.org/p/u62m3Yu4jwD
我最初遇到的问题是,在发送完成信号并退出之前,并非所有的goroutine都能执行完毕,所以我添加了WaitGroup,但问题依然存在。最后几个goroutine有时在应用程序退出前仍未能完成执行。double函数本应每次都返回1998,但实际并非如此:
channels $ go run basic.go |grep 1998
Message passed: 1998
channels $ go run basic.go |grep 1998
Message passed: 1998
channels $ go run basic.go |grep 1998
Message passed: 1998
channels $ go run basic.go |grep 1998
channels $ go run basic.go |grep 1998
Message passed: 1998
我尝试将wg作为模块变量(这个术语正确吗?)使用,也尝试将其作为参数传递,看看是否会有不同效果,但结果都一样。我哪里做错了?
另外,我是否需要通过发送完成消息来终止handle_ret,还是可以在wg.Wait()完成后直接继续执行应用程序的其他部分而不管它?似乎正确处理它比直接不管更为妥当。
在Go语言中处理并发写入Map的场景,标准做法是使用sync.Map或结合互斥锁的普通Map。对于你的MapReduce单词统计场景,我建议使用sync.Map,因为它专门为并发访问优化,无需额外的队列系统。
以下是一个实现示例:
package main
import (
"fmt"
"sync"
)
func main() {
var result sync.Map
var wg sync.WaitGroup
// 模拟多个映射goroutine并发写入
words := []string{"length", "colour", "length", "colour", "length"}
values := []string{"1", "red", "33", "blue", "1"}
for i, key := range words {
wg.Add(1)
go func(k string, v string) {
defer wg.Done()
// 使用LoadOrStore处理并发写入
if actual, loaded := result.LoadOrStore(k, []string{v}); loaded {
// 如果key已存在,追加新值
existing := actual.([]string)
updated := append(existing, v)
result.Store(k, updated)
}
}(key, values[i])
}
wg.Wait()
// 归约阶段:处理所有映射结果
finalResult := make(map[string][]string)
result.Range(func(key, value interface{}) bool {
finalResult[key.(string)] = value.([]string)
return true
})
fmt.Printf("Final result: %v\n", finalResult)
}
另一个使用互斥锁和普通Map的替代方案:
package main
import (
"fmt"
"sync"
)
type SafeMap struct {
mu sync.RWMutex
data map[string][]string
}
func NewSafeMap() *SafeMap {
return &SafeMap{
data: make(map[string][]string),
}
}
func (m *SafeMap) Append(key string, value string) {
m.mu.Lock()
defer m.mu.Unlock()
m.data[key] = append(m.data[key], value)
}
func (m *SafeMap) GetResult() map[string][]string {
m.mu.RLock()
defer m.mu.RUnlock()
// 返回副本避免数据竞争
result := make(map[string][]string)
for k, v := range m.data {
result[k] = append([]string{}, v...)
}
return result
}
func main() {
safeMap := NewSafeMap()
var wg sync.WaitGroup
words := []string{"length", "colour", "length", "colour", "length"}
values := []string{"1", "red", "33", "blue", "1"}
for i, key := range words {
wg.Add(1)
go func(k string, v string) {
defer wg.Done()
safeMap.Append(k, v)
}(key, values[i])
}
wg.Wait()
finalResult := safeMap.GetResult()
fmt.Printf("Final result: %v\n", finalResult)
}
关于你的队列方案:在Go中引入额外的队列层会增加复杂度且可能成为瓶颈。Go的并发原语(goroutine和channel)已经提供了高效的并发处理机制。使用sync.Map或带互斥锁的Map能够直接安全地处理并发写入,无需队列中间层。
对于MapReduce模式,典型做法是让映射goroutine直接写入共享数据结构,使用适当的同步机制,然后在所有映射完成后进行归约操作。这种方法避免了单点瓶颈,充分利用了Go的并发优势。

