Golang协程池实现 - 简单泛用版

Golang协程池实现 - 简单泛用版 GitHub 链接:https://github.com/txaty/gool

市面上已经有很多优秀的 Goroutine 池实现,我非常欣赏它们的设计。

在此,我分享我自己的实现。几个月前,我为并行执行 Merkle Tree 库 而创建了这个库。

尽管 Goroutine 的开销显著低于常规线程或进程,但通过池化和复用 Goroutine,仍然有助于提升程序性能(并且确实对 Merkle Tree 库的性能有所帮助)。

我的 Goroutine 池的方法类似于 Python 的 ThreadPoolExecutor:

  • Submit:提交一个任务并返回结果(如果有)。
  • AsyncSubmit:提交一个任务并返回结果的 Future(如果有)。Future 是结果通道。
  • Map:提交一批任务并按顺序返回结果(如果有)。
  • AsyncMap:提交一批任务并返回结果的 Futures(如果有)。Futures 是结果通道。

使用它时,你需要定义以下内容:

  • 处理函数:handler func(A) R,以及
  • 参数:arg A

其中类型 AR 可以是任意的泛型类型。

创建新池时,你可以指定工作协程的数量 numWorkers 和任务队列的容量 cap

欢迎大家查看并提出宝贵意见。谢谢!


更多关于Golang协程池实现 - 简单泛用版的实战教程也可以访问 https://www.itying.com/category-94-b0.html

5 回复

谢谢!

更多关于Golang协程池实现 - 简单泛用版的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


非常感谢您的详细评论!

干得漂亮 👍

你好 @tommytim0515

你的库非常不错,对许多人来说可能是一个有价值的工具。但是,它也可以从以下改进中受益:

  • 在 README 中添加示例,说明如何使用它
  • 为池和工作器添加单元测试(每个公共方法都应该有一个单元测试)
  • 添加一个包含使用该库的示例应用程序的 examples 文件夹
  • 添加包注释
  • 添加方法注释
  • 在 README 中添加更多信息(如何测试、有哪些限制、基准测试)
  • 添加基准测试
  • 添加优雅关闭功能
  • 添加一种方式来配置池的“任务在被取消前允许运行的最长时间”;如果你的任务耗时过长,它们会占用工作器并导致所有阻塞
  • 测试其极限,添加一个图表,显示一个轴上是 CPU 数量,另一个轴上是(对于相同的模拟任务)能够运行而不阻塞或崩溃的最大任务数
  • 添加冒烟测试(这在一个单独的文件中)
  • 添加一个包含向用户公开的方法的接口,以及一个创建实现该接口的对象的方法
  • 添加 linter
  • 为其添加 CI 流水线配置
  • 添加一个 Dockerfile 来构建其镜像,并将该镜像上传到注册表
  • 修复错误(可以就此询问 chatGPT)
  • 添加更好的并发支持

这是一个设计良好的协程池实现,特别是对泛型的运用让类型安全得到了保障。以下是一个使用示例:

package main

import (
    "fmt"
    "time"
    "github.com/txaty/gool"
)

func main() {
    // 创建协程池,3个工作协程,任务队列容量10
    pool := gool.NewPool[int, string](3, 10)
    defer pool.Close() // 确保资源释放

    // 定义处理函数
    handler := func(num int) string {
        time.Sleep(100 * time.Millisecond)
        return fmt.Sprintf("处理结果: %d", num*2)
    }

    // 使用Submit同步提交任务
    result, err := pool.Submit(handler, 5)
    if err != nil {
        panic(err)
    }
    fmt.Println(result) // 输出: 处理结果: 10

    // 使用AsyncSubmit异步提交任务
    future := pool.AsyncSubmit(handler, 8)
    select {
    case result := <-future:
        fmt.Println(result) // 输出: 处理结果: 16
    case <-time.After(200 * time.Millisecond):
        fmt.Println("任务超时")
    }

    // 使用Map批量处理
    tasks := []int{1, 2, 3, 4, 5}
    results, err := pool.Map(handler, tasks)
    if err != nil {
        panic(err)
    }
    fmt.Println(results) // 输出: [处理结果: 2 处理结果: 4 处理结果: 6 处理结果: 8 处理结果: 10]

    // 使用AsyncMap异步批量处理
    futures := pool.AsyncMap(handler, tasks)
    for _, future := range futures {
        select {
        case result := <-future:
            fmt.Println(result)
        case <-time.After(200 * time.Millisecond):
            fmt.Println("批量任务超时")
        }
    }
}

对于需要错误处理的场景:

type Result struct {
    Value string
    Error error
}

func main() {
    pool := gool.NewPool[int, Result](5, 20)
    defer pool.Close()

    handler := func(num int) Result {
        if num < 0 {
            return Result{Error: fmt.Errorf("负数无效: %d", num)}
        }
        return Result{Value: fmt.Sprintf("成功: %d", num)}
    }

    // 错误处理示例
    result, err := pool.Submit(handler, -5)
    if err != nil {
        fmt.Println("提交错误:", err)
    } else if result.Error != nil {
        fmt.Println("处理错误:", result.Error)
    } else {
        fmt.Println("结果:", result.Value)
    }
}

这个实现通过泛型提供了类型安全,四种任务提交方式覆盖了常见的使用场景。任务队列容量限制可以防止内存无限制增长,defer pool.Close() 确保协程正确回收。

回到顶部