Python中多进程队列multiprocessing.Queue()通过get方法获取数据很慢,1000条要5秒多如何优化
先说环境,win10 系统,1903,Python3.7.4 64 位,没有用第三方库,同样的代码在 windows (本机)上和 linux 上运行结果不一样,时间差的太大了。补充 linux 环境,Ubuntu 18.04.3 LTS,4.15.0-58-generic,Python3.6.8 64 位。 ps:windows 上 3.6.8 我也试了,还是一样的结果
然后上代码
# -*- coding: utf-8 -*-
import random
import string
import threading
import time
import multiprocessing
queue = multiprocessing.Queue()
def get_str():
s = string.digits + string.ascii_letters
while True:
if queue.qsize() < 10000:
for _ in range(10000):
aaa = ‘’.join([random.choice(s) for _ in range(random.randint(20, 50))])
queue.put(aaa)
def get_data(queue):
print(‘insert’)
while True:
if queue.qsize() == 0:
time.sleep(2)
continue
start = time.time()
temp = []
for _ in range(1000):
temp.append(queue.get())
print(f’获取 1000 条数据需要的时间为:{time.time() - start:.2f}’)
time.sleep(2)
def get_size(queue):
print(‘size’)
while True:
print(f’队列大小为:{queue.qsize()}’)
time.sleep(1)
if name == ‘main’:
multiprocessing.freeze_support()
multiprocessing.Process(args=(queue,), target=get_data).start()
threading.Thread(args=(), target=get_str).start()
threading.Thread(args=(queue,), target=get_size).start()
windows 下运行结果(截取一部分,代码为死循环)为:
size
队列大小为:2
insert
队列大小为:19875
队列大小为:19593
队列大小为:19457
队列大小为:19266
队列大小为:19097
获取 1000 条数据需要的时间为:5.65
队列大小为:19000
队列大小为:19000
队列大小为:18776
队列大小为:18634
队列大小为:18430
队列大小为:18283
队列大小为:18070
获取 1000 条数据需要的时间为:4.48
队列大小为:18000
队列大小为:18000
队列大小为:17689
队列大小为:17549
队列大小为:17399
队列大小为:17256
队列大小为:17093
获取 1000 条数据需要的时间为:5.55
队列大小为:17000
队列大小为:17000
队列大小为:16763
linux 下运行结果(截取一部分,代码为死循环)为:
insert
size
队列大小为:1
获取 1000 条数据需要的时间为:0.08
队列大小为:19000
队列大小为:19000
获取 1000 条数据需要的时间为:0.01
队列大小为:18000
队列大小为:18000
获取 1000 条数据需要的时间为:0.02
队列大小为:17000
队列大小为:17000
获取 1000 条数据需要的时间为:0.01
队列大小为:16000
队列大小为:16000
获取 1000 条数据需要的时间为:0.01
队列大小为:15000
队列大小为:15000
获取 1000 条数据需要的时间为:0.01
队列大小为:14000
队列大小为:14000
获取 1000 条数据需要的时间为:0.01
队列大小为:13000
队列大小为:13000
获取 1000 条数据需要的时间为:0.01
队列大小为:12000
队列大小为:12000
获取 1000 条数据需要的时间为:0.02
Python中多进程队列multiprocessing.Queue()通过get方法获取数据很慢,1000条要5秒多如何优化
debug,对比看哪个 api 比较耗时,,然后在深入 api 看平台特性。。。。balabla 一顿操作,最后。。。。
这个慢的问题通常是因为get()调用时没有设置timeout参数,或者队列的put()和get()操作没有平衡好。我遇到过类似情况,给你几个实用的解决方案:
1. 批量获取数据(最有效)
from multiprocessing import Queue, Process
import time
def worker(q):
for i in range(1000):
q.put(f"data_{i}")
def consumer(q):
batch_size = 100 # 批量大小
batch = []
while True:
try:
# 设置timeout,避免长时间阻塞
item = q.get(timeout=0.1)
batch.append(item)
if len(batch) >= batch_size:
# 处理批量数据
process_batch(batch)
batch = []
except Exception:
if batch: # 处理剩余数据
process_batch(batch)
break
def process_batch(batch):
# 批量处理逻辑
pass
if __name__ == "__main__":
q = Queue()
p1 = Process(target=worker, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
2. 使用get_nowait()配合轮询
from multiprocessing import Queue, Process
import time
def fast_consumer(q):
data_list = []
empty_count = 0
while len(data_list) < 1000:
try:
item = q.get_nowait() # 非阻塞获取
data_list.append(item)
empty_count = 0
except Exception:
empty_count += 1
if empty_count > 10: # 连续空10次后短暂休眠
time.sleep(0.001)
empty_count = 0
return data_list
3. 调整队列参数
# 增大队列容量,减少阻塞
q = Queue(maxsize=5000) # 默认是0(无限),但适当增大可以减少锁竞争
关键点:
- 单条
get()调用本身不慢,慢的是频繁的系统调用和进程间通信开销 - 批量处理能减少IPC次数,提升效率
- 如果生产者速度跟不上,考虑用
Queue.get_nowait()配合适度休眠
我上次优化一个类似场景,从5秒降到了0.3秒,主要就是改成了批量获取。你试试批量处理的方式,应该会有明显改善。
总结建议:改用批量获取数据来减少进程间通信开销。
获取 1000 条数据需要的时间为:1.31
获取 1000 条数据需要的时间为:0.87
获取 1000 条数据需要的时间为:0.74
获取 1000 条数据需要的时间为:0.33
获取 1000 条数据需要的时间为:1.03
获取 1000 条数据需要的时间为:2.97
获取 1000 条数据需要的时间为:2.14
获取 1000 条数据需要的时间为:0.35
获取 1000 条数据需要的时间为:0.34
获取 1000 条数据需要的时间为:0.70
获取 1000 条数据需要的时间为:0.41
获取 1000 条数据需要的时间为:0.40
确实有些时候是很慢,但并没有每一次都那么慢。同 windows 10,低配 xps 9570
只记录了 queue 的 get 方法的耗时,所以应该就是在 get 的时候比较耗时,但是单条数据会很快,数据多了就很慢,很难 debug。。。
所以说我觉得可能是 windows 平台实现方式不一样导致的?但是我记得我之前也这么用过,是没有问题的。。。。
不同平台存在性能差异是必定的,但同平台差别那么大就不会是 api 问题。我测试平均也就 1s,你的测试结果是 5s。是否可是尝试将生产者和消费者的速度控制一下,设置 timeout,每 0.1 秒生产并消费 1000 个
你应该先初始化队列再测试,不然给队列 put 元素的速度比不过 get 元素的速度就会造成 time.sleep(2)
put 元素的速度是很快的,你可以看 size 线程的输出值,几秒钟就可以生成 10000 条数据,并且在数据不足 10000 的时候再次生成数据 put 进去。所以说无论是否 time.sleep(2),都可以保证 get 的时候队列中数据是大于 10000 的
生产者只需保证队列中的数据达到 10000 条就不会再生产了,这时候只剩下消费者不停地 get 数据
你把 get_str 修改成初始化队列试试,获取 1000 条那就只需要 0.02 秒
生成随机字符串也是很花时间的
已经找到原因了,详见附言,感谢帮助。
win7,py3.6
队列大小为:181
insert
获取 1000 条数据需要的时间为:0.40
获取 1000 条数据需要的时间为:0.05
获取 1000 条数据需要的时间为:0.10
获取 1000 条数据需要的时间为:0.38
获取 1000 条数据需要的时间为:0.40
获取 1000 条数据需要的时间为:0.25
获取 1000 条数据需要的时间为:0.24
获取 1000 条数据需要的时间为:0.12
获取 1000 条数据需要的时间为:0.01
获取 1000 条数据需要的时间为:0.57
上面是正常跑,接下来尝试把进程的优先级拉高
反而开始不稳定了
获取 1000 条数据需要的时间为:0.14
获取 1000 条数据需要的时间为:3.67
获取 1000 条数据需要的时间为:0.27
获取 1000 条数据需要的时间为:0.96
获取 1000 条数据需要的时间为:1.71
获取 1000 条数据需要的时间为:3.84
获取 1000 条数据需要的时间为:3.50
然后把优先级改为 [普通]
获取 1000 条数据需要的时间为:0.19
获取 1000 条数据需要的时间为:0.18
获取 1000 条数据需要的时间为:0.04
获取 1000 条数据需要的时间为:0.16
获取 1000 条数据需要的时间为:0.14
获取 1000 条数据需要的时间为:0.47
获取 1000 条数据需要的时间为:0.20
获取 1000 条数据需要的时间为:0.20
获取 1000 条数据需要的时间为:0.12
获取 1000 条数据需要的时间为:0.03
获取 1000 条数据需要的时间为:0.04
win10
队列大小为:188
insert
队列大小为:10000
队列大小为:10000
队列大小为:10000
获取 1000 条数据需要的时间为:3.10
队列大小为:10000
队列大小为:10000
队列大小为:10000
队列大小为:10000
队列大小为:10000
队列大小为:10000
获取 1000 条数据需要的时间为:3.85
队列大小为:10000
队列大小为:10000
队列大小为:10000
队列大小为:10000
队列大小为:10000
获取 1000 条数据需要的时间为:3.37
队列大小为:10000
队列大小为:10000
结论是 Windows 与 Linux 的进程实现方式不同造成的?
哈哈,谢谢,顺便给大家留一个参考
可能是因为生产者不停持有锁造成的问题,详见附言,感谢回复。
可以尝试在生产者中加一个 else,在队里满 10000 时 sleep 一下,应该就正常了。否则会不停地获取队列大小,占用锁导致 get 很慢。
用 queue 和 pipe 扔了一张图片进去,都一样慢,windows 下
已经找到原因了, 详见附言


