上一章,咱們講到,用redis共享數據,以及用redis中的隊列來實現一個簡單的消息傳遞。其實在真實的過程當中,不該該用redis來傳遞,最好用專業的消息隊列,咱們python中,用到最普遍的就是rabbitmq,雖然它是用erlang開發的,但真的很是好用,通過無數次驗證。若是你們不會安裝rabbitmq,請看我這篇文章,http://www.cnblogs.com/yueerwanwan0204/p/5319474.html 這篇文章講解了怎麼安裝rabbitmq以及簡單的使用它。html
咱們把上一章的圖再稍微修改一下,前端
其實在真實的項目中,也這樣,通常來講,利用redis在不一樣模塊之間共享數據,利用rabbitmq來進行消息傳遞。咱們這個項目只作到從web到flask,再到rabbitmq,傳遞給tcpserver,再下放給具體的tcpclient客戶端;其實還能夠反向傳遞,即從tcp的client到tcp服務器,再到rabbitmq,到前端tcp或者前端http,可是這個前端tcp或者http要基於循環模式的,flask確定不行。咱們從下一章開始講tornado,用tornado來接受,而且作一個websocket,就能夠下放下去。python
好了,說了這麼多,咱們來看一下代碼,首先,tcpserver這塊,咱們以前用redis的隊列作消息隊列,如今修改一下,修改的大概代碼以下:react
import pika from pika.adapters import twisted_connection RABBITMQ_HOST = 'localhost' RABBITMQ_PORT = 5672 RABBITMQ_USERNAME = 'rabbitmq01' RABBITMQ_PASSWORD = 'rabbitmq01' class RabbitMQ(object): _connection = None _channel_receive_from_http = None @staticmethod @defer.inlineCallbacks def init_mq(ip_address, port): credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) parameters = pika.ConnectionParameters(credentials=credentials) cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters) RabbitMQ._connection = yield cc.connectTCP(ip_address, port) defer.returnValue(1) @staticmethod @defer.inlineCallbacks def set_channel_receive_from_back(user_factory): """ 設置rabbitmq消息接受隊列的channel,而且作好循環任務 """ RabbitMQ._channel_receive_from_http = yield RabbitMQ._connection.channel() yield RabbitMQ._channel_receive_from_http.queue_declare(queue='front_tcp') queue_object, consumer_tag = yield RabbitMQ._channel_receive_from_http.basic_consume(queue='front_tcp', no_ack=True) l = task.LoopingCall(RabbitMQ.read_from_mq, queue_object, user_factory) l.start(0.5) defer.returnValue(1) @staticmethod @defer.inlineCallbacks def read_from_mq(queue_object, chat_factory): """ 讀取接受到的消息隊列消息,而且處理 """ ch, method, properties, body = yield queue_object.get() if body: log.msg('Accept data from http successful!') chat_factory.process_data_from_mq(body) defer.returnValue(1) defer.returnValue(0)
首先,你們要注意一下,因爲twisted是異步的,因此不能採用原先阻塞的函數,鏈接或者接受或者發送消息,全部跟rabbitmq的鏈接,發送,接受,都要異步化,即都要返回defer對象。由於鏈接rabbitmq的本質,其實就是socket的網絡行爲,任何網絡行爲都有可能被阻塞,一旦阻塞,異步的效率會極其低下。(之後咱們寫tornado也是這樣,必定要返回future對象)。web
我看到網上還有不少博客,在接受rabbitmq的消息的時候,竟然開了另一個進程或者線程,有時候這麼作,程序運行起來沒問題,但涉及到異步的時候,仍是會影響效率。都已經用異步的代碼了,就不該該大量使用多進程或者多線程。多進程或者多線程,會讓cpu調度頻繁切換,大量併發的時候,嚴重影響效率。redis
詳細看上面的代碼,簡單的解釋一下,json
init_mq就是初始化消息隊列,先加入用戶名,密碼,返回一個相似與token的東西,而後用twisted客戶端來鏈接rabbitmq,其實就是socket行爲,返回一個connection。flask
set_channel_receive_from_back設置channel,其實就是定義一個管道,我從這個管道接受東西。接受並讀取的過程其實就是寫一個循環任務,這個循環任務每0.5秒執行一次,你也能夠寫小一點,0.1秒執行一次,具體的看你須要設置。api
read_from_mq就是真正的讀取並處理的函數,我這邊在read_from_mq中,加了一個參數,就是這個工廠對象,由於接受的時候,一個工廠,就產生一個接受函數。而後讀取到消息之後,把消息傳遞到這個工廠對象的處理方法中,整個環節就完整了。服務器
RabbitMQ的3個方法全是靜態方法,因此我沒有生成RabbitMQ對象,直接使用這個類自己就能夠了。因此在運行的時候,又加了以下代碼。
cf = ChatFactory() task1 = task.LoopingCall(cf.check_users_online) task1.start(3, now=False) task_receive_data_from_mq = task.LoopingCall(cf.receive_from_mq) task_receive_data_from_mq.start(0.1, now=False) reactor.callLater(0.1, RabbitMQ.init_mq, RABBITMQ_HOST, RABBITMQ_PORT) reactor.callLater(0.5, RabbitMQ.set_channel_receive_from_back, cf) reactor.listenTCP(8124, cf) reactor.run()
看見我加的代碼沒有,一個init_mq,一個set_channel_receive_from_back。一個初始化消息隊列,初始化好之後,再設置channel,而且開始接受消息。
整個tcpserver這塊就算完成了,下面是整個tcpserver的代碼
# coding:utf-8 from twisted.internet.protocol import Factory, Protocol from twisted.internet import reactor, task, defer, protocol import struct import json from twisted.python import log import sys import time import txredisapi as redis import pika from pika.adapters import twisted_connection log.startLogging(sys.stdout) REDIS_HOST = 'localhost' REDIS_PORT = 6380 REDIS_DB = 4 REDIS_PASSWORD = 'dahai123' RABBITMQ_HOST = 'localhost' RABBITMQ_PORT = 5672 RABBITMQ_USERNAME = 'rabbitmq01' RABBITMQ_PASSWORD = 'rabbitmq01' redis_store = redis.lazyConnectionPool(dbid=4, host='localhost', port=6380, password='dahai123') @defer.inlineCallbacks def check_token(phone_number, token): token_in_redis = yield redis_store.hget('user:%s' % phone_number, 'token') if token != token_in_redis: defer.returnValue(False) else: defer.returnValue(True) class RabbitMQ(object): _connection = None _channel_receive_from_http = None @staticmethod @defer.inlineCallbacks def init_mq(ip_address, port): credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) parameters = pika.ConnectionParameters(credentials=credentials) cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters) RabbitMQ._connection = yield cc.connectTCP(ip_address, port) defer.returnValue(1) @staticmethod @defer.inlineCallbacks def set_channel_receive_from_back(user_factory): """ 設置rabbitmq消息接受隊列的channel,而且作好循環任務 """ RabbitMQ._channel_receive_from_http = yield RabbitMQ._connection.channel() yield RabbitMQ._channel_receive_from_http.queue_declare(queue='front_tcp') queue_object, consumer_tag = yield RabbitMQ._channel_receive_from_http.basic_consume(queue='front_tcp', no_ack=True) l = task.LoopingCall(RabbitMQ.read_from_mq, queue_object, user_factory) l.start(0.5) defer.returnValue(1) @staticmethod @defer.inlineCallbacks def read_from_mq(queue_object, chat_factory): """ 讀取接受到的消息隊列消息,而且處理 """ ch, method, properties, body = yield queue_object.get() if body: log.msg('Accept data from http successful!') chat_factory.process_data_from_mq(body) defer.returnValue(1) defer.returnValue(0) class Chat(Protocol): def __init__(self, factory): self.factory = factory self.phone_number = None self.state = "VERIFY" self.version = 0 self.last_heartbeat_time = 0 self.command_func_dict = { 1: self.handle_verify, 2: self.handle_single_chat, 3: self.handle_group_chat, 4: self.handle_broadcast_chat, 5: self.handle_heartbeat } self._data_buffer = bytes() def connectionMade(self): log.msg("New connection, the info is:", self.transport.getPeer()) def connectionLost(self, reason): log.msg("[%s]:斷線" % self.phone_number.encode('utf-8')) if self.phone_number in self.factory.users: del self.factory.users[self.phone_number] def dataReceived(self, data): """ 接受到數據之後的操做 """ self._data_buffer += data while True: length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12]) if length > len(self._data_buffer): return content = self._data_buffer[12:length] if command_id not in [1, 2, 3, 4, 5]: return if self.state == "VERIFY" and command_id == 1: self.handle_verify(content) if self.state == "DATA": self.handle_data(command_id, content) self._data_buffer = self._data_buffer[length:] if len(self._data_buffer) < 12: return def handle_heartbeat(self, content): """ 處理心跳包 """ self.last_heartbeat_time = int(time.time()) @defer.inlineCallbacks def handle_verify(self, content): """ 驗證函數 """ content = json.loads(content) phone_number = content.get('phone_number') token = content.get('token') result = yield check_token(phone_number, token) if not result: send_content = json.dumps({'code': 0}) self.send_content(send_content, 101, [phone_number]) length = 12 + len(send_content) version = self.version command_id = 101 header = [length, version, command_id] header_pack = struct.pack('!3I', *header) self.transport.write(header_pack + send_content) return if phone_number in self.factory.users: log.msg("電話號碼<%s>存在老的鏈接." % phone_number.encode('utf-8')) self.factory.users[phone_number].connectionLost("") self.factory.users.pop(phone_number) log.msg("歡迎, %s!" % (phone_number.encode('utf-8'),)) self.phone_number = phone_number self.factory.users[phone_number] = self self.state = "DATA" send_content = json.dumps({'code': 1}) self.send_content(send_content, 101, [phone_number]) def handle_data(self, command_id, content): """ 根據command_id來分配函數 """ self.command_func_dict[command_id](content) def handle_single_chat(self, content): """ 單播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_to = content.get('chat_to') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) self.send_content(send_content, 102, [chat_to]) def handle_group_chat(self, content): """ 組播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_to = content.get('chat_to') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) phone_numbers = chat_to self.send_content(send_content, 103, phone_numbers) def handle_broadcast_chat(self, content): """ 廣播 """ content = json.loads(content) chat_from = content.get('chat_from') chat_content = content.get('chat_content') send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) phone_numbers = self.factory.users.keys() self.send_content(send_content, 104, phone_numbers) def send_content(self, send_content, command_id, phone_numbers): """ 發送函數 """ length = 12 + len(send_content) version = self.version command_id = command_id header = [length, version, command_id] header_pack = struct.pack('!3I', *header) for phone_number in phone_numbers: if phone_number in self.factory.users.keys(): self.factory.users[phone_number].transport.write(header_pack + send_content) else: log.msg("Phone_number:%s 不在線." % phone_number.encode('utf-8')) class ChatFactory(Factory): def __init__(self): self.users = {} def buildProtocol(self, addr): return Chat(self) def check_users_online(self): for key, value in self.users.items(): if value.last_heartbeat_time != 0 and int(time.time()) - value.last_heartbeat_time > 4: log.msg("[%s]沒有檢測到心跳包,主動切斷" % key.encode('utf-8')) value.transport.abortConnection() @defer.inlineCallbacks def receive_from_mq(self): data = yield redis_store.rpop('front_tcp') if data: log.msg("接受到來自消息隊列的消息:", data) self.process_data_from_mq(data) def process_data_from_mq(self, data): loads_data = json.loads(data) command_id = loads_data.get('command_id') phone_numbers = loads_data.get('chat_to') chat_from = loads_data.get('chat_from') chat_content = loads_data.get('chat_content') content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content)) self.send_content(content, command_id, phone_numbers) def send_content(self, send_content, command_id, phone_numbers): """ 發送函數 """ length = 12 + len(send_content) version = 1100 command_id = command_id header = [length, version, command_id] header_pack = struct.pack('!3I', *header) for phone_number in phone_numbers: if phone_number in self.users.keys(): self.users[phone_number].transport.write(header_pack + send_content) else: log.msg("Phone_number:%s 不在線." % phone_number.encode('utf-8')) cf = ChatFactory() task1 = task.LoopingCall(cf.check_users_online) task1.start(3, now=False) task_receive_data_from_mq = task.LoopingCall(cf.receive_from_mq) task_receive_data_from_mq.start(0.1, now=False) reactor.callLater(0.1, RabbitMQ.init_mq, RABBITMQ_HOST, RABBITMQ_PORT) reactor.callLater(0.5, RabbitMQ.set_channel_receive_from_back, cf) reactor.listenTCP(8124, cf) reactor.run()
下面是web方面的代碼,web也是,以前用redis很簡單的作,如今換到rabbitmq,因爲這個例子很簡單,因此我就在request過程當中初始化rabbitmq了,整個代碼就很是簡單了,就是一個發送函數而已。
# coding:utf-8 from flask import Flask, request, jsonify, g, render_template, redirect, url_for, session, current_app from app.model import User, db_session import json from . import web import pika RABBITMQ_HOST = 'localhost' RABBITMQ_PORT = 5672 RABBITMQ_USERNAME = 'rabbitmq01' RABBITMQ_PASSWORD = 'rabbitmq01' @web.teardown_request def handle_teardown_request(exception): db_session.remove() @web.route('/send-command', methods=['GET', 'POST']) def send_command(): if request.method == 'GET': users = User.query.all() return render_template('web/send-command.html', users=users) else: data = request.get_json() command_id = data.get('command_id') chat_from = '13764408552' chat_to = data.get('chat_to') chat_content = data.get('content') if not chat_to or not chat_content or not command_id: return jsonify({'code': 0, 'message': '信息不完整'}) send_data = json.dumps(dict(command_id=command_id, chat_from=chat_from, chat_to=chat_to, chat_content=chat_content)) # current_app.redis.lpush('front_tcp', send_data) credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=credentials, port=RABBITMQ_PORT)) channel = connection.channel() channel.queue_declare(queue='front_tcp') channel.basic_publish(exchange='', routing_key='front_tcp', body=send_data) print "send json_data to front_tcp, the data is ", send_data connection.close() return jsonify({'code': 1, 'message': '發送成功'})
全部代碼更換完成,看一下具體效果吧
web上先發送一個消息。
隨便啓動一個客戶端,看看接受吧。
看見沒有,整個過程就所有打通了。
總結:整個twisted就講到這了,你們能夠看到,twisted我也不是特別熟悉,因此我一共就用了5章把它講完。從下一章開始,我開始講tornado,利用tornado作tcpserver,tcpclient,websocket服務器,由於tornado的源碼比較好讀,因此我重點也會放在tornado上。最近我在看reactjs,屆時我會用稍微好看一點的圖形界面,來作websocket頁面,tornado這個庫真正作到small strong smart,我一直喜歡小而精的庫。總之,我重點會放在tornado上,但願你們到時候會喜歡。