golang高性能轻量级goroutine工作池插件pond的使用
Golang高性能轻量级goroutine工作池插件pond的使用
pond是一个极简且高性能的Go库,旨在优雅地管理并发任务。它基于Worker Pool模式,允许同时运行大量任务,同时限制同时运行的goroutine数量。
安装
go get -u github.com/alitto/pond/v2
基本使用示例
package main
import (
"fmt"
"github.com/alitto/pond/v2"
)
func main() {
// 创建并发限制为100的工作池
pool := pond.NewPool(100)
// 提交1000个任务
for i := 0; i < 1000; i++ {
i := i
pool.Submit(func() {
fmt.Printf("Running task #%d\n", i)
})
}
// 停止池并等待所有提交的任务完成
pool.StopAndWait()
}
提交返回错误的任务
// 创建并发限制为100的工作池
pool := pond.NewPool(100)
// 提交返回错误的任务
task := pool.SubmitErr(func() error {
return errors.New("An error occurred")
})
// 等待任务完成并获取错误
err := task.Wait()
提交返回结果的任务
// 创建接受返回字符串任务的池
pool := pond.NewResultPool[string](10)
// 提交返回字符串的任务
task := pool.Submit(func() (string) {
return "Hello, World!"
})
// 等待任务完成并获取结果
result, err := task.Wait()
// result = "Hello, World!" 且 err = nil
任务组使用示例
// 创建并发限制为10的工作池
pool := pond.NewPool(10)
// 创建任务组
group := pool.NewGroup()
// 提交一组任务
for i := 0; i < 20; i++ {
i := i
group.Submit(func() {
fmt.Printf("Running group task #%d\n", i)
})
}
// 等待组内所有任务完成
err := group.Wait()
带上下文的任务组
// 创建并发限制为10的工作池
pool := pond.NewPool(10)
// 创建5秒超时的上下文
timeout, _ := context.WithTimeout(context.Background(), 5*time.Second)
// 创建带上下文的任务组
group := pool.NewGroupContext(timeout)
// 提交一组任务
for i := 0; i < 20; i++ {
i := i
group.Submit(func() {
fmt.Printf("Running group task #%d\n", i)
})
}
// 等待组内所有任务完成或超时发生,以先到者为准
err := group.Wait()
子池使用示例
// 创建并发限制为10的工作池
pool := pond.NewPool(10)
// 创建子池,使用父池最多5个worker
subpool := pool.NewSubpool(5)
// 向子池提交任务
subpool.Submit(func() {
fmt.Println("Running task in subpool")
})
// 停止子池并等待所有提交的任务完成
subpool.StopAndWait()
默认池使用示例
// 向默认池提交任务并等待完成
err := pond.SubmitErr(func() error {
fmt.Println("Running task in default pool")
return nil
}).Wait()
if err != nil {
fmt.Printf("Failed to run task: %v", err)
} else {
fmt.Println("Task completed successfully")
}
有界任务队列
// 创建队列最多10个任务的池
pool := pond.NewPool(1, pond.WithQueueSize(10))
// 尝试向池提交任务
task, ok := pool.TrySubmit(func() {
// 执行一些工作
})
// 检查任务是否提交成功
if !ok {
fmt.Println("Task submission failed because the queue is full")
}
动态调整池大小
// 创建5个worker的池
pool := pond.NewPool(5)
// 提交一些任务
for i := 0; i < 20; i++ {
pool.Submit(func() {
// 执行一些工作
})
}
// 将池大小增加到10个worker
pool.Resize(10)
// 提交更多任务,将使用增加后的容量
for i := 0; i < 20; i++ {
pool.Submit(func() {
// 执行一些工作
})
}
// 将池大小减少回5个worker
pool.Resize(5)
监控指标
pond提供了以下监控指标方法:
pool.RunningWorkers() int64
: 当前运行的worker数量pool.SubmittedTasks() uint64
: 自池创建以来提交的任务总数pool.WaitingTasks() uint64
: 当前等待执行的任务数pool.SuccessfulTasks() uint64
: 成功完成的任务数pool.FailedTasks() uint64
: 因panic失败的任务数pool.CompletedTasks() uint64
: 已完成的任务总数(成功+失败)pool.DroppedTasks() uint64
: 因队列满被丢弃的任务数
pond是一个功能强大且灵活的工作池实现,适用于各种并发场景,特别是需要限制并发量的情况。
更多关于golang高性能轻量级goroutine工作池插件pond的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang高性能轻量级goroutine工作池插件pond的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang高性能轻量级Goroutine工作池Pond使用指南
Pond是一个轻量级、高性能的Goroutine工作池库,专为Go语言设计。它提供了简单易用的API来管理并发任务,可以有效控制资源使用,避免无限制创建Goroutine带来的问题。
基本特性
- 轻量级实现,核心代码精简
- 高性能任务调度
- 动态调整工作池大小
- 任务超时控制
- 优雅关闭支持
- 任务优先级支持
安装
go get github.com/alitto/pond
基本使用示例
package main
import (
"fmt"
"time"
"github.com/alitto/pond"
)
func main() {
// 创建一个工作池
// 参数:worker数量(2),任务队列容量(100)
pool := pond.New(2, 100)
// 提交任务到工作池
for i := 0; i < 10; i++ {
n := i
pool.Submit(func() {
fmt.Printf("Running task #%d\n", n)
time.Sleep(1 * time.Second)
})
}
// 等待所有任务完成
pool.StopAndWait()
fmt.Println("All tasks completed")
}
高级功能示例
1. 带返回值的任务
pool := pond.New(2, 100)
future := pool.Submit(func() interface{} {
time.Sleep(1 * time.Second)
return 42
})
// 获取任务结果
result := future.Value()
fmt.Printf("Task result: %v\n", result)
2. 任务超时控制
pool := pond.New(2, 100, pond.IdleTimeout(5*time.Second))
future := pool.Submit(func() interface{} {
time.Sleep(10 * time.Second) // 长时间运行的任务
return "done"
})
// 等待结果,带超时
result, err := future.Get(2 * time.Second)
if err == pond.ErrTimeout {
fmt.Println("Task timed out")
} else {
fmt.Printf("Task result: %v\n", result)
}
3. 动态调整工作池大小
pool := pond.New(2, 100)
// 扩大工作池
pool.SetMaxWorkers(4)
// 缩小工作池
pool.SetMaxWorkers(1)
4. 批量提交任务
pool := pond.New(4, 100)
tasks := make([]func(), 0, 10)
for i := 0; i < 10; i++ {
n := i
tasks = append(tasks, func() {
fmt.Printf("Running batch task #%d\n", n)
time.Sleep(500 * time.Millisecond)
})
}
// 批量提交
pool.SubmitGroup(tasks)
pool.StopAndWait()
5. 优雅关闭
pool := pond.New(4, 100, pond.Strategy(pond.Eager()))
// 提交一些任务
for i := 0; i < 20; i++ {
n := i
pool.Submit(func() {
fmt.Printf("Task %d running\n", n)
time.Sleep(1 * time.Second)
})
}
// 优雅关闭,等待正在运行的任务完成
pool.Stop()
性能优化建议
-
合理设置工作池大小:根据CPU核心数和任务类型调整
- CPU密集型任务:接近CPU核心数
- IO密集型任务:可以设置更大值
-
使用适当的策略:
pond.Eager()
:立即创建worker直到达到最大值pond.Lazy()
:按需创建worker
-
监控工作池状态:
fmt.Printf("Running workers: %d\n", pool.RunningWorkers()) fmt.Printf("Idle workers: %d\n", pool.IdleWorkers()) fmt.Printf("Waiting tasks: %d\n", pool.WaitingTasks())
与其他工作池库对比
-
与标准库相比:
- 避免无限制创建Goroutine
- 提供更丰富的控制功能
-
与其他工作池库(如ants)相比:
- 更轻量级
- API更简洁
- 动态调整能力更强
总结
Pond是一个非常适合需要控制并发度的场景的工作池库,它的轻量级设计和高性能特性使其成为许多Go项目的首选。通过合理配置,可以在资源控制和性能之间取得良好平衡。
// 完整示例:处理HTTP请求的工作池
func main() {
pool := pond.New(100, 1000)
defer pool.StopAndWait()
http.HandleFunc("/process", func(w http.ResponseWriter, r *http.Request) {
// 将耗时任务提交到工作池
future := pool.Submit(func() interface{} {
return processRequest(r)
})
result := future.Value().(string)
w.Write([]byte(result))
})
http.ListenAndServe(":8080", nil)
}
func processRequest(r *http.Request) string {
// 模拟耗时处理
time.Sleep(100 * time.Millisecond)
return "Processed: " + r.URL.Query().Get("id")
}