如何优雅地通知Python的multiprocessing.Pool中的进程退出?
目的是想在主进程中随时终止子进程的执行 目前的代码长这样 有个问题是如果我 Ctrl-C 的话 所有的子进程虽然会正常退出 但是主进程会一直挂起在 pool.join()上 求解决方案~
#!/usr/bin/env python2
from future import print_function
import time
import signal
import logging
from multiprocessing import Manager
from multiprocessing.pool import Pool
from multiprocessing.queues import Queue
def Fn(n, q, ns):
if ns.done:
return
try:
q.put(n)
finally:
return
def main():
handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(processes=5)
signal.signal(signal.SIGINT, handler)
total = 10000
manager = Manager()
q = manager.Queue()
ns = manager.Namespace()
ns.done = False
for n in range(total):
pool.apply_async(Fn, args=(n, q, ns))
pool.close()
try:
received = 0
while received != total:
n = q.get(60)
print(‘get {} from queue’.format(n))
received += 1
except KeyboardInterrupt:
ns.done = True
pass
finally:
pool.join()
if name == ‘main’:
main()
如何优雅地通知Python的multiprocessing.Pool中的进程退出?
openstack 里缩水的代码, 稍微简化了下
https://github.com/lolizeppelin/simpleservice/blob/master/simpleservice/base.py
看懂了就知道怎么处理了
import multiprocessing
import time
import signal
import sys
class GracefulExitPool:
def __init__(self, processes=None):
self.pool = multiprocessing.Pool(processes=processes)
self.shutdown_event = multiprocessing.Event()
def worker(self, task):
"""工作函数,定期检查退出信号"""
if self.shutdown_event.is_set():
print(f"进程 {multiprocessing.current_process().name} 收到退出信号")
return None
# 模拟工作
time.sleep(0.5)
return f"处理任务: {task}"
def apply_async_with_check(self, task):
"""包装apply_async,添加退出检查"""
return self.pool.apply_async(self._worker_wrapper, (task,))
def _worker_wrapper(self, task):
"""包装器函数,用于进程内部"""
if self.shutdown_event.is_set():
return None
return self.worker(task)
def shutdown(self):
"""优雅关闭池"""
print("发送关闭信号...")
self.shutdown_event.set()
# 等待当前任务完成
self.pool.close()
self.pool.join()
print("所有进程已优雅退出")
# 使用示例
if __name__ == "__main__":
pool = GracefulExitPool(processes=4)
try:
# 提交任务
results = []
for i in range(10):
result = pool.apply_async_with_check(i)
results.append(result)
# 模拟运行一段时间后需要退出
time.sleep(2)
# 获取部分结果
for r in results[:5]:
print(r.get(timeout=1))
except KeyboardInterrupt:
print("\n收到中断信号")
finally:
# 优雅关闭
pool.shutdown()
核心方法:
- 使用Event信号:
multiprocessing.Event()作为全局退出标志,所有工作进程定期检查这个标志 - 包装工作函数:在真正的工作逻辑外包一层检查退出的包装器
- close() + join()组合:先阻止新任务提交,再等待现有任务完成
关键点:
- 不要用
terminate(),它会强制杀死进程,可能导致资源泄漏 - 工作函数中需要定期检查退出标志(比如在循环中或任务间隙)
- 配合
try-finally确保无论是否异常都能执行清理
一句话总结:用Event信号配合close/join实现优雅退出。
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
# dies unexpectedly
self.readpipe.read(1)
LOG.info(‘Parent process has died unexpectedly, exiting’)
if self.launcher:
self.launcher.stop()
sys.exit(1)
#2 这段代码在主进程创建 pipe, 然后设置进程退出时的 close fd, 通过这种方式来通知子进程 read 返回吧
但是我现在是 hang 在了主进程的 join 呢 而且目前来看 子进程(应该)是都正常退出了 主进程没有返回
pipe 是主进程退出 子进程也收到能退出 是个退出保险
主进程里确认子进程退出用 waitpid
啥叫“应该” 子进程是否结束是可以看到
multiprocessing 我记得默认是用 socket 来父子进程通信的
join 里应该是取了 socket 数据 并 wait 子进程结束
好好看看处理信号的部分就知道怎么让子进程 exit 了
当然如果你代码是 win 上的当我上面的的都没说
#5 我的意思是 ##应该正常##退出了 子进程是肯定退出了 ps 看过 至于是不是异常退出的 并没有确定
如果不用 Pool 手动创建管理的话 waitpid 应该是 OK 的
但是用 Pool 的话 惯用法应该不会去 os.getpid 然后手动 waitpid 吧
你代码有问题 子进程是不是正常退出的都不知道 直接 try 包一层都好啊
子进程有信号处理没
主进程收到 ctrl c 信号以后 给所有子进程发终止信号不就行了
打日志 好歹你要知道子进程怎么退出的
#7 不是子进程正不正常退出的问题 这只是一个描述问题 不用纠结 我用 try 能保证它退出 并且我上面贴的代码就是这样了
现在的问题是 子进程全部退出了 父进程仍然在 pool.join()没有返回 这个问题
#8 按照我的理解 如果用 C 的做法实现 pool.join 子进程无论怎么正常 /异常退出 父进程都应该能够通过 waitpid 感知到子进程退出 所有退出之后 join 就可以返回了 所以 python 现在这个现象让我很不理解
看了下, 和父子进程一点关系都没…
自进程
ns.done 没有捕获异常只是小问题
主要在这 3 有问题
manager = Manager()
q = manager.Queue()
ns = manager.Namespace()
要解决得慢慢折腾里面代码 我随便弄了下不想弄了, 折腾 multiprocessing 不如自己写多进程代码还好控一点
大致搞定了 给你代码弄蒙了
一开始叫你看信号是没错的,你信号用错了…
except KeyboardInterrupt 这是不对的,你注册正确的拦截信号以后,是不会收到这个错误的
信号要处理 2 次,一次是在 fork 前,就是 multiprocessing 创建任务之前,拦截 SIGINT,拦截执行内容
def empty(signo, frame):
print 'do nothing!!!'
这里的目的是让 multiprocessing 里的代码不会因为收到 SIGINT 抛出异常
第二次处理时在 fork 后,拦截内容
def stop(signo, frame):
print ‘stoped’
ns.done = True
这里拦截到信号以后设置 ns.done
顺便…这个 pool 用的有点问题 Fn 返回后还会生成新的 Fn 塞进去…具体你看看怎么停掉 pool 我就不看了
我印象中 setDaemon=True
也就是设为守护进程就可以随着父进程的关闭而关闭了。。
参考 Samba 和 ctdb 代码吧

