Golang Go语言中 channel 的一个疑问
我们现在项目中使用 channel 来实现多个协程间通讯时遇到一个问题,想请教下这里的大牛。
我们有多个生产端协程(数千个)会不断的产生数据,塞入到 channel 中,然后一个消费端协程不断的从 channel 中获取数据。
现在,我们的多个生产端协程会不定时的产生短暂的峰值,这个峰值的量很大,而 channel 是有容量的。当 channel 的容量满了后,生产端就阻塞住了。这影响到了生产端的正常逻辑的执行。
我们考虑过使用 select 和 default 的方式来避免阻塞。但如何在 channel 有空余容量时,生产端协程及时将积压的消息再次推入 channel ?
由于消费端逻辑的特殊性,我们无法创建多个消费端协程来提高消费的速度。
Golang Go语言中 channel 的一个疑问
更多关于Golang Go语言中 channel 的一个疑问的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
这和 golang 没关系 传统的队列一样的 只能加大缓冲或者提高消费能力
评估下内存够用不 不够了需要改架构持久化
更多关于Golang Go语言中 channel 的一个疑问的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
内存是够的。我们现在暂时使用了 go-datastructures 的无边界 queue 来缓解这个问题。只是这个数据结构和其他 channel 没法太好的搭配使用。
绝大部分时间,生产端的产生速度是很缓慢的,所以为了偶尔的波峰为 channel 分配巨量的缓冲,感觉比较浪费。
如 1 楼所说,加大缓冲,如果消费能力不足的情况下最好在 channel 和消费者中间加个持久化队列,如 kafka ,如果对延迟要求不是很高可以直接把 channel 换成 kafka
可以借鉴下拥塞算法
你可以看看 nsq 的代码,用 select ,当 channel 满了就写到文件里
使用 channel 的 channel a , a 不满的时候生产者定义一个长度为 1 的子 channel b ,往 b 中放一个元素,然后放到 a ; b 满的时候,生产者定义一个长度比较大(需要估计一下峰值大概多大)的子 channel c ,然后数据放入 c ,直接 a 不满,把 c 放入 a 。
不过 channel 都会有一个问题,进程挂了就啥都没有了,使用 kafka 可以防止数据丢失。
生产者写入 channel 时候能不能判断下, 如果快满了就再创建一个更大的 channel 替换旧的
gocrawl(开源爬虫类库)的一种方案
https://github.com/PuerkitoBio/gocrawl/blob/master/popchannel.go <br>type popChannel chan []*URLContext<br>// The stack function ensures the specified URLs are added to the pop channel<br>// with minimal blocking (since the channel is stacked, it is virtually equivalent<br>// to an infinitely buffered channel).<br>func (pc popChannel) stack(cmd ...*URLContext) {<br> toStack := cmd<br> for {<br> select {<br> case pc <- toStack:<br> return<br> case old := <-pc:<br> // Content of the channel got emptied and is now in old, so append whatever<br> // is in toStack to it, so that it can either be inserted in the channel,<br> // or appended to some other content that got through in the meantime.<br> toStack = append(old, toStack...)<br> }<br> }<br>}<br>
如果我理解的没问题的话,这个方案是无法保证同一个生产者产生的 cmd 被有序的消费。
对,是不能保证有序的。
在你的 unbounded queue 前后接上两个 channel ,然后用两个 worker 搬数据,伪装成 unbounded channel 。
好比一条 TCP 连接达到最大吞吐,你再塞就没有意义。
建议考虑下 ZeroMQ, nanomsg 等,重新定义模型。
纯 golang 的 nanomsg : https://github.com/go-mangos/mangos