twsited(5)--不一樣模塊用rabbitmq傳遞消息

  上一章,咱們講到,用redis共享數據,以及用redis中的隊列來實現一個簡單的消息傳遞。其實在真實的過程當中,不該該用redis來傳遞,最好用專業的消息隊列,咱們python中,用到最普遍的就是rabbitmq,雖然它是用erlang開發的,但真的很是好用,通過無數次驗證。若是你們不會安裝rabbitmq,請看我這篇文章,http://www.cnblogs.com/yueerwanwan0204/p/5319474.html   這篇文章講解了怎麼安裝rabbitmq以及簡單的使用它。html

  咱們把上一章的圖再稍微修改一下,前端

  其實在真實的項目中,也這樣,通常來講,利用redis在不一樣模塊之間共享數據,利用rabbitmq來進行消息傳遞。咱們這個項目只作到從web到flask,再到rabbitmq,傳遞給tcpserver,再下放給具體的tcpclient客戶端;其實還能夠反向傳遞,即從tcp的client到tcp服務器,再到rabbitmq,到前端tcp或者前端http,可是這個前端tcp或者http要基於循環模式的,flask確定不行。咱們從下一章開始講tornado,用tornado來接受,而且作一個websocket,就能夠下放下去。python

  好了,說了這麼多,咱們來看一下代碼,首先,tcpserver這塊,咱們以前用redis的隊列作消息隊列,如今修改一下,修改的大概代碼以下:react

import pika
from pika.adapters import twisted_connection

RABBITMQ_HOST = 'localhost'
RABBITMQ_PORT = 5672
RABBITMQ_USERNAME = 'rabbitmq01'
RABBITMQ_PASSWORD = 'rabbitmq01'


class RabbitMQ(object):
    _connection = None
    _channel_receive_from_http = None

    @staticmethod
    @defer.inlineCallbacks
    def init_mq(ip_address, port):
        credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
        parameters = pika.ConnectionParameters(credentials=credentials)
        cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
        RabbitMQ._connection = yield cc.connectTCP(ip_address, port)
        defer.returnValue(1)

    @staticmethod
    @defer.inlineCallbacks
    def set_channel_receive_from_back(user_factory):
        """
        設置rabbitmq消息接受隊列的channel,而且作好循環任務
        """
        RabbitMQ._channel_receive_from_http = yield RabbitMQ._connection.channel()
        yield RabbitMQ._channel_receive_from_http.queue_declare(queue='front_tcp')
        queue_object, consumer_tag = yield 
RabbitMQ._channel_receive_from_http.basic_consume(queue='front_tcp', no_ack=True)
        l = task.LoopingCall(RabbitMQ.read_from_mq, queue_object, user_factory)
        l.start(0.5)
        defer.returnValue(1)

    @staticmethod
    @defer.inlineCallbacks
    def read_from_mq(queue_object, chat_factory):
        """
        讀取接受到的消息隊列消息,而且處理
        """
        ch, method, properties, body = yield queue_object.get()

        if body:
            log.msg('Accept data from http successful!')
            chat_factory.process_data_from_mq(body)
            defer.returnValue(1)
        defer.returnValue(0)

  首先,你們要注意一下,因爲twisted是異步的,因此不能採用原先阻塞的函數,鏈接或者接受或者發送消息,全部跟rabbitmq的鏈接,發送,接受,都要異步化,即都要返回defer對象。由於鏈接rabbitmq的本質,其實就是socket的網絡行爲,任何網絡行爲都有可能被阻塞,一旦阻塞,異步的效率會極其低下。(之後咱們寫tornado也是這樣,必定要返回future對象)。web

  我看到網上還有不少博客,在接受rabbitmq的消息的時候,竟然開了另一個進程或者線程,有時候這麼作,程序運行起來沒問題,但涉及到異步的時候,仍是會影響效率。都已經用異步的代碼了,就不該該大量使用多進程或者多線程。多進程或者多線程,會讓cpu調度頻繁切換,大量併發的時候,嚴重影響效率。redis

  詳細看上面的代碼,簡單的解釋一下,json

  init_mq就是初始化消息隊列,先加入用戶名,密碼,返回一個相似與token的東西,而後用twisted客戶端來鏈接rabbitmq,其實就是socket行爲,返回一個connection。flask

  set_channel_receive_from_back設置channel,其實就是定義一個管道,我從這個管道接受東西。接受並讀取的過程其實就是寫一個循環任務,這個循環任務每0.5秒執行一次,你也能夠寫小一點,0.1秒執行一次,具體的看你須要設置。api

  read_from_mq就是真正的讀取並處理的函數,我這邊在read_from_mq中,加了一個參數,就是這個工廠對象,由於接受的時候,一個工廠,就產生一個接受函數。而後讀取到消息之後,把消息傳遞到這個工廠對象的處理方法中,整個環節就完整了。服務器

  RabbitMQ的3個方法全是靜態方法,因此我沒有生成RabbitMQ對象,直接使用這個類自己就能夠了。因此在運行的時候,又加了以下代碼。

cf = ChatFactory()

task1 = task.LoopingCall(cf.check_users_online)
task1.start(3, now=False)

task_receive_data_from_mq = task.LoopingCall(cf.receive_from_mq)
task_receive_data_from_mq.start(0.1, now=False)

reactor.callLater(0.1, RabbitMQ.init_mq, RABBITMQ_HOST, RABBITMQ_PORT)

reactor.callLater(0.5, RabbitMQ.set_channel_receive_from_back, cf)

reactor.listenTCP(8124, cf)
reactor.run()

  看見我加的代碼沒有,一個init_mq,一個set_channel_receive_from_back。一個初始化消息隊列,初始化好之後,再設置channel,而且開始接受消息。

  整個tcpserver這塊就算完成了,下面是整個tcpserver的代碼

# coding:utf-8
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor, task, defer, protocol
import struct
import json
from twisted.python import log
import sys
import time
import txredisapi as redis
import pika
from pika.adapters import twisted_connection
log.startLogging(sys.stdout)

REDIS_HOST = 'localhost'
REDIS_PORT = 6380
REDIS_DB = 4
REDIS_PASSWORD = 'dahai123'

RABBITMQ_HOST = 'localhost'
RABBITMQ_PORT = 5672
RABBITMQ_USERNAME = 'rabbitmq01'
RABBITMQ_PASSWORD = 'rabbitmq01'


redis_store = redis.lazyConnectionPool(dbid=4, host='localhost', port=6380, password='dahai123')


@defer.inlineCallbacks
def check_token(phone_number, token):
    token_in_redis = yield redis_store.hget('user:%s' % phone_number, 'token')
    if token != token_in_redis:
        defer.returnValue(False)
    else:
        defer.returnValue(True)


class RabbitMQ(object):
    _connection = None
    _channel_receive_from_http = None

    @staticmethod
    @defer.inlineCallbacks
    def init_mq(ip_address, port):
        credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
        parameters = pika.ConnectionParameters(credentials=credentials)
        cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
        RabbitMQ._connection = yield cc.connectTCP(ip_address, port)
        defer.returnValue(1)

    @staticmethod
    @defer.inlineCallbacks
    def set_channel_receive_from_back(user_factory):
        """
        設置rabbitmq消息接受隊列的channel,而且作好循環任務
        """
        RabbitMQ._channel_receive_from_http = yield RabbitMQ._connection.channel()
        yield RabbitMQ._channel_receive_from_http.queue_declare(queue='front_tcp')
        queue_object, consumer_tag = yield RabbitMQ._channel_receive_from_http.basic_consume(queue='front_tcp', no_ack=True)
        l = task.LoopingCall(RabbitMQ.read_from_mq, queue_object, user_factory)
        l.start(0.5)
        defer.returnValue(1)

    @staticmethod
    @defer.inlineCallbacks
    def read_from_mq(queue_object, chat_factory):
        """
        讀取接受到的消息隊列消息,而且處理
        """
        ch, method, properties, body = yield queue_object.get()

        if body:
            log.msg('Accept data from http successful!')
            chat_factory.process_data_from_mq(body)
            defer.returnValue(1)
        defer.returnValue(0)


class Chat(Protocol):
    def __init__(self, factory):
        self.factory = factory
        self.phone_number = None
        self.state = "VERIFY"
        self.version = 0
        self.last_heartbeat_time = 0
        self.command_func_dict = {
            1: self.handle_verify,
            2: self.handle_single_chat,
            3: self.handle_group_chat,
            4: self.handle_broadcast_chat,
            5: self.handle_heartbeat
        }
        self._data_buffer = bytes()

    def connectionMade(self):
        log.msg("New connection, the info is:", self.transport.getPeer())

    def connectionLost(self, reason):
        log.msg("[%s]:斷線" % self.phone_number.encode('utf-8'))
        if self.phone_number in self.factory.users:
            del self.factory.users[self.phone_number]

    def dataReceived(self, data):
        """
        接受到數據之後的操做
        """
        self._data_buffer += data

        while True:
            length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12])

            if length > len(self._data_buffer):
                return

            content = self._data_buffer[12:length]

            if command_id not in [1, 2, 3, 4, 5]:
                return

            if self.state == "VERIFY" and command_id == 1:
                self.handle_verify(content)

            if self.state == "DATA":
                self.handle_data(command_id, content)

            self._data_buffer = self._data_buffer[length:]

            if len(self._data_buffer) < 12:
                return

    def handle_heartbeat(self, content):
        """
        處理心跳包
        """
        self.last_heartbeat_time = int(time.time())

    @defer.inlineCallbacks
    def handle_verify(self, content):
        """
        驗證函數
        """
        content = json.loads(content)
        phone_number = content.get('phone_number')
        token = content.get('token')

        result = yield check_token(phone_number, token)

        if not result:
            send_content = json.dumps({'code': 0})
            self.send_content(send_content, 101, [phone_number])
            length = 12 + len(send_content)
            version = self.version
            command_id = 101
            header = [length, version, command_id]
            header_pack = struct.pack('!3I', *header)
            self.transport.write(header_pack + send_content)
            return

        if phone_number in self.factory.users:
            log.msg("電話號碼<%s>存在老的鏈接." % phone_number.encode('utf-8'))
            self.factory.users[phone_number].connectionLost("")
            self.factory.users.pop(phone_number)

        log.msg("歡迎, %s!" % (phone_number.encode('utf-8'),))
        self.phone_number = phone_number
        self.factory.users[phone_number] = self
        self.state = "DATA"

        send_content = json.dumps({'code': 1})

        self.send_content(send_content, 101, [phone_number])

    def handle_data(self, command_id, content):
        """
        根據command_id來分配函數
        """
        self.command_func_dict[command_id](content)

    def handle_single_chat(self, content):
        """
        單播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        self.send_content(send_content, 102, [chat_to])

    def handle_group_chat(self, content):
        """
        組播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = chat_to
        self.send_content(send_content, 103, phone_numbers)

    def handle_broadcast_chat(self, content):
        """
        廣播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = self.factory.users.keys()
        self.send_content(send_content, 104, phone_numbers)

    def send_content(self, send_content, command_id, phone_numbers):
        """
        發送函數
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        for phone_number in phone_numbers:
            if phone_number in self.factory.users.keys():
                self.factory.users[phone_number].transport.write(header_pack + send_content)
            else:
                log.msg("Phone_number:%s 不在線." % phone_number.encode('utf-8'))


class ChatFactory(Factory):
    def __init__(self):
        self.users = {}

    def buildProtocol(self, addr):
        return Chat(self)

    def check_users_online(self):
        for key, value in self.users.items():
            if value.last_heartbeat_time != 0 and int(time.time()) - value.last_heartbeat_time > 4:
                log.msg("[%s]沒有檢測到心跳包,主動切斷" % key.encode('utf-8'))
                value.transport.abortConnection()

    @defer.inlineCallbacks
    def receive_from_mq(self):
        data = yield redis_store.rpop('front_tcp')
        if data:
            log.msg("接受到來自消息隊列的消息:", data)
            self.process_data_from_mq(data)

    def process_data_from_mq(self, data):
        loads_data = json.loads(data)
        command_id = loads_data.get('command_id')
        phone_numbers = loads_data.get('chat_to')
        chat_from = loads_data.get('chat_from')
        chat_content = loads_data.get('chat_content')

        content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        self.send_content(content, command_id, phone_numbers)

    def send_content(self, send_content, command_id, phone_numbers):
        """
        發送函數
        """
        length = 12 + len(send_content)
        version = 1100
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        for phone_number in phone_numbers:
            if phone_number in self.users.keys():
                self.users[phone_number].transport.write(header_pack + send_content)
            else:
                log.msg("Phone_number:%s 不在線." % phone_number.encode('utf-8'))

cf = ChatFactory()

task1 = task.LoopingCall(cf.check_users_online)
task1.start(3, now=False)

task_receive_data_from_mq = task.LoopingCall(cf.receive_from_mq)
task_receive_data_from_mq.start(0.1, now=False)

reactor.callLater(0.1, RabbitMQ.init_mq, RABBITMQ_HOST, RABBITMQ_PORT)

reactor.callLater(0.5, RabbitMQ.set_channel_receive_from_back, cf)

reactor.listenTCP(8124, cf)
reactor.run()
View Code

  

  下面是web方面的代碼,web也是,以前用redis很簡單的作,如今換到rabbitmq,因爲這個例子很簡單,因此我就在request過程當中初始化rabbitmq了,整個代碼就很是簡單了,就是一個發送函數而已。

# coding:utf-8
from flask import Flask, request, jsonify, g, render_template, redirect, url_for, session, current_app
from app.model import User, db_session
import json
from . import web
import pika

RABBITMQ_HOST = 'localhost'
RABBITMQ_PORT = 5672
RABBITMQ_USERNAME = 'rabbitmq01'
RABBITMQ_PASSWORD = 'rabbitmq01'


@web.teardown_request
def handle_teardown_request(exception):
    db_session.remove()


@web.route('/send-command', methods=['GET', 'POST'])
def send_command():
    if request.method == 'GET':
        users = User.query.all()
        return render_template('web/send-command.html', users=users)
    else:
        data = request.get_json()
        command_id = data.get('command_id')
        chat_from = '13764408552'
        chat_to = data.get('chat_to')
        chat_content = data.get('content')

        if not chat_to or not chat_content or not command_id:
            return jsonify({'code': 0, 'message': '信息不完整'})

        send_data = json.dumps(dict(command_id=command_id, chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        # current_app.redis.lpush('front_tcp', send_data)

        credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST, credentials=credentials, port=RABBITMQ_PORT))
        channel = connection.channel()
        channel.queue_declare(queue='front_tcp')

        channel.basic_publish(exchange='',
                              routing_key='front_tcp',
                              body=send_data)

        print "send json_data to front_tcp, the data is ", send_data

        connection.close()

        return jsonify({'code': 1, 'message': '發送成功'})

  全部代碼更換完成,看一下具體效果吧

  web上先發送一個消息。

  隨便啓動一個客戶端,看看接受吧。

  看見沒有,整個過程就所有打通了。

  總結:整個twisted就講到這了,你們能夠看到,twisted我也不是特別熟悉,因此我一共就用了5章把它講完。從下一章開始,我開始講tornado,利用tornado作tcpserver,tcpclient,websocket服務器,由於tornado的源碼比較好讀,因此我重點也會放在tornado上。最近我在看reactjs,屆時我會用稍微好看一點的圖形界面,來作websocket頁面,tornado這個庫真正作到small strong smart,我一直喜歡小而精的庫。總之,我重點會放在tornado上,但願你們到時候會喜歡。

相關文章
相關標籤/搜索