Python中异步asyncio的问题请教
想写一个分析日志的工具,用 asyncio 处理的时候,想大家帮忙看一下哪里有问题,实际处理日志的时候,readlines 的时候,一个 650m 的文件需要 3 秒多,然后往字典里面加的时候需要 8 秒多,所以一个文件要 12 秒多,
现在的效果是,多个文件消耗的时间就是 12*n,就是说并没有提升,不是说 await 的时候,会把当前执行的内容挂起,然后执行下一个任务么,可能是我哪里有问题,麻烦大家给看看,代码如下。
import asyncio
import re
import time
from pathlib import Path
ip_find = re.compile(r'((25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d))).){3}(25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))')
ip_database = dict()
async def ip_address(log):
"""分析 ip"""
with open(log) as f:
log_data = f.readlines()
try:
for log_ips in log_data:
ip_search = ip_find.search(log_ips)
if ip_search:
ip_database[ip_search.group(0)] = ip_database.get(ip_search.group(0), 0) + 1
except Exception as e:
print(e)
async def generator(log):
await ip_address(log)
if __name__ == '__main__':
path = Path(r"D:\anlysis_log")
start = time()
loop = asyncio.get_event_loop()
tasks = [generator(x) for x in path.iterdir()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
Python中异步asyncio的问题请教
asyncio 挂起的是 io 等待,可不包括正则啊
我无法理解你的问题
用 aiofiles 或者将读取文件的操作放到线程池 /进程池里面
- asyncio 的任务是显式的,有 await 才会切回到 event loop,不是在 function 前面加 async 就行。可以试试 https://github.com/Tinche/aiofiles
- 这里的瓶颈在磁盘 IO,asyncio 作用应该不会太明显;作用更大的地方是多个 IO 并行执行的时候,这里在读磁盘的时候就去那里执行 CPU 操作,另一个任务可能在等网络请求返回
你没理解 1 楼的意思,他是说你的程序消耗的时间主要在正则查找这里,也就是这条语句:ip_search = ip_find.search(log_ips)。
好的谢谢,我看下这个库
我原来理解的是如果 await 的函数在坐 io 等待,就会切换 event loop,就是我在 ip_address 中等待读取文件,这个时间应该切换到下一个 event,等待读取后再继续操作,现在看来理解是有问题的,我去看看 aiofiles 库,多谢。
但是上面文件读取也是在耗时啊,多少会有点儿提升的把
asyncio 好像不支持异步的读取文件,即使设置为非阻塞读取模式,所以是不会引起切换的。
[Asyncio Wiki]( https://github.com/python/asyncio/wiki/ThirdParty#filesystem)
看到了 感觉不适合用 python 写了
问题不在异 asyncio 不支持异步文件读取。run_in_executor 之类的很容易把同步代码封装成异步代码。
瓶颈在#4 说的磁盘 IO,你就一个磁盘。正则表达式那块儿可以用 concurrent.futures.ProcessPoolExecutor + asyncio.get_event_loop().run_in_executor() 解决,前提是你有多核处理器
正则那儿可以用 pcre with jit 加速。换个语言也快不到哪儿去
现在用 aiofiles 是异步读取的文件,用 async for 去异步执行正则匹配的时候,print 的代码也看到是异步的,但是超级慢,比去掉 async 快了 15 倍 以上 ,现在我晕了已经,不知道咋回事。
没错的 前面有 r
贴代码啊
那么你连 r 是什么意思都没明白,r是拒绝 Python 字符串转义啊,并不会影响正则的意思。
你要说 r 让 “.” 不生效(具有正则表达式含义),那么 “\d” “(” “)” “{” “}” “[0-5]” 也不会生效啊。
>>> import re
>>> re.compile(r’((25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d))).){3}(25[0-5]|2[0-4]\d|((1\d{2})|([1-9]?\d)))’).search(“3|3|3|
3”)
<re.Match object; span=(0, 7), match=‘3|3|3|3’>
另外,不要做干净正则,做个脏正则就行了,
((?:\d{1,3}.){3}\d{1,3})
在反复出现没搜索到的情况(就是说 [不是] 一行单一个 IP 结束的情况)
受教了,是我想偏了,而日志中又没出现这种情况,让我以为没问题
就是把读取文件改成 aiofiles, for 改成 async for
我看了 debug 之后发现,每次进入 async for 的时候,都会 wait,重新回到 await ip_address(log) 中,然后到 event loop 去看有没有其他等待的 event,然后导致每一行在循环的时候都去 await 一次,结果特别慢,我个人理解,不知道对不对,请指教,async for 不是这样用的么?
代码如下async def ip_address(log):
“”“分析 ip”""
async with open(log) as f:
try:
async for log_ips in f:
ip_search = ip_find.search(log_ips)
if ip_search:
ip_database[ip_search.group(0)] = ip_database.get(ip_search.group(0), 0) + 1
except Exception as e:
print(e)
加 async 并不会让你的同步代码变成异步
好好了解一下异步风格里的”让出执行”权吧
看看下面的文章。划重点: asyncio.gather(), futures, ProcessPoolExecutor。理解 futures 你就知道怎么用同步风格写异步代码
https://blog.konpat.me/python-turn-sync-functions-to-async/
https://docs.python.org/3/library/asyncio-eventloop.html#id14
好的我仔细看下 第一次用 有点儿懵


