Python多进程异步调用apply_async,子任务不执行的原因排查与解决方法
问题:为啥多传一个 queue 参数,子任务函数 perform_task(), 没有执行?
代码如下:
from multiprocessing import Pool, Queue
from time import time, sleep
import random
def perform_task(id, queue):
print(" [{}号] 子进程 [开始] 执行任务".format(id))
begin = time()
sleep(random.random() * 5)
print(" [{}号] 子进程 [结束] 执行任务".format(id))
end = time()
cost = end - begin
res = " [{}号] 子进程执行任务耗时:{}".format(id, cost)
queue.put(res)
print(res)
if name == “main”:
# 进程结果集
queue = Queue()
# 进程池中进程最大数量
pool_count = 5
# 创建进程池
pool = Pool(pool_count)
print(“进程池准备就绪, 多进程开始执行任务,等待结束…”)
#
for i in range(pool_count):
pool.apply_async(perform_task, args=(i, queue) )
# 关闭进程池,使其不再接受新的任务
pool.close()
# 主进程阻塞机制
pool.join()
print(“所有进程池中的任务完成”)
正常结果应该是:
进程池准备就绪, 多进程开始执行任务,等待结束…
[ 0 号] 子进程 [开始] 执行任务
[ 1 号] 子进程 [开始] 执行任务
[ 2 号] 子进程 [开始] 执行任务
[ 3 号] 子进程 [开始] 执行任务
[ 4 号] 子进程 [开始] 执行任务
[ 3 号] 子进程 [结束] 执行任务
[ 3 号] 子进程执行任务耗时:0.24634218215942383
[ 4 号] 子进程 [结束] 执行任务
[ 4 号] 子进程执行任务耗时:0.41087770462036133
[ 2 号] 子进程 [结束] 执行任务
[ 2 号] 子进程执行任务耗时:0.8377587795257568
[ 1 号] 子进程 [结束] 执行任务
[ 1 号] 子进程执行任务耗时:2.044529914855957
[ 0 号] 子进程 [结束] 执行任务
[ 0 号] 子进程执行任务耗时:2.7406675815582275
所有进程池中的任务完成
Python多进程异步调用apply_async,子任务不执行的原因排查与解决方法
求大佬指点
Note that with asynchronous programming you don’t need to manually deal with result queues - apply_async returns a AsyncResult instance which can be used to get the result: result.get(). This uses an underlying result (out-) queue and so you simply need to return in your target function. Also if you use result.get() and you passed a Queue instance as an argument to the target function it will raise a RuntimeError
multiprocessing 中的 Queue 不能被序列化。
目的应该是这样吧Python<br>#! /usr/bin/python3<br><br>import time<br>import random<br>import multiprocessing<br><br>queue = multiprocessing.Queue()<br><br>def perform_task(id):<br> print("{0}# process start".format(id))<br> begin = time.time()<br> time.sleep(random.random()*5)<br> print("{0}# process end".format(id))<br> <br> res = "{0}# process time : {1}".format(id, time.time()-begin)<br> print(res)<br> <br> queue.put(res)<br> <br> return<br> <br>if __name__ == "__main__":<br> poolCount = 5<br> pool = multiprocessing.Pool(poolCount)<br> <br> for i in range(poolCount):<br> pool.apply_async(perform_task, args=(i, ))<br> <br> pool.close()<br> pool.join()<br> <br> print("End tasking, print results:")<br> <br> while True:<br> res = queue.get()<br> print(res)<br> if queue.empty():<br> break<br><br>
执行结果:<br>0# process start<br>1# process start<br>2# process start<br>3# process start<br>4# process start<br>0# process end<br>0# process time : 0.5990872383117676<br>1# process end<br>1# process time : 2.662280559539795<br>3# process end<br>3# process time : 3.903242826461792<br>2# process end<br>2# process time : 4.440236330032349<br>4# process end<br>4# process time : 4.543649435043335<br>End tasking, print results:<br>0# process time : 0.5990872383117676<br>1# process time : 2.662280559539795<br>3# process time : 3.903242826461792<br>2# process time : 4.440236330032349<br>4# process time : 4.543649435043335<br>
为啥我看到函数名就直觉是蟒蛇呢?我基本没怎么学过蟒蛇,同时百分百确定没用过这个函数。


