Python中如何使用Django Channels实时tail某个文件

我写了一个 consumer 在前端的 websocket 请求过来后就开始对某个日志实时输出

class LogConsumer(WebsocketConsumer):
def disconnect(self, close_code):
    print('关闭')
    if getattr(self, 'f', None):
        self.f.terminate()

def receive(self, text_data):
    print(text_data)
    log_file = 'xxxxx.log'

    if not os.path.exists(log_file):
        self.send(json.dumps({
            'key': 0,
            'data': '日志不存在'
        }))
        return
    self.f = subprocess.Popen(
        'tail -f -n 40 {}'.format(log_file),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        shell=True,
    )
    p = select.poll()
    p.register(self.f.stdout)
    line_number = 1
    while True:
        if p.poll(1):
            msg = self.f.stdout.readline().decode("utf-8")
            if not msg:
                raise Exception
            for m in str(msg).splitlines():
                self.send(json.dumps({'key': line_number,
                                        'data': m + '\n'
                                        }))
                line_number += 1

这样有一个问题就是会导致这个 consumer 处于 blocking 状态,导致 disconnect 无法被触发,进而无法释放进程

有什么好的解决办法吗?


Python中如何使用Django Channels实时tail某个文件

6 回复

两个线程或进程,一个 websocket,一个 subprocess。中间用 channels layer 连接。


要在Django Channels里实时tail文件,核心是结合异步文件监控和WebSocket。这里给个能跑起来的例子。

首先,确保你的requirements.txt有这些包:

Django>=4.0
channels>=4.0
channels-redis  # 如果用Redis做通道层
watchfiles      # 用于跨平台文件监控

1. 设置Channels和路由asgi.py里:

import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from your_app import routing  # 替换成你的应用名

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(
            routing.websocket_urlpatterns
        )
    ),
})

routing.py里:

from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/tail/(?P<file_path>.+)/$', consumers.TailFileConsumer.as_asgi()),
]

2. 核心Consumer逻辑consumers.py里:

import asyncio
import os
from channels.generic.websocket import AsyncWebsocketConsumer
from watchfiles import awatch

class TailFileConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.file_path = self.scope['url_route']['kwargs']['file_path']
        if not os.path.exists(self.file_path):
            await self.close(code=4004)
            return
        await self.accept()
        self.task = asyncio.create_task(self.tail_file())

    async def disconnect(self, close_code):
        if hasattr(self, 'task'):
            self.task.cancel()

    async def tail_file(self):
        """异步监控文件变化并发送新内容"""
        try:
            # 先发送现有内容(可选)
            with open(self.file_path, 'r') as f:
                await self.send(text_data=f.read())

            # 监控文件变化
            async for changes in awatch(self.file_path):
                for change_type, _ in changes:
                    if change_type == 1:  # 1表示文件被修改
                        with open(self.file_path, 'r') as f:
                            f.seek(0, 2)  # 跳到文件末尾
                            new_content = f.read()
                            if new_content:
                                await self.send(text_data=new_content)
        except asyncio.CancelledError:
            pass
        except Exception as e:
            await self.send(text_data=f"Error: {str(e)}")
            await self.close()

3. 前端WebSocket客户端 简单写个HTML示例:

<!DOCTYPE html>
<html>
<body>
    <pre id="output"></pre>
    <script>
        const filePath = encodeURIComponent('/var/log/your_app.log');
        const ws = new WebSocket(`ws://${window.location.host}/ws/tail/${filePath}/`);
        
        ws.onmessage = function(event) {
            document.getElementById('output').textContent += event.data;
            window.scrollTo(0, document.body.scrollHeight);
        };
        
        ws.onclose = function() {
            document.getElementById('output').textContent += '\n[连接已关闭]';
        };
    </script>
</body>
</html>

关键点说明:

  • watchfiles.awatch()做异步文件监控,比轮询效率高
  • Consumer连接时先发现有内容,然后监控变化
  • 文件路径通过WebSocket URL参数传递,记得做安全验证
  • 生产环境要加文件权限检查、大小限制和错误处理

一句话总结:用异步文件监控+WebSocket实现实时文件流推送。

同遇到这个问题 请问楼主如何解决

想了了个歪招,后端不能使用 while True 一直占用进程,前端 ws 握手连接成功后,使用定时器向后端 send message,后端每收到一次 message 去 stdout.readline 下

#3 我用了个比较丑陋的方法,见 append,没用到 channel

可以看看这个,正好实现了一样的功能
https://ops-coffee.cn/s/r5SpyTjRl0jJeAuYE4Q_-Q

回到顶部