如何优雅地通知Python的multiprocessing.Pool中的进程退出?

目的是想在主进程中随时终止子进程的执行 目前的代码长这样 有个问题是如果我 Ctrl-C 的话 所有的子进程虽然会正常退出 但是主进程会一直挂起在 pool.join()上 求解决方案~

#!/usr/bin/env python2

from future import print_function

import time import signal import logging from multiprocessing import Manager from multiprocessing.pool import Pool from multiprocessing.queues import Queue

def Fn(n, q, ns): if ns.done: return try: q.put(n) finally: return

def main(): handler = signal.signal(signal.SIGINT, signal.SIG_IGN) pool = Pool(processes=5) signal.signal(signal.SIGINT, handler)

total = 10000 manager = Manager() q = manager.Queue() ns = manager.Namespace() ns.done = False

for n in range(total): pool.apply_async(Fn, args=(n, q, ns)) pool.close() try: received = 0 while received != total: n = q.get(60) print(‘get {} from queue’.format(n)) received += 1 except KeyboardInterrupt: ns.done = True pass finally: pool.join()

if name == ‘main’: main()


如何优雅地通知Python的multiprocessing.Pool中的进程退出?

16 回复

openstack 里缩水的代码, 稍微简化了下
https://github.com/lolizeppelin/simpleservice/blob/master/simpleservice/base.py

看懂了就知道怎么处理了


import multiprocessing
import time
import signal
import sys

class GracefulExitPool:
    def __init__(self, processes=None):
        self.pool = multiprocessing.Pool(processes=processes)
        self.shutdown_event = multiprocessing.Event()
        
    def worker(self, task):
        """工作函数,定期检查退出信号"""
        if self.shutdown_event.is_set():
            print(f"进程 {multiprocessing.current_process().name} 收到退出信号")
            return None
            
        # 模拟工作
        time.sleep(0.5)
        return f"处理任务: {task}"
    
    def apply_async_with_check(self, task):
        """包装apply_async,添加退出检查"""
        return self.pool.apply_async(self._worker_wrapper, (task,))
    
    def _worker_wrapper(self, task):
        """包装器函数,用于进程内部"""
        if self.shutdown_event.is_set():
            return None
        return self.worker(task)
    
    def shutdown(self):
        """优雅关闭池"""
        print("发送关闭信号...")
        self.shutdown_event.set()
        
        # 等待当前任务完成
        self.pool.close()
        self.pool.join()
        print("所有进程已优雅退出")

# 使用示例
if __name__ == "__main__":
    pool = GracefulExitPool(processes=4)
    
    try:
        # 提交任务
        results = []
        for i in range(10):
            result = pool.apply_async_with_check(i)
            results.append(result)
        
        # 模拟运行一段时间后需要退出
        time.sleep(2)
        
        # 获取部分结果
        for r in results[:5]:
            print(r.get(timeout=1))
        
    except KeyboardInterrupt:
        print("\n收到中断信号")
    finally:
        # 优雅关闭
        pool.shutdown()

核心方法:

  1. 使用Event信号multiprocessing.Event()作为全局退出标志,所有工作进程定期检查这个标志
  2. 包装工作函数:在真正的工作逻辑外包一层检查退出的包装器
  3. close() + join()组合:先阻止新任务提交,再等待现有任务完成

关键点:

  • 不要用terminate(),它会强制杀死进程,可能导致资源泄漏
  • 工作函数中需要定期检查退出标志(比如在循环中或任务间隙)
  • 配合try-finally确保无论是否异常都能执行清理

一句话总结:用Event信号配合close/join实现优雅退出。

def _pipe_watcher(self):
# This will block until the write end is closed when the parent
# dies unexpectedly
self.readpipe.read(1)

LOG.info(‘Parent process has died unexpectedly, exiting’)

if self.launcher:
self.launcher.stop()
sys.exit(1)

#2 这段代码在主进程创建 pipe, 然后设置进程退出时的 close fd, 通过这种方式来通知子进程 read 返回吧
但是我现在是 hang 在了主进程的 join 呢 而且目前来看 子进程(应该)是都正常退出了 主进程没有返回

pipe 是主进程退出 子进程也收到能退出 是个退出保险

主进程里确认子进程退出用 waitpid

啥叫“应该” 子进程是否结束是可以看到

multiprocessing 我记得默认是用 socket 来父子进程通信的
join 里应该是取了 socket 数据 并 wait 子进程结束

好好看看处理信号的部分就知道怎么让子进程 exit 了

当然如果你代码是 win 上的当我上面的的都没说

#5 我的意思是 ##应该正常##退出了 子进程是肯定退出了 ps 看过 至于是不是异常退出的 并没有确定
如果不用 Pool 手动创建管理的话 waitpid 应该是 OK 的
但是用 Pool 的话 惯用法应该不会去 os.getpid 然后手动 waitpid 吧

你代码有问题 子进程是不是正常退出的都不知道 直接 try 包一层都好啊

子进程有信号处理没
主进程收到 ctrl c 信号以后 给所有子进程发终止信号不就行了

打日志 好歹你要知道子进程怎么退出的

#7 不是子进程正不正常退出的问题 这只是一个描述问题 不用纠结 我用 try 能保证它退出 并且我上面贴的代码就是这样了

现在的问题是 子进程全部退出了 父进程仍然在 pool.join()没有返回 这个问题

#8 按照我的理解 如果用 C 的做法实现 pool.join 子进程无论怎么正常 /异常退出 父进程都应该能够通过 waitpid 感知到子进程退出 所有退出之后 join 就可以返回了 所以 python 现在这个现象让我很不理解

看了下, 和父子进程一点关系都没…

自进程
ns.done 没有捕获异常只是小问题

主要在这 3 有问题
manager = Manager()
q = manager.Queue()
ns = manager.Namespace()

要解决得慢慢折腾里面代码 我随便弄了下不想弄了, 折腾 multiprocessing 不如自己写多进程代码还好控一点


大致搞定了 给你代码弄蒙了

一开始叫你看信号是没错的,你信号用错了…

except KeyboardInterrupt 这是不对的,你注册正确的拦截信号以后,是不会收到这个错误的

信号要处理 2 次,一次是在 fork 前,就是 multiprocessing 创建任务之前,拦截 SIGINT,拦截执行内容
def empty(signo, frame):
print 'do nothing!!!'
这里的目的是让 multiprocessing 里的代码不会因为收到 SIGINT 抛出异常

第二次处理时在 fork 后,拦截内容
def stop(signo, frame):
print ‘stoped’
ns.done = True

这里拦截到信号以后设置 ns.done

顺便…这个 pool 用的有点问题 Fn 返回后还会生成新的 Fn 塞进去…具体你看看怎么停掉 pool 我就不看了

我印象中 setDaemon=True
也就是设为守护进程就可以随着父进程的关闭而关闭了。。

参考 Samba 和 ctdb 代码吧

回到顶部