golang并发限制协程池管理插件gowp的使用
golang并发限制协程池管理插件gowp的使用
简介
gowp是一个Golang的worker pool实现,主要特点包括:
- 限制任务执行的并发数,而不是排队任务的数量
- 提交任务时不会阻塞,无论有多少任务排队
- 支持超时设置
- 支持通过安全队列(queue)管理任务
安装
使用以下命令安装gowp:
$ go get github.com/xxjwxc/gowp
基本使用示例
1. 基本协程池使用
package main
import (
"fmt"
"time"
"github.com/xxjwxc/gowp/workpool"
)
func main() {
wp := workpool.New(10) // 设置最大线程数
for i := 0; i < 20; i++ { // 开启20个请求
ii := i
wp.Do(func() error {
for j := 0; j < 10; j++ { // 每个任务打印0-10的值
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
time.Sleep(1 * time.Second)
}
return nil
})
}
wp.Wait()
fmt.Println("down")
}
2. 支持错误返回
package main
import (
"fmt"
"time"
"github.com/xxjwxc/gowp/workpool"
)
func main() {
wp := workpool.New(10) // 设置最大线程数
for i := 0; i < 20; i++ {
ii := i
wp.Do(func() error {
for j := 0; j < 10; j++ {
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
if ii == 1 {
return errors.Cause(errors.New("my test err")) // 返回错误
}
time.Sleep(1 * time.Second)
}
return nil
})
}
err := wp.Wait()
if err != nil {
fmt.Println(err)
}
fmt.Println("down")
}
3. 非阻塞完成判断
package main
import (
"fmt"
"time"
"github.com/xxjwxc/gowp/workpool"
)
func main() {
wp := workpool.New(5) // 设置最大线程数
for i := 0; i < 10; i++ {
wp.Do(func() error {
for j := 0; j < 5; j++ {
time.Sleep(1 * time.Second)
}
return nil
})
fmt.Println(wp.IsDone()) // 非阻塞判断是否完成
}
wp.Wait()
fmt.Println(wp.IsDone())
fmt.Println("down")
}
4. 同步等待结果
package main
import (
"fmt"
"time"
"github.com/xxjwxc/gowp/workpool"
)
func main() {
wp := workpool.New(5) // 设置最大线程数
for i := 0; i < 10; i++ {
ii := i
wp.DoWait(func() error {
for j := 0; j < 5; j++ {
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
time.Sleep(1 * time.Second)
}
return nil
})
}
err := wp.Wait()
if err != nil {
fmt.Println(err)
}
fmt.Println("down")
}
5. 支持超时退出
package main
import (
"fmt"
"time"
"github.com/xxjwxc/gowp/workpool"
)
func main() {
wp := workpool.New(5) // 设置最大线程数
wp.SetTimeout(time.Millisecond) // 设置超时时间
for i := 0; i < 10; i++ {
ii := i
wp.DoWait(func() error {
for j := 0; j < 5; j++ {
fmt.Println(fmt.Sprintf("%v->\t%v", ii, j))
time.Sleep(1 * time.Second)
}
return nil
})
}
err := wp.Wait()
if err != nil {
fmt.Println(err)
}
fmt.Println("down")
}
限流器(Limiter)使用
1. 基于缓存的限流器
package main
import (
"fmt"
"sync"
"time"
"github.com/xxjwxc/gowp/limiter"
)
func main() {
limiter := limiter.NewLimiter(limiter.WithLimit(10), limiter.WithNamespace("test"), limiter.WithTsTimeout(true))
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
token, _ := limiter.Acquire(10) // 获取令牌
fmt.Println(token)
time.Sleep(1 * time.Second)
limiter.Release(token) // 释放令牌
}()
}
wg.Wait()
fmt.Println("down")
}
2. 基于Redis的限流器
package main
import (
"fmt"
"sync"
"time"
"github.com/xxjwxc/gowp/limiter"
"github.com/xxjwxc/public/myredis"
)
func main() {
conf := myredis.InitRedis(myredis.WithAddr("127.0.0.1:6379"), myredis.WithPwd("123456"), myredis.WithGroupName("test"))
res, err := myredis.NewRedis(conf)
if err != nil {
fmt.Println(err)
return
}
limiter := limiter.NewLimiter(limiter.WithRedis(res), limiter.WithLimit(10), limiter.WithNamespace("test"))
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
token, _ := limiter.Acquire(10)
fmt.Println(token)
time.Sleep(1 * time.Second)
limiter.Release(token)
}()
}
wg.Wait()
fmt.Println("down")
}
更多关于golang并发限制协程池管理插件gowp的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
1 回复
更多关于golang并发限制协程池管理插件gowp的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
GoWP - Golang 并发协程池管理插件
GoWP 是一个轻量级的 Golang 并发协程池管理库,它可以帮助你控制并发 goroutine 的数量,避免无限制创建 goroutine 导致的资源耗尽问题。
基本特性
- 限制最大并发 goroutine 数量
- 任务队列管理
- 支持同步/异步任务提交
- 任务超时控制
- 简单易用的 API 接口
安装
go get github.com/xxjwxc/gowp
基本使用示例
1. 创建协程池
package main
import (
"fmt"
"time"
"github.com/xxjwxc/gowp"
)
func main() {
// 创建一个最大并发数为3的协程池
wp := gowp.New(3)
defer wp.Wait() // 等待所有任务完成
for i := 0; i < 10; i++ {
idx := i
wp.Do(func() error {
fmt.Printf("Task %d started\n", idx)
time.Sleep(time.Second) // 模拟耗时任务
fmt.Printf("Task %d done\n", idx)
return nil
})
}
}
2. 带错误处理的协程池
func main() {
wp := gowp.New(3)
defer wp.Wait()
results := make(chan int, 10)
for i := 0; i < 10; i++ {
idx := i
wp.Do(func() error {
if idx%2 == 0 {
return fmt.Errorf("error on task %d", idx)
}
results <- idx
return nil
})
}
// 处理结果和错误
go func() {
for err := range wp.ErrChan() {
fmt.Println("Error:", err)
}
}()
for result := range results {
fmt.Println("Success:", result)
}
}
3. 带超时控制的协程池
func main() {
// 创建协程池,设置最大并发数和超时时间
wp := gowp.New(3, gowp.WithTimeout(time.Second*2))
defer wp.Wait()
for i := 0; i < 5; i++ {
idx := i
wp.Do(func() error {
if idx == 3 {
time.Sleep(time.Second * 3) // 这个任务会超时
} else {
time.Sleep(time.Second)
}
fmt.Printf("Task %d completed\n", idx)
return nil
})
}
}
高级功能
1. 动态调整协程池大小
func main() {
wp := gowp.New(3)
defer wp.Wait()
// 动态调整协程池大小
wp.Resize(5) // 扩大协程池
wp.Resize(2) // 缩小协程池
for i := 0; i < 10; i++ {
idx := i
wp.Do(func() error {
fmt.Printf("Task %d running\n", idx)
time.Sleep(time.Second)
return nil
})
}
}
2. 获取协程池状态
func main() {
wp := gowp.New(3)
defer wp.Wait()
for i := 0; i < 10; i++ {
idx := i
wp.Do(func() error {
time.Sleep(time.Second)
return nil
})
// 获取协程池状态
fmt.Printf("Running: %d, Waiting: %d\n",
wp.Running(), wp.Waiting())
}
}
最佳实践
-
合理设置并发数:根据任务类型和系统资源设置合适的并发数,通常设置为 CPU 核心数的 2-4 倍。
-
及时处理错误:使用
ErrChan()
方法获取错误通道,及时处理任务中的错误。 -
资源清理:使用
defer wp.Wait()
确保所有任务完成后再退出程序。 -
避免任务阻塞:长时间运行的任务应该支持取消或超时机制。
-
监控协程池状态:通过
Running()
和Waiting()
方法监控协程池状态,必要时动态调整。
性能考虑
- GoWP 使用 channel 实现任务队列,性能开销小
- 协程复用减少了 goroutine 创建销毁的开销
- 适合 IO 密集型任务,对于 CPU 密集型任务效果有限
GoWP 是一个简单但功能完备的协程池实现,适合大多数需要控制并发数的场景。对于更复杂的需求,可以考虑使用更强大的工作池实现如 ants
或 tunny
。