golang基于CAS实现高性能可调整信号量插件库semaphore的使用

golang基于CAS实现高性能可调整信号量插件库semaphore的使用

简介

semaphore是一个基于CAS实现的高性能、可调整大小的Golang信号量库,具有以下特点:

  • 支持加权获取/释放
  • 支持通过context取消操作
  • 支持创建后动态调整信号量限制
  • 比基于channel的信号量实现更快

使用方法

初始化

import "github.com/marusama/semaphore/v2"
...
sem := semaphore.New(5) // 创建限制为5的信号量

获取信号量

sem.Acquire(ctx, n)     // 使用context获取n个信号量
sem.TryAcquire(n)       // 尝试获取n个信号量(非阻塞方式)
...
ctx := context.WithTimeout(context.Background(), time.Second)
sem.Acquire(ctx, n)     // 带超时的获取n个信号量

释放信号量

sem.Release(n)          // 释放n个信号量

调整信号量限制

sem.SetLimit(new_limit) // 设置新的信号量限制

完整示例

package main

import (
	"context"
	"fmt"
	"time"
	
	"github.com/marusama/semaphore/v2"
)

func main() {
	// 创建限制为3的信号量
	sem := semaphore.New(3)
	
	// 启动5个goroutine模拟并发任务
	for i := 0; i < 5; i++ {
		go func(id int) {
			// 获取1个信号量
			if err := sem.Acquire(context.Background(), 1); err != nil {
				fmt.Printf("goroutine %d: failed to acquire semaphore: %v\n", id, err)
				return
			}
			defer sem.Release(1) // 确保释放信号量
			
			fmt.Printf("goroutine %d: acquired semaphore\n", id)
			time.Sleep(time.Second) // 模拟工作
			fmt.Printf("goroutine %d: work done\n", id)
		}(i)
	}
	
	// 等待所有goroutine完成
	time.Sleep(5 * time.Second)
	
	// 动态调整信号量限制为5
	sem.SetLimit(5)
	fmt.Println("semaphore limit changed to 5")
}

性能基准测试

在MacBook Pro (2017) 3.1GHz Core i5 CPU和8GB DDR3内存,macOS High Sierra,go1.11.4 darwin/amd64环境下测试:

// 本库性能:
BenchmarkSemaphore_Acquire_Release_under_limit_simple-4     20000000  98.6 ns/op  96 B/op  1 allocs/op
BenchmarkSemaphore_Acquire_Release_under_limit-4            1000000   1593 ns/op  960 B/op 10 allocs/op
BenchmarkSemaphore_Acquire_Release_over_limit-4             100000    20760 ns/op 9600 B/op 100 allocs/op

// 与其他实现对比:

// golang.org/x/sync/semaphore:
BenchmarkXSyncSemaphore_Acquire_Release_under_limit_simple-4 50000000 34.9 ns/op 0 B/op 0 allocs/op
BenchmarkXSyncSemaphore_Acquire_Release_under_limit-4        1000000  1103 ns/op 0 B/op 0 allocs/op
BenchmarkXSyncSemaphore_Acquire_Release_over_limit-4         30000    65927 ns/op 15985 B/op 299 allocs/op

// github.com/abiosoft/semaphore:
BenchmarkAbiosoftSemaphore_Acquire_Release_under_limit_simple-4 10000000 208 ns/op 0 B/op 0 allocs/op
BenchmarkAbiosoftSemaphore_Acquire_Release_under_limit-4         500000   3147 ns/op 0 B/op 0 allocs/op
BenchmarkAbiosoftSemaphore_Acquire_Release_over_limit-4          50000    37148 ns/op 0 B/op 0 allocs/op

要重新运行这些基准测试,可以检出benchmarks分支并运行go test -bench=. -benchmem ./bench/...


更多关于golang基于CAS实现高性能可调整信号量插件库semaphore的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang基于CAS实现高性能可调整信号量插件库semaphore的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang基于CAS实现高性能可调整信号量库semaphore

信号量概述

信号量(Semaphore)是一种常用的并发控制机制,用于限制对共享资源的并发访问数量。在Golang中,我们可以基于CAS(Compare-And-Swap)原子操作实现高性能的信号量。

基于CAS的信号量实现

下面是一个基于CAS实现的高性能可调整信号量的完整代码示例:

package semaphore

import (
	"sync/atomic"
)

// Semaphore 基于CAS实现的可调整信号量
type Semaphore struct {
	permits int32     // 总许可数
	channel chan struct{} // 用于阻塞和唤醒goroutine的通道
}

// NewSemaphore 创建一个新的信号量
func NewSemaphore(permits int) *Semaphore {
	if permits <= 0 {
		permits = 1 // 默认最少1个许可
	}
	return &Semaphore{
		permits: int32(permits),
		channel: make(chan struct{}, permits),
	}
}

// Acquire 获取一个许可,如果没有可用许可则阻塞
func (s *Semaphore) Acquire() {
	s.channel <- struct{}{}
	atomic.AddInt32(&s.permits, -1)
}

// TryAcquire 尝试获取一个许可,立即返回是否成功
func (s *Semaphore) TryAcquire() bool {
	select {
	case s.channel <- struct{}{}:
		atomic.AddInt32(&s.permits, -1)
		return true
	default:
		return false
	}
}

// Release 释放一个许可
func (s *Semaphore) Release() {
	<-s.channel
	atomic.AddInt32(&s.permits, 1)
}

// AvailablePermits 返回当前可用许可数
func (s *Semaphore) AvailablePermits() int {
	return int(atomic.LoadInt32(&s.permits))
}

// DrainPermits 获取并返回所有当前可用许可
func (s *Semaphore) DrainPermits() int {
	available := s.AvailablePermits()
	for i := 0; i < available; i++ {
		s.Acquire()
	}
	return available
}

// ReducePermits 减少信号量的总许可数
func (s *Semaphore) ReducePermits(reduction int) {
	if reduction <= 0 {
		return
	}
	atomic.AddInt32(&s.permits, -int32(reduction))
	for i := 0; i < reduction; i++ {
		s.Acquire()
	}
}

// IncreasePermits 增加信号量的总许可数
func (s *Semaphore) IncreasePermits(increase int) {
	if increase <= 0 {
		return
	}
	atomic.AddInt32(&s.permits, int32(increase))
	for i := 0; i < increase; i++ {
		s.Release()
	}
}

使用示例

package main

import (
	"fmt"
	"sync"
	"time"
	
	"./semaphore" // 替换为你的semaphore包路径
)

func worker(id int, sem *semaphore.Semaphore, wg *sync.WaitGroup) {
	defer wg.Done()
	
	sem.Acquire()
	fmt.Printf("Worker %d acquired semaphore\n", id)
	time.Sleep(time.Second) // 模拟工作
	sem.Release()
	fmt.Printf("Worker %d released semaphore\n", id)
}

func main() {
	const (
		totalWorkers = 10
		initialPermits = 3
	)
	
	sem := semaphore.NewSemaphore(initialPermits)
	var wg sync.WaitGroup
	
	// 启动10个worker,但只有3个能同时运行
	for i := 1; i <= totalWorkers; i++ {
		wg.Add(1)
		go worker(i, sem, &wg)
	}
	
	// 等待2秒后增加许可数
	time.Sleep(2 * time.Second)
	fmt.Println("\nIncreasing permits to 5...")
	sem.IncreasePermits(2)
	
	// 等待2秒后减少许可数
	time.Sleep(2 * time.Second)
	fmt.Println("\nReducing permits to 2...")
	sem.ReducePermits(3)
	
	wg.Wait()
	fmt.Println("All workers completed")
}

性能优化说明

  1. CAS原子操作:使用atomic包提供的原子操作来保证permits字段的线程安全,避免了锁的开销。

  2. 通道缓冲:使用带缓冲的channel来实现阻塞和唤醒机制,比基于条件变量的实现更高效。

  3. 动态调整:支持运行时动态调整信号量的许可数量,适用于需要弹性控制并发度的场景。

  4. 非阻塞尝试:提供TryAcquire方法实现非阻塞获取许可,适用于需要快速失败(fast-fail)的场景。

适用场景

  1. 限制数据库连接池并发数
  2. 控制API请求并发度
  3. 资源池管理
  4. 限流保护系统

这个实现结合了CAS的高性能和channel的简洁性,同时提供了动态调整信号量大小的能力,适合大多数需要并发控制的场景。

回到顶部