Python中异步编程与多worker并发处理的问题
现在在写一个服务端的程序,主要有三个操作,1.是等待网络请求接收数据(使用 socket 而不是 http ),2.是对数据转格式,第三个把整理好格式的数据写入数据库中。现在是想把 1,2,3 都写到一个协程中,然后开多个 worker 来进行处理,怎样可以使协程开出多个 worker 来应对多并发呢?目前打算使用 asyncio,如果不支持可以考虑 tornado,求各位看看有什么解决方法
Python中异步编程与多worker并发处理的问题
在Python里搞异步编程和多worker并发,这俩经常被混为一谈,但其实路子完全不一样。核心区别在于:异步(asyncio)是单线程里靠“协作式”任务切换来模拟并发,而多worker(多进程/多线程)是真·并行。
1. 异步编程 (asyncio)
这玩意儿适合I/O密集型任务,比如一堆网络请求。它在一个线程里跑,用async/await语法。当一个任务在等I/O(比如等数据库返回)时,事件循环就把CPU让给其他就绪的任务。代码看起来是顺序的,但其实是“假装”在同时干多件事。
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = ['http://example.com', 'http://example.org']
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks) # 这里“并发”发起请求
for r in results:
print(len(r))
# 运行
asyncio.run(main())
2. 多Worker并发
对于CPU密集型任务(比如计算圆周率),你得用多进程(multiprocessing)来绕过GIL,或者用多线程(threading)处理一些虽然受GIL限制但涉及阻塞I/O的场景。这是实打实的多个系统线程或进程同时跑。
from concurrent.futures import ProcessPoolExecutor
import math
def compute_pi(n):
# 模拟一个耗时的CPU计算
s = 0.0
for i in range(n):
s += (-1)**i / (2*i + 1)
return s * 4
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
# 提交多个任务到进程池
futures = [executor.submit(compute_pi, 10**6) for _ in range(4)]
results = [f.result() for f in futures]
print(results)
怎么选?
简单说:I/O堵得多用asyncio,CPU算得猛用multiprocessing,别瞎混用。
Twisted
我有个类似的做法,我是这样解决的。
因为我是多个线程共享一个数据库连接,每个线程都 execute,最后一起提交数据库。因此我在 savedata 中加了个锁
import threading
lock = threading.Lock()
from concurrent.futures import ThreadPoolExecutor
max_workers=64
sock_pool =ThreadPoolExecutor(max_workers=max_workers) #注意这 3 个 max_workers 不必都相同的
chgdata_pool = ThreadPoolExecutor(max_workers=max_workers)
chgdata_future = []
savedata_pool = ThreadPoolExecutor(max_workers=200)
savedata_future = []
def sock(参数):
接收 shock 数据的代码
chgdata_future.append(chgdata_pool.submit(chgdata,参数) ) ##异步委托一个清洗数据的函数 chgdata
其它代码
def chgdata(参数):
清洗数据的代码
savedata_future.append(savedata_pool.submit(savedata,参数)) ##异步委托一个保存数据的函数 savedata
其它代码
def savedata(参数):
保存语句生成
lock.acquire() #加个互斥锁
保存到数据库
lock.release() #释放锁
其它代码
if name == ‘main’:
执行 sock()之前的代码
sock_future = sock_pool.submit(sock,参数) for 参数 in 列表] ## 多个 sock 接收数据
for f in sock_future:
f.result()
for f in chgdata_future:
f.result()
for f in saveda’ta_future:
f.result()
conn.commit() #。这步你可以视你的实际需求放在 savadata 中。
每个 sock 接收数据后传递给 chgdata,不必等待 chgdata。每个 chgdata 清洗数据后传递给 savedata,不必等待 savedata。
这应该是楼主想要的效果

