Golang中如何结合map与atomic/sync包实现并发安全
Golang中如何结合map与atomic/sync包实现并发安全 大家好,
我是Go语言的新手。我正在学习atomic/sync包。我遇到了以下代码:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type mapFunc struct {
mapV map[int]int
}
func newMap() mapFunc {
return mapFunc{
mapV: make(map[int]int),
}
}
func main() {
var wg sync.WaitGroup
var v atomic.Value
//var mu sync.Mutex
v.Store(mapFunc{mapV: map[int]int{0: 1, 1: 2}})
go func() {
var i int
for {
i++
//mu.Lock()
s := v.Load().(mapFunc)
m := s.mapV
m[0]++
mapX := mapFunc{
mapV: m,
}
v.Store(mapX)
//mu.Unlock()
}
}()
go func() {
var i int
for {
i++
//mu.Lock()
s := v.Load().(mapFunc)
m := s.mapV
m[1]++
mapX := mapFunc{
mapV: m,
}
v.Store(mapX)
//mu.Unlock()
}
}()
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
//mu.Lock()
mapX, ok := v.Load().(mapFunc)
if !ok {
panic("Error")
}
fmt.Println(mapX)
//mu.Unlock()
}()
}
wg.Wait()
}
我有一个疑问。如果我使用竞争检测标志执行上述代码,会得到一个数据竞争错误。但是当我使用锁时,一切正常。所以我的问题是,如何仅使用atomic.Value或atomic包而不使用互斥锁来实现这一点?因为使用互斥锁时,我不需要Load和Store方法。我可以简单地在主goroutine中初始化map并使用:
mu.Lock()
m[0]++
mu.Unlock()
任何帮助都将不胜感激。提前感谢。
更多关于Golang中如何结合map与atomic/sync包实现并发安全的实战教程也可以访问 https://www.itying.com/category-94-b0.html
好的,感谢您提供的解决方案和解释。这肯定能解决问题。我也会研究一下 sync.Map。
更多关于Golang中如何结合map与atomic/sync包实现并发安全的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
是的,这绝对可行。但如果你能稍微解释一下,我如何使用 atomic.Value 的 Load 和 Store 方法来实现同样的功能?
实际上,这背后的核心原因是,我想对整数以外的数据类型使用原子包。这就是为什么我正在使用 Load 和 Store 方法,但却被竞态条件卡住了。
我们不能同时读写一个映射,因为对映射的写入可能导致映射结构发生变化。这与哈希映射的实现方式有关,已有许多优秀的文章对此进行了探讨,这里是实际源代码的链接。
因此,如果我们想要写入一个映射,就需要始终使用互斥锁或其他机制来保护它,以实现对映射访问的序列化。然而,在这种特定情况下,我们可以使用一个技巧:映射在读取时不会发生变化。所以,如果我们从不直接更改映射的内容,就可以从任意多个goroutine中读取它。
我们可以做的是在映射中存储指针,创建后我们永远不会更改这些指针,只会更改它们所指向的值。从技术上讲,这效率较低,因为我们现在需要存储一个指针和实际的值,但这确实解决了竞态条件问题。我修改了你的代码以应用这个原则,修改后的代码如下:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
// 这个函数允许我们从字面值创建指针
func intPtr(newInt int64) *int64 {
return &newInt
}
func main() {
var wg sync.WaitGroup
mapX := map[int]*int64{0: intPtr(1), 1: intPtr(2)}
go func() {
for {
// mapX[0]++ 的原子版本
atomic.AddInt64(mapX[0], 1)
}
}()
go func() {
for {
// mapX[1]++ 的原子版本
atomic.AddInt64(mapX[1], 1)
}
}()
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
for key, value := range mapX {
// 我们需要使用 atomic.LoadInt64 来与 atomic.AddInt64 调用同步
fmt.Printf("%d = %d\n", key, atomic.LoadInt64(value))
}
}()
}
wg.Wait()
}
希望这能解答你的问题。
实际上,我从未在实践中使用过 atomic.Value。其内部只是一个指向某些数据的指针,它允许你以原子方式更新这个指针。在我见过的所有示例中,它都用于RCU(读-拷贝-更新)模式。例如,这个函数来自 net/http 包:
// RegisterProtocol 使用给定的方案注册一个新协议。
// Transport 会将使用该方案的请求传递给 rt。
// rt 负责模拟 HTTP 请求语义。
//
// RegisterProtocol 可供其他包用于提供
// 像 "ftp" 或 "file" 这样的协议方案实现。
//
// 如果 rt.RoundTrip 返回 ErrSkipAltProtocol,Transport 将
// 为该请求自行处理 RoundTrip,就像
// 该协议未被注册一样。
func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
t.altMu.Lock()
defer t.altMu.Unlock()
oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
if _, exists := oldMap[scheme]; exists {
panic("protocol " + scheme + " already registered")
}
newMap := make(map[string]RoundTripper)
for k, v := range oldMap {
newMap[k] = v
}
newMap[scheme] = rt
t.altProto.Store(newMap)
}
另一个例子是 sync/atomic 文档中给出的:链接
要更新映射,我们首先必须复制它,然后修改它,之后才能进行交换。然而,我们仍然需要一个互斥锁,因为在复制和交换之间,映射可能被修改。大多数情况下,人们在这种情况下会选择使用 sync.RWMutex。
由于 RCU 要求我们复制整个映射/对象,对于大型映射/对象,写入操作会很慢,并且需要分配更多内存。其主要好处是读取者永远不需要等待 Mutex 或 RWMutex 解锁。因此,这里存在 CPU 与内存的权衡。根据你的用例和需求,你可能需要选择其中一种。
与所有这些相关的是 sync.Map,如果你还不熟悉,你可能想看看它。同样,这是一种相当小众的数据类型,用于解决非常特定的问题。与所有事情一样,请为工作选择合适的工具。
在Go中实现并发安全的map,仅使用atomic.Value而不使用互斥锁是可行的,但需要遵循不可变数据模式。你的代码存在数据竞争是因为多个goroutine在修改同一个map的底层数据。以下是修正后的示例:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type mapFunc struct {
mapV map[int]int
}
func newMap() mapFunc {
return mapFunc{
mapV: make(map[int]int),
}
}
func main() {
var wg sync.WaitGroup
var v atomic.Value
v.Store(mapFunc{mapV: map[int]int{0: 1, 1: 2}})
// 更新goroutine
update := func(key int) {
for i := 0; i < 100; i++ {
// 加载当前值
old := v.Load().(mapFunc)
// 创建新的map副本
newMap := make(map[int]int, len(old.mapV))
for k, v := range old.mapV {
newMap[k] = v
}
// 修改副本
newMap[key]++
// 存储新值
v.Store(mapFunc{mapV: newMap})
}
}
// 启动更新goroutine
go update(0)
go update(1)
// 读取goroutine
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
for j := 0; j < 20; j++ {
mapX := v.Load().(mapFunc)
fmt.Printf("Read: %v\n", mapX.mapV)
}
}()
}
wg.Wait()
// 最终结果
final := v.Load().(mapFunc)
fmt.Printf("Final result: %v\n", final.mapV)
}
关键点:
- 不可变数据:每次修改都创建新的map副本,而不是修改原有map
- 原子替换:使用
atomic.Value.Store()原子地替换整个mapFunc - 读取安全:
atomic.Value.Load()总是返回完整的、一致的数据快照
对于更复杂的场景,可以使用sync/atomic包中的指针操作:
package main
import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
)
type ConcurrentMap struct {
ptr unsafe.Pointer // *map[int]int
}
func NewConcurrentMap() *ConcurrentMap {
m := make(map[int]int)
return &ConcurrentMap{
ptr: unsafe.Pointer(&m),
}
}
func (c *ConcurrentMap) Load() map[int]int {
return *(*map[int]int)(atomic.LoadPointer(&c.ptr))
}
func (c *ConcurrentMap) Store(newMap map[int]int) {
atomic.StorePointer(&c.ptr, unsafe.Pointer(&newMap))
}
func (c *ConcurrentMap) Update(key int, delta int) {
for {
oldPtr := atomic.LoadPointer(&c.ptr)
oldMap := *(*map[int]int)(oldPtr)
// 创建副本
newMap := make(map[int]int, len(oldMap))
for k, v := range oldMap {
newMap[k] = v
}
newMap[key] += delta
// CAS操作
if atomic.CompareAndSwapPointer(&c.ptr, oldPtr, unsafe.Pointer(&newMap)) {
break
}
}
}
func main() {
cmap := NewConcurrentMap()
cmap.Store(map[int]int{0: 1, 1: 2})
var wg sync.WaitGroup
// 并发更新
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
cmap.Update(0, 1)
}
}()
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
cmap.Update(1, 1)
}
}()
wg.Wait()
fmt.Println("Result:", cmap.Load())
}
这种方法通过CompareAndSwap实现了无锁更新,但需要注意:
- 每次更新都创建新map,内存开销较大
- 适合读多写少的场景
- 对于高频写入场景,
sync.Map或sync.RWMutex配合普通map通常更高效
sync.Map是标准库提供的并发安全map实现:
package main
import (
"fmt"
"sync"
)
func main() {
var sm sync.Map
// 存储值
sm.Store("key1", 1)
sm.Store("key2", 2)
// 加载值
if val, ok := sm.Load("key1"); ok {
fmt.Println("key1:", val)
}
// 原子操作
sm.LoadOrStore("key3", 3)
// 遍历
sm.Range(func(key, value interface{}) bool {
fmt.Printf("%v: %v\n", key, value)
return true
})
}
选择方案:
- 读多写少且键类型固定:
sync.Map - 需要完全控制且不介意复制开销:
atomic.Value+ 不可变数据 - 一般场景:
sync.RWMutex+ 普通map

