python manage.py startapp threewall
配置settings.pyhtml
STATIC_URL = '/static/' STATICFILES_DIRS = ( os.path.join(BASE_DIR, 'static'), )
pip install -U channels==2.0.2 channels_redis==2.1.1
pip install pyCryptodome
pip install django-simpleui
INSTALLED_APPS = [ 'simpleui', 'import_export', 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'app', 'threewall', 'channels', ]
from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter import threewall.routing application = ProtocolTypeRouter({ # (http->django views is added by default) # 普通的HTTP請求不須要咱們手動在這裏添加,框架會自動加載過來 'websocket': AuthMiddlewareStack( URLRouter( threewall.routing.websocket_urlpatterns ) ), })
緊接着,咱們須要在 Django 的配置文件中繼續配置 Channels 的 asgi 應用和通道層的信息:前端
#WSGI_APPLICATION = 'wechatDemo.wsgi.application' ASGI_APPLICATION = "wechatDemo.routing.application" # 上面新建的 asgi 應用 CHANNEL_LAYERS = { 'default': { # 這裏用到了 channels_redis 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { 'hosts': [('127.0.0.1', 6379)], # 配置你本身的 redis 服務信息 }, } }
windows 安裝redis地址:https://www.jianshu.com/p/e16d23e358c0python
from django.contrib.auth.models import User from django.db import models from django.utils.crypto import random # Create your models here. from django.utils.html import format_html # Create your models here. def rename(newname): def decorator(fn): fn.__name__ = newname return fn return decorator # 簽到表 class checkin(models.Model): headimgurl = models.URLField(max_length=256, default="", null=True, blank=True) openid = models.CharField(max_length=225, verbose_name="openid", blank=True, default="") nickname = models.CharField(max_length=225, verbose_name="暱稱", blank=True, default="") sex = models.CharField(max_length=225, verbose_name="性別", blank=True, default="") language = models.CharField(max_length=225, verbose_name="語言", blank=True, default="") city = models.CharField(max_length=225, verbose_name="城市", blank=True, default="") createTime = models.DateTimeField(auto_now_add=True, verbose_name="簽到時間") lastTime = models.DateTimeField(auto_now=True, verbose_name="修改時間") class Meta: verbose_name_plural = "簽到表" @rename("模板頭像") def showheadimgurl(self): return format_html("<img src='{}' style='width:50px'/>", self.headimgurl) def __str__(self): return self.nickname
from django.contrib import admin from threewall import models # Register your models here. @admin.register(models.checkin) class orderAdmin(admin.ModelAdmin): list_display = ("showheadimgurl", "openid", "nickname", "sex", "language", "city", "createTime", "lastTime") list_display_links = ("openid", "nickname") search_fields = ('nickname', "openid") list_per_page = 50
from django.shortcuts import render from wechatpy.oauth import WeChatOAuth from django.shortcuts import render, redirect from django.http import JsonResponse, HttpResponse, HttpResponseRedirect import time import datetime from django.conf import settings from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt from django.shortcuts import render import uuid from wechatpy import WeChatClient import os import json from wechatpy import WeChatPay from threewall import models from wechatpy.pay import dict_to_xml import base64 from Crypto.Cipher import AES # Create your views here. # 公衆號id AppID = "xxx" # 公衆號AppSecret AppSecret = "xxx" # 密鑰 key = "xxxx" # 服務號 client = WeChatClient(AppID, AppSecret) # 消息通道 from channels.layers import get_channel_layer channel_layer = get_channel_layer() from asgiref.sync import async_to_sync # Create your views here. def dwall(request): checkins = models.checkin.objects.values("headimgurl")[0:200] print(checkins.query) checkUserInfo = [] CurPersonNum = checkins.count() for item in checkins: checkUserInfo.append({"headimgurl": item["headimgurl"]}) if checkUserInfo.__len__() < 199: index = 0 while index < (199 - checkUserInfo.__len__()): index += 1 checkUserInfo.append({"headimgurl": "/static/3dwall/img/a.png"}) aes = AES.new(add_to_16(key), AES.MODE_ECB) # 先進行aes加密 ticks = str(time.time()) encrypt_aes = aes.encrypt(add_to_16(ticks)) # 用base64轉成字符串形式 encrypted_text = str(base64.encodebytes(encrypt_aes), encoding='utf-8') # 執行加密並轉碼返回bytes return render(request, "3dwall.html", {"checkUserInfo": checkUserInfo, "CurPersonNum": CurPersonNum, "encrypted_text": encrypted_text}) # 定義受權裝飾器 def getWeChatOAuth(redirect_url): return WeChatOAuth(AppID, AppSecret, redirect_url, 'snsapi_userinfo') def oauth(method): def warpper(request): if request.session.get('user_info', None) is None: code = request.GET.get('code', None) wechat_oauth = getWeChatOAuth(request.get_raw_uri()) url = wechat_oauth.authorize_url print(url) if code: try: wechat_oauth.fetch_access_token(code) user_info = wechat_oauth.get_user_info() print(user_info) except Exception as e: print(str(e)) # 這裏須要處理請求裏包含的 code 無效的狀況 # abort(403) else: # 建議存儲在用戶表 request.session['user_info'] = user_info else: return redirect(url) return method(request) return warpper @oauth def checkin(request): signature = request.GET.get("signature", None) if signature is None: return render(request, "checkerror.html") try: aes = AES.new(add_to_16(key), AES.MODE_ECB) # 優先逆向解密base64成bytes base64_decrypted = base64.decodebytes(signature.replace(' ', '+').encode(encoding='utf-8')) # 執行解密密並轉碼返回str decrypted_text = str(aes.decrypt(base64_decrypted), encoding='utf-8').replace('\0', '') except Exception as e: print("signature="+signature) return render(request, "expired.html") # 二維碼已過時 print(decrypted_text) ltime = time.localtime(float(decrypted_text)) qrTime = time.strftime("%Y-%m-%d %H:%M:%S", ltime) # 沒有必要再次轉換爲時間格式 # 得到系統當前時間-10秒,用於判斷二維碼是否過時 d = datetime.datetime.now() + datetime.timedelta(seconds=-10) t = d.timetuple() timeStamp = int(time.mktime(t)) timeStamp = float(str(timeStamp) + str("%06d" % d.microsecond)) / 1000000 if float(decrypted_text) < timeStamp: return render(request, "expired.html") # 二維碼已過時 print(qrTime) user_info = request.session.get('user_info') user_info = client.user.get(user_info["openid"]) print(user_info) subscribe = user_info["subscribe"] if subscribe == 0: return render(request, "isfollow.html") headimgurl = user_info["headimgurl"] nickname = user_info["nickname"] ischeck = models.checkin.objects.filter(openid=user_info["openid"]).first() if ischeck is None: models.checkin.objects.create(headimgurl=headimgurl, openid=user_info["openid"], nickname=nickname, sex=user_info["sex"], language=user_info["language"], city=user_info["country"] + "-" + user_info["province"] + "-" + user_info["city"]) else: ischeck.save() async_to_sync(channel_layer.group_send)("chat_roomName", {'type': 'chat_message', 'message': {"type": "message", "headimgurl": headimgurl, "nickname": nickname}}) return render(request, "checkin.html") # pip install pyCryptodome # str不是16的倍數那就補足爲16的倍數 def add_to_16(value): while len(value) % 16 != 0: value += '\0' return str.encode(value) # 返回bytes @csrf_exempt def signatureCheck(request): while True: time.sleep(5) # 初始化加密器 aes = AES.new(add_to_16(key), AES.MODE_ECB) # 先進行aes加密 ticks = str(time.time()) encrypt_aes = aes.encrypt(add_to_16(ticks)) # 用base64轉成字符串形式 encrypted_text = str(base64.encodebytes(encrypt_aes), encoding='utf-8') # 執行加密並轉碼返回bytes #print(encrypted_text) async_to_sync(channel_layer.group_send)("chat_roomName", {'type': 'chat_message', 'message': {"type": "signatureCheck", "signatureCheck": encrypted_text}})
import json,uuid import datetime import time from asgiref.sync import async_to_sync import multiprocessing from channels.generic.websocket import WebsocketConsumer class ChatConsumer(WebsocketConsumer): def connect(self): # 當 websocket 一連接上之後觸發該函數 self.room_name = self.scope['url_route']['kwargs']['room_name'] self.room_group_name = 'chat_%s' % self.room_name print(self.room_group_name) # 注意 `group_add` 只支持異步調用,因此這裏須要使用`async_to_sync`轉換爲同步調用 async_to_sync(self.channel_layer.group_add)( self.room_group_name, self.channel_name ) # 接受該連接 self.accept() def disconnect(self, close_code): # 斷開連接是觸發該函數 # 將該連接移出聊天室 async_to_sync(self.channel_layer.group_discard)( self.room_group_name, self.channel_name ) def receive(self, text_data): # 前端發送來消息時,經過這個接口傳遞 text_data_json = json.loads(text_data) message = text_data_json['message'] async_to_sync(self.channel_layer.group_send)( self.room_group_name, { # 這裏的type要在當前類中實現一個相應的函數, # 下劃線或者'.'的都會被Channels轉換爲下劃線處理, # 因此這裏寫 'chat.message'也沒問題 'type': 'chat_message', 'message': message + str(datetime.datetime.today()) } ) # 從聊天室拿到消息,後直接將消息返回回去 def chat_message(self, event): message = event['message'] # Send message to WebSocket self.send(text_data=json.dumps({ 'message': message }))
from django.urls import path from threewall import consumers websocket_urlpatterns = [ # 路由,指定 websocket 連接對應的 consumer path('ws/chat/<str:room_name>/', consumers.ChatConsumer), ]
from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter import threewall.routing application = ProtocolTypeRouter({ # (http->django views is added by default) # 普通的HTTP請求不須要咱們手動在這裏添加,框架會自動加載過來 'websocket': AuthMiddlewareStack( URLRouter( threewall.routing.websocket_urlpatterns ) ), })
注:須要靜態資源,請私下聯繫web