Python中如何将Django作为WebSocket服务器端的最佳实践

Python中如何将Django作为WebSocket服务器端的最佳实践

5 回复

对于将Django作为WebSocket服务器端,最佳实践是使用Django Channels库。它让Django原生支持WebSocket、HTTP2、聊天协议等。

首先安装Channels:

pip install channels channels-redis

配置settings.py

INSTALLED_APPS = [
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'channels',  # 添加Channels
]

# 设置ASGI应用
ASGI_APPLICATION = 'your_project.asgi.application'

# 配置通道层(使用Redis作为后端)
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}

创建asgi.py文件:

import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from your_app import routing

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(
            routing.websocket_urlpatterns
        )
    ),
})

创建路由文件routing.py

from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/some_path/$', consumers.YourConsumer.as_asgi()),
]

创建消费者(处理WebSocket连接):

from channels.generic.websocket import AsyncWebsocketConsumer
import json

class YourConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = "test_room"
        self.room_group_name = f"chat_{self.room_name}"
        
        # 加入房间组
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        
        await self.accept()
    
    async def disconnect(self, close_code):
        # 离开房间组
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )
    
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']
        
        # 发送消息到房间组
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message
            }
        )
    
    async def chat_message(self, event):
        message = event['message']
        
        # 发送消息到WebSocket
        await self.send(text_data=json.dumps({
            'message': message
        }))

最后运行Django时使用Daphne服务器:

daphne your_project.asgi:application

总结:用Channels库配合异步消费者来处理WebSocket。

感谢,看了下 channels 的文档,太强大啦~~!!!

dwebsocket

之前搜索到这个帖子,实现了一下,并且写了篇详细的文档,有需要的看下吧
https://mp.weixin.qq.com/s/hqaPrPS7w3D-9SeegQAB2Q

回到顶部