Python中多进程编程常见问题与解决方案
问题背景
现在有一个 Python 程序,这个程序里面 import abc 模块,该是用 C++ 编写的,编写的时候还使用了 OpenMP 多线程库。直接通过 python xxx.py --options OPTIONS 来运行,可以感受到多线程带来性能上的提升。但是这个 abc 模块的编写似乎有一点问题,在某些参数条件下会崩溃,这个崩溃不是抛出异常,而是整个 Python 程序(解释器)的崩溃,没有任何错误提示。OPTIONS 最终也是由 __main__ 函数负责解析,调用里面的函数 func(args...) 来完成。
我们需要测试不同的参数组合,并且不想因为某一组参数导致整个测试程序的崩溃。所以我们就另外编写了一个 Python 程序,里面 from xxx import func,然后想要通过派生一个新的进程来执行 func,防止因为崩溃导致主进程的崩溃。
multiprocessing 方式
首先想到的是使用 multiprocessing 方式,相关的代码如下:
from multiprocessing import Process
from xxx import func
p = Process(target=func, args=(OPTIONS)) # OPTIONS 为想要传入的参数
p.start()
p.join(300) # 设置主进程阻塞 300 秒
while p.is_alive():
p.terminate()
处理 IPC 回收数据
设定超过 300 秒就算失败。现在的问题是,使用这种方式执行 func 后,看起来 func 的执行失去了并行能力,执行其中的一个运算会卡住,但是这个卡住是真卡住了还是因为没有运算完,这个无从得知。
subprocess 方式
其次想到的就是利用 subprocess 来做,相关代码如下:
import subprocess
from xxx import func
p = subprocess.Popen(‘python xxx.py --options OPTIONS’, shell=True)
处理超时以及从 STDOUT 回收数据
利用这种方式 xxx.py 可以正常的并行执行,但是想要回收数据,只能通过 STDOUT 或者写入文件来进行。
问题
为什么通过 multiprocess.Process 执行的函数多进程会看起来丧失并行能力?
Python中多进程编程常见问题与解决方案
子进程在的 python 解释器都挂了还能指望 join 能正常工作么?
多进程编程确实容易踩坑,我遇到过几个典型问题:
1. 全局变量不共享
每个进程有自己的内存空间,修改全局变量只在当前进程有效。用multiprocessing.Value或Array共享数据:
from multiprocessing import Process, Value, Array
def worker(v, arr):
v.value += 1
arr[0] = 999
if __name__ == '__main__':
val = Value('i', 0) # 共享整数
arr = Array('i', [1, 2, 3]) # 共享数组
p = Process(target=worker, args=(val, arr))
p.start()
p.join()
print(val.value) # 输出: 1
print(arr[:]) # 输出: [999, 2, 3]
2. 进程池任务卡住
Pool.map遇到异常会静默失败。用Pool.apply_async配合超时:
from multiprocessing import Pool
import time
def task(x):
if x == 3:
raise ValueError("故意出错")
return x * x
if __name__ == '__main__':
with Pool(2) as pool:
results = []
for i in range(5):
r = pool.apply_async(task, (i,))
results.append(r)
for r in results:
try:
print(r.get(timeout=2))
except Exception as e:
print(f"任务失败: {e}")
3. 僵尸进程问题
子进程结束后要join,否则可能变成僵尸进程。用Process的daemon属性或上下文管理器:
from multiprocessing import Process
import time
def worker():
time.sleep(1)
# 方法1:daemon进程
p = Process(target=worker, daemon=True)
p.start()
# daemon进程会自动结束
# 方法2:with语句
with Process(target=worker) as p:
p.start()
p.join() # 自动等待
4. Windows平台的特殊问题
Windows没有fork,必须用if __name__ == '__main__':保护入口代码,否则会递归创建进程。
5. 进程间通信
优先用Queue而不是Pipe,Queue自带锁更安全:
from multiprocessing import Process, Queue
def producer(q):
q.put([42, None, 'hello'])
def consumer(q):
print(q.get())
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start(); p2.start()
p1.join(); p2.join()
总结:多进程的核心是处理好进程隔离和通信。
你明显不会写异步处理的程序。
py 下没试过。。。。 php 下用异步的类 exec 调用可以开 n 个 php 程序,具体 api 是 proc_open 。。。。 py 下肯定有这样的 api
通过 spawn 方式的 Process 不是重新开一个新的 Python 解释器来做吗?不过似乎 Python 2 不支持这种定义?
没有经验,希望多多指教。
查到 Python 下可以用 pool.apply_async 似乎是同样的工作,周一去试试。
其实我就想让主进程调用完了就阻塞的。
如果你自己看过 multiprocess.Process 的源代码,他内部是有子进程和父进程的通讯的,而你的子程序可能因为某些原因导致了通讯未能正常进行,所以就表选出卡住了的状态,而一个正常运行的 python 解析器是肯定可以保证通讯正常进行的
另外,实际上 multiprocess.Process 内部依然使用了 Popen 来实现,所以你自己模仿实现一个也没啥毛病
好的 谢谢


