Python中如何优雅地实现一个队列工作者(queue worker)

利用 python 做了个队列消费者(器) 伪代码:

while True:
    job = beanstalk.watch('test')
    body = job.body
    if (body.type == 'sms'):
        sendsms(body.params,job)
    elif (body.type == 'email'):
        sendemail(body.params,job)

怎么可以更优雅的让他动态 fork 出另外一个线程去执行,而让这个 worker 的主入口继续获取别的队列内容。


Python中如何优雅地实现一个队列工作者(queue worker)

9 回复

while job = beanstalk.watch(‘test’):
fork do
blablabla…

any way, fork 出来的是进程


在Python里实现队列工作者,用queue.Queue配合线程是最直接的方式。下面是个完整示例:

import threading
import queue
import time
import random

class QueueWorker:
    def __init__(self, num_workers=3):
        self.task_queue = queue.Queue()
        self.workers = []
        self.running = False
        
        # 创建工作线程
        for i in range(num_workers):
            worker = threading.Thread(target=self._worker_loop, daemon=True)
            self.workers.append(worker)
    
    def _worker_loop(self):
        while self.running:
            try:
                # 获取任务,timeout用于优雅退出
                task = self.task_queue.get(timeout=1)
                if task is None:  # 退出信号
                    self.task_queue.task_done()
                    break
                    
                # 执行任务
                self._process_task(task)
                self.task_queue.task_done()
                
            except queue.Empty:
                continue
    
    def _process_task(self, task):
        """实际处理任务的方法"""
        print(f"处理任务: {task}")
        time.sleep(random.uniform(0.1, 0.5))  # 模拟处理时间
    
    def start(self):
        """启动工作者"""
        self.running = True
        for worker in self.workers:
            worker.start()
        print(f"启动了 {len(self.workers)} 个工作线程")
    
    def stop(self):
        """停止工作者"""
        self.running = False
        
        # 发送退出信号
        for _ in self.workers:
            self.task_queue.put(None)
        
        # 等待所有任务完成
        self.task_queue.join()
        
        # 等待线程结束
        for worker in self.workers:
            worker.join()
        print("所有工作线程已停止")
    
    def add_task(self, task):
        """添加任务到队列"""
        self.task_queue.put(task)
        print(f"添加任务: {task}")

# 使用示例
if __name__ == "__main__":
    worker = QueueWorker(num_workers=2)
    worker.start()
    
    # 添加一些任务
    for i in range(10):
        worker.add_task(f"任务-{i}")
    
    time.sleep(2)  # 等待任务处理
    
    worker.stop()

这个实现有几个关键点:

  1. queue.Queue保证线程安全的任务队列
  2. 每个工作线程不断从队列取任务执行
  3. task_done()join()配合确保所有任务完成
  4. None作为退出信号实现优雅关闭

如果你需要更高级的功能,可以考虑用concurrent.futures.ThreadPoolExecutor或者multiprocessing模块。对于生产环境,Celery是更成熟的选择。

简单说就是:用Queue加线程,注意优雅关闭。

concurrent.future

话说,你到底想要在线程中执行还是进程中执行啊? fork 不是用来创建新进程的吗?

#2 描述有点问题,总之目标就是建立一个队列内容获取的东西,一旦获取就赶紧丢给真正执行这个队列的负责 [人] ,然后继续获取其他队列中数据。

都用队列了,多开几个进程呗,上层拿一个 supervisor 做个守护进程好了,搞那么麻烦干嘛。

理论上性能最好的方式是设置 SO_REUSEPORT,每个进程都去监听 beanstalk 的端口即可。不过需要手动改下客户端源码。

想错了,又不是服务端,SO_REUSEPORT 都不用设置,直接多进程消费即可。

我擦,和我以前写过的消息队列居然是一样的,我也用的 beanstalk。。

这个可以写得很复杂,简单点就做个反射调用就可以了。。

回到顶部