在最近的項目中的一個需求是消息實時推送消息以及通知功能,項目使用django寫的因此決定採用django-channels來實現websocket進行實時通信。目前官方已經更新到2.1版本,相對於老的channels 1.x版本有了很大變化,不管是使用方式仍是功能,其中最大的變化莫過於2.x版本中帶來的asyncio特性,可以使用異步處理模式。本文內容將介紹channels2版本使用,因爲項目django是1.11,其中也遇到了一些坑,好比在channels在處理一次請求後hang住而後報錯,後面修改了下django1.11版本的一點源碼得以解決,2.0版本應該不會有問題。html
channels是以django插件的形式存在,它不只能處理http請求,還提供對websocket、MQTT等長鏈接支持。不只如此,channels在保留了原生django的同步和易用的特性上還帶來了異步處理方式(channels2.X版本),而且將django自帶的認證系統以及session集成到模塊中,擴展性很是強。官方文檔:https://channels.readthedocs.io/en/latest/index.htmlpython
channels2.0最低django版本要求是1.11+,python3.5+。筆者的版本是django1.11,直接安裝可能有問題,如下是測試經過的版本。nginx
筆者的相關版本以下:web
Django==1.11.10 channels==2.1.4 channels-redis==2.3.1 asgiref==2.1.6 asgi-redis==1.4.3
若是django版本比較高直接採用pip安裝:redis
pip3 install channels pip3 install channels-redis #可選的,官方推薦若是使用redis做爲channel layer
redis安裝能夠參考博客:https://www.cnblogs.com/wdliu/p/9360286.htmldjango
筆者採用的redis做爲channel layer(關於其介紹請移步至https://channels.readthedocs.io/en/latest/topics/channel_layers.html),它是實現消息推送的核心,在項目的settings.py中:json
註冊channles app:服務器
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'cmdb', 'channels', #註冊app ]
配置channels layer:websocket
ASGI_APPLICATION = 'devops.routing.application' CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { "hosts": [('10.1.210.33', 6379)], #需修改 }, }, }
在項目settings文件同級目錄中新增routing.pysession
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter import deploy.routing application = ProtocolTypeRouter({ 'websocket': AuthMiddlewareStack( URLRouter( deploy.routing.websocket_urlpatterns# 指明路由文件是devops/routing.py ) ), })
最後在app裏配置路由和對應的消費者,筆者這裏是devops下的routing.py:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from django.conf.urls import url from . import consumers websocket_urlpatterns = [ url(r'^ws/deploy/(?P<service_name>[^/]+)/$', consumers.DeployResult), #consumers.DeployResult 是該路由的消費者 ]
項目目錄結構以下:
首先說明,消費者是Channels代碼的基本單元,當一個新的Socket進入的時候,Channels會根據路由表找到正確的消費者,如下代碼中每一個方法均可以看做一個消費者,他們消費不一樣的event,好比剛剛接受鏈接時候connect方法進行消費處理並接受鏈接,關閉websocket時候使用disconnect進行消費處理。
deploy/consumers.py:
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # Author:wd from channels.generic.websocket import AsyncWebsocketConsumer import json class DeployResult(AsyncWebsocketConsumer): async def connect(self): self.service_uid = self.scope["url_route"]["kwargs"]["service_uid"] self.chat_group_name = 'chat_%s' % self.service_uid # 收到鏈接時候處理, await self.channel_layer.group_add( self.chat_group_name, self.channel_name ) await self.accept() async def disconnect(self, close_code): # 關閉channel時候處理 await self.channel_layer.group_discard( self.chat_group_name, self.channel_name ) # 收到消息 async def receive(self, text_data): text_data_json = json.loads(text_data) message = text_data_json['message'] print("收到消息--》",message) # 發送消息到組 await self.channel_layer.group_send( self.chat_group_name, { 'type': 'client.message', 'message': message } ) # 處理客戶端發來的消息 async def client_message(self, event): message = event['message'] print("發送消息。。。",message) # 發送消息到 WebSocket await self.send(text_data=json.dumps({ 'message': message }))
以上代碼部分說明:
1.self.scope是單個鏈接傳入的詳細信息,其中包含了請求的session、以及django認證系統中的用戶信息等;
2.async...await 是python3.5以後的新異步特性,基於asyncio模塊;
利用js發起websocket請求
function InitWebSocket() { var websocket = new WebSocket( 'ws://' + window.location.host + '/ws/deploy/tasks/' ); websocket.onmessage = function (e) { var data = JSON.parse(e.data); var message = '\n' + data['message']; document.querySelector('#deploy-res').innerText += (message + '\n'); }; }
不管是消息的推送或者消息的接受,都是通過channel layer進行傳輸,如下是發送消息示例,
from channels.layers import get_channel_layer from asgiref.sync import async_to_sync channel_layer = get_channel_layer() def send_channel_msg(channel_name, msg): """ send msg to channel :param channel_name: :param msg: :return: """ async_to_sync(channel_layer.group_send)(channel_name, {"type": "deploy.run", "text": msg})
大多數django的應用部署方式都採用的是nginx+uwsgi進行部署,當django集成channels時候,因爲uwsgi不能處理websocket請求,因此咱們須要asgi服務器來處理websocket請求,官方推薦使用daphne。下一篇文章將介紹nginx+supervisor+daphne+uwsgi進行生產部署。