golang支持错误处理和并发控制的WaitGroup增强插件go-waitgroup的使用
golang支持错误处理和并发控制的WaitGroup增强插件go-waitgroup的使用
go-waitgroup介绍
go-waitgroup是一个增强版的WaitGroup包,它允许你使用类似sync.WaitGroup
的结构来创建goroutine池并控制并发。
基本用法
使用方式与标准sync.WaitGroup
类似,主要区别在于初始化。使用waitgroup.NewWaitGroup
时,可以指定并发限制大小。
- 任何大于0的整数都会限制并发goroutine数量
- 如果指定-1或0,所有goroutine将同时运行(和标准
sync.WaitGroup
一样)
package main
import (
"fmt"
"net/http"
"github.com/pieterclaerhout/go-waitgroup"
)
func main() {
urls := []string{
"https://www.easyjet.com/",
"https://www.skyscanner.de/",
"https://www.ryanair.com",
"https://wizzair.com/",
"https://www.swiss.com/",
}
// 限制并发数为3
wg := waitgroup.NewWaitGroup(3)
for _, url := range urls {
wg.BlockAdd() // 阻塞等待直到有可用槽位
go func(url string) {
defer wg.Done()
fmt.Printf("%s: checking\n", url)
res, err := http.Get(url)
if err != nil {
fmt.Println("Error: %v")
} else {
defer res.Body.Close()
fmt.Printf("%s: result: %v\n", url, err)
}
}(url)
}
wg.Wait()
fmt.Println("Finished")
}
使用闭包
也可以使用函数闭包使代码更易读:
package main
import (
"fmt"
"net/http"
"github.com/pieterclaerhout/go-waitgroup"
)
func main() {
urls := []string{
"https://www.easyjet.com/",
"https://www.skyscanner.de/",
"https://www.ryanair.com",
"https://wizzair.com/",
"https://www.swiss.com/",
}
// 限制并发数为3
wg := waitgroup.NewWaitGroup(3)
for _, url := range urls {
urlToCheck := url
wg.Add(func() {
fmt.Printf("%s: checking\n", urlToCheck)
res, err := http.Get(urlToCheck)
if err != nil {
fmt.Println("Error: %v")
} else {
defer res.Body.Close()
fmt.Printf("%s: result: %v\n", urlToCheck, err)
}
})
}
wg.Wait()
fmt.Println("Finished")
}
错误处理
如果需要处理错误,可以使用ErrorGroup
。它与普通WaitGroup
类似但有两点不同:
- 只能添加返回
error
的函数 - 当其中一个任务失败时,其余任务将被取消
package main
import (
"context"
"fmt"
"os"
"github.com/pieterclaerhout/go-waitgroup"
)
func main() {
ctx := context.Background()
// 创建错误组,限制并发数
wg, ctx := waitgroup.NewErrorGroup(ctx, tc.size)
if err != nil {
fmt.Println("Error: %v")
os.Exit(1)
}
// 添加成功任务
wg.Add(func() error {
return nil
})
// 添加失败任务
wg.Add(func() error {
return errors.New("An error occurred")
})
if err := wg.Wait(); err != nil {
fmt.Println("Error: %v")
os.Exit(1)
}
}
也可以一次性添加多个函数:
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/pieterclaerhout/go-waitgroup"
)
func main() {
ctx := context.Background()
wg, ctx := waitgroup.NewErrorGroup(ctx, tc.size)
if err != nil {
fmt.Println("Error: %v")
os.Exit(1)
}
// 批量添加多个函数
wg.Add(
func() error {
return nil
},
func() error {
return errors.New("An error occurred")
},
)
if err := wg.Wait(); err != nil {
fmt.Println("Error: %v")
os.Exit(1)
}
}
更多关于golang支持错误处理和并发控制的WaitGroup增强插件go-waitgroup的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang支持错误处理和并发控制的WaitGroup增强插件go-waitgroup的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
go-waitgroup 使用指南
go-waitgroup 是一个增强版的 WaitGroup,它在标准库 sync.WaitGroup 的基础上增加了错误处理和并发控制功能。下面我将详细介绍它的使用方法和示例代码。
主要特性
- 支持错误传播:当 goroutine 发生错误时,可以收集并返回错误
- 并发控制:可以限制最大并发 goroutine 数量
- 上下文支持:可以与 context.Context 集成
- 超时控制:可以设置超时时间
安装
go get github.com/pieterclaerhout/go-waitgroup
基础用法
基本等待
package main
import (
"fmt"
"time"
"github.com/pieterclaerhout/go-waitgroup"
)
func main() {
wg := waitgroup.NewWaitGroup()
for i := 0; i < 5; i++ {
wg.Add(func() {
time.Sleep(1 * time.Second)
fmt.Println("Task completed")
})
}
wg.Wait()
fmt.Println("All tasks completed")
}
带错误处理的用法
package main
import (
"errors"
"fmt"
"github.com/pieterclaerhout/go-waitgroup"
)
func main() {
wg := waitgroup.NewWaitGroup()
for i := 0; i < 3; i++ {
i := i // 创建局部变量
wg.Add(func() error {
if i == 1 {
return errors.New("error in task 1")
}
fmt.Printf("Task %d completed\n", i)
return nil
})
}
err := wg.Wait()
if err != nil {
fmt.Printf("Error occurred: %v\n", err)
}
}
高级用法
并发控制
package main
import (
"fmt"
"time"
"github.com/pieterclaerhout/go-waitgroup"
)
func main() {
// 限制最大并发数为2
wg := waitgroup.NewWaitGroup(waitgroup.WithMaxConcurrency(2))
for i := 0; i < 5; i++ {
i := i
wg.Add(func() {
fmt.Printf("Task %d started\n", i)
time.Sleep(1 * time.Second)
fmt.Printf("Task %d completed\n", i)
})
}
wg.Wait()
fmt.Println("All tasks completed")
}
带上下文的用法
package main
import (
"context"
"fmt"
"time"
"github.com/pieterclaerhout/go-waitgroup"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
wg := waitgroup.NewWaitGroup(waitgroup.WithContext(ctx))
for i := 0; i < 5; i++ {
i := i
wg.Add(func() error {
select {
case <-time.After(3 * time.Second):
fmt.Printf("Task %d completed\n", i)
return nil
case <-ctx.Done():
return ctx.Err()
}
})
}
err := wg.Wait()
if err != nil {
fmt.Printf("Error: %v\n", err) // 会输出context deadline exceeded
}
}
收集所有错误
package main
import (
"errors"
"fmt"
"github.com/pieterclaerhout/go-waitgroup"
)
func main() {
wg := waitgroup.NewWaitGroup(waitgroup.WithCollectErrs())
for i := 0; i < 5; i++ {
i := i
wg.Add(func() error {
if i%2 == 0 {
return fmt.Errorf("error in task %d", i)
}
return nil
})
}
err := wg.Wait()
if err != nil {
if multiErr, ok := err.(interface{ Unwrap() []error }); ok {
fmt.Println("Multiple errors occurred:")
for _, e := range multiErr.Unwrap() {
fmt.Println("-", e)
}
} else {
fmt.Println("Error:", err)
}
}
}
配置选项
go-waitgroup 提供了多种配置选项:
WithMaxConcurrency(n int)
- 设置最大并发数WithContext(ctx context.Context)
- 设置上下文WithCollectErrs()
- 收集所有错误而不仅仅是第一个错误WithTimeout(d time.Duration)
- 设置超时时间
最佳实践
- 总是处理返回的错误
- 对于长时间运行的任务,使用上下文控制
- 合理设置最大并发数以避免资源耗尽
- 考虑使用
WithCollectErrs()
当需要知道所有失败的任务时
go-waitgroup 是一个功能强大的工具,特别适合需要错误处理和并发控制的场景,比标准库的 WaitGroup 提供了更多的控制能力。