golang支持错误处理和并发控制的WaitGroup增强插件go-waitgroup的使用

golang支持错误处理和并发控制的WaitGroup增强插件go-waitgroup的使用

go-waitgroup介绍

go-waitgroup是一个增强版的WaitGroup包,它允许你使用类似sync.WaitGroup的结构来创建goroutine池并控制并发。

基本用法

使用方式与标准sync.WaitGroup类似,主要区别在于初始化。使用waitgroup.NewWaitGroup时,可以指定并发限制大小。

  • 任何大于0的整数都会限制并发goroutine数量
  • 如果指定-1或0,所有goroutine将同时运行(和标准sync.WaitGroup一样)
package main

import (
    "fmt"
    "net/http"

    "github.com/pieterclaerhout/go-waitgroup"
)

func main() {
    
    urls := []string{
        "https://www.easyjet.com/",
        "https://www.skyscanner.de/",
        "https://www.ryanair.com",
        "https://wizzair.com/",
        "https://www.swiss.com/",
    }

    // 限制并发数为3
    wg := waitgroup.NewWaitGroup(3)

    for _, url := range urls {
        wg.BlockAdd()  // 阻塞等待直到有可用槽位
        go func(url string) {
            defer wg.Done()
            fmt.Printf("%s: checking\n", url)
            res, err := http.Get(url)
            if err != nil {
                fmt.Println("Error: %v")
            } else {
                defer res.Body.Close()
                fmt.Printf("%s: result: %v\n", url, err)
            }
        }(url)
    }

    wg.Wait()
    fmt.Println("Finished")
}

使用闭包

也可以使用函数闭包使代码更易读:

package main

import (
    "fmt"
    "net/http"

    "github.com/pieterclaerhout/go-waitgroup"
)

func main() {

    urls := []string{
        "https://www.easyjet.com/",
        "https://www.skyscanner.de/",
        "https://www.ryanair.com",
        "https://wizzair.com/",
        "https://www.swiss.com/",
    }

    // 限制并发数为3
    wg := waitgroup.NewWaitGroup(3)

    for _, url := range urls {
        urlToCheck := url
        wg.Add(func() {
            fmt.Printf("%s: checking\n", urlToCheck)
            res, err := http.Get(urlToCheck)
            if err != nil {
                fmt.Println("Error: %v")
            } else {
                defer res.Body.Close()
                fmt.Printf("%s: result: %v\n", urlToCheck, err)
            }
        })
    }

    wg.Wait()
    fmt.Println("Finished")
}

错误处理

如果需要处理错误,可以使用ErrorGroup。它与普通WaitGroup类似但有两点不同:

  1. 只能添加返回error的函数
  2. 当其中一个任务失败时,其余任务将被取消
package main

import (
    "context"
    "fmt"
    "os"

    "github.com/pieterclaerhout/go-waitgroup"
)

func main() {

    ctx := context.Background()

    // 创建错误组,限制并发数
    wg, ctx := waitgroup.NewErrorGroup(ctx, tc.size)
    if err != nil {
        fmt.Println("Error: %v")
        os.Exit(1)
    }

    // 添加成功任务
    wg.Add(func() error {
        return nil
    })

    // 添加失败任务
    wg.Add(func() error {
        return errors.New("An error occurred")
    })

    if err := wg.Wait(); err != nil {
        fmt.Println("Error: %v")
        os.Exit(1)
    }
}

也可以一次性添加多个函数:

package main

import (
    "context"
    "errors"
    "fmt"
    "os"

    "github.com/pieterclaerhout/go-waitgroup"
)

func main() {

    ctx := context.Background()

    wg, ctx := waitgroup.NewErrorGroup(ctx, tc.size)
    if err != nil {
        fmt.Println("Error: %v")
        os.Exit(1)
    }

    // 批量添加多个函数
    wg.Add(
        func() error {
            return nil
        },
        func() error {
            return errors.New("An error occurred")
        },
    )

    if err := wg.Wait(); err != nil {
        fmt.Println("Error: %v")
        os.Exit(1)
    }
}

更多关于golang支持错误处理和并发控制的WaitGroup增强插件go-waitgroup的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang支持错误处理和并发控制的WaitGroup增强插件go-waitgroup的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


go-waitgroup 使用指南

go-waitgroup 是一个增强版的 WaitGroup,它在标准库 sync.WaitGroup 的基础上增加了错误处理和并发控制功能。下面我将详细介绍它的使用方法和示例代码。

主要特性

  1. 支持错误传播:当 goroutine 发生错误时,可以收集并返回错误
  2. 并发控制:可以限制最大并发 goroutine 数量
  3. 上下文支持:可以与 context.Context 集成
  4. 超时控制:可以设置超时时间

安装

go get github.com/pieterclaerhout/go-waitgroup

基础用法

基本等待

package main

import (
	"fmt"
	"time"

	"github.com/pieterclaerhout/go-waitgroup"
)

func main() {
	wg := waitgroup.NewWaitGroup()

	for i := 0; i < 5; i++ {
		wg.Add(func() {
			time.Sleep(1 * time.Second)
			fmt.Println("Task completed")
		})
	}

	wg.Wait()
	fmt.Println("All tasks completed")
}

带错误处理的用法

package main

import (
	"errors"
	"fmt"

	"github.com/pieterclaerhout/go-waitgroup"
)

func main() {
	wg := waitgroup.NewWaitGroup()

	for i := 0; i < 3; i++ {
		i := i // 创建局部变量
		wg.Add(func() error {
			if i == 1 {
				return errors.New("error in task 1")
			}
			fmt.Printf("Task %d completed\n", i)
			return nil
		})
	}

	err := wg.Wait()
	if err != nil {
		fmt.Printf("Error occurred: %v\n", err)
	}
}

高级用法

并发控制

package main

import (
	"fmt"
	"time"

	"github.com/pieterclaerhout/go-waitgroup"
)

func main() {
	// 限制最大并发数为2
	wg := waitgroup.NewWaitGroup(waitgroup.WithMaxConcurrency(2))

	for i := 0; i < 5; i++ {
		i := i
		wg.Add(func() {
			fmt.Printf("Task %d started\n", i)
			time.Sleep(1 * time.Second)
			fmt.Printf("Task %d completed\n", i)
		})
	}

	wg.Wait()
	fmt.Println("All tasks completed")
}

带上下文的用法

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/pieterclaerhout/go-waitgroup"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	wg := waitgroup.NewWaitGroup(waitgroup.WithContext(ctx))

	for i := 0; i < 5; i++ {
		i := i
		wg.Add(func() error {
			select {
			case <-time.After(3 * time.Second):
				fmt.Printf("Task %d completed\n", i)
				return nil
			case <-ctx.Done():
				return ctx.Err()
			}
		})
	}

	err := wg.Wait()
	if err != nil {
		fmt.Printf("Error: %v\n", err) // 会输出context deadline exceeded
	}
}

收集所有错误

package main

import (
	"errors"
	"fmt"

	"github.com/pieterclaerhout/go-waitgroup"
)

func main() {
	wg := waitgroup.NewWaitGroup(waitgroup.WithCollectErrs())

	for i := 0; i < 5; i++ {
		i := i
		wg.Add(func() error {
			if i%2 == 0 {
				return fmt.Errorf("error in task %d", i)
			}
			return nil
		})
	}

	err := wg.Wait()
	if err != nil {
		if multiErr, ok := err.(interface{ Unwrap() []error }); ok {
			fmt.Println("Multiple errors occurred:")
			for _, e := range multiErr.Unwrap() {
				fmt.Println("-", e)
			}
		} else {
			fmt.Println("Error:", err)
		}
	}
}

配置选项

go-waitgroup 提供了多种配置选项:

  • WithMaxConcurrency(n int) - 设置最大并发数
  • WithContext(ctx context.Context) - 设置上下文
  • WithCollectErrs() - 收集所有错误而不仅仅是第一个错误
  • WithTimeout(d time.Duration) - 设置超时时间

最佳实践

  1. 总是处理返回的错误
  2. 对于长时间运行的任务,使用上下文控制
  3. 合理设置最大并发数以避免资源耗尽
  4. 考虑使用 WithCollectErrs() 当需要知道所有失败的任务时

go-waitgroup 是一个功能强大的工具,特别适合需要错误处理和并发控制的场景,比标准库的 WaitGroup 提供了更多的控制能力。

回到顶部