Golang Go语言中的并发设计问题请教

我在做一个监测流量的项目。每秒会从数据源中获取 1w 条 json 格式的流量信息,我希望对这些流量进行分析,但是现在会出现丢数据的情况。

我的做法是
1.接受到数据后先传入 channelA
2.启动一个协程循环从 A 中读取数据存入切片 B
3.另起一个协程处理切片 B 的数据,同时在处理业务时利用 mutex 锁住 B

实际调试中发现,mutex 的次数会影响数据的丢失量
请问我这样设计是否有问题,是否会导致丢数据
Golang Go语言中的并发设计问题请教

40 回复

需要 demo 才能分析,你描述的逻辑里有很多不确定性。

更多关于Golang Go语言中的并发设计问题请教的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


要 demo, 你两个协程都会写切片 B

我就提一个可能,切片 B 扩容,导致这种特殊情况:
时刻 1 ,goroutine1 加锁,用 B=append(B, item)向切片 B 追加一个元素。刚好触发了扩容,B 的底层数组指针发生了转移。即,append 的参数 B 和返回值 B 中的 ptr 不同。
时刻 2 紧接着时刻 1 ,goroutine2 拿到锁,这个时候在 goroutine2 看来,B 只是一个由(size,cap,ptr)构成结构体,它察觉不到 B 底层数组指针的变化,所以看不到 goroutine1 追加的数据。

具体可以检查一下代码。

我感觉 sync.Pool 是干这个事情的。但是我一直没有掌握 sync.Pool 的正确用法,希望有大佬解释一下。

调大 A 的 bufsize

另外写入 B 的时候也要 mutex ,否则会被污染

“但是现在会出现丢数据的情况” 这是为什么呢?实际没有从数据源中获取到 1w 条?

有没有考虑用时序数据库来存储这些数据然后订阅? 1w/s ,这个数据量不小了

2 中存入 b 的过程也要锁。
其实这里不用切片用队列比较合适

没看到你的代码,粗略的分析,你的第一个协程,从 channelA 中写到切片 b 也需要先获得 b 的 mutex 锁的,要不然,如果在锁 b 的时候你从 channelA 中获取数据,因为 b 锁住了,你写不进去就丢了

The Golden Rule - Don’t Block the Event Loop or Coroutine.

写切片 B 和读切片 B 的时候都要加锁

先放 kafka ,再批量读出来处理呢

#11 +1 跟我理解的一样
而且整个过程感觉效率并不高
使用中间件哪怕 redis stream 整个代码都可以简单很多

能确定数量就用 channel ,不行的话用 linked list 。尽量避免用锁,传递锁的时候要传指针&。

丢数据算 bug 吗? 如果算请写个并发的单元测试并加上-race 测一下

数据量有点大, 建议使用 sync.Pool + 任务队列


老哥代码放上了






对,这里确实是有问题。但是我加上后发现还是会丢

conn 是什么协议?
把加放锁和处理数据的位置再标一下。

channel 里读 N 条出来直接处理掉,不要用切片缓存 /交互数据,就没这个问题了。

这个切片设计的根本没什么道理。

没理由 append B 加锁了还能丢数据啊
你可能丢数据的地方在 err := json.Unmarshal([]byte(<-A), &fs)

看你最终的代码感觉没什么问题。

建议写个可以复现的 demo ,之前我也是出 bug ,感觉是用的第三方的库的问题。后来写了个可以复现的 demo ,发现是我代码的问题。

我有很多莫名其妙的 bug 都是在写 demo 的时候发现代码真正错误的地方。

比如说你这个代码,里面有网络连接,写数据库啥的,都给简化了,最终就是纯粹的逻辑代码,慢慢调试就发现问题了。
而且也方便别人运行调试。

handleData 里加锁处理数据,但是 txData 里 append 却没有加锁,
所以当 handleData 正在处理数据的时候,txData 还在往里面 append 数据,
等 handleData 处理完,清空了 B ,txData 在 handleData 处理数据的过程中所添加的数据也就被清除了。
没有给写入加锁只给读取加锁,等于没加锁。

另外你想用 handleData 异步处理数据,但是如果在 txData 里给 append 加锁,其实就等于同步处理数据了,没什么意义。考虑在 txData 里对数据进行分块或按时间进行分块,再将分块的数据传给 handleData ,连锁都不用。

我的理解 handleData 这里完全没必要 也没必要用锁
可以把写库代码直接放到 appendB = append(B,fs) 位置执行
其次 db 本身是支持并发写库的,这里加锁意义不大,加了锁也都是在等待锁反而更慢

检查一下发送端的返回值。
如上面所说的,这样实现并没有并发。如果处理能力大于上游,同步处理就行;如果小于上游,最终结果就是一个协程在处理,一个在等锁,一个在等 channel 缓冲空间。

这个问题根本应该在于多个线程操作同一个切片导致的,这里就会有很大不确定性。我问了 GPT-4 ,它给了很好的建议,把 B 换成 chan 而不是切片试试
<br>var A = make(chan string, 1048576)<br>var B = make(chan flowStatistic, 1048576) // 使用带缓冲的 channel 而非切片<br>...<br><br>func txData() {<br> for {<br> var fs flowStatistic<br> err := json.Unmarshal([]byte(&lt;-A), &amp;fs)<br> ...<br> B &lt;- fs // 将 fs 传递给 handleData<br> }<br>}<br><br>func handleData() {<br> var buffer []flowStatistic<br> timer := time.NewTimer(5 * time.Second)<br><br> for {<br> select {<br> case fs := &lt;-B:<br> buffer = append(buffer, fs)<br> case &lt;-timer.C:<br> // 处理 buffer 中的数据<br> ...<br> buffer = make([]flowStatistic, 0)<br> timer.Reset(5 * time.Second)<br> }<br> }<br>}<br>
完整回复: https://flowus.cn/share/533684c0-2869-4507-8375-297103f09c77
PS: 顺便一提在我的小站就可以随时用 GPT-4 了, liaobots.com

go<br>var fmutex = sync.Mutex{}<br>var A = make(chan string, 1048576)<br>var B = sync.Pool{<br> New: func() interface{} {<br> return make([]flowStatistic, 0, 10000)<br> },<br>}<br><br>func foo(){<br> go getData()<br> go txData()<br> go handleData()<br>}<br><br>// 接收数据<br>func getData(){<br> for {<br> // ...<br> data := conn.Read()<br> A&lt;-data<br> }<br>}<br><br><br>func txData() {<br> for {<br> var fs flowStatistic<br> err := json.Unmarshal([]byte(&lt;-A), &amp;fs)<br> // ...<br> <br> fmutex.Lock()<br> currB := B.Get().([]flowStatistic)<br> currB = append(currB, fs)<br> B.Put(currB)<br> fmutex.Unlock()<br> }<br>}<br><br>func handleData() {<br> for {<br> time.Sleep(5 * time.Second)<br> fmutex.Lock()<br> currB := B.Get().([]flowStatistic)<br><br> // 进行数据聚合和存储操作<br> // ...<br> // 清空 B<br> currB = currB[:0]<br> B.Put(currB)<br> fmutex.Unlock()<br> }<br>}<br><br>

#29 怎么发送后格式就乱了呢

感觉可以换一种思路,通过 time.Ticker 和 select 来代替锁保证缓存数据不会被互相抢占影响
for {
select {
case data := <-A:

B = append(B,fs)
case t := <-ticker.C: // ticker := time.NewTicker(5 * time.Second)
// 聚合处理数据
process(B)

// 清空 B 保留容量
B = B[:0:cap(B)]
}
}

在楼主给的第二份代码其实也没有解决上面我提的那个问题,因为 goroutine1 在等待 goroutine2 放锁的时候,它栈里面的变量 B 就是旧的 B (底层指针不会变成你清空后新赋值的指针),所以 goroutine2 的清空操作 goroutine1 在这一次执行中是不可见的。

试试这样修改?
<br>var fmutex = sync.Mutex{}<br>var A = make(chan string, 1048576)<br>var B_array = make([]flowStatistic,0) // &lt;------<br>var B = &amp;B // &lt;------<br><br>func foo(){<br> go getData()<br> go txData()<br> go handleData()<br>}<br><br>//接受数据<br>func getData(){<br> for {<br> ...<br> addr, err := net.ResolveUnixAddr("unixgram", sock)<br> conn, err := net.ListenUnixgram("unixgram", addr)<br> data := conn.ReadFromUnix()<br> A&lt;-data<br> }<br>}<br><br>func txData(){<br> for{<br> var fs flowStatistic<br> err := json.Unmarshal([]byte(&lt;-A), &amp;fs) //这里不断解析 A 传过来的数据<br> ...<br> fmutex.Lock()<br> *B = append(*B,fs) // &lt;------<br> fmutex.Unlock()<br> }<br>}<br><br>func handleData(){<br> //这里每 5 秒钟对 B 中的数据进行聚合并入库,耗时较多。为了不丢数据,我锁住 B ,处理完后清空 B 中数据并解锁<br> for{<br> time.Sleep(5 * time.Second)<br> fmutex.Lock()<br> ...<br> *B = make([]flowStatistic,0) // &lt;------<br> fmutex.Unlock()<br> }<br>}<br>

感觉大概率是这里的问题

不好意思看错了,B 不在栈上,上面这个请忽略。。。

另外,在 handleData()里面,可以在加锁之后:
fmutex.Lock()
tmp := B
B = make([]flowStatistic, 0)
fmutex.Unlock()
… // processing tmp

可以减少加锁时间,看不能减少或者消除数据丢失?

数据量也不小了,感觉还是上 Flink 吧,基于滚动窗口+RocksDB 状态后端做实时分析。

感觉代码没有问题,但是有些能优化的地方,可以改成无锁化
go<br>func txData() {<br> ticker := time.NewTicker()<br> for {<br> select {<br> case &lt;- ticker.C:<br> go func() // report your data<br> B = make()<br> case evt &lt;- A:<br> B = append(B, evet)<br> case &lt;-ctx.Done():<br> return<br> }<br> }<br>}<br>

第二条附言的代码应该没问题了,golang 所有基础类型都不是线程安全的,txData()在不断自动扩容 B ,而 handleData()拿到的是旧指针,处理完旧指针的数据清空新 B 指针,导致了旧指针和新 B 指针这段时间 append()的数据丢失

第一个附言等于没锁,handleData()内部没有线程安全问题,是单线程的,竞态出在 txData()的 append()和 handleData()的 B = make([]flowStatistic,0)之间

这种地方 mutex 写 slice 不如写 chan……

如果还有问题,能不能试试 atomic.Value 来存取 B 。
txData 和 handleData 之间,能不能使用 chan 来传递 flowStatistic 。
5 秒处理一次的话,在 txData 缓存数据,每 5 秒调用一次 go handleData 行不行(传递缓存数据给 handleData ),不知道语义还对不对。
要不要考虑考虑 kafka 、flink 这种。

我之前做爬虫收集数据也遇到过类似的问题,把数据聚合进行批量插入减少 io 。

我采用的 chan ,然后启动一个协程监听 chan ,当收集一定数量的数据或者时间满足,就把数据写入 db

go<br><br>type DBWriter[T any] struct {<br> Size int<br> Interval time.Duration<br> done chan struct{}<br> ch chan T<br> insertDB func([]T, int)<br>}<br><br>func (w *DBWriter[T]) Start() {<br> ticker := time.NewTicker(w.Interval)<br> records := make([]T, 0, w.Size)<br> insert := func() {<br> if len(records) == 0 {<br> return<br> }<br> w.insertDB(records, w.Size)<br> records = make([]T, 0, w.Size)<br> }<br><br> for {<br> select {<br> case &lt;-w.done:<br> insert()<br> return<br><br> case &lt;-ticker.C:<br> insert()<br><br> case data := &lt;-<a target="_blank" href="http://w.ch" rel="nofollow noopener">w.ch</a>:<br> records = append(records, data)<br><br> if len(records) == w.Size {<br> insert()<br> }<br> }<br> }<br>}<br><br>

针对您提出的Golang并发设计问题,以下是我的专业解答:

Golang的并发设计是其一大亮点,但也伴随着一些挑战。在并发编程中,主要需关注数据竞态、死锁和缓冲区溢出等问题。

  1. 数据竞态:当多个Goroutine同时访问共享内存时,可能会发生数据竞态,导致数据损坏。为避免此问题,可使用互斥锁(Mutex)来保护共享资源,确保同一时间只有一个Goroutine能访问。
  2. 死锁:死锁发生在两个或多个Goroutine互相等待对方释放锁时,导致程序挂起。为避免死锁,应确保每个锁都能被适时释放,并尽量减少锁的嵌套使用。
  3. 缓冲区溢出:在使用Channel进行Goroutine间通信时,若向缓冲区写入的数据超过其容量,会发生缓冲区溢出。为此,应合理设置缓冲区大小,或使用无缓冲Channel进行同步通信。

此外,还应遵循Go语言并发编程的最佳实践,如优先使用Goroutine和Channel进行并发编程,避免不必要的共享内存,使用并发安全的数据结构等。

总之,Golang的并发设计虽强大,但也需谨慎使用,通过合理的并发控制和最佳实践,可确保程序的正确性和高效性。

回到顶部