在1.9版本以後,Django實現了對Channels的支持,他所使用的是WebSocket通訊,解決了實時通訊的問題,並且在使用WebSocket進行通訊的同時依舊可以支持HTTP通訊。web
在此結構中必須有硬性要求,具體以下:django
新的目錄以下: |-- channels_example | |--channels_example | |-- __init__.py | |-- settings.py | |-- urls.py | |-- wsgi.py | |-- routing.py #必須 | |-- consumer.py #必須 | |-- asgi.py | |-- manage.py
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'channels', ]
CHANNEL_LAYERS = { "default": { "BACKEND": "asgiref.inmemory.ChannelLayer", "ROUTING": "channels_example.routing.channel_routing", }, }
須要注意的是 ROUTING 參數,他是用來指定WebSocket表單的位置,當有WebSocket請求訪問時,就會根據這個路徑找到相應表單,調用相應的函數進行處理。
channels_example.routing 就是咱們剛纔建好的routing,py文件,裏面的channel_routing咱們下面會進行填充。json
from channels.routing import route import consumers channel_routing = [ route('websocket.connect', consumers.ws_connect), route('websocket.disconnect', consumers.ws_disconnect), # route('websocket.receive', consumers.ws_message), route('websocket.receive', consumers.ws_message_uuid), ]
from django.http import HttpResponse from channels.handler import AsgiHandler #message.reply_channel 一個客戶端通道的對象 #message.reply_channel.send(chunk) 用來惟一返回這個客戶端 #一個管道大概會持續30s def ws_connect(message): auth = True if not auth: reply = json.dumps({'error': error}) message.reply_channel.send({'text': reply, 'close': True}) else: reply = "{}" message.reply_channel.send({'text': reply}) print(">>> %s connected" % str(message)) def ws_disconnect(message): print("<<< %s disconnected" % str(message)) # with message_queue.mutex: # message_queue.queue.clear() while not message_queue.empty(): try: message_queue.get(False) except Empty: continue message_queue.task_done() def ws_message_uuid(message): task = Task.create(message) if task: message_queue.put(task)
Tornado在websocket模塊中提供了一個WebSocketHandler類。這個類提供了和已鏈接的客戶端通訊的WebSocket事件和方法的鉤子。當一個新的WebSocket鏈接打開時,open方法被調用,而on_message和on_close方法分別在鏈接接收到新的消息和客戶端關閉時被調用。跨域
此外,WebSocketHandler類還提供了write_message方法用於向客戶端發送消息,close方法用於關閉鏈接。瀏覽器
class EchoHandler(tornado.websocket.WebSocketHandler): def open(self): self.write_message('connected!') def on_message(self, message): self.write_message(message)
正如你在咱們的EchoHandler實現中所看到的,open方法只是使用WebSocketHandler基類提供的write_message方法向客戶端發送字符串"connected!"。每次處理程序從客戶端接收到一個新的消息時調用on_message方法,咱們的實現中將客戶端提供的消息原樣返回給客戶端。這就是所有!讓咱們經過一個完整的例子看看實現這個協議是如何簡單的吧。安全
當一個WebSocket鏈接創建後被調用。服務器
當客戶端發送消息message過來時被調用,注意此方法必須被重寫。websocket
當WebSocket鏈接關閉後被調用。網絡
向客戶端發送消息messagea,message能夠是字符串或字典(字典會被轉爲json字符串)。若binary爲False,則message以utf8編碼發送;二進制模式(binary=True)時,可發送任何字節碼。session
關閉WebSocket鏈接。
判斷源origin,對於符合條件(返回判斷結果爲True)的請求源origin容許其鏈接,不然返回403。能夠重寫此方法來解決WebSocket的跨域請求(如始終return True)。
#coding=utf-8 import uuid import os from works.actions import work import hashlib import json import Queue from threading import Thread import numpy as np import cv2 import base64 import jwt import tornado.gen from handlers.base_handler import BaseWebSocket from config import MEDIA_ROOT import time message_queue = Queue.PriorityQueue() def work_loop(): while True: task = message_queue.get() iuuid = task.uuid offset_top = task.offset_top image_data = task.image_data channel = task.channel zoom = task.zoom rType = task.rType responseType = task.responseType print(">>> len: %d | current offset: %d" % (message_queue.qsize(), offset_top)) filename = str(uuid.uuid1()) + '.jpg' filepath = os.path.join(MEDIA_ROOT, filename) with open(filepath, 'wb') as f: f.write(image_data.decode("base64")) if zoom != 1.0: im = cv2.imread(filepath) if im is None: continue osize = im.shape[1], im.shape[0] size = int(im.shape[1] * zoom), int(im.shape[0] * zoom) im = cv2.resize(im, size) cv2.imwrite(filepath, im) try: reply = work(filepath, use_crop=False, result=rType,responseType=responseType) except Exception as e: print("!!!!!! %s -> %s caused error" % (iuuid, filename)) print(e) cmd = u"cp %s %s" % (filepath, os.path.join(MEDIA_ROOT, 'rb_' + filename)) os.system(cmd.encode('utf-8')) continue if responseType == 'url': # rtn_url = 'http://101.236.17.104:3389/upload/' + 'rb_' + filename rtn_url = 'http://192.168.0.254:8000/upload/' + 'rb_' + filename reply = {'url': rtn_url, 'uuid': iuuid} reply['uuid'] = iuuid channel.write_message({'text': json.dumps(reply)}) print '%s end time:' % channel, time.time() class BrowserWebSocket(BaseWebSocket): '''瀏覽器websocket服務器''' def open(self): '''新的WebSocket鏈接打開時被調用''' # message = {} # remote_ip = self.request.remote_ip # message['query_string']=self.get_argument('query_string') # message['remote_ip']=remote_ip # auth, error = verify_auth_token(message) auth = True error = 'error' if not auth: reply = json.dumps({'error': error}) self.write_message({'text': reply, 'close': True}) else: reply = "{}" self.write_message({'text': reply}) print(">>> %s connected" % self.request.remote_ip) def on_message(self, message): '''鏈接收到新消息時被調用''' print '%s start time:'%self,time.time() task = Task.create(message,self) if task: message_queue.put(task) @tornado.gen.coroutine def on_messages(self, message): '''鏈接收到新消息時被調用''' task = Task.create(message,self) if task: message_queue.put(task) def on_close(self): '''客戶端關閉時被調用''' print("<<< %s disconnected" % str(self.request.remote_ip)) # with message_queue.mutex: # message_queue.queue.clear() while not message_queue.empty(): try: message_queue.get(False) except Queue.Empty: continue message_queue.task_done() def check_origin(self, origin): '''容許WebSocket的跨域請求''' return True class Task(object): def __init__(self, uuid, offset_top, image_data, channel, zoom, rType, responseType, *args): self.uuid = uuid self.offset_top = int(float(offset_top)) self.image_data = image_data self.channel = channel self.zoom = zoom self.rType = rType self.responseType = responseType @classmethod def create(clz, message,sel): # data = message.get('text') data = message try: params = json.loads(data[:150]) image_data = data[150:] image_data = image_data.replace(" ", "+") params['image_data'] = image_data params['channel'] = sel # add Type if params.get('responseType') is None: params['responseType'] = 'url' # request type if params.get('rType') is None: params['rType'] = 'rl' task = Task(**params) except ValueError as e: task = None print(">>>message data error!") print(e) return task def __cmp__(self, other): return cmp(self.offset_top, other.offset_top) def verify_auth_token(message): '''token 驗證''' token = message.get('query_string') secret_key = 'aoiakai' try: payload = jwt.decode(token, secret_key, algorithms=['HS256']) if payload.get('ip') != message.get('remote_ip'): return False, 'ip mismatch' except jwt.ExpiredSignatureError as e: print(e) return False, 'token expired' except Exception as e: print(e) return False, 'enter correct token' return True, '' work_thread = Thread(target=work_loop) work_thread.daemon = True work_thread.start()