Python中关于queue模块的使用问题

本来写一段代码,文件 IO 比较高,所以打算用多进程跑,子进程跑完的结果,通过放入 queue 中,最后再父进程中合并一下,但是子进程代码跑完就卡死了不动了。研究半天,最后把 from Multprocessing import Queue 换成了 from multiprocessing import Manager 中的 queue,然后代码可以跑完了,想了想是因为 Queue 只支持子进程通信,而 Manager 中的 queue 支持父子进程通信吗?


Python中关于queue模块的使用问题
5 回复

Manager 本身就是 python 用来实现共享内存的,Manager 中的 queue 当然是共享的啦,既然共享自然可以通信


queue模块是Python标准库中实现多生产者、多消费者队列的核心工具,主要用于线程间安全通信。它提供了三种主要队列类型:FIFO队列Queue、LIFO队列LifoQueue和优先级队列PriorityQueue。下面是一个典型的生产者-消费者模式的完整示例:

import threading
import queue
import time
import random

def producer(q, id):
    """生产者函数:生成数据并放入队列"""
    for i in range(5):
        item = f'产品-{id}-{i}'
        time.sleep(random.uniform(0.1, 0.5))  # 模拟生产时间
        q.put(item)
        print(f'生产者{id} 生产了: {item}')
    q.put(None)  # 发送结束信号

def consumer(q, id):
    """消费者函数:从队列取出并处理数据"""
    while True:
        item = q.get()
        if item is None:  # 收到结束信号
            q.put(item)   # 将信号放回队列供其他消费者使用
            break
        time.sleep(random.uniform(0.2, 0.8))  # 模拟处理时间
        print(f'消费者{id} 处理了: {item}')
        q.task_done()  # 标记任务完成

def main():
    # 创建队列
    q = queue.Queue(maxsize=3)  # 限制队列容量为3
    
    # 创建生产者和消费者线程
    producers = [
        threading.Thread(target=producer, args=(q, i))
        for i in range(2)
    ]
    
    consumers = [
        threading.Thread(target=consumer, args=(q, i))
        for i in range(3)
    ]
    
    # 启动所有线程
    for t in producers + consumers:
        t.start()
    
    # 等待生产者完成
    for t in producers:
        t.join()
    
    # 等待队列清空
    q.join()
    
    # 发送结束信号给消费者
    for _ in consumers:
        q.put(None)
    
    # 等待消费者完成
    for t in consumers:
        t.join()
    
    print("所有任务完成!")

if __name__ == '__main__':
    main()

关键点说明:

  1. q.put(item)q.get() 是基本的入队出队操作,默认阻塞直到操作完成
  2. maxsize 参数控制队列容量,0表示无限(默认)
  3. q.task_done()q.join() 配合使用可以跟踪任务完成状态
  4. None 作为结束信号是常见模式
  5. 队列本身是线程安全的,无需额外加锁

对于简单场景,直接使用Queue类就够了。

但是 multiprocessing 中的 Queue 不支持父子进程之间通信吗

Multprocessing.Queue 应该是用 pipe 实现的,如果父进程不及时把内容取走貌似会导致管道阻塞,至于 Multprocessing.Manager 是使用了类似于 tcp 通讯的方式,有后台线程持续读取 pipe 的消息,所以没事

原来是这样,多谢指点

回到顶部