Python中半同步半异步模式的工作原理与实现疑问?

主进程通过 epoll 循环监听 socket,有新连接就分出去同步进程。进程数是根据 CPU 核心数创建的,毕竟创建多了 CPU 切换、创建销毁等影响性能。我尝试过多种方式,fork,进程池,等都不行。
请问有大佬遇到过吗?能贴下实现代码吗
Python中半同步半异步模式的工作原理与实现疑问?

14 回复

没看明白,是“不行”在哪里?


半同步半异步(Half-Sync/Half-Async)模式的核心思想是将并发任务的异步执行与同步服务分离。上层用同步方式写业务逻辑(简单),底层用异步I/O处理高并发(高效),中间通过一个队列解耦。

直接看一个用asyncio + 线程池实现的例子:

import asyncio
import concurrent.futures
from queue import Queue, Empty
import time

class HalfSyncHalfAsync:
    def __init__(self):
        # 异步事件循环
        self.loop = asyncio.new_event_loop()
        # 同步层队列(线程安全)
        self.task_queue = Queue()
        # 线程池处理同步任务
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
        
    async def async_layer(self):
        """异步层:模拟I/O密集型操作"""
        while True:
            try:
                # 从队列获取同步层提交的任务
                task_data = await self.loop.run_in_executor(
                    None, self.task_queue.get, True, 0.1
                )
                print(f"[Async] 收到任务: {task_data}")
                
                # 模拟异步处理(如网络请求)
                await asyncio.sleep(0.5)
                result = f"处理完成: {task_data}"
                
                # 将结果返回给同步层(通过回调或队列)
                print(f"[Async] {result}")
                
            except Empty:
                continue
            except Exception as e:
                print(f"[Async] 错误: {e}")

    def sync_layer(self):
        """同步层:用户以同步方式调用"""
        for i in range(5):
            # 同步调用(阻塞直到任务提交)
            task = f"任务-{i}"
            self.task_queue.put(task)
            print(f"[Sync] 提交: {task}")
            time.sleep(0.2)  # 模拟同步操作间隔

    def run(self):
        # 启动异步层任务
        async_task = asyncio.ensure_future(self.async_layer(), loop=self.loop)
        
        # 在另一个线程运行同步层
        with self.executor:
            # 提交同步层执行
            future = self.executor.submit(self.sync_layer)
            future.result()  # 等待同步层完成
            
        # 清理
        self.loop.stop()
        self.loop.run_until_complete(self.loop.shutdown_asyncgens())
        self.loop.close()

if __name__ == "__main__":
    hsla = HalfSyncHalfAsync()
    hsla.run()

工作原理:

  1. 同步层:用户代码像写普通同步程序一样调用sync_layer(),任务被放入队列后立即返回。
  2. 异步层:在事件循环中运行async_layer(),从队列取任务并用asyncio非阻塞处理。
  3. 队列桥接:线程安全的队列连接两层,避免直接混用同步/异步代码。

关键点:

  • 同步层不用关心await,适合写复杂业务逻辑
  • 异步层专注高效I/O调度,用asyncio管理并发
  • 通过run_in_executor让队列操作不阻塞事件循环

一句话总结:用队列隔离同步业务和异步I/O,各司其职。

进程数和 CPU 数不同步,我现在的做法有两种 1、无限 fork,2、用进程池

有点莫名其妙的,首先 epoll 不是应该用线程吗,除非你有隔离资源的需求;其次,进程数也不用和 CPU 数同步吧,现在 CPU 都是时钟分片,进程和线程比 CPU 数量多才能保持高 CPU 利用率

Py 有 GIL,楼主如果用进程池的话,任务如何调度、中断、恢复呢?总不可能做成任务队列吧?那样如果有长时间占用 CPU 资源的任务,后面的任务就等着排队了,要么就是做成 Go 那样的 Goroutine

队列是可以用 asyncio 来处理的。。。我相信 epoll 也有方案,只不过我没用过。

看看 gunicorn 的代码, 或许有用

无论是 PY2 还是 PY3 只要尝试长时间占用 CPU 资源就一定会被解析器强行剥夺,释放 GIL 让其他线程运行,我记得 PY2 是按照虚拟机指令条数,PY3 是按照时间片,不可能存在“有长时间占用 CPU 资源的任务,后面的任务就等着排队了”的情况

没看懂说的 新的连接 fork 和 进程数是根据 cpu 数量创建的 这两句矛盾了啊 意思是上限等于 cpu 数量么 用 select 肯定可以的 epoll 没试过 感觉 epoll 和创建进程不搭。。

我说的是楼主如果用进程池做的话,进程数:任务数= M:N,你说的是使用线程,并且线程数:任务数 = 1:1

每个进程都有独立的 GIL 锁

线程有 GIL 锁,所以才不考虑的。和 CPU 核心数同步,是为了减少切换和创建销毁造成的开销。最好是能绑定 CPU 核心。

是这样的:主进程不断循环用 epoll 监听 socket,有新链接,就分出去工作进程去非堵塞执行逻辑,写入写出等,遇到 eagain 就通过管道传给主进程再次循环。我是用 psutil 获取 CPU 核心,期望的效果是:创建 CPU 核心数*2 的进程。例如 epollin,epollout 事件触发后,马上放进 work 进程。一来减少切换和创建销毁的开销,二来兼顾 IO 等待时避免 CPU 空转。现在就卡在如何创建和复用有限进程

跨进程访问好像很费劲 我记得 multiprocessing 提供了一个代理用来访问其他进程的对象 不知道对你有用不

通过管道的方式能传递连接吗这个还真太清楚。。

回到顶部