Python中关于queue模块的使用问题
本来写一段代码,文件 IO 比较高,所以打算用多进程跑,子进程跑完的结果,通过放入 queue 中,最后再父进程中合并一下,但是子进程代码跑完就卡死了不动了。研究半天,最后把 from Multprocessing import Queue 换成了 from multiprocessing import Manager 中的 queue,然后代码可以跑完了,想了想是因为 Queue 只支持子进程通信,而 Manager 中的 queue 支持父子进程通信吗?
Python中关于queue模块的使用问题
Manager 本身就是 python 用来实现共享内存的,Manager 中的 queue 当然是共享的啦,既然共享自然可以通信
queue模块是Python标准库中实现多生产者、多消费者队列的核心工具,主要用于线程间安全通信。它提供了三种主要队列类型:FIFO队列Queue、LIFO队列LifoQueue和优先级队列PriorityQueue。下面是一个典型的生产者-消费者模式的完整示例:
import threading
import queue
import time
import random
def producer(q, id):
"""生产者函数:生成数据并放入队列"""
for i in range(5):
item = f'产品-{id}-{i}'
time.sleep(random.uniform(0.1, 0.5)) # 模拟生产时间
q.put(item)
print(f'生产者{id} 生产了: {item}')
q.put(None) # 发送结束信号
def consumer(q, id):
"""消费者函数:从队列取出并处理数据"""
while True:
item = q.get()
if item is None: # 收到结束信号
q.put(item) # 将信号放回队列供其他消费者使用
break
time.sleep(random.uniform(0.2, 0.8)) # 模拟处理时间
print(f'消费者{id} 处理了: {item}')
q.task_done() # 标记任务完成
def main():
# 创建队列
q = queue.Queue(maxsize=3) # 限制队列容量为3
# 创建生产者和消费者线程
producers = [
threading.Thread(target=producer, args=(q, i))
for i in range(2)
]
consumers = [
threading.Thread(target=consumer, args=(q, i))
for i in range(3)
]
# 启动所有线程
for t in producers + consumers:
t.start()
# 等待生产者完成
for t in producers:
t.join()
# 等待队列清空
q.join()
# 发送结束信号给消费者
for _ in consumers:
q.put(None)
# 等待消费者完成
for t in consumers:
t.join()
print("所有任务完成!")
if __name__ == '__main__':
main()
关键点说明:
q.put(item)和q.get()是基本的入队出队操作,默认阻塞直到操作完成maxsize参数控制队列容量,0表示无限(默认)q.task_done()与q.join()配合使用可以跟踪任务完成状态None作为结束信号是常见模式- 队列本身是线程安全的,无需额外加锁
对于简单场景,直接使用Queue类就够了。
但是 multiprocessing 中的 Queue 不支持父子进程之间通信吗
Multprocessing.Queue 应该是用 pipe 实现的,如果父进程不及时把内容取走貌似会导致管道阻塞,至于 Multprocessing.Manager 是使用了类似于 tcp 通讯的方式,有后台线程持续读取 pipe 的消息,所以没事
原来是这样,多谢指点

