Python中如何正确使用协程与多进程

用 asyncio 做了一个 UDP 传输性能测试工具,目前单进程服务端性能不够,流量大的时候处理不过来,服务端用的 asyncio.DatagramProtocol,怎么变成多进程的呢?试试了一下抢占式的写法,运行报错了,运行起来也只有一个进程工作,上代码


import asyncio
import time
import socket
from multiprocessing import Process

loop = asyncio.get_event_loop() size = 0

class ServerProtocol(asyncio.DatagramProtocol):

def connection_made(self, transport):
    self.transport = transport

def datagram_received(self, data, addr, args=None):
    global size
    data = data.decode()
    message = data
    index = data.find('\n')
    if index > 0:
        filename = data[0:index]
        data = data[index+1::]
        size += len(data)
    task = self.WriteToFile(filename, data)
    asyncio.run_coroutine_threadsafe(task, loop)

async def WriteToFile(self, f, data):
    await asyncio.sleep(1)
    return True

async def print_size(): global size while True: await asyncio.sleep(1) print(size)

def start(sock): listen = loop.create_datagram_endpoint( ServerProtocol, sock = sock ) transport, protocol = loop.run_until_complete(listen)

task = print_size()
asyncio.run_coroutine_threadsafe(task, loop)
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
transport.close()
loop.close()
sock.close()

if name == ‘main’: print(“Starting UDP server”) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind((‘0.0.0.0’, 9873)) for i in range(1): t = Process(target=start, args=(sock,)) t.deamon = True t.start() start(sock)


报错信息

Exception in callback BaseSelectorEventLoop._add_reader(6, <bound method..., bufsize=0>>>)
handle: <Handle BaseSelectorEventLoop._add_reader(6, <bound method..., bufsize=0>>>)>
Traceback (most recent call last):
  File "/usr/local/python36/lib/python3.6/asyncio/selector_events.py", line 264, in _add_reader
    key = self._selector.get_key(fd)
  File "/usr/local/python36/lib/python3.6/selectors.py", line 191, in get_key
    raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File “/usr/local/python36/lib/python3.6/asyncio/events.py”, line 145, in _run self._callback(*self._args) File “/usr/local/python36/lib/python3.6/asyncio/selector_events.py”, line 267, in _add_reader (handle, None)) File “/usr/local/python36/lib/python3.6/selectors.py”, line 412, in register self._epoll.register(key.fd, epoll_events) FileExistsError: [Errno 17] File exists


Python中如何正确使用协程与多进程

4 回复

这。。。咋没人呢


在Python里混用协程和多进程,主要是为了把CPU密集和I/O密集的任务分开处理。协程(asyncio)擅长处理大量I/O等待,而多进程(multiprocessing)能真正利用多核CPU。关键是要用对方法,让它们各司其职。

最常见且推荐的方式是使用 ProcessPoolExecutor 配合 asyncio。别直接用 multiprocessing.Process 在协程里创建进程,容易出问题。下面这个例子演示了标准做法:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

# 这是一个CPU密集型的计算函数
def cpu_bound_task(n):
    # 模拟一个耗时的CPU计算,比如计算斐波那契数列
    if n <= 1:
        return n
    else:
        return cpu_bound_task(n-1) + cpu_bound_task(n-2)

# 这是一个I/O密集型的协程函数
async def io_bound_task(task_id):
    print(f'IO任务 {task_id} 开始')
    await asyncio.sleep(2)  # 模拟I/O等待,比如网络请求
    print(f'IO任务 {task_id} 结束')
    return f'IO结果-{task_id}'

async def main():
    # 1. 创建进程池执行器,通常进程数设为CPU核心数
    with ProcessPoolExecutor(max_workers=4) as executor:
        # 获取当前的事件循环
        loop = asyncio.get_running_loop()

        # 2. 提交CPU密集型任务到进程池
        # run_in_executor 会返回一个 asyncio.Future 对象
        cpu_future = loop.run_in_executor(executor, cpu_bound_task, 35)
        print("CPU任务已提交到进程池,正在异步计算...")

        # 3. 同时运行多个I/O密集型协程任务
        io_tasks = [io_bound_task(i) for i in range(3)]
        io_results = await asyncio.gather(*io_tasks, cpu_future)
        # gather会等待所有任务完成,结果顺序与传入顺序一致
        # 前三个是io_results,最后一个是cpu_future的结果
        cpu_result = io_results[-1]
        io_results = io_results[:-1]

        print(f"所有I/O任务结果: {io_results}")
        print(f"CPU任务结果: {cpu_result}")

if __name__ == '__main__':
    start = time.time()
    asyncio.run(main())
    print(f"总耗时: {time.time() - start:.2f}秒")

代码要点解析:

  1. 分工明确cpu_bound_task 是纯计算函数,用 def 定义,交给进程池。io_bound_task 是协程函数,用 async def 定义,处理I/O。
  2. 核心方法loop.run_in_executor(executor, func, *args)。这是连接事件循环和进程池的桥梁。它把同步函数 func 放到 executor(这里是进程池)里执行,并返回一个 asyncio.Future,这样协程就能 await 它而不阻塞事件循环。
  3. 并发执行asyncio.gather 同时并发执行多个协程任务,并把进程池任务返回的 Future 也一起 gather 进去,实现真正的“同时等待”。
  4. 进程池配置max_workers 一般设为 os.cpu_count() 或稍小一点的值,避免过度切换。

另一种场景:在子进程中运行协程事件循环 如果你的CPU密集型任务内部也需要处理异步I/O,那可以把整个协程事件循环放到子进程里。但这更复杂一些:

import asyncio
from multiprocessing import Process

async def complex_async_work(n):
    # 一个既包含计算又包含I/O的复杂协程任务
    result = n * n  # 一些计算
    await asyncio.sleep(1)  # 一些I/O
    return result

def run_async_in_process(n):
    # 这个函数会在子进程中运行,它负责启动事件循环
    result = asyncio.run(complex_async_work(n))
    print(f"子进程计算结果: {result}")

async def main():
    # 在主进程的协程中创建子进程
    p = Process(target=run_async_in_process, args=(5,))
    p.start()
    p.join()

if __name__ == '__main__':
    asyncio.run(main())

总结一下:

  • 简单场景(计算+I/O分离):用 ProcessPoolExecutor + run_in_executor,这是最清晰、最不容易出错的方式。
  • 复杂场景(子进程内也需要异步):把 asyncio.run() 放到 multiprocessing.Process 的目标函数里。

简单建议:用进程池处理CPU活,用协程处理I/O等。

把 fork 搞明白

一多进城就 multiprocessing
有没有想过要看 multiprocessing 的源码?

感谢回复,已经解决,因为多进程中共用了一个事件循环,add_reader 重复注册了 socket.fileno,所以报错了,用 new_event_loop 解决,与 fork、mutiprocessing 的用法和是否看了源码没有关系,另外我认为像 fork,mutiprocessing 这种系统接口函数和系统接口的高级封装直接用就是了,除非遇到相关必须要解决的问题和抱着学习的目的,没有必要看这种源码

回到顶部