Python中如何使用Django Channels实时tail某个文件
我写了一个 consumer 在前端的 websocket 请求过来后就开始对某个日志实时输出
class LogConsumer(WebsocketConsumer):
def disconnect(self, close_code):
print('关闭')
if getattr(self, 'f', None):
self.f.terminate()
def receive(self, text_data):
print(text_data)
log_file = 'xxxxx.log'
if not os.path.exists(log_file):
self.send(json.dumps({
'key': 0,
'data': '日志不存在'
}))
return
self.f = subprocess.Popen(
'tail -f -n 40 {}'.format(log_file),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
)
p = select.poll()
p.register(self.f.stdout)
line_number = 1
while True:
if p.poll(1):
msg = self.f.stdout.readline().decode("utf-8")
if not msg:
raise Exception
for m in str(msg).splitlines():
self.send(json.dumps({'key': line_number,
'data': m + '\n'
}))
line_number += 1
这样有一个问题就是会导致这个 consumer 处于 blocking 状态,导致 disconnect 无法被触发,进而无法释放进程
有什么好的解决办法吗?
Python中如何使用Django Channels实时tail某个文件
6 回复
两个线程或进程,一个 websocket,一个 subprocess。中间用 channels layer 连接。
要在Django Channels里实时tail文件,核心是结合异步文件监控和WebSocket。这里给个能跑起来的例子。
首先,确保你的requirements.txt有这些包:
Django>=4.0
channels>=4.0
channels-redis # 如果用Redis做通道层
watchfiles # 用于跨平台文件监控
1. 设置Channels和路由
在asgi.py里:
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from your_app import routing # 替换成你的应用名
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(
routing.websocket_urlpatterns
)
),
})
在routing.py里:
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/tail/(?P<file_path>.+)/$', consumers.TailFileConsumer.as_asgi()),
]
2. 核心Consumer逻辑
在consumers.py里:
import asyncio
import os
from channels.generic.websocket import AsyncWebsocketConsumer
from watchfiles import awatch
class TailFileConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.file_path = self.scope['url_route']['kwargs']['file_path']
if not os.path.exists(self.file_path):
await self.close(code=4004)
return
await self.accept()
self.task = asyncio.create_task(self.tail_file())
async def disconnect(self, close_code):
if hasattr(self, 'task'):
self.task.cancel()
async def tail_file(self):
"""异步监控文件变化并发送新内容"""
try:
# 先发送现有内容(可选)
with open(self.file_path, 'r') as f:
await self.send(text_data=f.read())
# 监控文件变化
async for changes in awatch(self.file_path):
for change_type, _ in changes:
if change_type == 1: # 1表示文件被修改
with open(self.file_path, 'r') as f:
f.seek(0, 2) # 跳到文件末尾
new_content = f.read()
if new_content:
await self.send(text_data=new_content)
except asyncio.CancelledError:
pass
except Exception as e:
await self.send(text_data=f"Error: {str(e)}")
await self.close()
3. 前端WebSocket客户端 简单写个HTML示例:
<!DOCTYPE html>
<html>
<body>
<pre id="output"></pre>
<script>
const filePath = encodeURIComponent('/var/log/your_app.log');
const ws = new WebSocket(`ws://${window.location.host}/ws/tail/${filePath}/`);
ws.onmessage = function(event) {
document.getElementById('output').textContent += event.data;
window.scrollTo(0, document.body.scrollHeight);
};
ws.onclose = function() {
document.getElementById('output').textContent += '\n[连接已关闭]';
};
</script>
</body>
</html>
关键点说明:
- 用
watchfiles.awatch()做异步文件监控,比轮询效率高 - Consumer连接时先发现有内容,然后监控变化
- 文件路径通过WebSocket URL参数传递,记得做安全验证
- 生产环境要加文件权限检查、大小限制和错误处理
一句话总结:用异步文件监控+WebSocket实现实时文件流推送。
同遇到这个问题 请问楼主如何解决
想了了个歪招,后端不能使用 while True 一直占用进程,前端 ws 握手连接成功后,使用定时器向后端 send message,后端每收到一次 message 去 stdout.readline 下
#3 我用了个比较丑陋的方法,见 append,没用到 channel
可以看看这个,正好实现了一样的功能
https://ops-coffee.cn/s/r5SpyTjRl0jJeAuYE4Q_-Q

