Python中关于queue模块的使用问题
本来写一段代码,文件 IO 比较高,所以打算用多进程跑,子进程跑完的结果,通过放入 queue 中,最后再父进程中合并一下,但是子进程代码跑完就卡死了不动了。研究半天,最后把 from Multprocessing import Queue 换成了 from multiprocessing import Manager 中的 queue,然后代码可以跑完了,想了想是因为 Queue 只支持子进程通信,而 Manager 中的 queue 支持父子进程通信吗?
Python中关于queue模块的使用问题
5 回复
Manager 本身就是 python 用来实现共享内存的,Manager 中的 queue 当然是共享的啦,既然共享自然可以通信
queue模块是Python标准库中实现多生产者、多消费者队列的核心工具,主要用于线程间安全通信。它提供了三种主要队列类型:FIFO队列Queue、LIFO队列LifoQueue和优先级队列PriorityQueue。下面是一个典型的生产者-消费者模式的完整示例:
import threading
import queue
import time
import random
def producer(q, id):
"""生产者函数:生成数据并放入队列"""
for i in range(5):
item = f'产品-{id}-{i}'
time.sleep(random.uniform(0.1, 0.5)) # 模拟生产时间
q.put(item)
print(f'生产者{id} 生产了: {item}')
q.put(None) # 发送结束信号
def consumer(q, id):
"""消费者函数:从队列取出并处理数据"""
while True:
item = q.get()
if item is None: # 收到结束信号
q.put(item) # 将信号放回队列供其他消费者使用
break
time.sleep(random.uniform(0.2, 0.8)) # 模拟处理时间
print(f'消费者{id} 处理了: {item}')
q.task_done() # 标记任务完成
def main():
# 创建队列
q = queue.Queue(maxsize=3) # 限制队列容量为3
# 创建生产者和消费者线程
producers = [
threading.Thread(target=producer, args=(q, i))
for i in range(2)
]
consumers = [
threading.Thread(target=consumer, args=(q, i))
for i in range(3)
]
# 启动所有线程
for t in producers + consumers:
t.start()
# 等待生产者完成
for t in producers:
t.join()
# 等待队列清空
q.join()
# 发送结束信号给消费者
for _ in consumers:
q.put(None)
# 等待消费者完成
for t in consumers:
t.join()
print("所有任务完成!")
if __name__ == '__main__':
main()
关键点说明:
q.put(item)和q.get()是基本的入队出队操作,默认阻塞直到操作完成maxsize参数控制队列容量,0表示无限(默认)q.task_done()与q.join()配合使用可以跟踪任务完成状态None作为结束信号是常见模式- 队列本身是线程安全的,无需额外加锁
对于简单场景,直接使用Queue类就够了。
但是 multiprocessing 中的 Queue 不支持父子进程之间通信吗
Multprocessing.Queue 应该是用 pipe 实现的,如果父进程不及时把内容取走貌似会导致管道阻塞,至于 Multprocessing.Manager 是使用了类似于 tcp 通讯的方式,有后台线程持续读取 pipe 的消息,所以没事
原来是这样,多谢指点

