Python中如何实现多线程和多进程环境下的Redis限流器?
soulomoon/python-throttle
在假如多线程和多进程测试前一切都是好好的, 感觉好开心(^・ω・^ )
但是加入多线程,多进程测试后发现出现了 race condition😢
但是写的时候我已经好好地去尝试规避这个问题了,找不到原因😫
race condition 的问题出现在 test_limiter 中(;´༎ຶД༎ຶ`)

Python中如何实现多线程和多进程环境下的Redis限流器?
手机上看的,看测试大概了解了思路,不成熟的想法
如果你真要这么做,用 incr 保证线程安全
另外,你没有考虑到滑动窗口的问题
最后 token bucket 了解下?
核心思路:
在Python的多线程/多进程环境下实现Redis限流器,关键在于利用Redis的原子操作(如INCR + EXPIRE)或更精确的令牌桶/漏桶算法,并通过分布式锁或Redis的原子性保证并发安全。以下是两种常见方案的代码示例:
方案1:固定窗口计数器(简单场景)
适用于对精度要求不高的限流,使用INCR和EXPIRE实现:
import redis
import time
class RedisRateLimiter:
def __init__(self, redis_client, key, max_requests, window_seconds):
self.redis = redis_client
self.key = key
self.max_requests = max_requests
self.window = window_seconds
def is_allowed(self):
# 原子操作:计数+过期时间设置
current = self.redis.incr(self.key)
if current == 1:
self.redis.expire(self.key, self.window)
return current <= self.max_requests
# 使用示例(多线程/进程共享同一Redis客户端)
redis_client = redis.Redis(host='localhost', port=6379)
limiter = RedisRateLimiter(redis_client, "api_limit:user1", 100, 60)
# 在多个线程/进程中调用
if limiter.is_allowed():
print("请求通过")
else:
print("请求被限流")
方案2:令牌桶算法(平滑限流)
使用Redis的LIST存储令牌,通过BRPOP实现阻塞获取令牌:
import redis
import threading
class TokenBucketLimiter:
def __init__(self, redis_client, key, refill_rate, capacity):
self.redis = redis_client
self.key = key
self.refill_rate = refill_rate # 令牌/秒
self.capacity = capacity
self._init_bucket()
def _init_bucket(self):
# 初始化桶满令牌
if not self.redis.exists(self.key):
self.redis.lpush(self.key, *['token'] * self.capacity)
def acquire_token(self, timeout=1):
# 阻塞获取令牌(BRPOP是原子操作)
token = self.redis.brpop(self.key, timeout=timeout)
if token:
# 异步补充令牌(可通过后台线程或Redis定时任务实现)
self._refill_bucket()
return True
return False
def _refill_bucket(self):
# 简化的补充逻辑:实际需根据时间间隔计算补充数量
current_size = self.redis.llen(self.key)
if current_size < self.capacity:
self.redis.lpush(self.key, 'token')
# 多线程测试示例
def worker(limiter, id):
if limiter.acquire_token():
print(f"线程{id}获取令牌成功")
else:
print(f"线程{id}被限流")
redis_client = redis.Redis()
limiter = TokenBucketLimiter(redis_client, "token_bucket:api", refill_rate=10, capacity=50)
threads = [threading.Thread(target=worker, args=(limiter, i)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
关键点说明
- 原子性保障:所有限流操作必须使用Redis的原子命令(如
INCR、BRPOP、Lua脚本),避免并发竞争。 - 多进程支持:确保不同进程连接同一Redis实例,方案与多线程完全相同。
- 时间窗口同步:依赖Redis服务器时间,避免客户端时间不同步问题。
- 高性能建议:使用连接池管理Redis连接,Lua脚本合并多个操作减少网络往返。
总结:根据业务精度需求选择固定窗口或令牌桶方案,核心是依赖Redis的原子操作。
不好意思 我图放错了 0 0。
这里我有两种 implementaion, 一个用 ordered set, 一个用 incr 写在 luascript,redis 官方是这么推荐的 0。
都用 expire。
token bucket 要单开 put token 的进程吧?
python<br> lua_incr = """<br> local current<br> current = redis.call("incr",KEYS[1])<br> if tonumber(current) == 1 then<br> redis.call("expire",KEYS[1],KEYS[2])<br> end<br> return current-1<br> """<br>
这个是我改过的 lua script,原版的在[这里]( https://redis.io/commands/incr)
我的理解,incr 不需要放在 lua script 里面来保证单线程吧? incr 本身是有返回值的, 比较下返回值和 threshold 应该就可以了
另外,粗略算的话,token bucket 不需要另外开进程,可以在消耗时顺便添加 token (当然了,这种实现需要带时间戳)
incr 放进去是为了和 expire 一起,保证 key 不会因为没有 set expire 而 leak ,现在我怀疑 redis-py run script 的特点,因为是通过 register 到远端,然后再通过 sha1 作为 key 执行,可能是多个 instance 同时执行了同一 script,获取到了相同的返回值。。。
还有一个 sliding log 的 implementaion,pipeline 看 redis-py 的简介 0 0 理论上是一个 multi exce 的行为,0 0, 也是 fail。这样的:
def add_key(self, key, expired):
“”“use ordered set for counting keys get_set manner
“””
now = time.time()
with self.redis.pipeline() as session:
session.zremrangebyscore(key, 0, now - expired)
session.zrange(key, 0, -1)
session.zadd(key, now, uuid.uuid4().hex)
session.expire(key, expired)
result = session.execute()
return len(result[1])
*多个 instance 接受了同一个 script 的执行结果。。
发现即使是简单如
def add_key(self, key, expired):
return self.redis.incr(key)
也会有同样的问题
who’s to blame…
发现问题了, 是我写 unittest 的问题,interleaving 多个不同时限的 limiter 到相同的 key 中,当然会出现不同的结果 ozn
好开心解决了,原来不是我 implementaion 的问题是我的测试的问题,看来要好好学习测试, 最后谢谢 的回答。

