golang高效控制Goroutine并发数量的线程池管理插件go-workerpool的使用
Golang高效控制Goroutine并发数量的线程池管理插件go-workerpool的使用
介绍
go-workerpool是一个受Java线程池启发的Golang工作池库,旨在控制大量的Goroutine并发。它提供了高效的方式来管理并发任务,避免资源耗尽。
安装
安装该库的最简单方法是运行:
go get github.com/zenthangplus/go-workerpool
基本示例
package main
import (
"fmt"
"github.com/zenthangplus/go-workerpool"
)
func main() {
// 初始化具有3个工作线程的线程池
pool := workerpool.NewFixedSize(3)
// 启动线程池
pool.Start()
// Submit会阻塞直到池队列中有可用槽位
// 提交一个可识别的工作,ID将随机生成(使用UUID)
pool.Submit(workerpool.NewIdentifiableJob(func() {
// 执行耗时任务
}))
// 使用NewCustomIdentifierJob如果你不想随机生成ID
pool.Submit(workerpool.NewCustomIdentifierJob("custom-id", func() {
// 执行耗时任务
}))
// 或者提交一个没有标识符的简单函数
pool.SubmitFunc(func() { // Submit(FuncJob(func() {}))的简化方式
// 执行耗时任务
})
// SubmitConfidently以自信模式提交工作
// 当池队列满时,此函数将返回ErrPoolFull
err := pool.SubmitConfidently(workerpool.NewIdentifiableJob(func() {
// 执行耗时任务
}))
if err == workerpool.ErrPoolFull {
fmt.Println("池已满")
}
}
高级用法
package main
import (
"fmt"
"github.com/zenthangplus/go-workerpool"
)
func main() {
// 初始化固定大小的线程池,例如:3个工作线程并发运行
pool := workerpool.NewFixedSize(3)
// 或者使用自定义选项初始化固定大小的线程池
pool = workerpool.NewFixedSize(3,
// 自定义模式
workerpool.WithMode(workerpool.FixedSize),
// 自定义工作线程数量
workerpool.WithNumberWorkers(5),
// 自定义容量
workerpool.WithCapacity(6),
// 自定义日志函数
workerpool.WithLogFunc(func(msgFormat string, args ...interface{}) {
fmt.Printf(msgFormat+"\n", args...)
}),
)
// 启动线程池
pool.Start()
// 初始化一个ID随机生成的功能性工作
job1 := workerpool.NewIdentifiableJob(func() {})
// 初始化一个预定义ID的功能性工作
job2 := workerpool.NewCustomIdentifierJob("test-an-id", func() {})
// 以正常模式提交工作,它将阻塞直到池中有可用槽位
pool.Submit(job1)
// 或者提交一个简单函数
pool.SubmitFunc(func() {})
// 以自信模式提交工作,当池满时将返回ErrPoolFull
err := pool.SubmitConfidently(job2)
if err != nil {
fmt.Print(err)
}
}
// CompressDirJob
// 你可以通过实现`Job`接口来创建自定义工作
type CompressDirJob struct {
directory string
}
func NewCompressDirJob(directory string) *CompressDirJob {
return &CompressDirJob{directory: directory}
}
func (c CompressDirJob) Id() string {
return "directory-" + c.directory
}
func (c CompressDirJob) Exec() {
// 执行目录压缩
}
主要功能
- 固定大小线程池:可以控制并发Goroutine的数量
- 多种提交方式:
Submit()
:阻塞直到有可用槽位SubmitConfidently()
:立即返回,池满时返回错误SubmitFunc()
:提交简单函数的便捷方法
- 工作标识:可以为每个工作分配唯一ID
- 自定义工作:通过实现
Job
接口创建自定义工作类型 - 可配置选项:可以自定义工作线程数量、队列容量、日志函数等
这个库非常适合需要控制Goroutine并发数量的场景,避免资源耗尽,同时保持高效的并发处理能力。
更多关于golang高效控制Goroutine并发数量的线程池管理插件go-workerpool的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang高效控制Goroutine并发数量的线程池管理插件go-workerpool的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Go-WorkerPool: 高效控制Goroutine并发数量的线程池管理
在Go语言中,虽然goroutine非常轻量级,但在实际开发中我们仍然需要控制并发数量,以避免资源耗尽或系统过载。下面我将介绍如何使用go-workerpool这个库来高效管理goroutine并发。
安装go-workerpool
go get github.com/gammazero/workerpool
基本使用方法
package main
import (
"fmt"
"time"
"github.com/gammazero/workerpool"
)
func main() {
// 创建一个最大并发数为3的worker pool
wp := workerpool.New(3)
// 提交5个任务
for i := 0; i < 5; i++ {
id := i
wp.Submit(func() {
fmt.Printf("任务 %d 开始\n", id)
time.Sleep(1 * time.Second) // 模拟耗时操作
fmt.Printf("任务 %d 完成\n", id)
})
}
// 等待所有任务完成
wp.StopWait()
fmt.Println("所有任务完成")
}
高级功能
1. 动态调整worker数量
func main() {
wp := workerpool.New(2)
// 动态增加worker数量
wp.SetSize(5)
// 提交10个任务
for i := 0; i < 10; i++ {
id := i
wp.Submit(func() {
fmt.Printf("处理任务 %d\n", id)
time.Sleep(500 * time.Millisecond)
})
}
// 完成后减少worker数量
wp.SetSize(2)
wp.StopWait()
}
2. 带超时的任务提交
func main() {
wp := workerpool.New(2)
// 提交带超时的任务
err := wp.SubmitWithTimeout(100*time.Millisecond, func() {
time.Sleep(200 * time.Millisecond) // 这个任务会超时
fmt.Println("这个任务会超时")
})
if err != nil {
fmt.Println("任务提交超时:", err)
}
wp.StopWait()
}
3. 获取当前worker数量
func main() {
wp := workerpool.New(5)
fmt.Println("当前worker数量:", wp.Size())
wp.SetSize(3)
fmt.Println("调整后worker数量:", wp.Size())
wp.StopWait()
}
实际应用示例
package main
import (
"fmt"
"net/http"
"sync/atomic"
"time"
"github.com/gammazero/workerpool"
)
func main() {
wp := workerpool.New(10) // 限制并发请求数为10
var totalRequests int32
// 模拟100个HTTP请求
for i := 0; i < 100; i++ {
url := fmt.Sprintf("http://example.com/page%d", i)
wp.Submit(func() {
resp, err := http.Get(url)
if err != nil {
fmt.Printf("请求 %s 失败: %v\n", url, err)
return
}
defer resp.Body.Close()
atomic.AddInt32(&totalRequests, 1)
fmt.Printf("成功请求 %s, 状态码: %d\n", url, resp.StatusCode)
})
}
wp.StopWait()
fmt.Printf("总共完成 %d 个请求\n", totalRequests)
}
性能考虑
-
worker数量选择:
- CPU密集型任务:通常设置为CPU核心数
- IO密集型任务:可以设置更高的数量,如CPU核心数的2-5倍
-
任务队列:
- go-workerpool使用无界队列,适合任务量不确定但需要控制并发数的场景
- 如果内存有限,可以考虑有界队列的实现
-
错误处理:
- 确保任务函数内部有适当的错误处理
- 可以使用channel收集任务执行结果和错误
替代方案
除了go-workerpool,还有其他一些goroutine池实现:
- ants:高性能goroutine池
- tunny:另一个goroutine池实现
- 自实现简单池:
func SimplePool(size int, tasks <-chan func()) {
var wg sync.WaitGroup
for i := 0; i < size; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for task := range tasks {
task()
}
}()
}
wg.Wait()
}
总结
go-workerpool是一个简单易用但功能强大的goroutine池实现,特别适合需要控制并发数量的场景。它提供了动态调整worker数量、带超时的任务提交等实用功能,能够帮助开发者更好地管理系统资源。