1 回复
更多关于Golang中关于限流器的问题求解答的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html
在Go语言中实现带取消令牌恢复功能的限流器,可以通过context和令牌桶算法结合实现。以下是一个完整的实现示例:
package main
import (
"context"
"fmt"
"sync"
"time"
)
type RateLimiter struct {
tokens int
maxTokens int
refillRate int
lastRefillTime time.Time
mu sync.Mutex
cancelFuncs map[context.CancelFunc]bool
}
func NewRateLimiter(maxTokens, refillRate int) *RateLimiter {
return &RateLimiter{
tokens: maxTokens,
maxTokens: maxTokens,
refillRate: refillRate,
lastRefillTime: time.Now(),
cancelFuncs: make(map[context.CancelFunc]bool),
}
}
func (rl *RateLimiter) refill() {
now := time.Now()
elapsed := now.Sub(rl.lastRefillTime)
tokensToAdd := int(elapsed.Seconds()) * rl.refillRate
if tokensToAdd > 0 {
rl.tokens = min(rl.tokens+tokensToAdd, rl.maxTokens)
rl.lastRefillTime = now
}
}
func (rl *RateLimiter) Wait(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
rl.mu.Lock()
rl.refill()
if rl.tokens > 0 {
rl.tokens--
rl.mu.Unlock()
return nil
}
// 创建可取消的等待上下文
waitCtx, cancel := context.WithCancel(context.Background())
rl.cancelFuncs[cancel] = true
rl.mu.Unlock()
// 等待令牌恢复或外部取消
select {
case <-time.After(time.Second / time.Duration(rl.refillRate)):
rl.mu.Lock()
delete(rl.cancelFuncs, cancel)
rl.mu.Unlock()
cancel()
continue
case <-ctx.Done():
rl.mu.Lock()
delete(rl.cancelFuncs, cancel)
rl.mu.Unlock()
cancel()
return ctx.Err()
case <-waitCtx.Done():
continue
}
}
}
}
func (rl *RateLimiter) RestoreTokens(tokens int) {
rl.mu.Lock()
defer rl.mu.Unlock()
rl.refill()
rl.tokens = min(rl.tokens+tokens, rl.maxTokens)
// 取消所有等待的请求
for cancel := range rl.cancelFuncs {
cancel()
delete(rl.cancelFuncs, cancel)
}
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
// 使用示例
func main() {
limiter := NewRateLimiter(10, 1) // 最大10个令牌,每秒补充1个
// 模拟并发请求
var wg sync.WaitGroup
for i := 0; i < 15; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
start := time.Now()
err := limiter.Wait(ctx)
elapsed := time.Since(start)
if err != nil {
fmt.Printf("请求%d: 超时或取消 (%v)\n", id, err)
} else {
fmt.Printf("请求%d: 获得令牌,等待时间: %v\n", id, elapsed)
}
}(i)
}
// 模拟令牌恢复
time.Sleep(2 * time.Second)
fmt.Println("恢复5个令牌...")
limiter.RestoreTokens(5)
wg.Wait()
}
这个实现提供了以下关键功能:
- 令牌桶算法:基础的限流算法,支持突发请求
- 上下文取消:通过context支持请求超时和手动取消
- 令牌恢复机制:
RestoreTokens方法允许外部恢复令牌 - 等待队列管理:当令牌不足时,请求会等待,并在令牌恢复时被唤醒
输出示例:
请求0: 获得令牌,等待时间: 0s
请求1: 获得令牌,等待时间: 0s
...
请求9: 获得令牌,等待时间: 0s
恢复5个令牌...
请求10: 获得令牌,等待时间: 2.1s
请求11: 获得令牌,等待时间: 2.1s
...
请求13: 获得令牌,等待时间: 2.1s
请求14: 超时或取消 (context deadline exceeded)
这个实现确保了在调用RestoreTokens时,所有等待的请求都能立即获得令牌继续执行,同时保持了线程安全性。

