golang限制工作goroutine数量的替代errgroup插件库neilotoole/errgroup的使用

neilotoole/errgroup 的使用

neilotoole/errgroup 是 Go 标准库 sync/errgroup 的一个替代方案,主要特点是能够限制工作 goroutine 的数量。这对于与速率受限的 API、数据库等交互非常有用。

注意

sync/errgroup 包现在已经有 Group.SetLimit 方法,这使得 neilotoole/errgroup 不再必要。该库将不再维护,建议使用 sync/errgroup 替代。

概述

neilotoole/errgroup 本质上是一个带有 N 个 goroutine 工作池的 sync/errgroup。其导出 API 完全相同,只是多了一个 WithContextN 函数,允许调用者指定最大 goroutine 数量 (numG) 和工作队列通道的容量 (qSize)。

使用示例

基本用法

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/neilotoole/errgroup"
)

func main() {
	g, ctx := errgroup.WithContext(context.Background())

	// 启动多个工作goroutine
	for i := 0; i < 10; i++ {
		i := i // 创建局部变量副本
		g.Go(func() error {
			time.Sleep(time.Second) // 模拟工作
			fmt.Printf("Task %d completed\n", i)
			return nil
		})
	}

	// 等待所有goroutine完成
	if err := g.Wait(); err != nil {
		fmt.Println("Error:", err)
	}
}

自定义 goroutine 数量和队列大小

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/neilotoole/errgroup"
)

func main() {
	// 限制最多4个goroutine,队列大小为2
	numG, qSize := 4, 2
	g, ctx := errgroup.WithContextN(context.Background(), numG, qSize)

	// 启动多个工作goroutine
	for i := 0; i < 10; i++ {
		i := i // 创建局部变量副本
		g.Go(func() error {
			time.Sleep(time.Second) // 模拟工作
			fmt.Printf("Task %d completed\n", i)
			return nil
		})
	}

	// 等待所有goroutine完成
	if err := g.Wait(); err != nil {
		fmt.Println("Error:", err)
	}
}

错误处理示例

package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/neilotoole/errgroup"
)

func main() {
	g, ctx := errgroup.WithContext(context.Background())

	// 启动多个工作goroutine,其中一个会返回错误
	for i := 0; i < 5; i++ {
		i := i
		g.Go(func() error {
			time.Sleep(time.Second)
			if i == 2 {
				return errors.New("error in task 2")
			}
			fmt.Printf("Task %d completed\n", i)
			return nil
		})
	}

	// 等待所有goroutine完成
	if err := g.Wait(); err != nil {
		fmt.Println("Encountered error:", err)
	}
}

性能考虑

基准测试表明,对于某些工作负载,neilotoole/errgroup 可能比 sync/errgroup 更高效。性能差异取决于任务复杂度和 goroutine 数量等参数。

设计说明

sync/errgroup 不同,当工作队列满时,neilotoole/errgroupGo 方法会阻塞,直到有工作 goroutine 腾出空间。这是有意为之的设计选择。


更多关于golang限制工作goroutine数量的替代errgroup插件库neilotoole/errgroup的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang限制工作goroutine数量的替代errgroup插件库neilotoole/errgroup的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


使用neilotoole/errgroup限制Goroutine数量

neilotoole/errgroup是标准库errgroup的增强版本,提供了限制并发Goroutine数量的功能。下面我将详细介绍它的使用方法和示例代码。

基本特性

neilotoole/errgroup在标准errgroup基础上增加了:

  1. 限制最大并发Goroutine数量的能力
  2. 更灵活的上下文控制
  3. 额外的实用方法

安装

go get github.com/neilotoole/errgroup

基本使用示例

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/neilotoole/errgroup"
)

func main() {
	g, ctx := errgroup.WithContext(context.Background())
	g.SetLimit(3) // 限制最多3个goroutine并发执行

	for i := 0; i < 10; i++ {
		i := i // 创建局部变量副本
		g.Go(func() error {
			select {
			case <-ctx.Done():
				return ctx.Err() // 如果上下文取消,返回错误
			default:
				time.Sleep(time.Second)
				fmt.Printf("Task %d completed\n", i)
				return nil
			}
		})
	}

	if err := g.Wait(); err != nil {
		fmt.Printf("Error occurred: %v\n", err)
	} else {
		fmt.Println("All tasks completed successfully")
	}
}

高级功能示例

1. 带取消功能的限流

func processWithCancel() {
	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
	defer cancel()

	g := errgroup.WithCancel(ctx)
	g.SetLimit(2) // 限制2个并发

	for i := 0; i < 5; i++ {
		i := i
		g.Go(func() error {
			select {
			case <-g.Done():
				return nil // 上下文已取消,直接返回
			default:
				time.Sleep(2 * time.Second)
				if i == 3 {
					return fmt.Errorf("task %d failed", i)
				}
				fmt.Printf("Task %d done\n", i)
				return nil
			}
		})
	}

	if err := g.Wait(); err != nil {
		fmt.Println("Error:", err)
	}
}

2. 收集所有错误

func collectAllErrors() {
	g := errgroup.Group{}
	g.SetLimit(4) // 限制4个并发

	// 启用收集所有错误而不仅是第一个
	g.CollectErrs(true)

	for i := 0; i < 10; i++ {
		i := i
		g.Go(func() error {
			if i%2 == 0 {
				return fmt.Errorf("error in task %d", i)
			}
			fmt.Printf("Task %d succeeded\n", i)
			return nil
		})
	}

	if err := g.Wait(); err != nil {
		if multiErr, ok := err.(errgroup.MultiError); ok {
			fmt.Printf("Encountered %d errors:\n", len(multiErr))
			for _, e := range multiErr {
				fmt.Println("-", e)
			}
		} else {
			fmt.Println("Error:", err)
		}
	}
}

3. 带初始化的Worker池

func workerPoolExample() {
	const workerCount = 3
	const taskCount = 10

	g := errgroup.Group{}
	g.SetLimit(workerCount)

	results := make(chan int, taskCount)

	for i := 0; i < taskCount; i++ {
		i := i
		g.Go(func() error {
			// 模拟工作负载
			time.Sleep(500 * time.Millisecond)
			results <- i * 2
			return nil
		})
	}

	// 等待所有任务完成
	go func() {
		_ = g.Wait()
		close(results)
	}()

	// 收集结果
	for res := range results {
		fmt.Println("Result:", res)
	}
}

与标准库errgroup对比

特性 标准库errgroup neilotoole/errgroup
限制并发数 ❌ 不支持 ✅ 支持
收集所有错误 ❌ 不支持 ✅ 支持
额外的上下文控制 ❌ 有限 ✅ 更灵活
Worker池模式 ❌ 不支持 ✅ 支持

最佳实践

  1. 合理设置并发限制数,通常与CPU核心数或外部服务限制相关
  2. 总是处理返回的错误
  3. 对于长时间运行的任务,使用上下文实现取消功能
  4. 考虑使用CollectErrs(true)收集所有错误而不仅是第一个

neilotoole/errgroup是一个功能强大且灵活的并发控制库,特别适合需要限制Goroutine数量或需要更精细错误处理的场景。

回到顶部