django實時通信--channels2.x使用

1、背景

  在最近的項目中的一個需求是消息實時推送消息以及通知功能,項目使用django寫的因此決定採用django-channels來實現websocket進行實時通信。目前官方已經更新到2.1版本,相對於老的channels 1.x版本有了很大變化,不管是使用方式仍是功能,其中最大的變化莫過於2.x版本中帶來的asyncio特性,可以使用異步處理模式。本文內容將介紹channels2版本使用,因爲項目django是1.11,其中也遇到了一些坑,好比在channels在處理一次請求後hang住而後報錯,後面修改了下django1.11版本的一點源碼得以解決,2.0版本應該不會有問題。html

2、channels介紹

  channels是以django插件的形式存在,它不只能處理http請求,還提供對websocket、MQTT等長鏈接支持。不只如此,channels在保留了原生django的同步和易用的特性上還帶來了異步處理方式(channels2.X版本),而且將django自帶的認證系統以及session集成到模塊中,擴展性很是強。官方文檔:https://channels.readthedocs.io/en/latest/index.htmlpython

 

3、安裝以及安裝需求

  channels2.0最低django版本要求是1.11+,python3.5+。筆者的版本是django1.11,直接安裝可能有問題,如下是測試經過的版本。nginx

筆者的相關版本以下:web

Django==1.11.10
channels==2.1.4
channels-redis==2.3.1
asgiref==2.1.6
asgi-redis==1.4.3

若是django版本比較高直接採用pip安裝:redis

pip3 install channels
pip3 install channels-redis  #可選的,官方推薦若是使用redis做爲channel layer

redis安裝能夠參考博客:https://www.cnblogs.com/wdliu/p/9360286.htmldjango

4、開始使用

1、配置settings.py

  筆者採用的redis做爲channel layer(關於其介紹請移步至https://channels.readthedocs.io/en/latest/topics/channel_layers.html),它是實現消息推送的核心,在項目的settings.py中:json

註冊channles app:服務器

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'cmdb',
    'channels', #註冊app
]

配置channels layer:websocket

ASGI_APPLICATION = 'devops.routing.application'
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('10.1.210.33', 6379)], #需修改
        },
    },
}

 

2、路由配置

在項目settings文件同級目錄中新增routing.pysession

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import deploy.routing

application = ProtocolTypeRouter({
    'websocket': AuthMiddlewareStack(
        URLRouter(
            deploy.routing.websocket_urlpatterns# 指明路由文件是devops/routing.py
        )
    ),
})

最後在app裏配置路由和對應的消費者,筆者這裏是devops下的routing.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd
from django.conf.urls import url

from . import consumers

websocket_urlpatterns = [
    url(r'^ws/deploy/(?P<service_name>[^/]+)/$', consumers.DeployResult), #consumers.DeployResult 是該路由的消費者
]

項目目錄結構以下:

 

 

3、編寫webscoket消息處理方法(消費者)

首先說明,消費者是Channels代碼的基本單元,當一個新的Socket進入的時候,Channels會根據路由表找到正確的消費者,如下代碼中每一個方法均可以看做一個消費者,他們消費不一樣的event,好比剛剛接受鏈接時候connect方法進行消費處理並接受鏈接,關閉websocket時候使用disconnect進行消費處理。

deploy/consumers.py:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author:wd

from channels.generic.websocket import AsyncWebsocketConsumer
import json

class DeployResult(AsyncWebsocketConsumer):
    async def connect(self):
        self.service_uid = self.scope["url_route"]["kwargs"]["service_uid"]
        self.chat_group_name = 'chat_%s' % self.service_uid
        # 收到鏈接時候處理,
        await self.channel_layer.group_add(
            self.chat_group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):
        # 關閉channel時候處理
        await self.channel_layer.group_discard(
            self.chat_group_name,
            self.channel_name
        )

    # 收到消息
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']
        print("收到消息--》",message)
        # 發送消息到組
        await self.channel_layer.group_send(
            self.chat_group_name,
            {
                'type': 'client.message',
                'message': message
            }
        )

    # 處理客戶端發來的消息
    async def client_message(self, event):
        message = event['message']
        print("發送消息。。。",message)
        # 發送消息到 WebSocket
        await self.send(text_data=json.dumps({
            'message': message
        }))

以上代碼部分說明:

1.self.scope是單個鏈接傳入的詳細信息,其中包含了請求的session、以及django認證系統中的用戶信息等;

2.async...await 是python3.5以後的新異步特性,基於asyncio模塊;

4、發起webscoket請求

利用js發起websocket請求

function InitWebSocket() {
            var websocket = new WebSocket( 
                'ws://' + window.location.host + '/ws/deploy/tasks/' );

            websocket.onmessage = function (e) {
                var data = JSON.parse(e.data);
                var message = '\n' + data['message'];
                document.querySelector('#deploy-res').innerText += (message + '\n');
            };
        }

 

5、發送消息到channel

不管是消息的推送或者消息的接受,都是通過channel layer進行傳輸,如下是發送消息示例,

from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync


channel_layer = get_channel_layer()
def send_channel_msg(channel_name, msg):
     """
        send msg to channel
        :param channel_name: 
        :param msg: 
        :return: 
        """
    async_to_sync(channel_layer.group_send)(channel_name,
                                                 {"type": "deploy.run", "text": msg})

6、生產部署

大多數django的應用部署方式都採用的是nginx+uwsgi進行部署,當django集成channels時候,因爲uwsgi不能處理websocket請求,因此咱們須要asgi服務器來處理websocket請求,官方推薦使用daphne。下一篇文章將介紹nginx+supervisor+daphne+uwsgi進行生產部署。

相關文章
相關標籤/搜索