Golang Go语言中的协程池本应该是这样的

看过了一下 star 比较高的协程池实现,还有字节开源的实现,完全是 java/c++之类的外行实现思路

协程/线程池,最基本的元件 就是 队列 + 协程/线程,M:N 模型

这两个组件在 go 里边天生就有啊,为什么再搞一套 task queue 呢?

控制队列容量:make(chan, cap) 第二参数就可以

想要控制协程/线程数量,再辅助一个 chan 就可以了,

代码实现如下,100 行搞定:

我把它放到 github 上 gopool 喜欢的老铁可以给个 star

// GoPool is a minimalistic goroutine pool that provides a pure Go implementation
type GoPool struct {
	noCopy
queueLen atomic.Int32
doTaskN  atomic.Int32
workerN  atomic.Int32
options  Options

workerSem chan struct{}
queue     chan func()

}

// NewGoPool provite fixed number of goroutines, reusable. M:N model // // M: the number of reusable goroutines, // N: the capacity for asynchronous task queue. func NewGoPool(opts …Option) *GoPool { opt := setOptions(opts…) if opt.minWorkers <= 0 { panic(“GoPool: min workers <= 0”) } if opt.minWorkers > opt.maxWorkers { panic(“GoPool: min workers > max workers”) } p := &GoPool{ options: opt, workerSem: make(chan struct{}, opt.maxWorkers), queue: make(chan func(), opt.queueCap), } for i := int32(0); i < p.options.minWorkers; i++ { // pre spawn p.workerSem <- struct{}{} go p.worker(func() {}) } go p.shrink() return p }

// QueueFree returns (capacity of task-queue - length of task-queue) func (p *GoPool) QueueFree() int { return int(p.options.queueCap - p.queueLen.Load()) }

// Workers returns current the number of workers func (p *GoPool) Workers() int { return int(p.workerN.Load()) }

// Go submits a task to this pool. func (p *GoPool) Go(task func()) { if task == nil { panic(“GoPool: Go task is nil”) } select { case p.queue <- task: p.queueLen.Add(1) case p.workerSem <- struct{}{}: go p.worker(task) } }

func (p *GoPool) worker(task func()) { p.workerN.Add(1) defer func() { <-p.workerSem p.workerN.Add(-1) if e := recover(); e != nil { if p.options.panicHandler != nil { p.options.panicHandler(e) } } }()

for {
	task()
	task = &lt;-p.queue
	if task == nil {
		break
	}
	p.doTaskN.Add(1)
	p.queueLen.Add(-1)
}

} func (p *GoPool) shrink() { ticker := time.NewTicker(p.options.shrinkPeriod) defer ticker.Stop()

for {
	select {
	case &lt;-ticker.C:
		doTaskN := p.doTaskN.Load()
		p.doTaskN.Store(0)
		if doTaskN &lt; p.options.tasksBelowN {
			closeN := p.workerN.Load() - p.options.minWorkers
			for closeN &gt; 0 {
				p.queue &lt;- nil
				closeN--
			}
		}
	}
}

}


Golang Go语言中的协程池本应该是这样的

更多关于Golang Go语言中的协程池本应该是这样的的实战教程也可以访问 https://www.itying.com/category-94-b0.html

28 回复

好好好

更多关于Golang Go语言中的协程池本应该是这样的的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


应该是这样的:

go<br>type channel chan struct{}<br><br>func (c channel) add() { c &lt;- struct{}{} }<br><br>func (c channel) done() { &lt;-c }<br><br>func (c channel) Go(f func()) {<br> c.add()<br> go func() {<br> f()<br> c.done()<br> }()<br>}<br>

都协程了还要池化?

并发太大时,代码没写好的话会造成协程数激增,池化能控制数量,而且性能也能得到提升

看上去好像也行,OP 代码这么多是完善了哪些方面?

这不算多吧,这已经算是最精简的了吧,我就是因为看到别人实现的太复杂了,所以写了一个,功能就是可以定时收缩,加了几个计数而已

抱歉回复错了

最精简的版本,没有之一

XY 问题.

都协程了就没必要池化, 协程模型需要的是速率控制. 用令牌桶算法 10 行代码就能解决问题. 谈起协程就自动类比为进程, 然后想当然认为应该有一个"协程池", “完全是 java/c++之类的外行实现思路”

之前看了有的框架有协程池化的组件,看里面的测试结果,除了内存占用优势,性能上没任何优势。

当队列满的时候,Go() 会阻塞调用线程,而不是加到任务队列里立刻返回,后面空闲了再由 worker 线程去执行。

一个 semaphore 就能解决的问题,不需要写这么多代码

是的,不会有肉眼可见的性能提升,至于协程复用啥的只是减少一些内存分配而已,主要还是控制数量

有 sem 不也得有个 job 队列吗

只是一个小品,总有它存在的应用场景。

国内大厂自己轮的东西基本都是性能驱动。如果不是为了解决他们自己生产环境里遇见的性能问题,应该不会搞这么个东西出来。

学过 Go 的都能写出这个代码,,但是并不一定能写出那些复杂的 pool 。
你这个本质上是一个并发控制而已,最基本的优雅关闭也没处理。

是的,自己造的轮子肯定是为了解决自己生产环境的问题,不用考虑通用性

也不一定,有些人的实现思路就够 gopher ,自己实现一套条件变量 + queue ,我是觉得没必要,go 天生就带这些东西。另外,你说的优雅关闭 是指关闭 pool 吗?我是觉得没必要,本来 go pool 就不是很必须,再加上一个动态的 pool 就更没必要了

代码就应该简单直接

不懂就问,也没改 runtime 复用什么东西,就单纯控制数量也叫池了吗?

所有的池化技术,无论是 mysql redis 还是 http 的池化技术核心都是复用连接再然后是控制连接的数量不至于把 server 的连接资源都耗尽,私以为你这顶多算是控制并发。但是如果只需要控制并发你这未免也太繁琐了

是的用个计数器也可以达到效果,但这样不就是更通用一点嘛,

官方池化实现不给出来了么,sync.Pool ,如果用不到的,那就说明压根不需要池化(比如协程)

他这个代码控制了协程数量,也复用了协程,shrink 这个函数就是发消息结束超时生存的协程的,worker 里面没任务会阻塞等待任务来,直到 shrink 发了结束信号退出。

为什么要控制协程数量?即使要限制也是限制请求并发

协程池与最大并发数控制傻傻分不清?

在Go语言中,协程池(Goroutine Pool)的设计旨在高效地管理和复用协程,以提高并发程序的性能和资源利用率。理论上,一个理想的协程池应该具备以下几个关键特性:

  1. 动态调整:协程池应能根据任务负载动态调整其大小,以避免资源闲置或过载。这通常涉及到工作协程的创建和销毁策略。

  2. 任务队列:为了平衡负载,任务应该被放入一个队列中,由空闲的协程从队列中取出并执行。队列的设计需要考虑并发安全性和性能。

  3. 复用机制:协程在完成一个任务后,不应立即销毁,而应被重新放入池中等待新的任务,以减少创建和销毁协程的开销。

  4. 优雅关闭:协程池应支持优雅关闭,即在关闭过程中能够处理完所有正在执行的任务,避免资源泄漏或任务中断。

  5. 监控与调试:为了便于调试和性能优化,协程池应提供必要的监控接口,如当前协程数量、任务队列长度等。

在实际实现中,可以利用Go语言的channel、sync包等内置并发原语来构建协程池。同时,要注意避免过度复杂的逻辑导致性能下降或引入新的bug。此外,还可以参考一些开源的协程池实现,如ants等,以获取更多的实践经验和最佳实践。

总之,设计和实现一个高效的协程池需要综合考虑多个方面,以达到最佳的性能和资源利用率。

回到顶部