Python多进程异步调用apply_async,子任务不执行的原因排查与解决方法

问题:为啥多传一个 queue 参数,子任务函数 perform_task(), 没有执行?

代码如下:

from multiprocessing import Pool, Queue
from time import time, sleep
import random


def perform_task(id, queue):
print(" [{}号] 子进程 [开始] 执行任务".format(id))
begin = time()
sleep(random.random() * 5)
print(" [{}号] 子进程 [结束] 执行任务".format(id))
end = time()
cost = end - begin
res = " [{}号] 子进程执行任务耗时:{}".format(id, cost)
queue.put(res)
print(res)



if name == “main”:
# 进程结果集
queue = Queue()

# 进程池中进程最大数量
pool_count = 5

# 创建进程池
pool = Pool(pool_count)
print(“进程池准备就绪, 多进程开始执行任务,等待结束…”)

#
for i in range(pool_count):
pool.apply_async(perform_task, args=(i, queue) )
# 关闭进程池,使其不再接受新的任务
pool.close()
# 主进程阻塞机制
pool.join()
print(“所有进程池中的任务完成”)


正常结果应该是:

进程池准备就绪, 多进程开始执行任务,等待结束…
[ 0 号] 子进程 [开始] 执行任务
[ 1 号] 子进程 [开始] 执行任务
[ 2 号] 子进程 [开始] 执行任务
[ 3 号] 子进程 [开始] 执行任务
[ 4 号] 子进程 [开始] 执行任务
[ 3 号] 子进程 [结束] 执行任务
[ 3 号] 子进程执行任务耗时:0.24634218215942383
[ 4 号] 子进程 [结束] 执行任务
[ 4 号] 子进程执行任务耗时:0.41087770462036133
[ 2 号] 子进程 [结束] 执行任务
[ 2 号] 子进程执行任务耗时:0.8377587795257568
[ 1 号] 子进程 [结束] 执行任务
[ 1 号] 子进程执行任务耗时:2.044529914855957
[ 0 号] 子进程 [结束] 执行任务
[ 0 号] 子进程执行任务耗时:2.7406675815582275
所有进程池中的任务完成
Python多进程异步调用apply_async,子任务不执行的原因排查与解决方法


6 回复

求大佬指点


问题分析: apply_async 不执行子任务通常是因为主进程提前结束,导致进程池被回收。apply_async 是异步提交任务,主进程需要调用 close()join()get() 来等待子进程完成。

解决方案: 确保主进程等待所有异步任务完成。以下是两种常用方法:

方法1:使用 join() 等待所有任务

from multiprocessing import Pool
import time

def task(x):
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    with Pool(4) as pool:
        results = []
        for i in range(10):
            # 提交异步任务
            result = pool.apply_async(task, (i,))
            results.append(result)
        
        # 关键:关闭进程池并等待所有任务完成
        pool.close()
        pool.join()
        
        # 获取结果
        for res in results:
            print(res.get())

方法2:使用 get() 立即等待结果

from multiprocessing import Pool

def task(x):
    return x * x

if __name__ == '__main__':
    with Pool(4) as pool:
        results = []
        for i in range(10):
            result = pool.apply_async(task, (i,))
            # 立即调用 get() 会阻塞直到任务完成
            results.append(result.get())
        
        print(results)

其他可能原因:

  1. 未使用 if __name__ == '__main__': - 在Windows和macOS上必须使用,否则会引发递归创建进程错误。
  2. 任务函数定义错误 - 确保函数可序列化且参数正确。
  3. 异常被静默吞没 - 使用 get(timeout) 或检查 ready()/successful() 来捕获异常。

一句话总结: 确保主进程通过 close()/join()get() 等待异步任务完成。

Note that with asynchronous programming you don’t need to manually deal with result queues - apply_async returns a AsyncResult instance which can be used to get the result: result.get(). This uses an underlying result (out-) queue and so you simply need to return in your target function. Also if you use result.get() and you passed a Queue instance as an argument to the target function it will raise a RuntimeError

multiprocessing 中的 Queue 不能被序列化。
目的应该是这样吧
Python<br>#! /usr/bin/python3<br><br>import time<br>import random<br>import multiprocessing<br><br>queue = multiprocessing.Queue()<br><br>def perform_task(id):<br> print("{0}# process start".format(id))<br> begin = time.time()<br> time.sleep(random.random()*5)<br> print("{0}# process end".format(id))<br> <br> res = "{0}# process time : {1}".format(id, time.time()-begin)<br> print(res)<br> <br> queue.put(res)<br> <br> return<br> <br>if __name__ == "__main__":<br> poolCount = 5<br> pool = multiprocessing.Pool(poolCount)<br> <br> for i in range(poolCount):<br> pool.apply_async(perform_task, args=(i, ))<br> <br> pool.close()<br> pool.join()<br> <br> print("End tasking, print results:")<br> <br> while True:<br> res = queue.get()<br> print(res)<br> if queue.empty():<br> break<br><br>
执行结果:
<br>0# process start<br>1# process start<br>2# process start<br>3# process start<br>4# process start<br>0# process end<br>0# process time : 0.5990872383117676<br>1# process end<br>1# process time : 2.662280559539795<br>3# process end<br>3# process time : 3.903242826461792<br>2# process end<br>2# process time : 4.440236330032349<br>4# process end<br>4# process time : 4.543649435043335<br>End tasking, print results:<br>0# process time : 0.5990872383117676<br>1# process time : 2.662280559539795<br>3# process time : 3.903242826461792<br>2# process time : 4.440236330032349<br>4# process time : 4.543649435043335<br>

为啥我看到函数名就直觉是蟒蛇呢?我基本没怎么学过蟒蛇,同时百分百确定没用过这个函数。

回到顶部