在 uwsgi/gunicorn 多进程环境中,如何用 Python 获取 os.popen() 和 subprocess.Popen() 的回调信息?

有一个任务大约耗时 1~3 秒钟左右,所以考虑先阻塞请求( 1~3 秒的等待可以接受),等任务执行完,返回完成信息之后,在进行下一步。由于 uwsgi/gunicorn 多个 worker, 须要获取 os.popen()、subprocess.Poen() 方法 fork 出的进程 id、以及状态返回值,异常的时候父进程还要能 kill 子进程。

在单进程的情况下,按上面的思路可以行的通。但是多进程的时候,就各种信息捕获异常。想要在多 woker 的环境中,阻塞请求,还能优雅的能获取子进程的回调信息呢?还是上面的思路本身有问题?
在 uwsgi/gunicorn 多进程环境中,如何用 Python 获取 os.popen() 和 subprocess.Popen() 的回调信息?


3 回复

在uWSGI/Gunicorn多进程环境里,用os.popen()subprocess.Popen()获取回调信息确实要注意进程隔离问题。直接在主进程里开子进程,worker重启时子进程会变孤儿进程。

简单粗暴的解法是用subprocess.Popen()配合communicate()同步获取输出:

import subprocess

def run_command(cmd):
    proc = subprocess.Popen(
        cmd,
        shell=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )
    stdout, stderr = proc.communicate()
    return {
        'returncode': proc.returncode,
        'stdout': stdout,
        'stderr': stderr
    }

但这样会阻塞worker。要实时获取输出的话,得用异步方式:

import subprocess
import select

def run_command_async(cmd):
    proc = subprocess.Popen(
        cmd,
        shell=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
        bufsize=1
    )
    
    outputs = []
    while True:
        reads = [proc.stdout.fileno(), proc.stderr.fileno()]
        ret = select.select(reads, [], [])
        
        for fd in ret[0]:
            if fd == proc.stdout.fileno():
                line = proc.stdout.readline()
                if line:
                    outputs.append(('stdout', line.strip()))
            if fd == proc.stderr.fileno():
                line = proc.stderr.readline()
                if line:
                    outputs.append(('stderr', line.strip()))
        
        if proc.poll() is not None:
            break
    
    # 收尾
    for line in proc.stdout:
        outputs.append(('stdout', line.strip()))
    for line in proc.stderr:
        outputs.append(('stderr', line.strip()))
    
    return {
        'returncode': proc.returncode,
        'outputs': outputs
    }

更稳当的做法是把耗时命令扔到Celery之类的任务队列里,避免阻塞worker进程。用消息队列解耦最靠谱。


不知道你用的是哪一个 py2 还是 py3。可以看一下这个链接 https://pymotw.com/3/subprocess/index.html,针对 python3 的 subprocess。这个包和 py2 中有一些不一样,可以看一下 subprocess.run 这个函数的源码,它是通过封装 subprocess.Popen 来的,里面包含了各种异常处理, 可以参考一下.

回到顶部