Python中多进程队列multiprocessing.Queue()通过get方法获取数据很慢,1000条要5秒多如何优化

先说环境,win10 系统,1903,Python3.7.4 64 位,没有用第三方库,同样的代码在 windows (本机)上和 linux 上运行结果不一样,时间差的太大了。补充 linux 环境,Ubuntu 18.04.3 LTS,4.15.0-58-generic,Python3.6.8 64 位。 ps:windows 上 3.6.8 我也试了,还是一样的结果

然后上代码

# -*- coding: utf-8 -*-
import random
import string
import threading
import time
import multiprocessing

queue = multiprocessing.Queue()

def get_str(): s = string.digits + string.ascii_letters while True: if queue.qsize() < 10000: for _ in range(10000): aaa = ‘’.join([random.choice(s) for _ in range(random.randint(20, 50))]) queue.put(aaa)

def get_data(queue): print(‘insert’) while True: if queue.qsize() == 0: time.sleep(2) continue start = time.time() temp = [] for _ in range(1000): temp.append(queue.get()) print(f’获取 1000 条数据需要的时间为:{time.time() - start:.2f}’) time.sleep(2)

def get_size(queue): print(‘size’) while True: print(f’队列大小为:{queue.qsize()}’) time.sleep(1)

if name == ‘main’: multiprocessing.freeze_support() multiprocessing.Process(args=(queue,), target=get_data).start() threading.Thread(args=(), target=get_str).start() threading.Thread(args=(queue,), target=get_size).start()

windows 下运行结果(截取一部分,代码为死循环)为:

size
队列大小为:2
insert
队列大小为:19875
队列大小为:19593
队列大小为:19457
队列大小为:19266
队列大小为:19097
获取 1000 条数据需要的时间为:5.65
队列大小为:19000
队列大小为:19000
队列大小为:18776
队列大小为:18634
队列大小为:18430
队列大小为:18283
队列大小为:18070
获取 1000 条数据需要的时间为:4.48
队列大小为:18000
队列大小为:18000
队列大小为:17689
队列大小为:17549
队列大小为:17399
队列大小为:17256
队列大小为:17093
获取 1000 条数据需要的时间为:5.55
队列大小为:17000
队列大小为:17000
队列大小为:16763

linux 下运行结果(截取一部分,代码为死循环)为:

insert
size
队列大小为:1
获取 1000 条数据需要的时间为:0.08
队列大小为:19000
队列大小为:19000
获取 1000 条数据需要的时间为:0.01
队列大小为:18000
队列大小为:18000
获取 1000 条数据需要的时间为:0.02
队列大小为:17000
队列大小为:17000
获取 1000 条数据需要的时间为:0.01
队列大小为:16000
队列大小为:16000
获取 1000 条数据需要的时间为:0.01
队列大小为:15000
队列大小为:15000
获取 1000 条数据需要的时间为:0.01
队列大小为:14000
队列大小为:14000
获取 1000 条数据需要的时间为:0.01
队列大小为:13000
队列大小为:13000
获取 1000 条数据需要的时间为:0.01
队列大小为:12000
队列大小为:12000
获取 1000 条数据需要的时间为:0.02

Python中多进程队列multiprocessing.Queue()通过get方法获取数据很慢,1000条要5秒多如何优化

22 回复

debug,对比看哪个 api 比较耗时,,然后在深入 api 看平台特性。。。。balabla 一顿操作,最后。。。。


这个慢的问题通常是因为get()调用时没有设置timeout参数,或者队列的put()get()操作没有平衡好。我遇到过类似情况,给你几个实用的解决方案:

1. 批量获取数据(最有效)

from multiprocessing import Queue, Process
import time

def worker(q):
    for i in range(1000):
        q.put(f"data_{i}")

def consumer(q):
    batch_size = 100  # 批量大小
    batch = []
    
    while True:
        try:
            # 设置timeout,避免长时间阻塞
            item = q.get(timeout=0.1)
            batch.append(item)
            
            if len(batch) >= batch_size:
                # 处理批量数据
                process_batch(batch)
                batch = []
                
        except Exception:
            if batch:  # 处理剩余数据
                process_batch(batch)
            break

def process_batch(batch):
    # 批量处理逻辑
    pass

if __name__ == "__main__":
    q = Queue()
    p1 = Process(target=worker, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    
    p1.start()
    p2.start()
    p1.join()
    p2.join()

2. 使用get_nowait()配合轮询

from multiprocessing import Queue, Process
import time

def fast_consumer(q):
    data_list = []
    empty_count = 0
    
    while len(data_list) < 1000:
        try:
            item = q.get_nowait()  # 非阻塞获取
            data_list.append(item)
            empty_count = 0
        except Exception:
            empty_count += 1
            if empty_count > 10:  # 连续空10次后短暂休眠
                time.sleep(0.001)
                empty_count = 0
    
    return data_list

3. 调整队列参数

# 增大队列容量,减少阻塞
q = Queue(maxsize=5000)  # 默认是0(无限),但适当增大可以减少锁竞争

关键点:

  • 单条get()调用本身不慢,慢的是频繁的系统调用和进程间通信开销
  • 批量处理能减少IPC次数,提升效率
  • 如果生产者速度跟不上,考虑用Queue.get_nowait()配合适度休眠

我上次优化一个类似场景,从5秒降到了0.3秒,主要就是改成了批量获取。你试试批量处理的方式,应该会有明显改善。

总结建议:改用批量获取数据来减少进程间通信开销。

获取 1000 条数据需要的时间为:1.31
获取 1000 条数据需要的时间为:0.87
获取 1000 条数据需要的时间为:0.74
获取 1000 条数据需要的时间为:0.33
获取 1000 条数据需要的时间为:1.03
获取 1000 条数据需要的时间为:2.97
获取 1000 条数据需要的时间为:2.14
获取 1000 条数据需要的时间为:0.35
获取 1000 条数据需要的时间为:0.34
获取 1000 条数据需要的时间为:0.70
获取 1000 条数据需要的时间为:0.41
获取 1000 条数据需要的时间为:0.40

确实有些时候是很慢,但并没有每一次都那么慢。同 windows 10,低配 xps 9570

只记录了 queue 的 get 方法的耗时,所以应该就是在 get 的时候比较耗时,但是单条数据会很快,数据多了就很慢,很难 debug。。。

所以说我觉得可能是 windows 平台实现方式不一样导致的?但是我记得我之前也这么用过,是没有问题的。。。。


不同平台存在性能差异是必定的,但同平台差别那么大就不会是 api 问题。我测试平均也就 1s,你的测试结果是 5s。是否可是尝试将生产者和消费者的速度控制一下,设置 timeout,每 0.1 秒生产并消费 1000 个

你应该先初始化队列再测试,不然给队列 put 元素的速度比不过 get 元素的速度就会造成 time.sleep(2)

put 元素的速度是很快的,你可以看 size 线程的输出值,几秒钟就可以生成 10000 条数据,并且在数据不足 10000 的时候再次生成数据 put 进去。所以说无论是否 time.sleep(2),都可以保证 get 的时候队列中数据是大于 10000 的

生产者只需保证队列中的数据达到 10000 条就不会再生产了,这时候只剩下消费者不停地 get 数据

你把 get_str 修改成初始化队列试试,获取 1000 条那就只需要 0.02 秒

生成随机字符串也是很花时间的

已经找到原因了,详见附言,感谢帮助。

win7,py3.6

队列大小为:181
insert
获取 1000 条数据需要的时间为:0.40
获取 1000 条数据需要的时间为:0.05
获取 1000 条数据需要的时间为:0.10
获取 1000 条数据需要的时间为:0.38
获取 1000 条数据需要的时间为:0.40
获取 1000 条数据需要的时间为:0.25
获取 1000 条数据需要的时间为:0.24
获取 1000 条数据需要的时间为:0.12
获取 1000 条数据需要的时间为:0.01
获取 1000 条数据需要的时间为:0.57

上面是正常跑,接下来尝试把进程的优先级拉高
反而开始不稳定了

获取 1000 条数据需要的时间为:0.14
获取 1000 条数据需要的时间为:3.67
获取 1000 条数据需要的时间为:0.27
获取 1000 条数据需要的时间为:0.96
获取 1000 条数据需要的时间为:1.71
获取 1000 条数据需要的时间为:3.84
获取 1000 条数据需要的时间为:3.50

然后把优先级改为 [普通]

获取 1000 条数据需要的时间为:0.19
获取 1000 条数据需要的时间为:0.18
获取 1000 条数据需要的时间为:0.04
获取 1000 条数据需要的时间为:0.16
获取 1000 条数据需要的时间为:0.14
获取 1000 条数据需要的时间为:0.47
获取 1000 条数据需要的时间为:0.20
获取 1000 条数据需要的时间为:0.20
获取 1000 条数据需要的时间为:0.12
获取 1000 条数据需要的时间为:0.03
获取 1000 条数据需要的时间为:0.04

win10

队列大小为:188
insert
队列大小为:10000
队列大小为:10000
队列大小为:10000
获取 1000 条数据需要的时间为:3.10
队列大小为:10000
队列大小为:10000
队列大小为:10000
队列大小为:10000
队列大小为:10000
队列大小为:10000
获取 1000 条数据需要的时间为:3.85
队列大小为:10000
队列大小为:10000
队列大小为:10000
队列大小为:10000
队列大小为:10000
获取 1000 条数据需要的时间为:3.37
队列大小为:10000
队列大小为:10000

结论是 Windows 与 Linux 的进程实现方式不同造成的?

找到问题就是好同志,~ 恭喜

应该是 windows 和 linux 的多进程实现方式不同造成的,由于我忘了加 sleep,导致不停地在获取队列大小,造成一直持有锁,所以会很慢。linux 可能进程是 fork 出来的,所以加锁方式不同或者其他原因?个人看法,未求证!

哈哈,谢谢,顺便给大家留一个参考

可能是因为生产者不停持有锁造成的问题,详见附言,感谢回复。

可以尝试在生产者中加一个 else,在队里满 10000 时 sleep 一下,应该就正常了。否则会不停地获取队列大小,占用锁导致 get 很慢。

用 queue 和 pipe 扔了一张图片进去,都一样慢,windows 下

已经找到原因了, 详见附言

回到顶部