golang限制工作goroutine数量的替代errgroup插件库neilotoole/errgroup的使用
neilotoole/errgroup 的使用
neilotoole/errgroup
是 Go 标准库 sync/errgroup
的一个替代方案,主要特点是能够限制工作 goroutine 的数量。这对于与速率受限的 API、数据库等交互非常有用。
注意
sync/errgroup
包现在已经有 Group.SetLimit
方法,这使得 neilotoole/errgroup
不再必要。该库将不再维护,建议使用 sync/errgroup
替代。
概述
neilotoole/errgroup
本质上是一个带有 N 个 goroutine 工作池的 sync/errgroup
。其导出 API 完全相同,只是多了一个 WithContextN
函数,允许调用者指定最大 goroutine 数量 (numG
) 和工作队列通道的容量 (qSize
)。
使用示例
基本用法
package main
import (
"context"
"fmt"
"time"
"github.com/neilotoole/errgroup"
)
func main() {
g, ctx := errgroup.WithContext(context.Background())
// 启动多个工作goroutine
for i := 0; i < 10; i++ {
i := i // 创建局部变量副本
g.Go(func() error {
time.Sleep(time.Second) // 模拟工作
fmt.Printf("Task %d completed\n", i)
return nil
})
}
// 等待所有goroutine完成
if err := g.Wait(); err != nil {
fmt.Println("Error:", err)
}
}
自定义 goroutine 数量和队列大小
package main
import (
"context"
"fmt"
"time"
"github.com/neilotoole/errgroup"
)
func main() {
// 限制最多4个goroutine,队列大小为2
numG, qSize := 4, 2
g, ctx := errgroup.WithContextN(context.Background(), numG, qSize)
// 启动多个工作goroutine
for i := 0; i < 10; i++ {
i := i // 创建局部变量副本
g.Go(func() error {
time.Sleep(time.Second) // 模拟工作
fmt.Printf("Task %d completed\n", i)
return nil
})
}
// 等待所有goroutine完成
if err := g.Wait(); err != nil {
fmt.Println("Error:", err)
}
}
错误处理示例
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/neilotoole/errgroup"
)
func main() {
g, ctx := errgroup.WithContext(context.Background())
// 启动多个工作goroutine,其中一个会返回错误
for i := 0; i < 5; i++ {
i := i
g.Go(func() error {
time.Sleep(time.Second)
if i == 2 {
return errors.New("error in task 2")
}
fmt.Printf("Task %d completed\n", i)
return nil
})
}
// 等待所有goroutine完成
if err := g.Wait(); err != nil {
fmt.Println("Encountered error:", err)
}
}
性能考虑
基准测试表明,对于某些工作负载,neilotoole/errgroup
可能比 sync/errgroup
更高效。性能差异取决于任务复杂度和 goroutine 数量等参数。
设计说明
与 sync/errgroup
不同,当工作队列满时,neilotoole/errgroup
的 Go
方法会阻塞,直到有工作 goroutine 腾出空间。这是有意为之的设计选择。
更多关于golang限制工作goroutine数量的替代errgroup插件库neilotoole/errgroup的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang限制工作goroutine数量的替代errgroup插件库neilotoole/errgroup的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
使用neilotoole/errgroup限制Goroutine数量
neilotoole/errgroup
是标准库errgroup
的增强版本,提供了限制并发Goroutine数量的功能。下面我将详细介绍它的使用方法和示例代码。
基本特性
neilotoole/errgroup
在标准errgroup
基础上增加了:
- 限制最大并发Goroutine数量的能力
- 更灵活的上下文控制
- 额外的实用方法
安装
go get github.com/neilotoole/errgroup
基本使用示例
package main
import (
"context"
"fmt"
"time"
"github.com/neilotoole/errgroup"
)
func main() {
g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(3) // 限制最多3个goroutine并发执行
for i := 0; i < 10; i++ {
i := i // 创建局部变量副本
g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err() // 如果上下文取消,返回错误
default:
time.Sleep(time.Second)
fmt.Printf("Task %d completed\n", i)
return nil
}
})
}
if err := g.Wait(); err != nil {
fmt.Printf("Error occurred: %v\n", err)
} else {
fmt.Println("All tasks completed successfully")
}
}
高级功能示例
1. 带取消功能的限流
func processWithCancel() {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
g := errgroup.WithCancel(ctx)
g.SetLimit(2) // 限制2个并发
for i := 0; i < 5; i++ {
i := i
g.Go(func() error {
select {
case <-g.Done():
return nil // 上下文已取消,直接返回
default:
time.Sleep(2 * time.Second)
if i == 3 {
return fmt.Errorf("task %d failed", i)
}
fmt.Printf("Task %d done\n", i)
return nil
}
})
}
if err := g.Wait(); err != nil {
fmt.Println("Error:", err)
}
}
2. 收集所有错误
func collectAllErrors() {
g := errgroup.Group{}
g.SetLimit(4) // 限制4个并发
// 启用收集所有错误而不仅是第一个
g.CollectErrs(true)
for i := 0; i < 10; i++ {
i := i
g.Go(func() error {
if i%2 == 0 {
return fmt.Errorf("error in task %d", i)
}
fmt.Printf("Task %d succeeded\n", i)
return nil
})
}
if err := g.Wait(); err != nil {
if multiErr, ok := err.(errgroup.MultiError); ok {
fmt.Printf("Encountered %d errors:\n", len(multiErr))
for _, e := range multiErr {
fmt.Println("-", e)
}
} else {
fmt.Println("Error:", err)
}
}
}
3. 带初始化的Worker池
func workerPoolExample() {
const workerCount = 3
const taskCount = 10
g := errgroup.Group{}
g.SetLimit(workerCount)
results := make(chan int, taskCount)
for i := 0; i < taskCount; i++ {
i := i
g.Go(func() error {
// 模拟工作负载
time.Sleep(500 * time.Millisecond)
results <- i * 2
return nil
})
}
// 等待所有任务完成
go func() {
_ = g.Wait()
close(results)
}()
// 收集结果
for res := range results {
fmt.Println("Result:", res)
}
}
与标准库errgroup对比
特性 | 标准库errgroup | neilotoole/errgroup |
---|---|---|
限制并发数 | ❌ 不支持 | ✅ 支持 |
收集所有错误 | ❌ 不支持 | ✅ 支持 |
额外的上下文控制 | ❌ 有限 | ✅ 更灵活 |
Worker池模式 | ❌ 不支持 | ✅ 支持 |
最佳实践
- 合理设置并发限制数,通常与CPU核心数或外部服务限制相关
- 总是处理返回的错误
- 对于长时间运行的任务,使用上下文实现取消功能
- 考虑使用
CollectErrs(true)
收集所有错误而不仅是第一个
neilotoole/errgroup
是一个功能强大且灵活的并发控制库,特别适合需要限制Goroutine数量或需要更精细错误处理的场景。