Golang中[]byte的单个字节访问是否并发安全?
Golang中[]byte的单个字节访问是否并发安全?
我正在尝试理解对 []byte 中字节进行原子访问的限制。
我在下面的代码中模拟了以下场景:
- 我有一个
[]byte数据缓冲区,其中每个字节的低半字节是一个所有者ID,显然范围是 0 到 15。 - 我有 16 个 goroutine,ID 为 0 到 15。假设每个 goroutine 拥有那些低半字节等于其 ID 的字节。
- 这些 goroutine 持续且并发地扫描所有数据以寻找属于它们的字节,当找到一个时,它们会更改缓冲区中该字节的高半字节。
重要的是,15 个 goroutine 可以并发访问同一个字节,而一个 goroutine 可以向该字节写入,但由于有且仅有一个 goroutine 会写入给定的字节,所以不存在竞态条件。我相信,这只是原子加载和设置的问题。
在 16+ 核的 AMD x64 硬件(包括 Ryzen 和 Threadripper)上长时间运行似乎没有问题。如果这确实有效,将为我们节省巨大的同步开销。
这实际上是安全的吗?还是依赖于硬件?或者我只是在这次测试中运气好?
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
func changeMyData(myID byte, data []byte, running *int64, wg *sync.WaitGroup) {
// each worker will only change bytes that match
// their id in the low order nibble
var passNum byte
for {
// All workers are concurrently reading all of data, and all workers
// are concurrently writing to data, but just to the bytes they own.
for i, b := range data {
// if id is in the low order nibble, then this byte is owned by this routine.
if b&0x0F == myID {
// put passNum in the high order nibble and myID back in the low
data[i] = passNum<<4 | myID
}
}
// we only check for a stop between complete passes through the data
// so all all bytes with this id have the same passNum in the high nibble.
if atomic.LoadInt64(running) == 0 {
wg.Done()
return
}
passNum++
}
}
func main() {
// Make the length shorter (min 32) to increse the velocity of changes at every byte.
// Make longer to have a better chance catching write errors with wrong passNum.
data := make([]byte, 64)
var id byte
for i := range data {
data[i] = byte(i & 0x0F) // just set the low order nibble to incrementing 0-15
}
// turn on the running flag
running := new(int64)
*running = 1
var wg sync.WaitGroup
for id = 0; id < 0x0F; id++ {
wg.Add(1)
go changeMyData(id, data, running, &wg)
}
// Make this longer to increase the chance of a conflict
time.Sleep(10 * time.Second)
atomic.StoreInt64(running, 0)
// Wait for the workers to finish
wg.Wait()
// Now make sure the data is what we expect...
// make sure the low order nibble is still correct for every byte.
for bNum, b := range data {
// expected low order nibble
id = b & 0x0F
if byte(bNum&0x0F) != id {
fmt.Printf("Error: owning ID of bNum %v=%X changed to %X\n", bNum, bNum, id)
}
}
// make sure the high order nibble is the same for every byte owned by a given id.
// This just means all bytes for an id are the same: same high and same low nibble.
for id = 0; id < 0x0F; id++ {
expectedByte := data[int(id)]
// loop over each byte owned by this id.
for bNum := int(id); bNum < len(data); bNum += 0x10 {
if data[bNum] != expectedByte {
fmt.Printf("Error: byte at bNum=%X got %X but expected %X\n", bNum, data[bNum], expectedByte)
}
}
}
fmt.Printf("ByteNum:")
for bNum, _ := range data {
fmt.Printf("%3X", bNum)
}
fmt.Printf("\n Value:")
for _, b := range data {
fmt.Printf("%3X", b)
}
fmt.Println()
}
更多关于Golang中[]byte的单个字节访问是否并发安全?的实战教程也可以访问 https://www.itying.com/category-94-b0.html
我不认为这是安全的做法。Go 内存模型 - Go 编程语言
更多关于Golang中[]byte的单个字节访问是否并发安全?的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
好的,明白了。谢谢你的帮助和提供的链接。我想我们的理解是一致的。你说得对,如果我们有字节级别的原子操作,就能解决这个问题。
显然,如果能将其用于一些并发安全的“无锁编程”,那将会非常棒。
是的,我完全理解你的意思。这也是我提出这个问题的原因。
基本上,这个问题可以归结为:
如果多个协程正在读取一个字节,而一个协程在没有同步的情况下写入该字节,那么读取方是否有可能看到除以下两种情况之外的其他值:a) 旧的字节值,或 b) 新的字节值。
如果答案是否定的,正如看起来的那样,那么了解这一点是非常有益的。
我很好奇:你正在解决什么问题,需要在共享内存中无同步地读写半字节?
我认为这样是可以的,但我怀疑你实际上正在承受性能损失,我建议你进行测量!在 x86 平台上,读写单个字节是原子性的,但底层内存系统使用缓存行作为进出主内存的最小单位;而不是单个字节,所以我怀疑你的 []byte 切片底层的 [16]byte 数组正在遭受“伪共享”。根据 Ryzen 和 Threadripper 的缓存布局,这可能不是问题,因为例如,它们可能都可以访问相同的 L3 缓存,因此它们可以持续修改缓存的内存,而不需要“提交”到主内存,所以性能可能仍然可以(请测量!)。
如果你可以为此进程多分配一点内存,你可以尝试类似这样的方法来消除所有伪共享,以便每个 goroutine 拥有自己的缓存行,这样就不会在缓存行上产生(潜在的)争用,然而,现在你在内存中移动了多个缓存行,而以前只有一个,所以这实际上可能会对你的性能产生负面影响(请测量!)。
func main() {
fmt.Println("hello world")
}
你好,肖恩,感谢你提供的具体建议。
回答你的问题:我之前提到的半字节(nibble)其实只是一个例子,我用高半字节作为测试数据,来检查我是否在丢失写入操作。
实际的问题是一个针对海量数据点的离散优化问题。更具体地说,向量中的每个完整字节表示多个搜索线程(<=32个)中的哪一个拥有特定的数据行。搜索线程以复杂、非线性的方式遍历数据,对于它们看到的每一行,它们需要知道自己是否拥有它。如果拥有,它们就可以进行一些处理,然后将其分配给另一个所有者。
相对于集群问题,我们的特定问题算不上大数据,但相对于在单台机器上运行,数据量是巨大的。它包含数千个特征列,每个列有一百万个float32数据点,加上一大堆相关的图、分箱和互连数据(总共约32 GB)。因此,我们必须对我们添加的任何数据结构都保持极高的内存效率。显然,我们不能做任何像重新分配切片或依赖垃圾回收来处理与数据绑定的项目这类事情。
如果我将所有权字节放在与每行关联的另一个结构中,或许可以避免伪共享(false sharing)问题,但这会破坏该结构的对齐,并且使我无法利用GoLang的内存清零优化来快速将整个字节切片的所有权重置为零(这种情况经常发生)。
我打算研究一下,看看哪种方案总体成本更低。
再次感谢你花时间提供帮助!
你的代码在并发访问 []byte 的单个字节时不是并发安全的,尽管在x86架构上可能暂时没有观察到问题。这是因为Go的内存模型不保证对字节的并发读写是原子的,即使这些读写操作来自不同的goroutine。
问题分析
-
非原子性操作:
data[i] = passNum<<4 | myID这个赋值操作在底层可能不是原子的,特别是在32位或某些ARM架构上。 -
内存可见性:Go的内存模型不保证一个goroutine对字节的写入会立即对其他goroutine可见,即使是在x86架构上。
-
编译器优化:编译器可能会对循环进行优化,导致意外的内存访问顺序。
正确的实现方式
使用 sync/atomic 包提供的原子操作来确保并发安全:
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"unsafe"
)
func changeMyData(myID byte, data []byte, running *int64, wg *sync.WaitGroup) {
var passNum byte
for {
for i := range data {
// 原子加载当前值
addr := (*byte)(unsafe.Pointer(&data[i]))
old := atomic.LoadUint32((*uint32)(unsafe.Pointer(addr)))
b := byte(old)
// 检查是否属于当前goroutine
if b&0x0F == myID {
// 准备新值
newVal := passNum<<4 | myID
// 原子比较并交换
for {
current := atomic.LoadUint32((*uint32)(unsafe.Pointer(addr)))
if byte(current) != b {
break // 值已改变,重新开始
}
if atomic.CompareAndSwapUint32(
(*uint32)(unsafe.Pointer(addr)),
uint32(b),
uint32(newVal),
) {
break
}
}
}
}
if atomic.LoadInt64(running) == 0 {
wg.Done()
return
}
passNum++
}
}
// 或者使用更简洁的atomic.Value方式
type ByteSlice struct {
data []byte
mu sync.RWMutex
}
func (bs *ByteSlice) AtomicUpdate(index int, updater func(byte) byte) {
bs.mu.Lock()
defer bs.mu.Unlock()
bs.data[index] = updater(bs.data[index])
}
func (bs *ByteSlice) AtomicRead(index int) byte {
bs.mu.RLock()
defer bs.mu.RUnlock()
return bs.data[index]
}
使用sync/atomic的替代方案
package main
import (
"sync/atomic"
)
// 使用uint32数组,每个元素原子操作
type AtomicByteArray struct {
data []uint32
}
func NewAtomicByteArray(size int) *AtomicByteArray {
return &AtomicByteArray{
data: make([]uint32, size),
}
}
func (a *AtomicByteArray) AtomicStore(idx int, value byte) {
atomic.StoreUint32(&a.data[idx], uint32(value))
}
func (a *AtomicByteArray) AtomicLoad(idx int) byte {
return byte(atomic.LoadUint32(&a.data[idx]))
}
func (a *AtomicByteArray) AtomicUpdate(idx int, myID, passNum byte) bool {
for {
old := atomic.LoadUint32(&a.data[idx])
oldByte := byte(old)
if oldByte&0x0F != myID {
return false
}
newVal := passNum<<4 | myID
if atomic.CompareAndSwapUint32(&a.data[idx], old, uint32(newVal)) {
return true
}
}
}
关键点
-
不要依赖架构特性:x86的强内存模型可能暂时掩盖问题,但代码在ARM或其他弱内存模型架构上会失败。
-
使用标准库的原子操作:
sync/atomic包提供了跨平台的原子操作保证。 -
内存顺序:原子操作确保内存顺序,避免编译器重排导致的问题。
你的测试在x86上可能暂时运行正常,但这是一种未定义行为,不能保证在所有平台或未来Go版本中都能正常工作。


