Golang Go语言中求并发限制的最佳实现

发布于 1周前 作者 nodeper 来自 Go语言

Golang Go语言中求并发限制的最佳实现

情况如下: 有 N 个任务,每个任务执行完都会返回结果或者 error,通过固定的(M)协程去执行,如果其中有一个任务返回 error 时立即结束,否则全部执行完成时返回结果列表。

我自己写了一版,感觉有点复杂:https://play.golang.org/p/ono1S04XupK

不知道各位有没有什么更简单的实现。


更多关于Golang Go语言中求并发限制的最佳实现的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html

71 回复

context 不就可以实现吗

更多关于Golang Go语言中求并发限制的最佳实现的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


errgroup 了解一下

一起探讨,如果 M 不是大到离谱,是否需要控制 M ?还是无脑 go 就好了,因为这样感觉就是 go 程池 /线程池了,有点像把 go 程当线程用了?

不是 channel 就可以实现了吗?

context 实现不了等待任务全部执行完成吧

现在就是 M 如果太大了服务器会顶不住,比如并发查询 sql

errgroup 我试了下,第一不能限制并发数,第二不能在发生错误时立即返回。

借楼问一下,我怎么不能发帖了。。

我也不能发了

context 配合 waitGRoup,就可以做到失败一个全干掉,全完成,返回

一个 err chan 用来通知 root goroutine 发生错误,然后 root context 退出; wait group 用来做全部完成检查。
这样何如?

哦还有并发控制。
taskChannel + workerChannel,这样 wait group 也不需要了

线程池+channel,收到错误消息停止向队列里发送消息。
想要立刻回收,主线程直接 stop 线程池里所有线程,否则等待运行中线程跑完


直接上代码吧,这样说不明白🤣

errgroup + 1

并发控制:协程池
消息传递:channel
还有等待协程结束再结束 main 函数:sync.
甚至可以子任务报错直接 exit

那你用 errgroup 把,go 官方标准库中的,看文档直接用,就是 context 和 waitGroup 结合的

看不到代码。。。我的思路是,for 循环几个 goroutine,select 的时候,先从 errchannel 里看 err,然后 default 里从 taskchannel 里拿 task

go<br> var wg sync.WaitGroup<br> pool := makePool(pollSize)<br> func() {<br> for i := 0; i &lt; runTimes; i++ {<br> select {<br> case &lt;-errChan:<br> fmt.Println("error")<br> pool.Stop()<br> return<br> default:<br> wg.Add(1)<br> poll.Submit(task)<br> }<br> }<br> }()<br> if pool.IsRunning() <br> wg.Wait()<br>
我觉得核心就是这样一个结构,不过 go 没用过线程池,可以看看有什么库

一个比较简单的实现
https://play.golang.org/p/MDU_x7C2npI

如果有一个 routine 有 error,ctx 会 Done
如果 ctx 已经 Done 了,semaphore 会 error

package main

import (
“fmt”
“net/http”
)

/有 N 个任务,每个任务都会返回结果或者 error,通过固定的并发数(M)去执行。
如果其中有一个任务返回 error 时立即结束,否则全部执行完成时返回结果列表
/
func main() {

n := 10
m := 5
result := make([]string, n)
limitCh := make(chan interface{}, m)
errCh := make(chan error)
doneCh := make(chan interface{},1)

defer func() {
close(limitCh)
close(errCh)
}()

for state, i := true, 0; i < n; i++ {
state = true
for state {
select {
case limitCh <- nil:
fmt.Printf(“开始第%d 个任务\n”, i)
go func(i int) {
var err error
defer func() {
if i == n-1 {
close(doneCh)
}
if err != nil {
errCh <- err
}
<-limitCh
}()
ret, err := doTask()
if err != nil {
return
}
result[i] = ret
}(i)
state = false
case <-errCh:
return
default:
}
}
}
<- doneCh
fmt.Println(result)

}

func doTask() (string, error) {
// 模拟执行任务
resp, err := http.Get(“https://www.baidu.com”)
if err != nil {
return “”, err
}
defer resp.Body.Close()
return resp.Status, nil
}

#22 这个做不到有一个 task 发生 error 立即结束,例如: https://play.golang.org/p/sqlMbgW7z9Z

#22 比如第一个任务执行已经失败了,需要立即返回,而不是等到所有任务执行完

#23 这个好像也有点问题哦,就是判断任务全部执行完成的地方

<br>if i == n-1 {<br> close(doneCh)<br>}<br>
这里判断最后一个任务执行完成就结束,但是可能会存在还有正在执行的任务并且比最后一个任务执行还慢,就不对了。

你仔细看一下打出来的内容,第一批开始执行的任务里没有 1 ( 1 没有抢到信号量)。当 1 抢到信号量开始执行后整个程序都结束了(其他的任务都取消了,不然如果 i 那个循环大的话整个程序会运行很久)

  1. 声明一个 buffered chan 当做 goroutine 池子 。
    2. 启动 goroutine 的时候传入 context 用做取消(前提是你的 goroutine 任务可以取消)。

兄弟,errgroup 就是为了你这个场景设计的。不要重复造轮子了

说只要一个错误不能马上中断返回的,自己好好审计一下 errgroup

#27 错误发生时不会立刻结束,而是会等正在执行的任务全部完成才返回,你可以跑这个试试: https://play.golang.org/p/66Me2TYbVoK

错误发生了也要等 5 秒才结束。

#30 老哥贴个代码我跑一下看看,我自己测的 errgroup 是不能发生错误立即中断的

是滴 用 waitgroup 大家说的都对 其实你把原来的封装下 搞个协程池 代码就清晰了

我觉得这是个很好的面试题,既有实际意义也考验基本功,大家可以试试不用三方库实现一下~
talk is cheap, show me the code
一起娱乐娱乐,新年快乐~!
1. 控制并发
2. 等待所有任务返回
3. 一个任务错误,立刻结束

如果不解耦,并发控制和结果处理的逻辑混杂确实屎

go 实现的时候想当然了,只以最后提交的任务判断是否结束

kotlin 协程实现的时候发现 io,计算任务无法退出,必须要手动捕捉中止信号

java 须要手动捕捉中止信号 但是可以通过 thread.stop()强制停止,另外判断线程是否异常退出较难



最开始的版本里注释有写 DoWork 里的操作需要支持 context cancellation (比如如果操作是 http.Get 的话,可以使用 http.NewRequestWithContext)

这个里面实现了一个如果调用的操作不支持 context cancellation 的情况
https://play.golang.org/p/Cot1FYgIKLd

#34 哈哈,搞不好会加入大厂面试题库

#35 实际上很多 work 是不支持 cancel 的,而且也不一定要 cancel 掉,只要不阻塞主协程就行了,发送错误的时候主协程继续执行,其它正在执行的任务让它继续跑。

#35 不好意思前面没看仔细,这个确实可以,赞一个!

难道 slice+context.Context+errgroup 的组合不行?

1.分配 M 容量的 slice 。放 M 个 go 程正常运行的结果。每个 go 程都有自己的 index,所以存结果这块都不要加锁,相当舒服的操作。

2.context 当作异常终端点。每个 go 程都持有这个 context 变量。任意一个 go 程错误,cancel 。任意 go 程检查 ctx.Done()。所谓 “如果其中有一个任务返回 error 时立即结束” 完美实现。

3.检查 errgroup 的返回,err != nil,就返回错误,else 部分没有错误,返回第 1 步声明的 slice

分解问题:
1. 通过固定的 M 个携程执行。
解决方案:信号量或者条件变量或者 channel,很多种方法实现。

2. 只要出现一个任务返回 Error,就立即结束全部任务。
解决方案:C#的 TAP 的 Task.WhenAny 和 CancellationToken 模式可以很好的解决这些问题。
那么只需要用 go 来实现 Task.WhenAny 和 CancellationToken 模式即可。

CancellationToken 很好实现,用 struct {IsCancellationRequired: bool}就可以实现。
Task.WhenAny 可以用如下方式实现:有一个 chan ch,每个任务完成后,往 ch 里面写一个数字(或者写入任务信息),同时主线程阻塞读取 ch 。

#40 应该行的,但是对第一点有点疑问,用 slice 怎么实现 M 个协程的限制呢

//放结果伪代码
// 每个 go 程的 id 已经固定下来,就是 for 循环启动的 index.大家操作自己的私有 index 。为啥会有竞争?
for i :=0;i < M;i++ {
i:=i
go func(){
slice[i] = result
}
}

发生错误不能立即返回的,需要等待其它协程结束才行,不然协程泄露了。

所有在并发里要求错误立即中断所有并发返回的,显然是脑抽抽

当其中有一个协程有错误,另一个协程在密集计算,应该是没法打断直接返回的吧

楼上说的没错,协程好像只能自己结束

N 个的容量的 success channel.
再来一个 err channel

然后主线程那里 select 这两个 channel 做事情就可以啦.

----------------------
至于你纠结 err 之后协程能不能关闭, 那个不是你关心的事情了. 可以考虑传递一个 context 给 request, err 发生错误的时候进行 cancel context 即可.

https://play.golang.org/p/YYvynelzIHj
感兴趣写了下,如果想出现错误后中断其他 goroutine 的处理,其他 goroutine 必须可以被 cancel 掉。

errgroup 可以实现出错时结束流程,前提是你的代码实现了 cancel 逻辑

并发限制用协程池,退出机制需要自己实现。

如果懒得实现,就直接摘出来,用系统的实现。main 启动之后开一个协程监听退出信号,收到退出信号之后直接 os.Exit()

其实还是应该自己在代码中实现出来,那个操作只能玩玩。看看源码,用 runtime.Goexit() 来终止协程。

errgroup context 都是用来传递消息的,并不是来做终止的。你用这两个来做的话,只能每进行一步就检查一下是否有退出信号,不然就做不到及时退出。

#49 你好,请教一下,在 61 行的这一部分,为什么两个 channel 之间互相要写数据?
<br> select {<br> case err := &lt;-errCh:<br> handleErr(err, result[1:])<br> &lt;-done<br> case &lt;-done:<br> if len(errCh) &gt; 0 {<br> err := &lt;-errCh<br> handleErr(err, result[1:])<br> return<br> }<br> fmt.Println("success handle all task:", result[1:])<br> }<br>

改成这样是否可以?
<br> select {<br> case err := &lt;-errCh:<br> handleErr(err, result[1:])<br> case &lt;-done:<br> fmt.Println("success handle all task:", result[1:])<br> }<br>

#52

因为他代码中 errCh 和 done 作用是一样的,一个 chan 就可以了,检查 chan 中的内容是出错退出的还是最终完成退出就行了。

没有相互写数据,是为了一些健壮性考虑吧。第一个 case 如果从 errCh 中读到 err 了,<-done 表示等待其他 goroutine 也都结束,是怕有 goroutine 泄露。第二个 case 是考虑可能有极小概率 errCh 和 done 同时有数据,select 随机选择了 done channel,所以最终再判断下 errCh 是否有 err 。

#49 目前 35L 这种应该是最优雅的实现,我们用纯标准库实现的还是太复杂了哈哈

#54 “相互写数据”这个表述不太严谨。不过你说的有道理,从健壮性来说,确实需要考虑这些极端情况。


#55 个人觉得 35L 的写法确实比较优雅,但是从效率来说,还是 49L 的写法更好一些,因为 49L 的协程数量是 M 个,而 35L 的协程数量是 N 个,当 N>>M 时,虽然 golang 的协程足够轻量,但是也没必要这么浪费。

#56 35L 协程数量其实是 N 个,用信号量做了控制的

#57 说出了,是 M 个

#57 协程有 N 个,只不过通过信号量保证了其中只有 M 个在跑,其他的协程虽然在等待,也需要消耗少量资源

我之前研究过 go 的流控怎么写,感觉还是流控好,流控还可以控制几秒才做一个任务,单纯控制并发感觉不如流控。

#59 额,确实是 N 个协程,不过稍微改下就行了,把信号量控制放在循环里面
https://play.golang.org/p/SP7a8MaDd8B

感觉不需要写太多代码,直接用 channel 控制就好了
https://play.golang.org/p/JiD2EopqPkL

#62 这种思路好像也不错,不过如果要加上参数传递和结果、错误返回也还是挺复杂的

#62 可以尝试下用这种思路实现下这里的 run 方法,https://play.golang.org/p/Be7vNF4JH4-

嗯嗯,真要项目搞,还是用一些开源包好点

控制并发直接用 Semaphore 就行了,至于遇到错误怎么退出 goroutine 应该是你的业务代码自己实现,不同业务场景需要有不同的处理。

errgroup.withcontext 楼主是不是没学过 go 呀? 这样只要一个 err,context 就会取消,全部都返回了

把 channel 关了就可以

go-zero 下有个 mr 包解决这种场景,遇到 error 可以 cancel 所有任务

各位问一下,你们是怎样看 play.golang.org/p/xxxxxx 代码的?难道要 pa 强??

在Golang中,实现并发限制的最佳实践通常是使用Go的内置工具,如goroutines和通道(channels),以及结合使用sync.WaitGroupcontext包来管理并发任务的生命周期。

一个常见的模式是使用带缓冲区的通道作为信号量来控制同时运行的goroutine数量。你可以创建一个带N个缓冲区的通道,其中N是你希望同时运行的最大goroutine数量。每个goroutine在开始执行前,会从通道中获取一个空位(即发送一个空值到通道),执行完毕后释放这个空位(即从通道中接收一个值)。

示例代码如下:

func main() {
    maxWorkers := 5 // 并发限制
    jobs := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    sem := make(chan struct{}, maxWorkers)
    var wg sync.WaitGroup

    for _, job := range jobs {
        wg.Add(1)
        sem <- struct{}{} // 获取一个空位
        go func(j int) {
            defer wg.Done()
            defer func() { <-sem }() // 释放空位
            // 执行工作
            fmt.Println("Processing job", j)
        }(job)
    }

    wg.Wait()
}

这种方法简单且高效,能有效控制并发量,防止因创建过多goroutine而导致的资源耗尽问题。同时,结合sync.WaitGroup可以确保所有任务完成后程序才退出,非常适合处理大量并发任务但希望限制并发数的场景。

回到顶部