golang高效goroutine管理与取消插件库pool的使用
Golang高效Goroutine管理与取消插件库Pool的使用
包介绍
Pool包实现了一个有限消费者goroutine或无限goroutine池,用于更简单的goroutine处理和取消。
特性
- 使用极其简单,不对您的使用方式做任何假设
- 从返回错误的消费者goroutine自动恢复
Pool v3相比v2的优势
- 对象不再是接口,减少了未来可能出现的破坏性变更
- 现在有两种完全可互换的Pool类型:有限工作池和无限池
- 工作单元使用更简单,现在可以使用
work.Wait()
而不是<-work.Done
安装
使用go get安装:
go get gopkg.in/go-playground/pool.v3
然后在代码中导入:
import "gopkg.in/go-playground/pool.v3"
重要提示
- 建议从调用函数中取消池或批处理,而不是在工作单元内部取消
- 批处理时不要忘记调用batch.QueueComplete(),否则会导致死锁
- 在阻塞操作后(如等待连接池中的连接)检查
WorkUnit.IsCancelled()
是您的责任
使用示例
有限池和无限池具有相同的签名并且完全可互换。
单个工作单元示例
package main
import (
"fmt"
"time"
"gopkg.in/go-playground/pool.v3"
)
func main() {
// 创建限制10个goroutine的池
p := pool.NewLimited(10)
defer p.Close()
// 排队获取用户信息
user := p.Queue(getUser(13))
// 排队获取其他信息
other := p.Queue(getOtherInfo(13))
// 等待用户信息任务完成
user.Wait()
if err := user.Error(); err != nil {
// 处理错误
}
// 使用返回的用户信息
username := user.Value().(string)
fmt.Println(username)
// 等待其他信息任务完成
other.Wait()
if err := other.Error(); err != nil {
// 处理错误
}
// 使用返回的其他信息
otherInfo := other.Value().(string)
fmt.Println(otherInfo)
}
// 获取用户信息的工作函数
func getUser(id int) pool.WorkFunc {
return func(wu pool.WorkUnit) (interface{}, error) {
// 模拟等待TCP连接建立或从连接池获取连接
time.Sleep(time.Second * 1)
if wu.IsCancelled() {
// 返回值不会被使用
return nil, nil
}
// 准备处理...
return "Joeybloggs", nil
}
}
// 获取其他信息的工作函数
func getOtherInfo(id int) pool.WorkFunc {
return func(wu pool.WorkUnit) (interface{}, error) {
// 模拟等待TCP连接建立或从连接池获取连接
time.Sleep(time.Second * 1)
if wu.IsCancelled() {
// 返回值不会被使用
return nil, nil
}
// 准备处理...
return "Other Info", nil
}
}
批处理工作示例
package main
import (
"fmt"
"time"
"gopkg.in/go-playground/pool.v3"
)
func main() {
// 创建限制10个goroutine的池
p := pool.NewLimited(10)
defer p.Close()
// 创建批处理
batch := p.Batch()
// 为了最大速度,在另一个goroutine中排队
// 这不是必须的,但在所有项目排队完成前不能开始读取结果
go func() {
for i := 0; i < 10; i++ {
batch.Queue(sendEmail("email content"))
}
// 不要忘记这个,否则goroutine会死锁
// 如果调用Cancel(),它会在内部调用QueueComplete()
batch.QueueComplete()
}()
// 处理结果
for email := range batch.Results() {
if err := email.Error(); err != nil {
// 处理错误
// 可能需要调用batch.Cancel()
}
// 使用返回值
fmt.Println(email.Value().(bool))
}
}
// 发送邮件的工作函数
func sendEmail(email string) pool.WorkFunc {
return func(wu pool.WorkUnit) (interface{}, error) {
// 模拟等待TCP连接建立或从连接池获取连接
time.Sleep(time.Second * 1)
if wu.IsCancelled() {
// 返回值不会被使用
return nil, nil
}
// 准备处理...
return true, nil // 一切正常,发送nil,如果有错误则发送错误
}
}
性能基准
在MacBook Pro (Retina, 15-inch, Late 2013) 2.6 GHz Intel Core i7 16 GB 1600 MHz DDR3上使用Go 1.6.2运行测试。
一些基准测试结果:
- BenchmarkLimitedSmallRun 在1.002492008秒内完成了10秒的处理
- BenchmarkLimitedSmallCancel 运行20个作业,在第6个作业取消,运行时间为0秒
- BenchmarkLimitedLargeCancel 运行1000个作业,在第6个作业取消,运行时间为0秒
- BenchmarkLimitedOverconsumeLargeRun 使用25个工作线程运行100个作业,耗时4.027153081秒
许可证
根据MIT许可证分发,更多详情请参见代码中的许可证文件。
更多关于golang高效goroutine管理与取消插件库pool的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang高效goroutine管理与取消插件库pool的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang高效Goroutine管理与取消:Pool库使用指南
在Go语言中,goroutine的轻量级特性使得并发编程变得简单,但如果不加以管理,可能会导致goroutine泄漏或资源耗尽问题。本文将介绍如何使用pool
库来高效管理goroutine的生命周期和取消机制。
为什么需要goroutine池
- 控制并发数量,防止资源耗尽
- 复用goroutine,减少创建销毁开销
- 统一管理goroutine生命周期
- 实现优雅的取消和超时机制
常用goroutine池库
Go生态中有几个优秀的goroutine池实现:
ants
:高性能goroutine池tunny
:工作池实现pond
:轻量级工作池go-playground/pool
:功能丰富的池实现
使用go-playground/pool示例
1. 基本使用
package main
import (
"context"
"fmt"
"time"
"github.com/go-playground/pool/v3"
)
func main() {
// 创建一个最大容量为5的goroutine池
p := pool.NewLimited(5)
defer p.Close() // 确保关闭池
// 提交任务到池中
batch := p.Batch()
// 提交10个任务
for i := 0; i < 10; i++ {
count := i
batch.Queue(func(wu pool.WorkUnit) (interface{}, error) {
if wu.IsCancelled() {
return nil, nil
}
time.Sleep(1 * time.Second)
fmt.Printf("Task %d completed\n", count)
return fmt.Sprintf("Result %d", count), nil
})
}
// 等待所有任务完成
batch.QueueComplete()
results := batch.Results()
for result := range results {
if result.Error() != nil {
fmt.Printf("Error: %v\n", result.Error())
continue
}
fmt.Printf("Success: %v\n", result.Value())
}
}
2. 带取消和超时的池
package main
import (
"context"
"fmt"
"time"
"github.com/go-playground/pool/v3"
)
func main() {
p := pool.NewLimited(3)
defer p.Close()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
batch := p.BatchWithContext(ctx)
for i := 0; i < 10; i++ {
count := i
batch.Queue(func(wu pool.WorkUnit) (interface{}, error) {
select {
case <-time.After(1 * time.Second):
if wu.IsCancelled() {
return nil, nil
}
fmt.Printf("Task %d completed\n", count)
return count, nil
case <-ctx.Done():
return nil, ctx.Err()
}
})
}
batch.QueueComplete()
results := batch.Results()
for result := range results {
if result.Error() != nil {
fmt.Printf("Error for task: %v\n", result.Error())
continue
}
fmt.Printf("Success: %v\n", result.Value())
}
}
使用ants库示例
ants
是另一个高性能的goroutine池实现:
package main
import (
"fmt"
"sync"
"time"
"github.com/panjf2000/ants/v2"
)
func taskFunc(i int) {
time.Sleep(time.Second)
fmt.Printf("Task %d executed\n", i)
}
func main() {
defer ants.Release() // 释放池
var wg sync.WaitGroup
// 创建容量为5的池
p, _ := ants.NewPool(5)
defer p.Release()
for i := 0; i < 10; i++ {
wg.Add(1)
count := i
_ = p.Submit(func() {
defer wg.Done()
taskFunc(count)
})
}
wg.Wait()
fmt.Printf("Running goroutines: %d\n", p.Running())
}
最佳实践
- 合理设置池大小:通常设置为CPU核心数的2-5倍
- 及时释放资源:使用defer确保池被关闭
- 处理取消信号:检查
IsCancelled()
或context取消 - 错误处理:妥善处理任务返回的错误
- 监控池状态:定期检查池的运行状况
性能考虑
- 对于CPU密集型任务,池大小应接近CPU核心数
- 对于IO密集型任务,可以设置更大的池
- 避免在任务中创建新的goroutine
- 使用
sync.Pool
复用任务中的临时对象
通过合理使用goroutine池,可以显著提高Go程序的稳定性和性能,同时避免资源泄漏问题。