Golang协程池实现 - 简单泛用版
Golang协程池实现 - 简单泛用版 GitHub 链接:https://github.com/txaty/gool
市面上已经有很多优秀的 Goroutine 池实现,我非常欣赏它们的设计。
在此,我分享我自己的实现。几个月前,我为并行执行 Merkle Tree 库 而创建了这个库。
尽管 Goroutine 的开销显著低于常规线程或进程,但通过池化和复用 Goroutine,仍然有助于提升程序性能(并且确实对 Merkle Tree 库的性能有所帮助)。
我的 Goroutine 池的方法类似于 Python 的 ThreadPoolExecutor:
Submit:提交一个任务并返回结果(如果有)。AsyncSubmit:提交一个任务并返回结果的 Future(如果有)。Future 是结果通道。Map:提交一批任务并按顺序返回结果(如果有)。AsyncMap:提交一批任务并返回结果的 Futures(如果有)。Futures 是结果通道。
使用它时,你需要定义以下内容:
- 处理函数:handler
func(A) R,以及 - 参数:
arg A。
其中类型 A 和 R 可以是任意的泛型类型。
创建新池时,你可以指定工作协程的数量 numWorkers 和任务队列的容量 cap。
欢迎大家查看并提出宝贵意见。谢谢!
更多关于Golang协程池实现 - 简单泛用版的实战教程也可以访问 https://www.itying.com/category-94-b0.html
非常感谢您的详细评论!
干得漂亮 👍
你好 @tommytim0515,
你的库非常不错,对许多人来说可能是一个有价值的工具。但是,它也可以从以下改进中受益:
- 在 README 中添加示例,说明如何使用它
- 为池和工作器添加单元测试(每个公共方法都应该有一个单元测试)
- 添加一个包含使用该库的示例应用程序的
examples文件夹 - 添加包注释
- 添加方法注释
- 在 README 中添加更多信息(如何测试、有哪些限制、基准测试)
- 添加基准测试
- 添加优雅关闭功能
- 添加一种方式来配置池的“任务在被取消前允许运行的最长时间”;如果你的任务耗时过长,它们会占用工作器并导致所有阻塞
- 测试其极限,添加一个图表,显示一个轴上是 CPU 数量,另一个轴上是(对于相同的模拟任务)能够运行而不阻塞或崩溃的最大任务数
- 添加冒烟测试(这在一个单独的文件中)
- 添加一个包含向用户公开的方法的接口,以及一个创建实现该接口的对象的方法
- 添加 linter
- 为其添加 CI 流水线配置
- 添加一个 Dockerfile 来构建其镜像,并将该镜像上传到注册表
- 修复错误(可以就此询问 chatGPT)
- 添加更好的并发支持
这是一个设计良好的协程池实现,特别是对泛型的运用让类型安全得到了保障。以下是一个使用示例:
package main
import (
"fmt"
"time"
"github.com/txaty/gool"
)
func main() {
// 创建协程池,3个工作协程,任务队列容量10
pool := gool.NewPool[int, string](3, 10)
defer pool.Close() // 确保资源释放
// 定义处理函数
handler := func(num int) string {
time.Sleep(100 * time.Millisecond)
return fmt.Sprintf("处理结果: %d", num*2)
}
// 使用Submit同步提交任务
result, err := pool.Submit(handler, 5)
if err != nil {
panic(err)
}
fmt.Println(result) // 输出: 处理结果: 10
// 使用AsyncSubmit异步提交任务
future := pool.AsyncSubmit(handler, 8)
select {
case result := <-future:
fmt.Println(result) // 输出: 处理结果: 16
case <-time.After(200 * time.Millisecond):
fmt.Println("任务超时")
}
// 使用Map批量处理
tasks := []int{1, 2, 3, 4, 5}
results, err := pool.Map(handler, tasks)
if err != nil {
panic(err)
}
fmt.Println(results) // 输出: [处理结果: 2 处理结果: 4 处理结果: 6 处理结果: 8 处理结果: 10]
// 使用AsyncMap异步批量处理
futures := pool.AsyncMap(handler, tasks)
for _, future := range futures {
select {
case result := <-future:
fmt.Println(result)
case <-time.After(200 * time.Millisecond):
fmt.Println("批量任务超时")
}
}
}
对于需要错误处理的场景:
type Result struct {
Value string
Error error
}
func main() {
pool := gool.NewPool[int, Result](5, 20)
defer pool.Close()
handler := func(num int) Result {
if num < 0 {
return Result{Error: fmt.Errorf("负数无效: %d", num)}
}
return Result{Value: fmt.Sprintf("成功: %d", num)}
}
// 错误处理示例
result, err := pool.Submit(handler, -5)
if err != nil {
fmt.Println("提交错误:", err)
} else if result.Error != nil {
fmt.Println("处理错误:", result.Error)
} else {
fmt.Println("结果:", result.Value)
}
}
这个实现通过泛型提供了类型安全,四种任务提交方式覆盖了常见的使用场景。任务队列容量限制可以防止内存无限制增长,defer pool.Close() 确保协程正确回收。

