Python3 如何实现发送 Request 并异步处理 Response 的 IPC 通信?
用 NodeJs 的的 Rest API 的话,对端收到了请求先返回响应然后不 return 的话继续执行函数接下去的代码:
app.get(’/url’, function(req, res) {
// Do something…
res.json(some_json_response);
// 继续干活
};
Python 正在查资料中,找了一会儿还没弄明白好一点的解决方案。。求助。。
Python3 如何实现发送 Request 并异步处理 Response 的 IPC 通信?
3 回复
用 asyncio 配合 aiohttp 就行。核心是开两个异步任务:一个发请求,一个用队列收响应。下面是个完整例子:
import asyncio
import aiohttp
from typing import Optional
class AsyncRequestHandler:
def __init__(self):
self.response_queue = asyncio.Queue()
self.session: Optional[aiohttp.ClientSession] = None
async def fetch(self, url: str):
"""发送请求并等待响应"""
if not self.session:
self.session = aiohttp.ClientSession()
try:
async with self.session.get(url) as response:
data = await response.text()
await self.response_queue.put((url, data, response.status))
except Exception as e:
await self.response_queue.put((url, f"Error: {e}", None))
async def process_responses(self):
"""从队列处理响应"""
while True:
url, data, status = await self.response_queue.get()
print(f"URL: {url}")
print(f"Status: {status}")
print(f"Data length: {len(data) if data else 0}")
print("-" * 40)
self.response_queue.task_done()
async def run(self, urls):
"""主运行函数"""
# 启动响应处理器
processor = asyncio.create_task(self.process_responses())
# 并发发送所有请求
tasks = [asyncio.create_task(self.fetch(url)) for url in urls]
await asyncio.gather(*tasks)
# 等待队列处理完成
await self.response_queue.join()
processor.cancel()
if self.session:
await self.session.close()
# 使用示例
async def main():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/404"
]
handler = AsyncRequestHandler()
await handler.run(urls)
if __name__ == "__main__":
asyncio.run(main())
这个模式把请求和响应解耦了:fetch() 只管发请求和塞结果到队列,process_responses() 专门从队列里取出来处理。用 asyncio.Queue 做 IPC 通信的通道,安全又简单。
要扩展的话,可以加多个消费者协程,或者用 callbacks 注册处理函数。不过基本思路就是这样:队列当消息总线,协程之间异步通信。
总结:用 asyncio.Queue 做消息通道就行。
<iframe src="https://www.youtube.com/embed/idLtMISlgy8" class="embedded_video" allowfullscreen="" type="text/html" id="ytplayer" frameborder="0"></iframe>
科普视频


