golang支持超时和动态优先级的并发限制插件库concurrency-limiter的使用
Golang支持超时和动态优先级的并发限制插件库concurrency-limiter的使用
关于
concurrency-limiter允许您限制访问资源的goroutine数量,支持超时、goroutine的动态优先级和上下文取消。
安装
安装concurrency-limiter:
go get github.com/vivek-ng/concurrency-limiter
然后导入concurrency-limiter使用它:
import(
github.com/vivek-ng/concurrency-limiter/limiter
)
nl := limiter.New(3)
ctx := context.Background()
nl.Wait(ctx)
// 执行操作...
nl.Finish()
使用示例
基本限流器
import(
github.com/vivek-ng/concurrency-limiter/limiter
)
func main() {
nl := limiter.New(3)
var wg sync.WaitGroup
wg.Add(15)
for i := 0; i < 15; i++ {
go func(index int) {
defer wg.Done()
ctx := context.Background()
nl.Wait(ctx)
// 实际应用中,这可能是数据库操作、消息发布到队列等...
fmt.Println("executing action...: ", "index: ", index, "current number of goroutines: ", nl.Count())
nl.Finish()
}(i)
}
wg.Wait()
}
在上面的例子中,最多可以有3个goroutine同时访问资源。其他goroutine被添加到等待列表中,按照FIFO顺序获得访问资源的机会。如果上下文被取消,goroutine将从等待列表中移除。
带超时的限流器
nl := limiter.New(3,
WithTimeout(10),
)
ctx := context.Background()
nl.Wait(ctx)
// 执行操作...
nl.Finish()
在上面的例子中,goroutine最多等待10毫秒。即使并发goroutine数量大于指定限制,goroutine也会在10毫秒后从等待列表中移除。
优先级限流器
import(
github.com/vivek-ng/concurrency-limiter/priority
)
func main() {
pr := priority.NewLimiter(1)
var wg sync.WaitGroup
wg.Add(15)
for i := 0; i < 15; i++ {
go func(index int) {
defer wg.Done()
ctx := context.Background()
if index%2 == 1 {
pr.Wait(ctx, priority.High)
} else {
pr.Wait(ctx, priority.Low)
}
// 实际应用中,这可能是数据库操作、消息发布到队列等...
fmt.Println("executing action...: ", "index: ", index, "current number of goroutines: ", pr.Count())
pr.Finish()
}(i)
}
wg.Wait()
}
在优先级限流器中,优先级较高的goroutine将优先从等待列表中移除。在上面的例子中,goroutine将获得最高优先级。在优先级相同的情况下,goroutine将按照FIFO顺序从等待列表中移除。
带动态优先级的限流器
nl := priority.NewLimiter(3,
WithDynamicPriority(5),
)
ctx := context.Background()
nl.Wait(ctx , priority.Low)
// 执行操作...
nl.Finish()
在动态优先级限流器中,低优先级的goroutine会按照指定的时间周期定期提高其优先级。在上面的例子中,goroutine每5毫秒提高一次优先级。这样可以确保低优先级的goroutine不会饿死。强烈建议使用动态优先级限流器来避免低优先级goroutine饿死。
带超时的优先级限流器
nl := priority.NewLimiter(3,
WithTimeout(30),
WithDynamicPriority(5),
)
ctx := context.Background()
nl.Wait(ctx , priority.Low)
// 执行操作...
nl.Finish()
这与普通限流器中的超时类似。在上面的例子中,goroutine最多等待30毫秒。低优先级的goroutine每5毫秒提高一次优先级。
可运行函数
nl := priority.NewLimiter(3)
ctx := context.Background()
nl.Run(ctx , priority.Low , func()error {
return sendMetrics()
})
可运行函数允许您包装您的函数并在并发限制下执行它们。这个函数是Wait()和Finish()函数的包装器。
贡献
欢迎提交问题和创建PR来修复bug或添加新功能。所有贡献都是受欢迎的 :)
更多关于golang支持超时和动态优先级的并发限制插件库concurrency-limiter的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang支持超时和动态优先级的并发限制插件库concurrency-limiter的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang 并发限制器:支持超时和动态优先级的 concurrency-limiter
在 Go 语言中,管理并发是一个常见需求。concurrency-limiter 是一个支持超时和动态优先级的并发限制插件库,可以帮助开发者更好地控制并发执行的任务。
基本概念
concurrency-limiter 提供了以下核心功能:
- 限制最大并发数
- 支持任务优先级
- 支持超时控制
- 动态调整并发限制
安装
go get github.com/your-repo/concurrency-limiter
基本使用示例
package main
import (
"context"
"fmt"
"time"
"github.com/your-repo/concurrency-limiter"
)
func main() {
// 创建一个最大并发数为3的限流器
limiter := concurrency_limiter.NewLimiter(3)
// 模拟10个任务
for i := 0; i < 10; i++ {
taskID := i
go func() {
// 获取许可,如果没有可用许可会阻塞
token, err := limiter.Acquire(context.Background())
if err != nil {
fmt.Printf("Task %d failed to acquire token: %v\n", taskID, err)
return
}
defer token.Release() // 确保释放许可
// 执行任务
fmt.Printf("Task %d is running\n", taskID)
time.Sleep(time.Second)
fmt.Printf("Task %d completed\n", taskID)
}()
}
// 等待所有任务完成
time.Sleep(5 * time.Second)
}
带超时的使用示例
func main() {
limiter := concurrency_limiter.NewLimiter(2)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
for i := 0; i < 5; i++ {
taskID := i
go func() {
token, err := limiter.Acquire(ctx)
if err != nil {
fmt.Printf("Task %d timed out waiting for token\n", taskID)
return
}
defer token.Release()
fmt.Printf("Task %d started\n", taskID)
time.Sleep(time.Second)
fmt.Printf("Task %d finished\n", taskID)
}()
}
time.Sleep(3 * time.Second)
}
带优先级的使用示例
func main() {
// 创建支持优先级的限流器
limiter := concurrency_limiter.NewPriorityLimiter(3)
for i := 0; i < 10; i++ {
taskID := i
priority := i % 3 // 优先级0-2
go func() {
token, err := limiter.AcquireWithPriority(context.Background(), priority)
if err != nil {
fmt.Printf("Task %d failed to acquire token\n", taskID)
return
}
defer token.Release()
fmt.Printf("Task %d (priority %d) started\n", taskID, priority)
time.Sleep(time.Second)
fmt.Printf("Task %d completed\n", taskID)
}()
}
time.Sleep(5 * time.Second)
}
动态调整并发限制
func main() {
limiter := concurrency_limiter.NewLimiter(2)
// 初始限制为2
for i := 0; i < 5; i++ {
go func(id int) {
token, _ := limiter.Acquire(context.Background())
defer token.Release()
fmt.Printf("Task %d running with limit %d\n", id, limiter.GetLimit())
time.Sleep(time.Second)
}(i)
}
time.Sleep(1 * time.Second)
// 动态调整限制为4
limiter.SetLimit(4)
fmt.Println("Limit increased to 4")
time.Sleep(3 * time.Second)
}
高级功能:带权重的并发限制
func main() {
// 创建带权重的限流器,总权重限制为10
limiter := concurrency_limiter.NewWeightedLimiter(10)
for i := 0; i < 5; i++ {
taskID := i
weight := i + 1 // 权重1-5
go func() {
token, err := limiter.AcquireWithWeight(context.Background(), weight)
if err != nil {
fmt.Printf("Task %d failed to acquire token\n", taskID)
return
}
defer token.Release()
fmt.Printf("Task %d (weight %d) started\n", taskID, weight)
time.Sleep(2 * time.Second)
fmt.Printf("Task %d completed\n", taskID)
}()
}
time.Sleep(10 * time.Second)
}
性能考虑
- 对于高并发场景,考虑使用
TryAcquire
非阻塞方法 - 合理设置超时时间避免长时间阻塞
- 动态调整限制时要考虑当前正在执行的任务
总结
concurrency-limiter 提供了灵活的并发控制机制,通过超时、优先级和动态调整等功能,可以满足各种复杂的并发控制需求。在实际应用中,可以根据具体场景选择合适的限制策略和参数配置。
注意:上述代码示例中的 concurrency_limiter
是一个假设的库名,实际使用时需要替换为真实的库名或实现。