golang高性能无限任务队列与并发工作池插件库kyoo的使用
Golang高性能无限任务队列与并发工作池插件库kyoo的使用
关于kyoo
kyoo是队列(queue)一词的音标转写。它提供了一个任务队列,可以容纳尽可能多的任务,仅受运行系统可用资源的限制。
该队列具有以下特点:
- 任务队列无限制(仅受系统资源=内存限制)
- 使用工作池并发处理任务
- 停止队列时,待处理的任务仍会被处理
该库包含一个简单的Job
接口和一个简单的FuncExecutorJob
,它只是执行给定的函数并实现该接口。通过这种方式,几乎所有类型的工作负载都应该可以被处理,当然也可以添加Job
接口的自定义实现。
kyoo的典型使用场景包括:
- RabbitMQ或Amazon SQS等消息队列的消费者
- 将Web服务器请求中耗时的处理工作卸载到后台任务
- 各种后台处理任务,如图像优化等
示例代码
以下示例展示了一个简单的HTTP服务器,将任务卸载到持续在后台处理的任务队列中。
package main
import (
"fmt"
"log"
"net/http"
"runtime"
"time"
jobqueue "github.com/dirkaholic/kyoo"
"github.com/dirkaholic/kyoo/job"
)
var queue *jobqueue.JobQueue
// HTTP处理函数,将任务提交到队列
func handler(w http.ResponseWriter, r *http.Request) {
queue.Submit(&job.FuncExecutorJob{Func: func() error {
return doTheHeavyBackgroundWork(r.URL.Path)
}})
fmt.Printf("%s - submitted %s !!\n", time.Now().String(), r.URL.Path)
fmt.Fprint(w, "Job added to queue.")
}
func main() {
// 创建工作队列,工作池大小为CPU核心数的2倍
queue = jobqueue.NewJobQueue(runtime.NumCPU() * 2)
queue.Start()
// 设置HTTP路由并启动服务器
http.HandleFunc("/", handler)
log.Fatal(http.ListenAndServe(":8080", nil))
}
// 模拟耗时后台任务
func doTheHeavyBackgroundWork(path string) error {
time.Sleep(2 * time.Second)
fmt.Printf("%s - processed %s !!\n", time.Now().String(), path)
return nil
}
测试示例
可以通过向服务器发送一系列HTTP请求来测试任务卸载功能:
$ for i in {1..10}; do http http://127.0.0.1:8080/test/$i; done
服务器端的输出应该类似于这样:
2020-01-09 21:36:36.156277 +0100 CET m=+5.733617272 - submitted /test/1 !!
2020-01-09 21:36:36.443521 +0100 CET m=+6.020861136 - submitted /test/2 !!
2020-01-09 21:36:36.730535 +0100 CET m=+6.307874793 - submitted /test/3 !!
2020-01-09 21:36:37.021405 +0100 CET m=+6.598744533 - submitted /test/4 !!
2020-01-09 21:36:37.311973 +0100 CET m=+6.889312431 - submitted /test/5 !!
2020-01-09 21:36:37.609868 +0100 CET m=+7.187208115 - submitted /test/6 !!
2020-01-09 21:36:37.895222 +0100 CET m=+7.472561850 - submitted /test/7 !!
2020-01-09 21:36:38.160524 +0100 CET m=+7.737863891 - processed /test/1 !!
2020-01-09 21:36:38.171491 +0100 CET m=+7.748830724 - submitted /test/8 !!
2020-01-09 21:36:38.445832 +0100 CET m=+8.023171514 - processed /test/2 !!
2020-01-09 21:36:38.448423 +0100 CET m=+8.025762679 - submitted /test/9 !!
2020-01-09 21:36:38.730541 +0100 CET m=+8.307880933 - submitted /test/10 !!
2020-01-09 21:36:38.735158 +0100 CET m=+8.312497505 - processed /test/3 !!
2020-01-09 21:36:39.024788 +0100 CET m=+8.602128093 - processed /test/4 !!
2020-01-09 21:36:39.315991 +0100 CET m=+8.893331115 - processed /test/5 !!
2020-01-09 21:36:39.614848 +0100 CET m=+9.192187633 - processed /test/6 !!
2020-01-09 21:36:39.896692 +0100 CET m=+9.474031970 - processed /test/7 !!
2020-01-09 21:36:40.175952 +0100 CET m=+9.753291345 - processed /test/8 !!
2020-01-09 21:36:40.451877 +0100 CET m=+10.029216847 - processed /test/9 !!
2020-01-09 21:36:40.734289 +0100 CET m=+10.311628415 - processed /test/10 !!
更多示例
kyoo还提供了其他使用示例,如SQS worker等,可以参考库中的示例代码。
更多关于golang高性能无限任务队列与并发工作池插件库kyoo的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang高性能无限任务队列与并发工作池插件库kyoo的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
kyoo - Go高性能无限任务队列与并发工作池库
kyoo 是一个轻量级的Go语言库,用于实现高性能的任务队列和并发工作池模式。下面我将详细介绍它的使用方法和示例代码。
基本概念
kyoo 的核心特点:
- 无限任务队列容量
- 动态工作池大小调整
- 任务优先级支持
- 优雅关闭机制
- 极简API设计
安装
go get github.com/rytsh/kyoo
基础使用示例
1. 创建简单工作池
package main
import (
"fmt"
"github.com/rytsh/kyoo"
"time"
)
func main() {
// 创建一个工作池,默认5个worker
pool := kyoo.New(5)
// 提交任务
for i := 0; i < 10; i++ {
taskID := i
pool.Add(func() {
fmt.Printf("处理任务 %d\n", taskID)
time.Sleep(1 * time.Second)
})
}
// 等待所有任务完成
pool.Wait()
fmt.Println("所有任务完成")
}
2. 动态调整工作池大小
pool := kyoo.New(3) // 初始3个worker
// 动态调整worker数量
pool.Tune(5) // 增加到5个
pool.Tune(2) // 减少到2个
3. 带优先级的任务
pool := kyoo.New(3)
// 高优先级任务
pool.AddPriority(func() {
fmt.Println("高优先级任务")
}, kyoo.High)
// 普通优先级任务(默认)
pool.AddPriority(func() {
fmt.Println("普通优先级任务")
}, kyoo.Normal)
// 低优先级任务
pool.AddPriority(func() {
fmt.Println("低优先级任务")
}, kyoo.Low)
高级功能示例
1. 优雅关闭
pool := kyoo.New(3)
// 提交一些任务
for i := 0; i < 10; i++ {
pool.Add(func() {
time.Sleep(500 * time.Millisecond)
fmt.Println("任务完成")
})
}
// 优雅关闭 - 等待所有任务完成
pool.Close()
2. 获取任务结果
pool := kyoo.New(3)
// 使用ResultChan获取任务结果
resultChan := make(chan int, 10)
for i := 0; i < 10; i++ {
i := i
pool.Add(func() {
// 模拟工作
time.Sleep(100 * time.Millisecond)
resultChan <- i * 2
})
}
// 收集结果
go func() {
for res := range resultChan {
fmt.Printf("结果: %d\n", res)
}
}()
pool.Wait()
close(resultChan)
3. 错误处理
pool := kyoo.New(3)
errChan := make(chan error, 10)
for i := 0; i < 10; i++ {
i := i
pool.Add(func() {
defer func() {
if r := recover(); r != nil {
errChan <- fmt.Errorf("任务 %d 出错: %v", i, r)
}
}()
if i == 5 {
panic("模拟错误")
}
fmt.Printf("处理任务 %d\n", i)
})
}
go func() {
for err := range errChan {
fmt.Println("捕获到错误:", err)
}
}()
pool.Wait()
close(errChan)
性能优化建议
-
批量提交任务:减少锁竞争
tasks := make([]func(), 0, 100) for i := 0; i < 100; i++ { tasks = append(tasks, func() { // 任务逻辑 }) } pool.AddBatch(tasks...)
-
合理设置worker数量:通常设置为CPU核心数的2-3倍
-
使用对象池减少GC压力:
var taskPool = sync.Pool{ New: func() interface{} { return new(MyTask) }, } pool.Add(func() { task := taskPool.Get().(*MyTask) defer taskPool.Put(task) // 使用task })
与标准库sync.WaitGroup对比
kyoo相比sync.WaitGroup的优势:
- 自动的任务分发和负载均衡
- 内置并发控制
- 优先级支持
- 更丰富的管理接口
总结
kyoo是一个简单但功能强大的Go并发任务处理库,适合需要处理大量异步任务的场景。它的API设计简洁,同时提供了足够的灵活性来处理各种并发模式。
对于更复杂的场景,你还可以考虑结合其他库如:
github.com/gammazero/workerpool
- 另一个流行的worker pool实现github.com/Jeffail/tunny
- 基于goroutine池的实现
希望这个介绍对你有所帮助!如果需要更高级的功能或有任何问题,可以查阅kyoo的官方文档或源代码。