Golang中如何结合map与atomic/sync包实现并发安全

Golang中如何结合map与atomic/sync包实现并发安全 大家好,

我是Go语言的新手。我正在学习atomic/sync包。我遇到了以下代码:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

type mapFunc struct {
	mapV map[int]int
}

func newMap() mapFunc {
	return mapFunc{
		mapV: make(map[int]int),
	}
}

func main() {
	var wg sync.WaitGroup
	var v atomic.Value
	//var mu sync.Mutex
	v.Store(mapFunc{mapV: map[int]int{0: 1, 1: 2}})

	go func() {
		var i int
		for {
			i++
			//mu.Lock()
			s := v.Load().(mapFunc)
			m := s.mapV
			m[0]++
			mapX := mapFunc{
				mapV: m,
			}
			v.Store(mapX)
			//mu.Unlock()
		}
	}()

	go func() {
		var i int
		for {
			i++
			//mu.Lock()
			s := v.Load().(mapFunc)
			m := s.mapV
			m[1]++
			mapX := mapFunc{
				mapV: m,
			}
			v.Store(mapX)
			//mu.Unlock()
		}
	}()

	wg.Add(5)
	for i := 0; i < 5; i++ {
		go func() {
			defer wg.Done()
			//mu.Lock()
			mapX, ok := v.Load().(mapFunc)
			if !ok {
				panic("Error")
			}
			fmt.Println(mapX)
			//mu.Unlock()
		}()
	}

	wg.Wait()
}

我有一个疑问。如果我使用竞争检测标志执行上述代码,会得到一个数据竞争错误。但是当我使用锁时,一切正常。所以我的问题是,如何仅使用atomic.Value或atomic包而不使用互斥锁来实现这一点?因为使用互斥锁时,我不需要LoadStore方法。我可以简单地在主goroutine中初始化map并使用:

mu.Lock()
m[0]++
mu.Unlock()

任何帮助都将不胜感激。提前感谢。


更多关于Golang中如何结合map与atomic/sync包实现并发安全的实战教程也可以访问 https://www.itying.com/category-94-b0.html

5 回复

好的,感谢您提供的解决方案和解释。这肯定能解决问题。我也会研究一下 sync.Map

更多关于Golang中如何结合map与atomic/sync包实现并发安全的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


是的,这绝对可行。但如果你能稍微解释一下,我如何使用 atomic.ValueLoadStore 方法来实现同样的功能?

实际上,这背后的核心原因是,我想对整数以外的数据类型使用原子包。这就是为什么我正在使用 LoadStore 方法,但却被竞态条件卡住了。

我们不能同时读写一个映射,因为对映射的写入可能导致映射结构发生变化。这与哈希映射的实现方式有关,已有许多优秀的文章对此进行了探讨,这里是实际源代码的链接

因此,如果我们想要写入一个映射,就需要始终使用互斥锁或其他机制来保护它,以实现对映射访问的序列化。然而,在这种特定情况下,我们可以使用一个技巧:映射在读取时不会发生变化。所以,如果我们从不直接更改映射的内容,就可以从任意多个goroutine中读取它。

我们可以做的是在映射中存储指针,创建后我们永远不会更改这些指针,只会更改它们所指向的值。从技术上讲,这效率较低,因为我们现在需要存储一个指针和实际的值,但这确实解决了竞态条件问题。我修改了你的代码以应用这个原则,修改后的代码如下:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

// 这个函数允许我们从字面值创建指针
func intPtr(newInt int64) *int64 {
	return &newInt
}

func main() {
	var wg sync.WaitGroup

	mapX := map[int]*int64{0: intPtr(1), 1: intPtr(2)}
	go func() {
		for {
			// mapX[0]++ 的原子版本
			atomic.AddInt64(mapX[0], 1)
		}
	}()

	go func() {
		for {
			// mapX[1]++ 的原子版本
			atomic.AddInt64(mapX[1], 1)
		}
	}()

	wg.Add(5)
	for i := 0; i < 5; i++ {
		go func() {
			defer wg.Done()

			for key, value := range mapX {
				// 我们需要使用 atomic.LoadInt64 来与 atomic.AddInt64 调用同步
				fmt.Printf("%d = %d\n", key, atomic.LoadInt64(value))
			}
		}()
	}

	wg.Wait()
}

希望这能解答你的问题。

实际上,我从未在实践中使用过 atomic.Value。其内部只是一个指向某些数据的指针,它允许你以原子方式更新这个指针。在我见过的所有示例中,它都用于RCU(读-拷贝-更新)模式。例如,这个函数来自 net/http 包:

// RegisterProtocol 使用给定的方案注册一个新协议。
// Transport 会将使用该方案的请求传递给 rt。
// rt 负责模拟 HTTP 请求语义。
//
// RegisterProtocol 可供其他包用于提供
// 像 "ftp" 或 "file" 这样的协议方案实现。
//
// 如果 rt.RoundTrip 返回 ErrSkipAltProtocol,Transport 将
// 为该请求自行处理 RoundTrip,就像
// 该协议未被注册一样。
func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
	t.altMu.Lock()
	defer t.altMu.Unlock()
	oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
	if _, exists := oldMap[scheme]; exists {
		panic("protocol " + scheme + " already registered")
	}
	newMap := make(map[string]RoundTripper)
	for k, v := range oldMap {
		newMap[k] = v
	}
	newMap[scheme] = rt
	t.altProto.Store(newMap)
}

另一个例子是 sync/atomic 文档中给出的:链接

要更新映射,我们首先必须复制它,然后修改它,之后才能进行交换。然而,我们仍然需要一个互斥锁,因为在复制和交换之间,映射可能被修改。大多数情况下,人们在这种情况下会选择使用 sync.RWMutex

由于 RCU 要求我们复制整个映射/对象,对于大型映射/对象,写入操作会很慢,并且需要分配更多内存。其主要好处是读取者永远不需要等待 MutexRWMutex 解锁。因此,这里存在 CPU 与内存的权衡。根据你的用例和需求,你可能需要选择其中一种。

与所有这些相关的是 sync.Map,如果你还不熟悉,你可能想看看它。同样,这是一种相当小众的数据类型,用于解决非常特定的问题。与所有事情一样,请为工作选择合适的工具。

在Go中实现并发安全的map,仅使用atomic.Value而不使用互斥锁是可行的,但需要遵循不可变数据模式。你的代码存在数据竞争是因为多个goroutine在修改同一个map的底层数据。以下是修正后的示例:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

type mapFunc struct {
	mapV map[int]int
}

func newMap() mapFunc {
	return mapFunc{
		mapV: make(map[int]int),
	}
}

func main() {
	var wg sync.WaitGroup
	var v atomic.Value
	v.Store(mapFunc{mapV: map[int]int{0: 1, 1: 2}})

	// 更新goroutine
	update := func(key int) {
		for i := 0; i < 100; i++ {
			// 加载当前值
			old := v.Load().(mapFunc)
			
			// 创建新的map副本
			newMap := make(map[int]int, len(old.mapV))
			for k, v := range old.mapV {
				newMap[k] = v
			}
			
			// 修改副本
			newMap[key]++
			
			// 存储新值
			v.Store(mapFunc{mapV: newMap})
		}
	}

	// 启动更新goroutine
	go update(0)
	go update(1)

	// 读取goroutine
	wg.Add(5)
	for i := 0; i < 5; i++ {
		go func() {
			defer wg.Done()
			for j := 0; j < 20; j++ {
				mapX := v.Load().(mapFunc)
				fmt.Printf("Read: %v\n", mapX.mapV)
			}
		}()
	}

	wg.Wait()
	
	// 最终结果
	final := v.Load().(mapFunc)
	fmt.Printf("Final result: %v\n", final.mapV)
}

关键点:

  1. 不可变数据:每次修改都创建新的map副本,而不是修改原有map
  2. 原子替换:使用atomic.Value.Store()原子地替换整个mapFunc
  3. 读取安全atomic.Value.Load()总是返回完整的、一致的数据快照

对于更复杂的场景,可以使用sync/atomic包中的指针操作:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"unsafe"
)

type ConcurrentMap struct {
	ptr unsafe.Pointer // *map[int]int
}

func NewConcurrentMap() *ConcurrentMap {
	m := make(map[int]int)
	return &ConcurrentMap{
		ptr: unsafe.Pointer(&m),
	}
}

func (c *ConcurrentMap) Load() map[int]int {
	return *(*map[int]int)(atomic.LoadPointer(&c.ptr))
}

func (c *ConcurrentMap) Store(newMap map[int]int) {
	atomic.StorePointer(&c.ptr, unsafe.Pointer(&newMap))
}

func (c *ConcurrentMap) Update(key int, delta int) {
	for {
		oldPtr := atomic.LoadPointer(&c.ptr)
		oldMap := *(*map[int]int)(oldPtr)
		
		// 创建副本
		newMap := make(map[int]int, len(oldMap))
		for k, v := range oldMap {
			newMap[k] = v
		}
		newMap[key] += delta
		
		// CAS操作
		if atomic.CompareAndSwapPointer(&c.ptr, oldPtr, unsafe.Pointer(&newMap)) {
			break
		}
	}
}

func main() {
	cmap := NewConcurrentMap()
	cmap.Store(map[int]int{0: 1, 1: 2})
	
	var wg sync.WaitGroup
	
	// 并发更新
	wg.Add(2)
	go func() {
		defer wg.Done()
		for i := 0; i < 1000; i++ {
			cmap.Update(0, 1)
		}
	}()
	
	go func() {
		defer wg.Done()
		for i := 0; i < 1000; i++ {
			cmap.Update(1, 1)
		}
	}()
	
	wg.Wait()
	
	fmt.Println("Result:", cmap.Load())
}

这种方法通过CompareAndSwap实现了无锁更新,但需要注意:

  1. 每次更新都创建新map,内存开销较大
  2. 适合读多写少的场景
  3. 对于高频写入场景,sync.Mapsync.RWMutex配合普通map通常更高效

sync.Map是标准库提供的并发安全map实现:

package main

import (
	"fmt"
	"sync"
)

func main() {
	var sm sync.Map
	
	// 存储值
	sm.Store("key1", 1)
	sm.Store("key2", 2)
	
	// 加载值
	if val, ok := sm.Load("key1"); ok {
		fmt.Println("key1:", val)
	}
	
	// 原子操作
	sm.LoadOrStore("key3", 3)
	
	// 遍历
	sm.Range(func(key, value interface{}) bool {
		fmt.Printf("%v: %v\n", key, value)
		return true
	})
}

选择方案:

  • 读多写少且键类型固定:sync.Map
  • 需要完全控制且不介意复制开销:atomic.Value + 不可变数据
  • 一般场景:sync.RWMutex + 普通map
回到顶部