上一章開頭咱們說,要鏈接以前flask系列文章中的用戶,結果篇幅不夠,沒有實現。html
今天咱們把它實現一下。話說,不一樣模塊之間,該如何聯繫在一塊兒,一般都是mysql、redis、rabbitmq還有RPC等,之因此着重講redis,由於我太喜歡這個內存數據庫了。small stong simple。這就跟我喜歡flask、tornado而不太喜歡django和twisted同樣(之後咱們着重講tornado,源碼比較簡單直觀,哈哈,雖然本身用着twisted,但真不喜歡twisted這種大型庫,好多虛類的繼承,太過於注重設計模式。好了,廢話說到這。)python
這篇文章的主要目的就以下圖所示,不但在http和tcp之間共享數據,並且能在http端發送數據到tcpserver服務器,而後下發到tcpclient。mysql
在twisted中訪問redis,就是找一個redis版的twisted庫就行了,在redis官網,有推薦,其源碼在這 https://github.com/fiorix/txredisapi 我一直使用,很穩定,操做也極其簡單。react
咱們就用這個庫,利用以前flask系列中,login的時候,返回的token進行驗證,把http模塊和tcp模塊聯繫到一塊兒。服務器端代碼以下:jquery
frontTCP.pygit
# coding:utf-8 from twisted.internet.protocol import Factory, Protocol from twisted.internet import reactor, task, defer import struct import json from twisted.python import log import sys import time import txredisapi as redis log.startLogging(sys.stdout) REDIS_HOST = 'localhost' REDIS_PORT = 6380 REDIS_DB = 4 REDIS_PASSWORD = 'dahai123' redis_store = redis.lazyConnectionPool(dbid=4, host='localhost', port=6380, password='dahai123') @defer.inlineCallbacks def check_token(phone_number, token): token_in_redis = yield redis_store.hget('user:%s' % phone_number, 'token') if token != token_in_redis: defer.returnValue(False) else: defer.returnValue(True) class Chat(Protocol): def __init__(self, factory): self.factory = factory self.phone_number = None self.state = "VERIFY" self.version = 0 self.last_heartbeat_time = 0 self.command_func_dict = { 1: self.handle_verify, 2: self.handle_single_chat, 3: self.handle_group_chat, 4: self.handle_broadcast_chat, 5: self.handle_heartbeat } self._data_buffer = bytes() def connectionMade(self): log.msg("New connection, the info is:", self.transport.getPeer()) def connectionLost(self, reason): log.msg("[%s]:斷線" % self.phone_number.encode('utf-8')) if self.phone_number in self.factory.users: del self.factory.users[self.phone_number] def dataReceived(self, data): """ 接受到數據之後的操做 """ self._data_buffer += data while True: length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12]) if length > len(self._data_buffer): return content = self._data_buffer[12:length] if command_id not in [1, 2, 3, 4, 5]: return if self.state == "VERIFY" and command_id == 1: self.handle_verify(content) if self.state == "DATA": self.handle_data(command_id, content) self._data_buffer = self._data_buffer[length:] if len(self._data_buffer) < 12: return def handle_heartbeat(self, content): """ 處理心跳包 """ self.last_heartbeat_time = int(time.time()) @defer.inlineCallbacks def handle_verify(self, content): """ 驗證函數 """ content = json.loads(content) phone_number = content.get('phone_number') token = content.get('token') result = yield check_token(phone_number, token) if not result: send_content = json.dumps({'code': 0}) self.send_content(send_content, 101, [phone_number]) length = 12 + len(send_content) version = self.version command_id = 101 header = [length, version, command_id] header_pack = struct.pack('!3I', *header) self.transport.write(header_pack + send_content) return if phone_number in self.factory.users: log.msg("電話號碼<%s>存在老的鏈接." % phone_number.encode('utf-8')) self.factory.users[phone_number].connectionLost("") self.factory.users.pop(phone_number) log.msg("歡迎, %s!" % (phone_number.encode('utf-8'),)) self.phone_number = phone_number self.factory.users[phone_number] = self self.state = "DATA" send_content = json.dumps({'code': 1}) self.send_content(send_content, 101, [phone_number]) def handle_data(self, command_id, content): """ 根據command_id來分配函數 """ self.command_func_dict[command_id](content) def handle_single_chat(self, content): """ 單播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_to = content.get('chat_to') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) self.send_content(send_content, 102, [chat_to]) def handle_group_chat(self, content): """ 組播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_to = content.get('chat_to') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) phone_numbers = chat_to self.send_content(send_content, 103, phone_numbers) def handle_broadcast_chat(self, content): """ 廣播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) phone_numbers = self.factory.users.keys() self.send_content(send_content, 104, phone_numbers) def send_content(self, send_content, command_id, phone_numbers): """ 發送函數 """ length = 12 + len(send_content) version = self.version command_id = command_id header = [length, version, command_id] header_pack = struct.pack('!3I', *header) for phone_number in phone_numbers: if phone_number in self.factory.users.keys(): self.factory.users[phone_number].transport.write(header_pack + send_content) else: log.msg("Phone_number:%s 不在線." % phone_number.encode('utf-8')) class ChatFactory(Factory): def __init__(self): self.users = {} def buildProtocol(self, addr): return Chat(self) def check_users_online(self): for key, value in self.users.items(): if value.last_heartbeat_time != 0 and int(time.time()) - value.last_heartbeat_time > 4: log.msg("[%s]沒有檢測到心跳包,主動切斷" % key.encode('utf-8')) value.transport.abortConnection() cf = ChatFactory() task1 = task.LoopingCall(cf.check_users_online) task1.start(3, now=False) reactor.listenTCP(8124, cf) reactor.run()
上一章有點地方有錯誤,此次在這一塊兒更正了。好比users,在每一個Protocol裏面不保存,直接存儲在Factory裏面,每次引用的時候,直接去取就能夠了;還有,若是通過驗證以前,若是驗證錯誤,users裏面是沒有鏈接的值的,只能self.transport.write(),不能經過send_content()來發送。github
好了,上面2處修改了,而且在此基礎上,加了一個訪問redis的函數,很是簡單,跟http訪問同樣,就是要注意異步化的問題。web
再看看客戶端的代碼,客戶端這邊要獲取token,首先要引用flask restful api 系列中,咱們模擬的client.py,此次我也把它引用進來了,先登陸,獲取token,拿到token之後,再用tcpclient進行驗證,其實這個在生產環境中也這麼作的。ajax
下面是frontClient.py的代碼。redis
# coding:utf-8 from twisted.internet import reactor, task from twisted.internet.protocol import Protocol, ClientFactory import struct from twisted.python import log import sys import json from client import API_1_1 log.startLogging(sys.stdout) class EchoClient(Protocol): def __init__(self): self.command_func_dict = { 101: self.handle_verify_s, 102: self.handle_single_chat_s, 103: self.handle_group_chat_s, 104: self.handle_broadcast_chat_s } self.version = 0 self.state = "VERIFY" self.phone_number = "" def connectionMade(self): log.msg("New connection", self.transport.getPeer()) def dataReceived(self, data): length, self.version, command_id = struct.unpack('!3I', data[:12]) content = data[12:length] if self.state == "VERIFY" and command_id == 101: self.handle_verify_s(content) else: self.handle_data(command_id, content) def handle_data(self, command_id, pack_data): self.command_func_dict[command_id](pack_data) def connectionLost(self, reason): log.msg("connection lost") def handle_verify_s(self, pack_data): """ 接受驗證結果 """ content = json.loads(pack_data) code = content.get('code') if code == 1: log.msg('驗證經過') else: log.msg('驗證沒有經過,請從新輸入,程序暫停') reactor.stop() self.state = "Data" def handle_single_chat_s(self, pack_data): """ 接受單聊 """ content = json.loads(pack_data) chat_from = content.get('chat_from') chat_content = content.get('chat_content') log.msg("[單聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8'))) def handle_group_chat_s(self, pack_data): """ 接受組聊 """ content = json.loads(pack_data) chat_from = content.get('chat_from') chat_content = content.get('chat_content') log.msg("[組聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8'))) def handle_broadcast_chat_s(self, pack_data): """ 接受廣播 """ content = json.loads(pack_data) chat_from = content.get('chat_from') chat_content = content.get('chat_content') log.msg("[羣聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8'))) def send_verify(self, phone_number, token): """ 發送驗證 """ content = json.dumps(dict(phone_number=phone_number, token=token)) self.send_data(content, 1) def send_single_chat(self, chat_from, chat_to, chat_content): """ 發送單聊內容 """ content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content)) self.send_data(content, 2) def send_group_chat(self, chat_from, chat_to, chat_content): """ 發送組聊內容 """ content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content)) self.send_data(content, 3) def send_broadcast_chat(self, chat_from, chat_content): """ 發送羣聊內容 """ content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) self.send_data(content, 4) def send_data(self, send_content, command_id): """ 發送函數 """ length = 12 + len(send_content) version = self.version command_id = command_id header = [length, version, command_id] header_pack = struct.pack('!3I', *header) self.transport.write(header_pack + send_content) def send_heartbeat(self): """ 發送心跳包 """ length = 12 version = self.version command_id = 5 header = [length, version, command_id] header_pack = struct.pack('!3I', *header) self.transport.write(header_pack) class EchoClientFactory(ClientFactory): def __init__(self): self.p = EchoClient() def startedConnecting(self, connector): log.msg("Started to connect") def buildProtocol(self, addr): log.msg("Connected.") return self.p def clientConnectionFailed(self, connector, reason): log.msg("Lost connection. Reason:", reason) def clientConnectionLost(self, connector, reason): log.msg("Connection failed. Reason:", reason) if __name__ == '__main__': api = API_1_1() chat_from = sys.argv[1] chat_password = sys.argv[2] u = api.login(chat_from, chat_password) token = api.token cf = EchoClientFactory() chat_from = sys.argv[1] all_phone_numbers = ['13565208554', '13764408552', '1390854961g'] all_phone_numbers.remove(chat_from) import random task_send_heartbeat = task.LoopingCall(cf.p.send_heartbeat) task_send_heartbeat.start(2, now=False) reactor.callLater(10, cf.p.send_verify, chat_from, token) reactor.callLater(20, cf.p.send_group_chat, chat_from, all_phone_numbers, '你好,這是10秒的時候發送') reactor.callLater(30, cf.p.send_group_chat, chat_from, all_phone_numbers, '你好,這是20秒的時候發送') reactor.connectTCP('192.168.5.60', 8124, cf) reactor.run()
分別把以前項目中的帳號拉出來運行一下吧。客戶端的認證函數也改變了,先引用以前的api客戶端,直接獲取正確的token,把token拿來發給tcp服務器端,tcp服務器端再到redis裏面去找,若是正確,就驗證經過,不然,返回code=0給客戶端,這時候服務器端的記錄當前客戶端狀態仍是未驗證經過,所以下面的客戶端再發其餘請求,服務器端所有丟棄。這跟http的思想是同樣的。
這是一個客戶端的調試結果,看,一切都正常。
yudahai@yudahaiPC:tcpserver$ python frontClient.py 13565208554 123456 2016-06-24 13:28:36+0800 [-] Log opened. 2016-06-24 13:28:36+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7fe377d25440> 2016-06-24 13:28:36+0800 [-] Started to connect 2016-06-24 13:28:36+0800 [Uninitialized] Connected. 2016-06-24 13:28:36+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124) 2016-06-24 13:28:46+0800 [EchoClient,client] 驗證經過 2016-06-24 13:28:53+0800 [EchoClient,client] [組聊][13764408552]:你好,這是10秒的時候發送 2016-06-24 13:29:03+0800 [EchoClient,client] [組聊][13764408552]:你好,這是20秒的時候發送
其實利用redis把http模塊和tcp模塊集合起來比較簡單,但難的地方在於設計思想,如何很好的經過redis把幾個模塊聯繫起來。
上面其實本質上講的就是如何經過redis來共享狀態。
下面咱們再深刻一下,經過純web端發送命令到後臺,而後後臺接受到之後,經過redis來作消息系統,原本這部分應該是rabbitmq的事,畢竟rabbitmq是專門作消息系統的。但簡單的消息系統能夠用redis作,redis中有個list模型,每一個消息發送的時候從左邊push進來,接受的時候從右邊pop,這樣就是一個簡單的消息系統。這邊用redis先作一個簡單的,主要可讓你們很是直觀的看到twisted如何做爲消費者客戶端運行的,下一章講rabbitmq的時候,就更簡單了。
好了,廢話少說,如今以前flask restful api那一個系列的項目中,加一個web頁面,進入web頁面,只有一個按鈕,輸入內容,發送一次,就廣播一次。
下面是代碼和具體的頁面。
先是原來的flask項目中,咱們增長一個web藍圖,這個我在flask restful api的第七篇 http://www.cnblogs.com/yueerwanwan0204/p/5522749.html 中講過,增長相應的文件夾web,而後在裏面添加2個文件__init__.py,view.py。
結構圖下下所示:
編輯原來的run.py文件,添加藍圖指向
from app_1_0 import api as api_1_0_blueprint app.register_blueprint(api_1_0_blueprint, url_prefix='/api/v1000') from api_1_1 import api as api_1_1_blueprint app.register_blueprint(api_1_1_blueprint, url_prefix='/api/v1100') from web import web as web_blueprint app.register_blueprint(web_blueprint, url_prefix='/web')
而後在web/__init__.py下面添加藍圖對象
# coding:utf-8 from flask import Blueprint web = Blueprint('web', __name__) from . import view
web/view.py就跟簡單了,就渲染一個頁面,同時具備get和post方法
# coding:utf-8 from flask import Flask, request, jsonify, g, render_template, redirect, url_for, session, current_app from app.model import User, db_session import json from . import web @web.teardown_request def handle_teardown_request(exception): db_session.remove() @web.route('/send-command', methods=['GET', 'POST']) def send_command(): if request.method == 'GET': users = User.query.all() return render_template('web/send-command.html', users=users) else: data = request.get_json() command_id = data.get('command_id') chat_from = '13764408552' chat_to = data.get('chat_to') chat_content = data.get('content') print data if not chat_to or not chat_content or not command_id: return jsonify({'code': 0, 'message': '信息不完整'}) send_data = json.dumps(dict(command_id=command_id, chat_from=chat_from, chat_to=chat_to, chat_content=chat_content)) current_app.redis.lpush('front_tcp', send_data) return jsonify({'code': 1, 'message': '發送成功'})
get的時候,就渲染;post的時候,接受頁面上傳的數據。
頁面就一個頁面,主要使用ajax上傳,因爲本人好長時間沒有開發html了,因此頁面醜了一點,js代碼也醜了一點,可是能用,之後有空,我優化一下,你們先看吧,功能達到了。
templates/web/send-command.html的代碼以下:
<!DOCTYPE html> <html lang="zh_CN"> <head> <meta charset="UTF-8"> <title>發送命令</title> <script src="../../static/js/jquery-2.1.4.min.js"></script> </head> <body> <div> <select id="single_object"> {% for user in users %} <option value="{{ user.phone_number }}">{{ user.phone_number }}</option> {% endfor %} </select> <input type="text" name="single_content"> <button id="single_chat">單聊</button> </div> <br> <br> <br> <br> <div> <select multiple id="group_object"> {% for user in users %} <option value="{{ user.phone_number }}">{{ user.phone_number }}</option> {% endfor %} </select> <input type="text" name="group_content"> <button id="group_chat">組聊</button> </div> <br> <br> <br> <br> <div> <input type="text" name="broadcast_content"> <button id="broadcast_chat">羣聊</button> </div> <script> var baseurl = '/web/'; $(function(){ $("#single_chat").click(function(){ var chat_to = []; chat_to.push($("#single_object option:selected").val()); var content = $("input[name=single_content]").val(); console.log("chat_to:" + chat_to + " content:" + content); $.ajax({ type: "POST", url: baseurl + "send-command", data: JSON.stringify({chat_to:chat_to, content:content, command_id:102}), dataType: "json", contentType: "application/json", success: function(data){ if (data["code"] == 1){ $("input[name=single_content]").val(""); console.log(data["message"]); }else { console.log(data["message"]); } } }); }); $("#group_chat").click(function(){ var chat_tos = []; var chat_to = $("#group_object option:selected").each(function(){ chat_tos.push($(this).val()); }); var content = $("input[name=group_content]").val(); console.log("chat_to:" + chat_tos + " content:" + content); $.ajax({ type: "POST", url: baseurl + "send-command", data: JSON.stringify({chat_to:chat_tos, content:content, command_id:103}), dataType: "json", contentType: "application/json", success: function(data){ if (data["code"] == 1){ $("input[name=group_content]").val(""); console.log(data["message"]); }else { console.log(data["message"]); } } }); }); $("#broadcast_chat").click(function(){ var chat_to = []; {% for user in users %} chat_to.push("{{ user.phone_number }}"); {% endfor %} var content = $("input[name=broadcast_content]").val(); console.log("content:" + content); $.ajax({ type: "POST", url: baseurl + "send-command", data: JSON.stringify({chat_to:chat_to, content:content, command_id:104}), dataType: "json", contentType: "application/json", success: function(data){ if (data["code"] == 1){ $("input[name=broadcast_content]").val(""); console.log(data["message"]); }else { console.log(data["message"]); } } }); }); }); </script> </body> </html>
效果有點醜,
這樣就能夠直接發送了,發送到http服務器端,http服務器再把數據打包成json格式,發送到frontTCP端,那天然,frontTCP須要增長一點代碼,以前的Protocol不變,只是在Factory裏面增長2個函數,再增長一個循環任務,不停的接受redis的消息。
frontTCP新增代碼以下:
class ChatFactory(Factory): def __init__(self): self.users = {} def buildProtocol(self, addr): return Chat(self) def check_users_online(self): for key, value in self.users.items(): if value.last_heartbeat_time != 0 and int(time.time()) - value.last_heartbeat_time > 4: log.msg("[%s]沒有檢測到心跳包,主動切斷" % key.encode('utf-8')) value.transport.abortConnection() @defer.inlineCallbacks def receive_from_mq(self): data = yield redis_store.rpop('front_tcp') if data: log.msg("接受到來自消息隊列的消息:", data) self.process_data_from_mq(data) def process_data_from_mq(self, data): loads_data = json.loads(data) command_id = loads_data.get('command_id') phone_numbers = loads_data.get('chat_to') chat_from = loads_data.get('chat_from') chat_content = loads_data.get('chat_content') content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) self.send_content(content, command_id, phone_numbers) def send_content(self, send_content, command_id, phone_numbers): """ 發送函數 """ length = 12 + len(send_content) version = 1100 command_id = command_id header = [length, version, command_id] header_pack = struct.pack('!3I', *header) for phone_number in phone_numbers: if phone_number in self.users.keys(): self.users[phone_number].transport.write(header_pack + send_content) else: log.msg("Phone_number:%s 不在線." % phone_number.encode('utf-8')) cf = ChatFactory() task1 = task.LoopingCall(cf.check_users_online) task1.start(3, now=False) task_receive_data_from_mq = task.LoopingCall(cf.receive_from_mq) task_receive_data_from_mq.start(0.1, now=False) reactor.listenTCP(8124, cf) reactor.run()
receive_from_mq就是接受來之redis的消息,異步化一下,而後建一個循環任務task_receive_data_from_mq,這個循環任務,每0.1秒觸發一次(之後rabbitmq也是這樣),
若是隊列消息裏面有數據,就處理,不然繼續循環。
process_data_from_mq這是拿到具體的data,而後處理的過程,基本就解包、打包,而後發送。
send_content這就是發送函數,我基本就把Protocol裏面的發送函數從新抄了一遍,之後咱們會作一個虛類,而後具體的處理函數來繼承它,此次我就直接抄了。
好了,整個過程就這樣,咱們來運行一下,啓動2個客戶端,看看客戶端接受狀況吧。
yudahai@yudahaiPC:tcpserver$ python frontClient.py 13764408552 123456 2016-06-24 16:01:09+0800 [-] Log opened. 2016-06-24 16:01:09+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7fd9a7eff3b0> 2016-06-24 16:01:09+0800 [-] Started to connect 2016-06-24 16:01:09+0800 [Uninitialized] Connected. 2016-06-24 16:01:09+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124) 2016-06-24 16:01:10+0800 [EchoClient,client] 驗證經過 2016-06-24 16:01:19+0800 [EchoClient,client] [單聊][13764408552]:ddddddddd 2016-06-24 16:01:32+0800 [EchoClient,client] [組聊][13764408552]:fghytjhnuyjuyjmuikiuk 2016-06-24 16:01:41+0800 [EchoClient,client] [羣聊][13764408552]:ffffffffffffffffffffffffffffff 2016-06-24 16:18:26+0800 [EchoClient,client] [組聊][13764408552]:你好,這是web組聊 2016-06-24 16:18:27+0800 [EchoClient,client] [羣聊][13764408552]:你好,這是web羣聊
看,是否是全接受到了?
這章就講到這,主要講到了若是經過redis把不一樣的模塊聯繫在一塊兒,其實本質上就是把客戶端的狀態在模塊之間共享;以後咱們講了如何經過redis作一個簡單的消息隊列,這個實際上是rabbitmq的特性,之因此要先講一下,就是用最簡單的方式來預熱一下,由於rabbitmq的應用很廣,可能一會兒接受不了。還有就是把上一章的一些小bug解決掉。至於異步化,這個概念稍微有點大(好吧,我也不是研究特別的深,之後我會專門抽出一章講這個內容)。