golang支持超时和动态优先级的并发限制插件库concurrency-limiter的使用

Golang支持超时和动态优先级的并发限制插件库concurrency-limiter的使用

关于

concurrency-limiter允许您限制访问资源的goroutine数量,支持超时、goroutine的动态优先级和上下文取消。

安装

安装concurrency-limiter:

go get github.com/vivek-ng/concurrency-limiter

然后导入concurrency-limiter使用它:

import(
    github.com/vivek-ng/concurrency-limiter/limiter
)

nl := limiter.New(3)
ctx := context.Background()
nl.Wait(ctx)
// 执行操作...
nl.Finish()

使用示例

基本限流器

import(
    github.com/vivek-ng/concurrency-limiter/limiter
)

func main() {
    nl := limiter.New(3)

    var wg sync.WaitGroup
    wg.Add(15)

    for i := 0; i < 15; i++ {
        go func(index int) {
            defer wg.Done()
            ctx := context.Background()
            nl.Wait(ctx)
            // 实际应用中,这可能是数据库操作、消息发布到队列等...
            fmt.Println("executing action...: ", "index: ", index, "current number of goroutines: ", nl.Count())
            nl.Finish()
        }(i)
    }
    wg.Wait()
}

在上面的例子中,最多可以有3个goroutine同时访问资源。其他goroutine被添加到等待列表中,按照FIFO顺序获得访问资源的机会。如果上下文被取消,goroutine将从等待列表中移除。

带超时的限流器

nl := limiter.New(3,
WithTimeout(10),
)
ctx := context.Background()
nl.Wait(ctx)
// 执行操作...
nl.Finish()

在上面的例子中,goroutine最多等待10毫秒。即使并发goroutine数量大于指定限制,goroutine也会在10毫秒后从等待列表中移除。

优先级限流器

import(
    github.com/vivek-ng/concurrency-limiter/priority
)

func main() {
    pr := priority.NewLimiter(1)
    var wg sync.WaitGroup
    wg.Add(15)
    for i := 0; i < 15; i++ {
        go func(index int) {
            defer wg.Done()
            ctx := context.Background()
            if index%2 == 1 {
                pr.Wait(ctx, priority.High)
            } else {
                pr.Wait(ctx, priority.Low)
            }
            // 实际应用中,这可能是数据库操作、消息发布到队列等...
            fmt.Println("executing action...: ", "index: ", index, "current number of goroutines: ", pr.Count())
            pr.Finish()
        }(i)
    }
    wg.Wait()
}

在优先级限流器中,优先级较高的goroutine将优先从等待列表中移除。在上面的例子中,goroutine将获得最高优先级。在优先级相同的情况下,goroutine将按照FIFO顺序从等待列表中移除。

带动态优先级的限流器

nl := priority.NewLimiter(3,
WithDynamicPriority(5),
)
ctx := context.Background()
nl.Wait(ctx , priority.Low)
// 执行操作...
nl.Finish()

在动态优先级限流器中,低优先级的goroutine会按照指定的时间周期定期提高其优先级。在上面的例子中,goroutine每5毫秒提高一次优先级。这样可以确保低优先级的goroutine不会饿死。强烈建议使用动态优先级限流器来避免低优先级goroutine饿死。

带超时的优先级限流器

nl := priority.NewLimiter(3,
WithTimeout(30),
WithDynamicPriority(5),
)
ctx := context.Background()
nl.Wait(ctx , priority.Low)
// 执行操作...
nl.Finish()

这与普通限流器中的超时类似。在上面的例子中,goroutine最多等待30毫秒。低优先级的goroutine每5毫秒提高一次优先级。

可运行函数

nl := priority.NewLimiter(3)
ctx := context.Background()
nl.Run(ctx , priority.Low , func()error {
    return sendMetrics()
})

可运行函数允许您包装您的函数并在并发限制下执行它们。这个函数是Wait()和Finish()函数的包装器。

贡献

欢迎提交问题和创建PR来修复bug或添加新功能。所有贡献都是受欢迎的 :)


更多关于golang支持超时和动态优先级的并发限制插件库concurrency-limiter的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang支持超时和动态优先级的并发限制插件库concurrency-limiter的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang 并发限制器:支持超时和动态优先级的 concurrency-limiter

在 Go 语言中,管理并发是一个常见需求。concurrency-limiter 是一个支持超时和动态优先级的并发限制插件库,可以帮助开发者更好地控制并发执行的任务。

基本概念

concurrency-limiter 提供了以下核心功能:

  • 限制最大并发数
  • 支持任务优先级
  • 支持超时控制
  • 动态调整并发限制

安装

go get github.com/your-repo/concurrency-limiter

基本使用示例

package main

import (
	"context"
	"fmt"
	"time"
	"github.com/your-repo/concurrency-limiter"
)

func main() {
	// 创建一个最大并发数为3的限流器
	limiter := concurrency_limiter.NewLimiter(3)

	// 模拟10个任务
	for i := 0; i < 10; i++ {
		taskID := i
		go func() {
			// 获取许可,如果没有可用许可会阻塞
			token, err := limiter.Acquire(context.Background())
			if err != nil {
				fmt.Printf("Task %d failed to acquire token: %v\n", taskID, err)
				return
			}
			defer token.Release() // 确保释放许可

			// 执行任务
			fmt.Printf("Task %d is running\n", taskID)
			time.Sleep(time.Second)
			fmt.Printf("Task %d completed\n", taskID)
		}()
	}

	// 等待所有任务完成
	time.Sleep(5 * time.Second)
}

带超时的使用示例

func main() {
	limiter := concurrency_limiter.NewLimiter(2)
	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer cancel()

	for i := 0; i < 5; i++ {
		taskID := i
		go func() {
			token, err := limiter.Acquire(ctx)
			if err != nil {
				fmt.Printf("Task %d timed out waiting for token\n", taskID)
				return
			}
			defer token.Release()

			fmt.Printf("Task %d started\n", taskID)
			time.Sleep(time.Second)
			fmt.Printf("Task %d finished\n", taskID)
		}()
	}

	time.Sleep(3 * time.Second)
}

带优先级的使用示例

func main() {
	// 创建支持优先级的限流器
	limiter := concurrency_limiter.NewPriorityLimiter(3)

	for i := 0; i < 10; i++ {
		taskID := i
		priority := i % 3 // 优先级0-2
		go func() {
			token, err := limiter.AcquireWithPriority(context.Background(), priority)
			if err != nil {
				fmt.Printf("Task %d failed to acquire token\n", taskID)
				return
			}
			defer token.Release()

			fmt.Printf("Task %d (priority %d) started\n", taskID, priority)
			time.Sleep(time.Second)
			fmt.Printf("Task %d completed\n", taskID)
		}()
	}

	time.Sleep(5 * time.Second)
}

动态调整并发限制

func main() {
	limiter := concurrency_limiter.NewLimiter(2)

	// 初始限制为2
	for i := 0; i < 5; i++ {
		go func(id int) {
			token, _ := limiter.Acquire(context.Background())
			defer token.Release()
			fmt.Printf("Task %d running with limit %d\n", id, limiter.GetLimit())
			time.Sleep(time.Second)
		}(i)
	}

	time.Sleep(1 * time.Second)
	
	// 动态调整限制为4
	limiter.SetLimit(4)
	fmt.Println("Limit increased to 4")

	time.Sleep(3 * time.Second)
}

高级功能:带权重的并发限制

func main() {
	// 创建带权重的限流器,总权重限制为10
	limiter := concurrency_limiter.NewWeightedLimiter(10)

	for i := 0; i < 5; i++ {
		taskID := i
		weight := i + 1 // 权重1-5
		go func() {
			token, err := limiter.AcquireWithWeight(context.Background(), weight)
			if err != nil {
				fmt.Printf("Task %d failed to acquire token\n", taskID)
				return
			}
			defer token.Release()

			fmt.Printf("Task %d (weight %d) started\n", taskID, weight)
			time.Sleep(2 * time.Second)
			fmt.Printf("Task %d completed\n", taskID)
		}()
	}

	time.Sleep(10 * time.Second)
}

性能考虑

  1. 对于高并发场景,考虑使用 TryAcquire 非阻塞方法
  2. 合理设置超时时间避免长时间阻塞
  3. 动态调整限制时要考虑当前正在执行的任务

总结

concurrency-limiter 提供了灵活的并发控制机制,通过超时、优先级和动态调整等功能,可以满足各种复杂的并发控制需求。在实际应用中,可以根据具体场景选择合适的限制策略和参数配置。

注意:上述代码示例中的 concurrency_limiter 是一个假设的库名,实际使用时需要替换为真实的库名或实现。

回到顶部