Golang中关于限流器的问题求解答

Golang中关于限流器的问题求解答 Stack Overflow 上的问题:速率限制取消令牌恢复

标签: algorithm, go

1 回复

更多关于Golang中关于限流器的问题求解答的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


在Go语言中实现带取消令牌恢复功能的限流器,可以通过context和令牌桶算法结合实现。以下是一个完整的实现示例:

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type RateLimiter struct {
    tokens         int
    maxTokens      int
    refillRate     int
    lastRefillTime time.Time
    mu             sync.Mutex
    cancelFuncs    map[context.CancelFunc]bool
}

func NewRateLimiter(maxTokens, refillRate int) *RateLimiter {
    return &RateLimiter{
        tokens:         maxTokens,
        maxTokens:      maxTokens,
        refillRate:     refillRate,
        lastRefillTime: time.Now(),
        cancelFuncs:    make(map[context.CancelFunc]bool),
    }
}

func (rl *RateLimiter) refill() {
    now := time.Now()
    elapsed := now.Sub(rl.lastRefillTime)
    tokensToAdd := int(elapsed.Seconds()) * rl.refillRate
    
    if tokensToAdd > 0 {
        rl.tokens = min(rl.tokens+tokensToAdd, rl.maxTokens)
        rl.lastRefillTime = now
    }
}

func (rl *RateLimiter) Wait(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            rl.mu.Lock()
            rl.refill()
            
            if rl.tokens > 0 {
                rl.tokens--
                rl.mu.Unlock()
                return nil
            }
            
            // 创建可取消的等待上下文
            waitCtx, cancel := context.WithCancel(context.Background())
            rl.cancelFuncs[cancel] = true
            rl.mu.Unlock()
            
            // 等待令牌恢复或外部取消
            select {
            case <-time.After(time.Second / time.Duration(rl.refillRate)):
                rl.mu.Lock()
                delete(rl.cancelFuncs, cancel)
                rl.mu.Unlock()
                cancel()
                continue
            case <-ctx.Done():
                rl.mu.Lock()
                delete(rl.cancelFuncs, cancel)
                rl.mu.Unlock()
                cancel()
                return ctx.Err()
            case <-waitCtx.Done():
                continue
            }
        }
    }
}

func (rl *RateLimiter) RestoreTokens(tokens int) {
    rl.mu.Lock()
    defer rl.mu.Unlock()
    
    rl.refill()
    rl.tokens = min(rl.tokens+tokens, rl.maxTokens)
    
    // 取消所有等待的请求
    for cancel := range rl.cancelFuncs {
        cancel()
        delete(rl.cancelFuncs, cancel)
    }
}

func min(a, b int) int {
    if a < b {
        return a
    }
    return b
}

// 使用示例
func main() {
    limiter := NewRateLimiter(10, 1) // 最大10个令牌,每秒补充1个
    
    // 模拟并发请求
    var wg sync.WaitGroup
    for i := 0; i < 15; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            
            ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
            defer cancel()
            
            start := time.Now()
            err := limiter.Wait(ctx)
            elapsed := time.Since(start)
            
            if err != nil {
                fmt.Printf("请求%d: 超时或取消 (%v)\n", id, err)
            } else {
                fmt.Printf("请求%d: 获得令牌,等待时间: %v\n", id, elapsed)
            }
        }(i)
    }
    
    // 模拟令牌恢复
    time.Sleep(2 * time.Second)
    fmt.Println("恢复5个令牌...")
    limiter.RestoreTokens(5)
    
    wg.Wait()
}

这个实现提供了以下关键功能:

  1. 令牌桶算法:基础的限流算法,支持突发请求
  2. 上下文取消:通过context支持请求超时和手动取消
  3. 令牌恢复机制RestoreTokens方法允许外部恢复令牌
  4. 等待队列管理:当令牌不足时,请求会等待,并在令牌恢复时被唤醒

输出示例:

请求0: 获得令牌,等待时间: 0s
请求1: 获得令牌,等待时间: 0s
...
请求9: 获得令牌,等待时间: 0s
恢复5个令牌...
请求10: 获得令牌,等待时间: 2.1s
请求11: 获得令牌,等待时间: 2.1s
...
请求13: 获得令牌,等待时间: 2.1s
请求14: 超时或取消 (context deadline exceeded)

这个实现确保了在调用RestoreTokens时,所有等待的请求都能立即获得令牌继续执行,同时保持了线程安全性。

回到顶部