Golang Go语言中 channel 的一个疑问

发布于 1周前 作者 sinazl 来自 Go语言

我们现在项目中使用 channel 来实现多个协程间通讯时遇到一个问题,想请教下这里的大牛。

我们有多个生产端协程(数千个)会不断的产生数据,塞入到 channel 中,然后一个消费端协程不断的从 channel 中获取数据。

现在,我们的多个生产端协程会不定时的产生短暂的峰值,这个峰值的量很大,而 channel 是有容量的。当 channel 的容量满了后,生产端就阻塞住了。这影响到了生产端的正常逻辑的执行。

我们考虑过使用 select 和 default 的方式来避免阻塞。但如何在 channel 有空余容量时,生产端协程及时将积压的消息再次推入 channel ?

由于消费端逻辑的特殊性,我们无法创建多个消费端协程来提高消费的速度。


Golang Go语言中 channel 的一个疑问

更多关于Golang Go语言中 channel 的一个疑问的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html

14 回复

这和 golang 没关系 传统的队列一样的 只能加大缓冲或者提高消费能力

评估下内存够用不 不够了需要改架构持久化

更多关于Golang Go语言中 channel 的一个疑问的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


内存是够的。我们现在暂时使用了 go-datastructures 的无边界 queue 来缓解这个问题。只是这个数据结构和其他 channel 没法太好的搭配使用。

绝大部分时间,生产端的产生速度是很缓慢的,所以为了偶尔的波峰为 channel 分配巨量的缓冲,感觉比较浪费。

如 1 楼所说,加大缓冲,如果消费能力不足的情况下最好在 channel 和消费者中间加个持久化队列,如 kafka ,如果对延迟要求不是很高可以直接把 channel 换成 kafka

可以借鉴下拥塞算法

你可以看看 nsq 的代码,用 select ,当 channel 满了就写到文件里

使用 channel 的 channel a , a 不满的时候生产者定义一个长度为 1 的子 channel b ,往 b 中放一个元素,然后放到 a ; b 满的时候,生产者定义一个长度比较大(需要估计一下峰值大概多大)的子 channel c ,然后数据放入 c ,直接 a 不满,把 c 放入 a 。

不过 channel 都会有一个问题,进程挂了就啥都没有了,使用 kafka 可以防止数据丢失。

生产者写入 channel 时候能不能判断下, 如果快满了就再创建一个更大的 channel 替换旧的

gocrawl(开源爬虫类库)的一种方案
https://github.com/PuerkitoBio/gocrawl/blob/master/popchannel.go

<br>type popChannel chan []*URLContext<br>// The stack function ensures the specified URLs are added to the pop channel<br>// with minimal blocking (since the channel is stacked, it is virtually equivalent<br>// to an infinitely buffered channel).<br>func (pc popChannel) stack(cmd ...*URLContext) {<br> toStack := cmd<br> for {<br> select {<br> case pc &lt;- toStack:<br> return<br> case old := &lt;-pc:<br> // Content of the channel got emptied and is now in old, so append whatever<br> // is in toStack to it, so that it can either be inserted in the channel,<br> // or appended to some other content that got through in the meantime.<br> toStack = append(old, toStack...)<br> }<br> }<br>}<br>

如果我理解的没问题的话,这个方案是无法保证同一个生产者产生的 cmd 被有序的消费。

对,是不能保证有序的。

在你的 unbounded queue 前后接上两个 channel ,然后用两个 worker 搬数据,伪装成 unbounded channel 。

好比一条 TCP 连接达到最大吞吐,你再塞就没有意义。
建议考虑下 ZeroMQ, nanomsg 等,重新定义模型。
纯 golang 的 nanomsg : https://github.com/go-mangos/mangos

在Go语言中,channel是用于在不同的goroutine之间进行通信的一种机制。它提供了一种安全的方式来传递数据,避免竞态条件的发生。针对你提到的关于channel的疑问,这里有几个常见的点可能有助于解答:

  1. channel的阻塞行为:向一个无缓冲的channel发送数据时,如果接收方还没有准备好接收,发送方将会阻塞,直到接收方准备好。同样,从一个无缓冲的channel接收数据时,如果发送方还没有准备好发送数据,接收方也会阻塞。

  2. 缓冲channel:可以创建带有缓冲区的channel,这样发送方可以在缓冲区未满时发送数据而不会立即阻塞,接收方也可以在缓冲区不为空时接收数据而不会阻塞。

  3. 关闭channel:关闭一个channel表明没有更多的值会被发送到channel。接收方可以通过检测语法来判断channel是否已关闭,格式为v, ok := <-ch

  4. channel的类型:channel是有类型的,例如chan int表示一个可以传递整数的channel。

  5. 多goroutine通信:channel是goroutine间通信的桥梁,通过channel,多个goroutine可以协同工作,实现复杂的并发逻辑。

如果你有更具体的问题,比如关于channel的某些特定行为或错误处理,请提供更详细的描述,以便我能给出更精确的解答。希望这些信息能帮到你!

回到顶部