golang管理可调整大小的上下文感知goroutine池插件库gpool的使用
golang管理可调整大小的上下文感知goroutine池插件库gpool的使用
介绍
gpool是一个通用的、上下文感知的可调整大小的goroutine池,用于限制并发度。一个Job被Enqueued到池中,同时最多只有N个job可以并发处理。
主要特点:
-
Job是一个简单的
func(){}
,当调用Enqueue(..)
时,该调用会在job开始处理时返回。如果池已满,则会阻塞直到:- 池中有空间可以处理该job
- job的
context
被取消 - 池被停止
-
可以通过
Resize()
调整池的大小,这是并发安全的操作。增大池会解除阻塞的enqueue调用,缩小池不会影响已经在处理/等待的jobs -
Enqueuing一个Job会在job开始时返回
nil
错误,如果池已关闭则返回ErrPoolClosed
,或者在等待池时job的context被取消则返回context的错误 -
使用
pool.Stop()
停止池会等待所有正在处理的jobs完成后再返回,同时会解除所有阻塞的job enqueues(这些enqueues会返回ErrPoolClosed) -
Start
、Stop
和Resize(N)
都是并发安全的,可以从多个goroutine调用
安装
$ go get github.com/sherifabdlnaby/gpool
import "github.com/sherifabdlnaby/gpool"
使用示例
示例1 - 简单Job入队
func main() {
concurrency := 2
// 创建并启动pool
pool, _ := gpool.NewPool(concurrency)
defer pool.Stop()
// 创建JOB
resultChan1 := make(chan int)
ctx := context.Background()
job := func() {
time.Sleep(2000 * time.Millisecond)
resultChan1 <- 1337
}
// 入队Job
err1 := pool.Enqueue(ctx, job)
if err1 != nil {
log.Printf("Job was not enqueued. Error: [%s]", err1.Error())
return
}
log.Printf("Job Enqueued and started processing")
log.Printf("Job Done, Received: %v", <-resultChan1)
}
示例2 - 带超时的Job入队
func main() {
concurrency := 2
// 创建并启动pool
pool, _ := gpool.NewPool(concurrency)
defer pool.Stop()
// 创建JOB
resultChan := make(chan int)
ctx := context.Background()
job := func() {
resultChan <- 1337
}
// 入队2个Jobs填满pool(除非从resultChan拉取结果否则不会完成)
_ = pool.Enqueue(ctx, job)
_ = pool.Enqueue(ctx, job)
ctxWithTimeout, _ := context.WithTimeout(ctx, 1000 * time.Millisecond)
// 由于超时只会阻塞1秒
err1 := pool.Enqueue(ctxWithTimeout, job)
if err1 != nil {
log.Printf("Job was not enqueued. Error: [%s]", err1.Error())
}
log.Printf("Job 1 Done, Received: %v", <-resultChan)
log.Printf("Job 2 Done, Received: %v", <-resultChan)
}
示例3 - 完整示例
// 池的大小/并发jobs数
const concurrency = 2
func main() {
pool, _ := gpool.NewPool(concurrency)
defer pool.Stop()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for i := 0; i < 10; i++ {
// 小间隔使输出更易读
time.Sleep(500 * time.Millisecond)
go func(i int) {
x := make(chan int, 1)
log.Printf("Job [%v] Enqueueing", i)
err := pool.Enqueue(ctx, func() {
time.Sleep(2000 * time.Millisecond)
x <- i
})
if err != nil {
log.Printf("Job [%v] was not enqueued. [%s]", i, err.Error())
return
}
log.Printf("Job [%v] Enqueue-ed ", i)
log.Printf("Job [%v] Receieved, Result: [%v]", i, <-x)
}(i)
}
}()
// 取消注释以演示ctx取消jobs
//time.Sleep(100 * time.Millisecond)
//cancel()
time.Sleep(5000 * time.Millisecond)
fmt.Println("Stopping...")
pool.Stop()
fmt.Println("Stopped")
fmt.Println("Sleeping for couple of seconds so canceled job have a chance to print out their status")
time.Sleep(4000 * time.Millisecond)
}
不同类型的"Enqueues"
Enqueue(ctx, job)
- 在job开始执行时返回(不是在job完成/返回后)EnqueueAndWait(ctx, job)
- 在job开始并完成执行时返回TryEnqueue(job)
- 如果池已满不会阻塞,在job开始执行时返回true
,池满返回false
TryEnqueueAndWait(job)
- 如果池已满不会阻塞,在job开始并完成执行时返回true
,池满返回false
基准测试
$ go test -bench=. -cpu=2 -benchmem
基准测试结果显示了不同池大小和批量作业数量下的性能表现。
许可证
MIT License
Copyright © 2019 Sherif Abdel-Naby
贡献
欢迎提交PR。
更多关于golang管理可调整大小的上下文感知goroutine池插件库gpool的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复