Python中如何正确使用协程与多进程
用 asyncio 做了一个 UDP 传输性能测试工具,目前单进程服务端性能不够,流量大的时候处理不过来,服务端用的 asyncio.DatagramProtocol,怎么变成多进程的呢?试试了一下抢占式的写法,运行报错了,运行起来也只有一个进程工作,上代码
import asyncio
import time
import socket
from multiprocessing import Process
loop = asyncio.get_event_loop()
size = 0
class ServerProtocol(asyncio.DatagramProtocol):
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr, args=None):
global size
data = data.decode()
message = data
index = data.find('\n')
if index > 0:
filename = data[0:index]
data = data[index+1::]
size += len(data)
task = self.WriteToFile(filename, data)
asyncio.run_coroutine_threadsafe(task, loop)
async def WriteToFile(self, f, data):
await asyncio.sleep(1)
return True
async def print_size():
global size
while True:
await asyncio.sleep(1)
print(size)
def start(sock):
listen = loop.create_datagram_endpoint(
ServerProtocol,
sock = sock
)
transport, protocol = loop.run_until_complete(listen)
task = print_size()
asyncio.run_coroutine_threadsafe(task, loop)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
transport.close()
loop.close()
sock.close()
if name == ‘main’:
print(“Starting UDP server”)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((‘0.0.0.0’, 9873))
for i in range(1):
t = Process(target=start, args=(sock,))
t.deamon = True
t.start()
start(sock)
报错信息
Exception in callback BaseSelectorEventLoop._add_reader(6, <bound method..., bufsize=0>>>)
handle: <Handle BaseSelectorEventLoop._add_reader(6, <bound method..., bufsize=0>>>)>
Traceback (most recent call last):
File "/usr/local/python36/lib/python3.6/asyncio/selector_events.py", line 264, in _add_reader
key = self._selector.get_key(fd)
File "/usr/local/python36/lib/python3.6/selectors.py", line 191, in get_key
raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File “/usr/local/python36/lib/python3.6/asyncio/events.py”, line 145, in _run
self._callback(*self._args)
File “/usr/local/python36/lib/python3.6/asyncio/selector_events.py”, line 267, in _add_reader
(handle, None))
File “/usr/local/python36/lib/python3.6/selectors.py”, line 412, in register
self._epoll.register(key.fd, epoll_events)
FileExistsError: [Errno 17] File exists
Python中如何正确使用协程与多进程
这。。。咋没人呢
在Python里混用协程和多进程,主要是为了把CPU密集和I/O密集的任务分开处理。协程(asyncio)擅长处理大量I/O等待,而多进程(multiprocessing)能真正利用多核CPU。关键是要用对方法,让它们各司其职。
最常见且推荐的方式是使用 ProcessPoolExecutor 配合 asyncio。别直接用 multiprocessing.Process 在协程里创建进程,容易出问题。下面这个例子演示了标准做法:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
# 这是一个CPU密集型的计算函数
def cpu_bound_task(n):
# 模拟一个耗时的CPU计算,比如计算斐波那契数列
if n <= 1:
return n
else:
return cpu_bound_task(n-1) + cpu_bound_task(n-2)
# 这是一个I/O密集型的协程函数
async def io_bound_task(task_id):
print(f'IO任务 {task_id} 开始')
await asyncio.sleep(2) # 模拟I/O等待,比如网络请求
print(f'IO任务 {task_id} 结束')
return f'IO结果-{task_id}'
async def main():
# 1. 创建进程池执行器,通常进程数设为CPU核心数
with ProcessPoolExecutor(max_workers=4) as executor:
# 获取当前的事件循环
loop = asyncio.get_running_loop()
# 2. 提交CPU密集型任务到进程池
# run_in_executor 会返回一个 asyncio.Future 对象
cpu_future = loop.run_in_executor(executor, cpu_bound_task, 35)
print("CPU任务已提交到进程池,正在异步计算...")
# 3. 同时运行多个I/O密集型协程任务
io_tasks = [io_bound_task(i) for i in range(3)]
io_results = await asyncio.gather(*io_tasks, cpu_future)
# gather会等待所有任务完成,结果顺序与传入顺序一致
# 前三个是io_results,最后一个是cpu_future的结果
cpu_result = io_results[-1]
io_results = io_results[:-1]
print(f"所有I/O任务结果: {io_results}")
print(f"CPU任务结果: {cpu_result}")
if __name__ == '__main__':
start = time.time()
asyncio.run(main())
print(f"总耗时: {time.time() - start:.2f}秒")
代码要点解析:
- 分工明确:
cpu_bound_task是纯计算函数,用def定义,交给进程池。io_bound_task是协程函数,用async def定义,处理I/O。 - 核心方法:
loop.run_in_executor(executor, func, *args)。这是连接事件循环和进程池的桥梁。它把同步函数func放到executor(这里是进程池)里执行,并返回一个asyncio.Future,这样协程就能await它而不阻塞事件循环。 - 并发执行:
asyncio.gather同时并发执行多个协程任务,并把进程池任务返回的Future也一起gather进去,实现真正的“同时等待”。 - 进程池配置:
max_workers一般设为os.cpu_count()或稍小一点的值,避免过度切换。
另一种场景:在子进程中运行协程事件循环 如果你的CPU密集型任务内部也需要处理异步I/O,那可以把整个协程事件循环放到子进程里。但这更复杂一些:
import asyncio
from multiprocessing import Process
async def complex_async_work(n):
# 一个既包含计算又包含I/O的复杂协程任务
result = n * n # 一些计算
await asyncio.sleep(1) # 一些I/O
return result
def run_async_in_process(n):
# 这个函数会在子进程中运行,它负责启动事件循环
result = asyncio.run(complex_async_work(n))
print(f"子进程计算结果: {result}")
async def main():
# 在主进程的协程中创建子进程
p = Process(target=run_async_in_process, args=(5,))
p.start()
p.join()
if __name__ == '__main__':
asyncio.run(main())
总结一下:
- 简单场景(计算+I/O分离):用
ProcessPoolExecutor+run_in_executor,这是最清晰、最不容易出错的方式。 - 复杂场景(子进程内也需要异步):把
asyncio.run()放到multiprocessing.Process的目标函数里。
简单建议:用进程池处理CPU活,用协程处理I/O等。
把 fork 搞明白
一多进城就 multiprocessing
有没有想过要看 multiprocessing 的源码?
感谢回复,已经解决,因为多进程中共用了一个事件循环,add_reader 重复注册了 socket.fileno,所以报错了,用 new_event_loop 解决,与 fork、mutiprocessing 的用法和是否看了源码没有关系,另外我认为像 fork,mutiprocessing 这种系统接口函数和系统接口的高级封装直接用就是了,除非遇到相关必须要解决的问题和抱着学习的目的,没有必要看这种源码

