golang基于channel和context实现带超时的信号量模式插件库semaphore的使用

Golang基于channel和context实现带超时的信号量模式插件库semaphore的使用

信号量模式实现

semaphore是一个实现了带超时的锁操作(lock/unlock)的信号量模式的Golang库。它提供了API来控制多个goroutine对共享资源的访问或限制吞吐量。

基本用法

releaser, err := semaphore.Acquire(breaker.BreakByTimeout(time.Second))
if err != nil {
    // 超时处理
}
defer releaser.Release()

快速开始示例

下面是一个HTTP请求限流的完整示例:

package main

import (
	"context"
	"log"
	"net/http"
	"time"
	
	"github.com/kamilsk/semaphore"
	"github.com/kamilsk/semaphore/breaker"
)

func main() {
	// 创建一个容量为1000的信号量
	limiter := semaphore.New(1000)

	http.HandleFunc("/", func(rw http.ResponseWriter, req *http.Request) {
		// 尝试获取信号量,设置1秒超时
		if _, err := limiter.Acquire(
			breaker.BreakByContext(
				context.WithTimeout(req.Context(), time.Second),
			),
		); err != nil {
			// 获取信号量失败,返回429状态码
			http.Error(rw, http.StatusText(http.StatusTooManyRequests), http.StatusTooManyRequests)
			return
		}
		defer limiter.Release() // 确保释放信号量

		// 处理请求
		rw.Write([]byte("Hello, World!"))
	})

	log.Fatal(http.ListenAndServe(":8080", http.DefaultServeMux))
}

集成方式

该库使用SemVer进行版本控制,可以通过go modules或dep来管理版本。

使用go modules安装最新版本(v5):

go get -u github.com/kamilsk/semaphore/v5

使用dep安装:

dep ensure -add github.com/kamilsk/semaphore@v5.0.0-rc1

实际应用示例

并行执行控制台命令

semaphore也可以用于控制并行执行的命令数量:

# 创建容量为2的信号量
$ semaphore create 2

# 添加要执行的命令
$ semaphore add -- docker build
$ semaphore add -- vagrant up
$ semaphore add -- ansible-playbook

# 等待所有命令完成,设置1分钟超时
$ semaphore wait --timeout=1m --notify

这个库为Golang中的并发控制提供了简单而强大的工具,特别适合需要限制资源访问或控制吞吐量的场景。


更多关于golang基于channel和context实现带超时的信号量模式插件库semaphore的使用的实战教程也可以访问 https://www.itying.com/category-94-b0.html

1 回复

更多关于golang基于channel和context实现带超时的信号量模式插件库semaphore的使用的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


Golang 基于 Channel 和 Context 实现带超时的信号量模式

信号量(Semaphore)是一种常用的并发控制机制,用于限制同时访问资源的协程数量。下面我将介绍如何使用 Go 的 channel 和 context 实现一个带超时功能的信号量库。

基本实现

package semaphore

import (
	"context"
	"time"
)

// Semaphore 信号量结构体
type Semaphore struct {
	sem chan struct{}
}

// New 创建一个新的信号量,size 为并发限制数
func New(size int) *Semaphore {
	return &Semaphore{
		sem: make(chan struct{}, size),
	}
}

// Acquire 获取信号量,如果无法获取会阻塞
func (s *Semaphore) Acquire() {
	s.sem <- struct{}{}
}

// Release 释放信号量
func (s *Semaphore) Release() {
	<-s.sem
}

// TryAcquire 尝试获取信号量,如果无法获取立即返回 false
func (s *Semaphore) TryAcquire() bool {
	select {
	case s.sem <- struct{}{}:
		return true
	default:
		return false
	}
}

// AcquireWithTimeout 带超时的获取信号量
func (s *Semaphore) AcquireWithTimeout(ctx context.Context) error {
	select {
	case s.sem <- struct{}{}:
		return nil
	case <-ctx.Done():
		return ctx.Err()
	}
}

使用示例

package main

import (
	"context"
	"fmt"
	"semaphore" // 假设上面的代码放在 semaphore 包中
	"time"
)

func main() {
	// 创建一个最大并发数为3的信号量
	sem := semaphore.New(3)

	// 使用普通方式获取信号量
	for i := 0; i < 10; i++ {
		go func(id int) {
			sem.Acquire()
			defer sem.Release()
			
			fmt.Printf("Worker %d started\n", id)
			time.Sleep(time.Second)
			fmt.Printf("Worker %d done\n", id)
		}(i)
	}

	time.Sleep(2 * time.Second)
	fmt.Println("---")

	// 使用带超时的信号量
	for i := 0; i < 10; i++ {
		go func(id int) {
			ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
			defer cancel()
			
			if err := sem.AcquireWithTimeout(ctx); err != nil {
				fmt.Printf("Worker %d failed to acquire semaphore: %v\n", id, err)
				return
			}
			defer sem.Release()
			
			fmt.Printf("Worker %d started with timeout\n", id)
			time.Sleep(time.Second)
			fmt.Printf("Worker %d done with timeout\n", id)
		}(i + 10)
	}

	time.Sleep(3 * time.Second)
}

高级功能扩展

我们可以扩展信号量功能,添加更多实用方法:

// 获取当前信号量可用数量
func (s *Semaphore) Available() int {
	return cap(s.sem) - len(s.sem)
}

// 获取信号量容量
func (s *Semaphore) Capacity() int {
	return cap(s.sem)
}

// 带权重的信号量获取
func (s *Semaphore) AcquireWithWeight(n int) {
	for i := 0; i < n; i++ {
		s.Acquire()
	}
}

// 带权重的信号量释放
func (s *Semaphore) ReleaseWithWeight(n int) {
	for i := 0; i < n; i++ {
		s.Release()
	}
}

使用注意事项

  1. 确保释放信号量:使用 defer 确保在函数退出时释放信号量
  2. 避免死锁:不要在持有信号量的情况下再次尝试获取
  3. 合理设置超时:长时间运行的作业需要设置合理的超时时间
  4. 监控信号量使用:可以通过 Available() 方法监控系统负载

性能考虑

  1. channel 实现的信号量在 Go 中性能很好,因为 channel 操作是 Go 运行时优化的核心操作
  2. 对于极高并发的场景,可以考虑使用 sync.Mutex 和 atomic 操作实现更轻量级的信号量
  3. 信号量大小应根据实际资源限制合理设置

这个实现提供了基本的信号量功能,并支持超时控制,可以满足大多数并发控制场景的需求。

回到顶部