Python中asyncio & aiohttp初入坑,向大佬们请教一些问题

需求是服务端单向向客户端(浏览器)推送消息。由于会出现同一用户同时登录着多个客户端的情况,希望能把消息同时推送给所有的终端。所以我是这么做的:

async def handler(request):
    admin = Admin(request)
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    request.app['websockets'][admin.id].add(ws)
    try:
        while True:
            msg = await admin.get_unread_msg(admin.id)
            for ws in request.app['websockets'][admin.id]:
                await ws.send_json({'data': msg})
    finally:
        request.app['websockets'][admin.id].discard(ws)
        await ws.close()
    return ws

question:这么做有没有问题 = =

然后消息队列用的是 redis + aioredis,上面的 get_unread_msg 我是这么做的:

async def get_unread_msg(self, admin_id):
    while True:
        data = await self.redis.lpop(f'msgs:{admin_id}')
        if data:
            return data
        await asyncio.sleep(0.1)

初始化 redis:

async def setup_redis(app):
    redis_url = app['config']['REDIS_URL']
    app['redis'] = await aioredis.create_redis_pool(redis_url)
    yield
    app['redis'].close()
    await app['redis'].wait_closed()

question:我在这里使用 blpop 会阻塞其它请求,这是为什么 = =

至于哪里来的其它请求,是我写了另一个请求用来添加消息:

async def add_msg(request):
    redis = request.app['redis']
    msg_id = await redis.incr('msg_id')
    import random
    msg = {
        'id': msg_id,
        'text': f'hello~{msg_id}'
    }
    await redis.rpush('msgs', json.dumps(msg))

这里会把消息都写进一个大列表里,所以还有一个后台任务做消息分发:

[@register_background_tasks](/user/register_background_tasks)
async def distribute_msgs(app):
    redis = app['redis']
    while True:
        data = await redis.lpop('msgs')
        if data:
            json_data = json.loads(data)
            to_admin = redis.rpush(f'msgs:{json_data["admin_id"]}', data)
            await asyncio.gather(to_sa, to_admin)
        await asyncio.sleep(0.1)

这里换成 blpop 也会阻塞请求= =

感谢大佬们读到这里,最后一个问题。。

如果我用 gunicorn 开启了多个 worker,会出现同一用户的多个请求发送到了不同的进程上,就没法保证所有连接都收到消息了。这种情况该怎么办?


Python中asyncio & aiohttp初入坑,向大佬们请教一些问题

1 回复

o(╥﹏╥)o 路过的大佬请伸出你们的援手~

回到顶部