INSTALLED_APPS = [ '...', 'channels', '...', ] ASGI_APPLICATION = 'server.routing.application' CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { "hosts": [(os.getenv('REDIS_SERVER_HOST', ''), int(os.getenv('REDIS_SERVER_PORT', '6379')))], }, }, }
# -*- coding: utf-8 -*- from channels.routing import ProtocolTypeRouter, URLRouter from device.consumers import QueryAuthMiddleware # websocket中間件 import device.routing # 路由 application = ProtocolTypeRouter({ 'websocket': QueryAuthMiddleware( URLRouter( device.routing.websocket_urlpatterns ) ) })
class QueryAuthMiddleware: """ WebSocket 認證中間件 """ def __init__(self, inner): # Store the ASGI application we were passed self.inner = inner def __call__(self, scope): query = parse.parse_qs(scope["query_string"].decode()) token = query.get('token', [None])[0] device_name = None try: login_info = json.loads(base64.b64decode(token).decode()) device_name = login_info['name'] logger.info('Device {} connect from {} port {}'.format(device_name, scope['client'][0], scope['client'][1])) # 驗證時間 gen_time = datetime.strptime(login_info['time'], '%Y%m%d%H%M%S') now_time = datetime.now() if abs(gen_time - now_time).total_seconds() > 5 * 60: raise ValueError('Device {} time validate fail: token time: {}, server time: {}'.format( device_name, gen_time.strftime('%Y-%m-%d %H:%M:%S'), now_time.strftime('%Y-%m-%d %H:%M:%S'))) # 驗證密碼 device = DeviceInfoModel.objects.get(name=device_name) # 獲取到設備 correct = device.check_password(login_info['password']) # 檢查密碼 if not correct: raise ValueError('Device {} password validate fail'.format(device_name)) except DeviceInfoModel.DoesNotExist: logger.info('Device {} can not find in DB'.format(device_name)) device = None except Exception as e: logger.error('Connect error {}'.format(e)) device = None finally: # Middleware 中必須手動關閉數據庫鏈接 # http://channels.readthedocs.io/en/latest/topics/authentication.html#custom-authentication close_old_connections() return self.inner(dict(scope, user=device))
""" ASGI entrypoint. Configures Django and then runs the application defined in the ASGI_APPLICATION setting. """ import os import django from channels.routing import get_default_application os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings") django.setup() application = get_default_application()
import threading import time import json import tarfile import base64 import hashlib import os import pickle from io import BytesIO from datetime import datetime import websocket def on_message(ws, message): print('on_message') msg = json.loads(message) print(msg) # print(msg['result']) # print(pickle.loads(base64.b64decode(msg['content']['2-5b4747af5f627dbcb5eaefeb']))) def on_error(ws, error): print(error) def on_close(ws): print("### closed ###") def on_open(ws): def run(*args): # for i in range(1): # tarobj = BytesIO() # tar = tarfile.open(fileobj=tarobj, mode='w:gz') # dir_list = filter(lambda name: os.path.isdir(name) and not name.startswith('.'), os.listdir(os.path.abspath(os.path.dirname('__file__')))) # for dir in dir_list: # tar.add(dir) # tar.close() # tarobj.seek(0) with open('D:\\software\\websocket\websocket\\face\\18032811042400\\0.jpg', 'rb') as f: base64_data = base64.b64encode(f.read()) s = base64_data.decode() file_obj = { 'timestamp': datetime.now().timestamp(), 'image': s } info = { 'command': 'TEST', 'content': file_obj, # 'content': ['30-5b691d365f627dee73c4b58d-common', '71-5b695edf5f627d762d5d651b'], } ws.send(json.dumps(info)) time.sleep(100) ws.close() print("thread terminating...") threading.Thread(target=run).start() if __name__ == "__main__": websocket.enableTrace(True) login_info = { 'name': '0000000000', 'password': hashlib.md5('0000000000'.encode('utf8')).hexdigest(), 'time': datetime.now().strftime('%Y%m%d%H%M%S') } b64_token = base64.b64encode(json.dumps(login_info).encode()).decode() url = "ws://xx.xx.xx.xx:8000/push/device/?token={}".format(b64_token) print(login_info) print('Connect {}'.format(url)) ws = websocket.WebSocketApp(url, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever()