Golang Go语言中多个协程池如何优雅退出

发布于 1周前 作者 sinazl 来自 Go语言

Golang Go语言中多个协程池如何优雅退出

假设有三个协程池(A,B,C), 三个分别处理不一样的业务且每个协程池中的 worker 数量不一致

当前数据流向为 A => B => C,任务在任一协程池中都有出现 err 导致该任务跳过的可能

在如下两种情况下:

  • 进入 A 协程 100 个任务,任务不会继续向下拆分,最后从 C 出来最多也就是 100 个
  • 进入 A 协程 100 个任务,后续每个协程池中都会继续拆分任务,比如转到 B 就是 500 个,继续转到 C 就是 1W 个

最后在主程序中,针对上述两种情况,有没有优雅的办法知道任务已经全部完成且让主程序退出


更多关于Golang Go语言中多个协程池如何优雅退出的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html

24 回复

怎么听着像是 WaitGroup 该干的事儿

更多关于Golang Go语言中多个协程池如何优雅退出的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


如果用 sync.WaitGroup 只适合情况 1,且需要在 err 出现的时候考虑到如何 done 好像挺恶心的

激进:在每个任务的事件循环,每一轮循环都检测一下某个标识退出的全局变量是不是该退出了。如果没有循环那就每个阻塞操作都设超时,超时或完成时检测这个标识。
保守:关掉 A 的入口,然后当协程池工作线程 = 0 时计数 +1,计数等于协程池数量时退。

如果你要 Go 风格的话,起手一个全局 hash 表(带锁),里面 K 是每个 context 的指针。

激进:锁表,每个 context 都发 cancel,然后退出。
保守:关掉 A 的入口,轮训这个 hash 表,内容物为 0 时退出。

另外两个奇葩方案。

不想思考:关掉 A 的入口,sleep 一个所有任务超时的事件,退出。
做 CDN 的:用 dlopen 打开备用的 .so ,切工作函数。按照激进策略等旧的 .so 不再运行,释放旧 .so 然后移动新的 .so 。

谢谢回复 这个包可以比较好的处理情况 1 中 err 的部分,但是无法处理情况 2

谢谢你提供的思路,但是对你说的内容有些疑问
> 然后当协程池工作线程 = 0 时计数 +1

如何知道某个协程中任务已经空了?

#5 2 这种有拆分的应该是用 context 吧。。。

看不太懂,为什么不让 MQ 做消息转接呢?

Context + WaitGroup 不是可以实现么,向下传递 Context 并附加 WaitGroup,每级往 WaitGroup 里添加真实的需要采集的数量并在出错的时候 defer wg.Done 并添加到 Context 中的 ErrArray,或者 errgroup,外层 wait 就好了,如果想让程序提前终止,就可以用带 cancel 的 context 从最外层一关,里面的所有任务就都会收到通知了

context 的只是起到超时或者从上到下关闭作用,并不能感知到任务是否完成吧

倒是全局使用一个 waitgroup 可以解决,只是全局变量到处飞略微有些蛋疼。。。

难道不是

wg.Add(1)
pool.Go(func(){
defer wg.Done()

})

吗?

是倒是是 就是 pool 会有多个

你只要是整体的别落下 add done,每个 Go 是一个,不管 Go 的 func 里有没有失败跳过,只要 defer done 了就能确保 wg.wait 正常结束,所以,好像不应该有这个困扰

应该是想要 completefuture 那种?

但是这样其实也有一个问题,就是你上面的代码示例中,协程数量是不固定的,那要是固定的协程数量怎么写比较优雅一点?

#10 context 是上下文啊,用来把 waitgroup 传递下去的,让不管多下层都能用同一个 waitgroup 并且能通过 context 感知上层要主动关闭 比如超时,而上层也可以通过 waitgroup 来感知下层任务是否已经完成,不管你下层开多少个协程,你每一层都是知道数量的,添加进去就好了

协程池本身就应该自带控制协程数量的属性,否则协程池还不如直接 go 。你看我上面写的也是 pool.Go

#17 只要你生产速度大于协程池消费速度,一样能充分利用这些数量的协程并发。最简单的实现,一个带容量的 chan,生产者往 chan 里写,多个协程去读,当前协程都忙、就被 chan 缓冲了、发送数量大于协程数量和 chan size 生产者就阻塞,这些细节看你怎么设计,姿势太多了,我这个库里就有好几种定制的,以前有些特殊的 hash 和时序需要所以没用其他三方的:
https://github.com/lesismal/nbio/tree/master/taskpool

其他第三方的也很多

给协程加 id,有任务就塞 map,ABC 三个 map 都空了不就表示任务干完了。。

谢谢你们的回复和思路

和四楼的一个意思 单还是谢谢回复:)

先说结论:要在 main 中感知任务的完成状态可以通过 chan,控制数据流转也是用 chan,如果有超时机制或者每个任务属性之类数据需要用到 context 。

1 )步骤大约是
main 函数中创建三个用于接受结果的 A B C chan,大小为 A 任务个数,和一个 notify chan 用于完成通知,再来一个通知主程序关闭的 doneChan

起一个 goruntine 去 for notify chan
go func() {
for a := range nChan {
if (ACount == BCount == CCount == 100) {
doneChan <- struct{}
}

}
}


起三个 goruntine 去 for 分别处理 A B C chan,里面写处理逻辑,大约是这样子
go func(nChan chan Notify) {
for a := range AChan {
// dosomething

if ok {
bChan <- weiboEntryID
} else {
nChan <- weiboEntryID
}
}
close(bChan)
}(nChan)

go func(chan Notify) {
for b := range BChan {
// dosomething

if ok {
cChan <- commentID
} else {
nChan <- commentID
}
}
close(cChan)
}(nChan)

go func(chan Notify) {
for c := range CChan {
// dosomething

if ok {
cChan <- commentUserID
} else {
nChan <- commentUserID
}
}
}(nChan)

这时候就 main 中可以往 AChan 里写数据了,写完直接 close AChan,然后直接用 doneChan 阻塞

for userID := range userIDs {
AChan <- userID
}
close(aChan)
<-doneChan


大致流程就是这样,需要注意的是需要正确关闭 ABC chan,就是在发送完成后关闭,nChan 用于任务信息回收还可以用于任务回放,另外为了保证每个任务都会返回,需要弄一个 timeout context 超时当作失败处理

2 )第二种情况,由于任务个数是未知的,上面的 100 就不能用了,有两种方式可以解决,一是可以预先知道每次数量,用另外的一个 chan 再任务开始时候传给发送 doneChan 那个 goruntine,把 100 改成最后总数即可。二是未知任务数量,这时候只能再来一个 chan,在做完 ABC 后发送一个 aDone 、bDone 、cDone,给发送 doneChan 那个 goruntine,当确认三个都完成后,就可以发送 done 了。

------

code 硬敲的可能很多不对,变量和具体结构可酌情改变 :-)

谢谢回复 我还是觉得上面说的由协程池中的任务数量来判断是否已经完成简单一点

在Golang中管理多个协程池并优雅地退出是一个常见的挑战,但可以通过一些设计模式和技巧来实现。以下是一些建议:

  1. 使用上下文(context)管理生命周期: 创建一个全局或局部的context.Context,并将其传递给所有协程。在需要优雅退出时,通过取消该上下文来通知所有协程。协程应定期检查上下文是否已被取消,并据此进行清理和退出。

  2. 协程池实现: 使用如sync.WaitGroup来跟踪协程的完成情况。在退出时,等待所有协程完成或超时后强制关闭。同时,可以使用一个信号通道(channel)来协调协程池的关闭请求。

  3. 优雅的关闭逻辑: 设计协程时,确保它们能够响应中断信号(如上下文取消、通道关闭等),并能在接收到信号后进行必要的资源清理(如关闭文件、网络连接等)。

  4. 监控与日志: 为协程池添加监控和日志功能,以便在退出过程中跟踪协程的状态和发现潜在问题。

  5. 测试与验证: 编写单元测试或集成测试,模拟协程池的优雅退出场景,确保在实际部署前验证退出逻辑的正确性。

通过上述方法,可以构建一个能够优雅退出的多协程池系统。关键在于协程之间的协作、上下文的使用以及适当的资源管理和监控。这不仅提高了系统的稳定性,还确保了资源在退出时的正确释放。

回到顶部