Python中如何实现多线程和多进程环境下的Redis限流器?

soulomoon/python-throttle 在假如多线程和多进程测试前一切都是好好的, 感觉好开心(^・ω・^ )
但是加入多线程,多进程测试后发现出现了 race condition😢
但是写的时候我已经好好地去尝试规避这个问题了,找不到原因😫
race condition 的问题出现在 test_limiter 中(;´༎ຶД༎ຶ`)

IMAGE ALT TEXT HERE


Python中如何实现多线程和多进程环境下的Redis限流器?

9 回复

手机上看的,看测试大概了解了思路,不成熟的想法

如果你真要这么做,用 incr 保证线程安全
另外,你没有考虑到滑动窗口的问题
最后 token bucket 了解下?


核心思路:
在Python的多线程/多进程环境下实现Redis限流器,关键在于利用Redis的原子操作(如INCR + EXPIRE)或更精确的令牌桶/漏桶算法,并通过分布式锁或Redis的原子性保证并发安全。以下是两种常见方案的代码示例:


方案1:固定窗口计数器(简单场景)

适用于对精度要求不高的限流,使用INCREXPIRE实现:

import redis
import time

class RedisRateLimiter:
    def __init__(self, redis_client, key, max_requests, window_seconds):
        self.redis = redis_client
        self.key = key
        self.max_requests = max_requests
        self.window = window_seconds

    def is_allowed(self):
        # 原子操作:计数+过期时间设置
        current = self.redis.incr(self.key)
        if current == 1:
            self.redis.expire(self.key, self.window)
        return current <= self.max_requests

# 使用示例(多线程/进程共享同一Redis客户端)
redis_client = redis.Redis(host='localhost', port=6379)
limiter = RedisRateLimiter(redis_client, "api_limit:user1", 100, 60)

# 在多个线程/进程中调用
if limiter.is_allowed():
    print("请求通过")
else:
    print("请求被限流")

方案2:令牌桶算法(平滑限流)

使用Redis的LIST存储令牌,通过BRPOP实现阻塞获取令牌:

import redis
import threading

class TokenBucketLimiter:
    def __init__(self, redis_client, key, refill_rate, capacity):
        self.redis = redis_client
        self.key = key
        self.refill_rate = refill_rate  # 令牌/秒
        self.capacity = capacity
        self._init_bucket()

    def _init_bucket(self):
        # 初始化桶满令牌
        if not self.redis.exists(self.key):
            self.redis.lpush(self.key, *['token'] * self.capacity)

    def acquire_token(self, timeout=1):
        # 阻塞获取令牌(BRPOP是原子操作)
        token = self.redis.brpop(self.key, timeout=timeout)
        if token:
            # 异步补充令牌(可通过后台线程或Redis定时任务实现)
            self._refill_bucket()
            return True
        return False

    def _refill_bucket(self):
        # 简化的补充逻辑:实际需根据时间间隔计算补充数量
        current_size = self.redis.llen(self.key)
        if current_size < self.capacity:
            self.redis.lpush(self.key, 'token')

# 多线程测试示例
def worker(limiter, id):
    if limiter.acquire_token():
        print(f"线程{id}获取令牌成功")
    else:
        print(f"线程{id}被限流")

redis_client = redis.Redis()
limiter = TokenBucketLimiter(redis_client, "token_bucket:api", refill_rate=10, capacity=50)
threads = [threading.Thread(target=worker, args=(limiter, i)) for i in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

关键点说明

  1. 原子性保障:所有限流操作必须使用Redis的原子命令(如INCRBRPOPLua脚本),避免并发竞争。
  2. 多进程支持:确保不同进程连接同一Redis实例,方案与多线程完全相同。
  3. 时间窗口同步:依赖Redis服务器时间,避免客户端时间不同步问题。
  4. 高性能建议:使用连接池管理Redis连接,Lua脚本合并多个操作减少网络往返。

总结:根据业务精度需求选择固定窗口或令牌桶方案,核心是依赖Redis的原子操作。


不好意思 我图放错了 0 0。
这里我有两种 implementaion, 一个用 ordered set, 一个用 incr 写在 luascript,redis 官方是这么推荐的 0。
都用 expire。
token bucket 要单开 put token 的进程吧?

python<br> lua_incr = """<br> local current<br> current = redis.call("incr",KEYS[1])<br> if tonumber(current) == 1 then<br> redis.call("expire",KEYS[1],KEYS[2])<br> end<br> return current-1<br> """<br>
这个是我改过的 lua script,原版的在[这里]( https://redis.io/commands/incr)



我的理解,incr 不需要放在 lua script 里面来保证单线程吧? incr 本身是有返回值的, 比较下返回值和 threshold 应该就可以了

另外,粗略算的话,token bucket 不需要另外开进程,可以在消耗时顺便添加 token (当然了,这种实现需要带时间戳)

incr 放进去是为了和 expire 一起,保证 key 不会因为没有 set expire 而 leak ,现在我怀疑 redis-py run script 的特点,因为是通过 register 到远端,然后再通过 sha1 作为 key 执行,可能是多个 instance 同时执行了同一 script,获取到了相同的返回值。。。
还有一个 sliding log 的 implementaion,pipeline 看 redis-py 的简介 0 0 理论上是一个 multi exce 的行为,0 0, 也是 fail。这样的:
def add_key(self, key, expired):
“”“use ordered set for counting keys get_set manner
“””
now = time.time()
with self.redis.pipeline() as session:
session.zremrangebyscore(key, 0, now - expired)
session.zrange(key, 0, -1)
session.zadd(key, now, uuid.uuid4().hex)
session.expire(key, expired)
result = session.execute()
return len(result[1])

*多个 instance 接受了同一个 script 的执行结果。。

发现即使是简单如
def add_key(self, key, expired):
return self.redis.incr(key)
也会有同样的问题

who’s to blame…

发现问题了, 是我写 unittest 的问题,interleaving 多个不同时限的 limiter 到相同的 key 中,当然会出现不同的结果 ozn
好开心解决了,原来不是我 implementaion 的问题是我的测试的问题,看来要好好学习测试, 最后谢谢 的回答。

回到顶部