Python3 如何实现发送 Request 并异步处理 Response 的 IPC 通信?

用 NodeJs 的的 Rest API 的话,对端收到了请求先返回响应然后不 return 的话继续执行函数接下去的代码:
app.get(’/url’, function(req, res) {
// Do something…
res.json(some_json_response);
// 继续干活
};
Python 正在查资料中,找了一会儿还没弄明白好一点的解决方案。。求助。。
Python3 如何实现发送 Request 并异步处理 Response 的 IPC 通信?

3 回复

asyncio 配合 aiohttp 就行。核心是开两个异步任务:一个发请求,一个用队列收响应。下面是个完整例子:

import asyncio
import aiohttp
from typing import Optional

class AsyncRequestHandler:
    def __init__(self):
        self.response_queue = asyncio.Queue()
        self.session: Optional[aiohttp.ClientSession] = None

    async def fetch(self, url: str):
        """发送请求并等待响应"""
        if not self.session:
            self.session = aiohttp.ClientSession()
        
        try:
            async with self.session.get(url) as response:
                data = await response.text()
                await self.response_queue.put((url, data, response.status))
        except Exception as e:
            await self.response_queue.put((url, f"Error: {e}", None))

    async def process_responses(self):
        """从队列处理响应"""
        while True:
            url, data, status = await self.response_queue.get()
            print(f"URL: {url}")
            print(f"Status: {status}")
            print(f"Data length: {len(data) if data else 0}")
            print("-" * 40)
            self.response_queue.task_done()

    async def run(self, urls):
        """主运行函数"""
        # 启动响应处理器
        processor = asyncio.create_task(self.process_responses())
        
        # 并发发送所有请求
        tasks = [asyncio.create_task(self.fetch(url)) for url in urls]
        await asyncio.gather(*tasks)
        
        # 等待队列处理完成
        await self.response_queue.join()
        processor.cancel()
        
        if self.session:
            await self.session.close()

# 使用示例
async def main():
    urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/status/404"
    ]
    
    handler = AsyncRequestHandler()
    await handler.run(urls)

if __name__ == "__main__":
    asyncio.run(main())

这个模式把请求和响应解耦了:fetch() 只管发请求和塞结果到队列,process_responses() 专门从队列里取出来处理。用 asyncio.Queue 做 IPC 通信的通道,安全又简单。

要扩展的话,可以加多个消费者协程,或者用 callbacks 注册处理函数。不过基本思路就是这样:队列当消息总线,协程之间异步通信。

总结:用 asyncio.Queue 做消息通道就行。


aiohttp 应该就是你想要的。
但是不用想太多,和 nodejs 的肯定有区别。

<iframe src="https://www.youtube.com/embed/idLtMISlgy8" class="embedded_video" allowfullscreen="" type="text/html" id="ytplayer" frameborder="0"></iframe>
科普视频
回到顶部