Python中如何优雅地实现一个队列工作者(queue worker)
利用 python 做了个队列消费者(器) 伪代码:
while True:
job = beanstalk.watch('test')
body = job.body
if (body.type == 'sms'):
sendsms(body.params,job)
elif (body.type == 'email'):
sendemail(body.params,job)
怎么可以更优雅的让他动态 fork 出另外一个线程去执行,而让这个 worker 的主入口继续获取别的队列内容。
Python中如何优雅地实现一个队列工作者(queue worker)
9 回复
while job = beanstalk.watch(‘test’):
fork do
blablabla…
any way, fork 出来的是进程
在Python里实现队列工作者,用queue.Queue配合线程是最直接的方式。下面是个完整示例:
import threading
import queue
import time
import random
class QueueWorker:
def __init__(self, num_workers=3):
self.task_queue = queue.Queue()
self.workers = []
self.running = False
# 创建工作线程
for i in range(num_workers):
worker = threading.Thread(target=self._worker_loop, daemon=True)
self.workers.append(worker)
def _worker_loop(self):
while self.running:
try:
# 获取任务,timeout用于优雅退出
task = self.task_queue.get(timeout=1)
if task is None: # 退出信号
self.task_queue.task_done()
break
# 执行任务
self._process_task(task)
self.task_queue.task_done()
except queue.Empty:
continue
def _process_task(self, task):
"""实际处理任务的方法"""
print(f"处理任务: {task}")
time.sleep(random.uniform(0.1, 0.5)) # 模拟处理时间
def start(self):
"""启动工作者"""
self.running = True
for worker in self.workers:
worker.start()
print(f"启动了 {len(self.workers)} 个工作线程")
def stop(self):
"""停止工作者"""
self.running = False
# 发送退出信号
for _ in self.workers:
self.task_queue.put(None)
# 等待所有任务完成
self.task_queue.join()
# 等待线程结束
for worker in self.workers:
worker.join()
print("所有工作线程已停止")
def add_task(self, task):
"""添加任务到队列"""
self.task_queue.put(task)
print(f"添加任务: {task}")
# 使用示例
if __name__ == "__main__":
worker = QueueWorker(num_workers=2)
worker.start()
# 添加一些任务
for i in range(10):
worker.add_task(f"任务-{i}")
time.sleep(2) # 等待任务处理
worker.stop()
这个实现有几个关键点:
- 用
queue.Queue保证线程安全的任务队列 - 每个工作线程不断从队列取任务执行
task_done()和join()配合确保所有任务完成- 用
None作为退出信号实现优雅关闭
如果你需要更高级的功能,可以考虑用concurrent.futures.ThreadPoolExecutor或者multiprocessing模块。对于生产环境,Celery是更成熟的选择。
简单说就是:用Queue加线程,注意优雅关闭。
#2 描述有点问题,总之目标就是建立一个队列内容获取的东西,一旦获取就赶紧丢给真正执行这个队列的负责 [人] ,然后继续获取其他队列中数据。
都用队列了,多开几个进程呗,上层拿一个 supervisor 做个守护进程好了,搞那么麻烦干嘛。
理论上性能最好的方式是设置 SO_REUSEPORT,每个进程都去监听 beanstalk 的端口即可。不过需要手动改下客户端源码。
想错了,又不是服务端,SO_REUSEPORT 都不用设置,直接多进程消费即可。
我擦,和我以前写过的消息队列居然是一样的,我也用的 beanstalk。。
这个可以写得很复杂,简单点就做个反射调用就可以了。。


