golang基于CAS实现高性能可调整信号量插件库semaphore的使用
golang基于CAS实现高性能可调整信号量插件库semaphore的使用
简介
semaphore是一个基于CAS实现的高性能、可调整大小的Golang信号量库,具有以下特点:
- 支持加权获取/释放
- 支持通过context取消操作
- 支持创建后动态调整信号量限制
- 比基于channel的信号量实现更快
使用方法
初始化
import "github.com/marusama/semaphore/v2"
...
sem := semaphore.New(5) // 创建限制为5的信号量
获取信号量
sem.Acquire(ctx, n) // 使用context获取n个信号量
sem.TryAcquire(n) // 尝试获取n个信号量(非阻塞方式)
...
ctx := context.WithTimeout(context.Background(), time.Second)
sem.Acquire(ctx, n) // 带超时的获取n个信号量
释放信号量
sem.Release(n) // 释放n个信号量
调整信号量限制
sem.SetLimit(new_limit) // 设置新的信号量限制
完整示例
package main
import (
"context"
"fmt"
"time"
"github.com/marusama/semaphore/v2"
)
func main() {
// 创建限制为3的信号量
sem := semaphore.New(3)
// 启动5个goroutine模拟并发任务
for i := 0; i < 5; i++ {
go func(id int) {
// 获取1个信号量
if err := sem.Acquire(context.Background(), 1); err != nil {
fmt.Printf("goroutine %d: failed to acquire semaphore: %v\n", id, err)
return
}
defer sem.Release(1) // 确保释放信号量
fmt.Printf("goroutine %d: acquired semaphore\n", id)
time.Sleep(time.Second) // 模拟工作
fmt.Printf("goroutine %d: work done\n", id)
}(i)
}
// 等待所有goroutine完成
time.Sleep(5 * time.Second)
// 动态调整信号量限制为5
sem.SetLimit(5)
fmt.Println("semaphore limit changed to 5")
}
性能基准测试
在MacBook Pro (2017) 3.1GHz Core i5 CPU和8GB DDR3内存,macOS High Sierra,go1.11.4 darwin/amd64环境下测试:
// 本库性能:
BenchmarkSemaphore_Acquire_Release_under_limit_simple-4 20000000 98.6 ns/op 96 B/op 1 allocs/op
BenchmarkSemaphore_Acquire_Release_under_limit-4 1000000 1593 ns/op 960 B/op 10 allocs/op
BenchmarkSemaphore_Acquire_Release_over_limit-4 100000 20760 ns/op 9600 B/op 100 allocs/op
// 与其他实现对比:
// golang.org/x/sync/semaphore:
BenchmarkXSyncSemaphore_Acquire_Release_under_limit_simple-4 50000000 34.9 ns/op 0 B/op 0 allocs/op
BenchmarkXSyncSemaphore_Acquire_Release_under_limit-4 1000000 1103 ns/op 0 B/op 0 allocs/op
BenchmarkXSyncSemaphore_Acquire_Release_over_limit-4 30000 65927 ns/op 15985 B/op 299 allocs/op
// github.com/abiosoft/semaphore:
BenchmarkAbiosoftSemaphore_Acquire_Release_under_limit_simple-4 10000000 208 ns/op 0 B/op 0 allocs/op
BenchmarkAbiosoftSemaphore_Acquire_Release_under_limit-4 500000 3147 ns/op 0 B/op 0 allocs/op
BenchmarkAbiosoftSemaphore_Acquire_Release_over_limit-4 50000 37148 ns/op 0 B/op 0 allocs/op
要重新运行这些基准测试,可以检出benchmarks
分支并运行go test -bench=. -benchmem ./bench/...
更多关于golang基于CAS实现高性能可调整信号量插件库semaphore的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html
更多关于golang基于CAS实现高性能可调整信号量插件库semaphore的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
Golang基于CAS实现高性能可调整信号量库semaphore
信号量概述
信号量(Semaphore)是一种常用的并发控制机制,用于限制对共享资源的并发访问数量。在Golang中,我们可以基于CAS(Compare-And-Swap)原子操作实现高性能的信号量。
基于CAS的信号量实现
下面是一个基于CAS实现的高性能可调整信号量的完整代码示例:
package semaphore
import (
"sync/atomic"
)
// Semaphore 基于CAS实现的可调整信号量
type Semaphore struct {
permits int32 // 总许可数
channel chan struct{} // 用于阻塞和唤醒goroutine的通道
}
// NewSemaphore 创建一个新的信号量
func NewSemaphore(permits int) *Semaphore {
if permits <= 0 {
permits = 1 // 默认最少1个许可
}
return &Semaphore{
permits: int32(permits),
channel: make(chan struct{}, permits),
}
}
// Acquire 获取一个许可,如果没有可用许可则阻塞
func (s *Semaphore) Acquire() {
s.channel <- struct{}{}
atomic.AddInt32(&s.permits, -1)
}
// TryAcquire 尝试获取一个许可,立即返回是否成功
func (s *Semaphore) TryAcquire() bool {
select {
case s.channel <- struct{}{}:
atomic.AddInt32(&s.permits, -1)
return true
default:
return false
}
}
// Release 释放一个许可
func (s *Semaphore) Release() {
<-s.channel
atomic.AddInt32(&s.permits, 1)
}
// AvailablePermits 返回当前可用许可数
func (s *Semaphore) AvailablePermits() int {
return int(atomic.LoadInt32(&s.permits))
}
// DrainPermits 获取并返回所有当前可用许可
func (s *Semaphore) DrainPermits() int {
available := s.AvailablePermits()
for i := 0; i < available; i++ {
s.Acquire()
}
return available
}
// ReducePermits 减少信号量的总许可数
func (s *Semaphore) ReducePermits(reduction int) {
if reduction <= 0 {
return
}
atomic.AddInt32(&s.permits, -int32(reduction))
for i := 0; i < reduction; i++ {
s.Acquire()
}
}
// IncreasePermits 增加信号量的总许可数
func (s *Semaphore) IncreasePermits(increase int) {
if increase <= 0 {
return
}
atomic.AddInt32(&s.permits, int32(increase))
for i := 0; i < increase; i++ {
s.Release()
}
}
使用示例
package main
import (
"fmt"
"sync"
"time"
"./semaphore" // 替换为你的semaphore包路径
)
func worker(id int, sem *semaphore.Semaphore, wg *sync.WaitGroup) {
defer wg.Done()
sem.Acquire()
fmt.Printf("Worker %d acquired semaphore\n", id)
time.Sleep(time.Second) // 模拟工作
sem.Release()
fmt.Printf("Worker %d released semaphore\n", id)
}
func main() {
const (
totalWorkers = 10
initialPermits = 3
)
sem := semaphore.NewSemaphore(initialPermits)
var wg sync.WaitGroup
// 启动10个worker,但只有3个能同时运行
for i := 1; i <= totalWorkers; i++ {
wg.Add(1)
go worker(i, sem, &wg)
}
// 等待2秒后增加许可数
time.Sleep(2 * time.Second)
fmt.Println("\nIncreasing permits to 5...")
sem.IncreasePermits(2)
// 等待2秒后减少许可数
time.Sleep(2 * time.Second)
fmt.Println("\nReducing permits to 2...")
sem.ReducePermits(3)
wg.Wait()
fmt.Println("All workers completed")
}
性能优化说明
-
CAS原子操作:使用
atomic
包提供的原子操作来保证permits
字段的线程安全,避免了锁的开销。 -
通道缓冲:使用带缓冲的channel来实现阻塞和唤醒机制,比基于条件变量的实现更高效。
-
动态调整:支持运行时动态调整信号量的许可数量,适用于需要弹性控制并发度的场景。
-
非阻塞尝试:提供
TryAcquire
方法实现非阻塞获取许可,适用于需要快速失败(fast-fail)的场景。
适用场景
- 限制数据库连接池并发数
- 控制API请求并发度
- 资源池管理
- 限流保护系统
这个实现结合了CAS的高性能和channel的简洁性,同时提供了动态调整信号量大小的能力,适合大多数需要并发控制的场景。