twisted(2)--聊天系統

  咱們今天要作一個聊天系統,這樣能夠和咱們以前flask api那系列文章結合起來;其次,聊天系統最能表明tcpserver,之後能夠套用各類模型,好比咱們公司作的物聯網,其實就是把聊天系統簡化一下。python

  twisted官方網站已經爲咱們提供了一個很是好的例子,咱們研究一下,而後在此基礎上進行修改便可(這方面確實要比tornado作得好,不過tornado在閱讀源碼方面又有很大優點,之後咱們作一個tornado版的)react

from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor

class Chat(LineReceiver):
    def __init__(self, users):
        self.users = users
        self.name = None
        self.state = "GETNAME"

    def connectionMade(self):
        self.sendLine("What's your name?")

    def connectionLost(self, reason):
        if self.name in self.users:
            del self.users[self.name]

    def lineReceived(self, line):
        if self.state == "GETNAME":
            self.handle_GETNAME(line)
        else:
            self.handle_CHAT(line)

    def handle_GETNAME(self, name):
        if name in self.users:
            self.sendLine("Name taken, please choose another.")
            return
        self.sendLine("Welcome, %s!" % (name,))
        self.name = name
        self.users[name] = self
        self.state = "CHAT"

    def handle_CHAT(self, message):
        message = "<%s> %s" % (self.name, message)
        for name, protocol in self.users.iteritems():
            if protocol != self:
                protocol.sendLine(message)


class ChatFactory(Factory):
    def __init__(self):
        self.users = {} # maps user names to Chat instances

    def buildProtocol(self, addr):
        return Chat(self.users)

reactor.listenTCP(8123, ChatFactory())
reactor.run()

  代碼很是簡單,每一個用戶鏈接上來的時候,都新建一個Chat對象,Chat類中,包含各類對單個鏈接的操做方法,其實看名字均可以看出來他們的做用,redis

  構造函數__init__中定義了3個變量,users是一個字典,包含全部當前鏈接的對象,key是它的name,value是Chat對象自己,表明本身這個鏈接;name標識這個鏈接名稱,必定要明瞭,惟一,咱們之後會用客戶的電話號碼做爲它的name;state有點意思,它表明一個狀態,當這個鏈接沒有經過驗證的時候,是一個狀態,驗證過之後,又是一個狀態。其實state之後還會繼續擴展,好比說,在不少時候,會有不少垃圾鏈接進來,一般一個鏈接上來,在必定時間內尚未經過驗證,就能夠abort掉。數據庫

  connectionMade看名字也知道,鏈接建立好之後,觸發的函數。json

  connectionLost看名字意思,鏈接丟失之後,觸發的函數,這個函數之後能夠擴展到redis記錄鏈接狀態。flask

  lineReceived這個是一個鏈接用的最多的函數,就是數據接受到之後,觸發的函數,下面2個函數就是在此基礎上構建而成的。api

  handle_GETNAME和handle_CHAT的運用跟鏈接的state有關,當state在未驗證狀態時,調用handle_GETNAME函數;當已經驗證過期,調用handle_CHAT。服務器

  再看看factory類,其中users就不用說了,記錄每一個鏈接的變量。網絡

  buildProtocol,新建一個鏈接之後,觸發的函數,它調用了Chat的構造函數,新建一個Chat對象。dom

  其實Chat繼承LineReceive,而LineReceive繼承Protocol的。真實的鏈接是transport,因此咱們這個例子中沒有展現出來transport,只有sendLine這樣的函數,我下面本身寫例子的時候,會加上去;Protocol其實就是整個鏈接連上來之後,加上一些這個鏈接當前的狀態,再加上一些基本操做方法組成的;Factory就是全部Protocol組成的一個工廠類,每新加入或者減小一個Protocol對象時,都能在Factory裏面表現出來。

  整個代碼分析完畢,官方例子就能夠直接運行了,看看運行結果吧。

  用telnet模擬一個客戶端,就能夠很好的操做了。

  

  以上全是官方的例子,咱們要引入本身的項目。

  首先,數據模型,官方例子很簡單,直接把str格式的數據發送出去,在測試的時候沒問題,但正式項目中絕對不可能。一般每一個數據,都會由2部分組成,一個header做爲頭,一個content做爲內容。其實就是模擬http。header中,一般有數據長度、版本號、數據類型id等,這個都不是必須的,要根據你實際項目來。content做爲真實數據內容,通常都用json數據格式,固然,若是你追求效率,也能夠用google protor buf或者facebook的數據模式,均可以(不少公司都用的google protor buf模式,解析速度比較快,咱們這爲了簡單,就用json格式)。

  

  上面是咱們數據格式,綠色段就是header,藍色段就是content。我上面就說了,這只是隨便寫的一個項目,在真實項目中,要根據你的需求來選擇,極可能要保留字段。這邊稍微解釋一下command_id,其實這個就相似於http中的url,http根據url代表它的做用;咱們這一樣根據command_id標示它的做用,由於在整個過程當中,不但有聊天,還有驗證過程,之後還可能有廣播,組播等各類功能。咱們就根據command_id來判斷這個數據的做用(其實寫到這,你們徹底能夠看出來,咱們基本就是跟http學的,現實過程當中也這樣,幾乎都在模仿http),而響應之類的,就是服務器主動推送給客戶端的command_id,這也是跟http不一樣的地方,不少時候,咱們都是主動推送給客戶端。

  好了,既然已經這樣規定,咱們再詳細規定一下command_id吧,就像http的url同樣。

  

  咱們先比較簡單的設定一下,之後要是有改動,再改變。

  咱們重寫tcpserver,代碼以下:

# coding:utf-8
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor
import struct
import json
from twisted.python import log
import sys
log.startLogging(sys.stdout)


class Chat(Protocol):
    def __init__(self, users):
        self.users = users
        self.phone_number = None
        self.state = "VERIFY"
        self.version = 0
        self.command_func_dict = {
            1: self.handle_verify,
            2: self.handle_single_chat,
            3: self.handle_group_chat,
            4: self.handle_broadcast_chat
        }

    def connectionMade(self):
        log.msg("New connection, the info is:", self.transport.getPeer())

    def connectionLost(self, reason):
        if self.phone_number in self.users:
            del self.users[self.phone_number]

    def dataReceived(self, data):
        """
        接受到數據之後的操做
        """
        length, self.version, command_id = struct.unpack('!3I', data[:12])
        content = data[12:length]

        if command_id not in [1, 2, 3, 4]:
            return

        if self.state == "VERIFY" and command_id == 1:
            self.handle_verify(content)
        else:
            self.handle_data(command_id, content)

    def handle_verify(self, content):
        """
        驗證函數
        """
        content = json.loads(content)
        phone_number = content.get('phone_number')
        if phone_number in self.users:
            log.msg("電話號碼<%s>存在老的鏈接." % phone_number.encode('utf-8'))
            self.users[phone_number].connectionLost("")
        log.msg("歡迎, %s!" % (phone_number.encode('utf-8'),))
        self.phone_number = phone_number
        self.users[phone_number] = self
        self.state = "DATA"

        send_content = json.dumps({'code': 1})

        self.send_content(send_content, 101, [phone_number])

    def handle_data(self, command_id, content):
        """
        根據command_id來分配函數
        """
        self.command_func_dict[command_id](content)

    def handle_single_chat(self, content):
        """
        單播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        self.send_content(send_content, 102, [chat_to])

    def handle_group_chat(self, content):
        """
        組播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = chat_to
        self.send_content(send_content, 103, phone_numbers)

    def handle_broadcast_chat(self, content):
        """
        廣播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = self.users.keys()
        self.send_content(send_content, 104, phone_numbers)

    def send_content(self, send_content, command_id, phone_numbers):
        """
        發送函數
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        for phone_number in phone_numbers:
            if phone_number in self.users.keys():
                self.users[phone_number].transport.write(header_pack + send_content)
            else:
                log.msg("Phone_number:%s 不在線,不能聊天." % phone_number.encode('utf-8'))


class ChatFactory(Factory):
    def __init__(self):
        self.users = {}

    def buildProtocol(self, addr):
        return Chat(self.users)


reactor.listenTCP(8124, ChatFactory())
reactor.run()

 

  代碼修改的比較多,

  首先,直接從Protocol繼承了,這樣比從LineReceive繼承更直觀一點;command_func_dict表明command_id和其處理函數的一一對應字典;

  其次,dataReceived是主要的接受函數,接受到數據之後,先解析header,根據header裏面的length截取數據,再根據command_id來把數據送個它的處理函數。若是command_id爲1,就進入驗證函數;若是爲其餘,就進入其餘數據處理函數,不過要先驗證經過,才能用其餘函數處理。這就跟http同樣。(這邊之後要重寫的,你們想象一下,若是我一個客戶端鏈接,同時發送2個數據,按照上面代碼,只能處理一個數據,另一個就丟棄了。)

  最後,send_content爲總的發送函數,先把header頭組建好,而後加上數據,就發送了。這邊可能遇到發送的客戶端不在線,要先檢測一下(之後還會遇到各類意外斷線狀況,服務器端無法及時檢測到,這個之後再講。)

  服務器端是否是很簡單?再寫一個客戶端代碼,客戶端若是用GUI方式寫的話,篇幅太長了,咱們這就用最簡單的方式,模擬客戶端操做。下面是客戶端代碼。

# coding:utf-8
from twisted.internet import reactor, task
from twisted.internet.protocol import Protocol, ClientFactory
import struct
from twisted.python import log
import sys
import json
log.startLogging(sys.stdout)


class EchoClient(Protocol):
    def __init__(self):
        self.command_func_dict = {
            101: self.handle_verify_s,
            102: self.handle_single_chat_s,
            103: self.handle_group_chat_s,
            104: self.handle_broadcast_chat_s
        }
        self.version = 0
        self.state = "VERIFY"
        self.phone_number = ""

    def connectionMade(self):
        log.msg("New connection", self.transport.getPeer())

    def dataReceived(self, data):
        length, self.version, command_id = struct.unpack('!3I', data[:12])
        content = data[12:length]
        if self.state == "VERIFY" and command_id == 101:
            self.handle_verify_s(content)
        else:
            self.handle_data(command_id, content)

    def handle_data(self, command_id, pack_data):
        self.command_func_dict[command_id](pack_data)

    def connectionLost(self, reason):
        log.msg("connection lost")

    def handle_verify_s(self, pack_data):
        """
        接受驗證結果
        """
        content = json.loads(pack_data)
        code = content.get('code')
        if code == 1:
            log.msg('驗證經過')
        self.state = "Data"

    def handle_single_chat_s(self, pack_data):
        """
        接受單聊
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[單聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def handle_group_chat_s(self, pack_data):
        """
        接受組聊
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[組聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def handle_broadcast_chat_s(self, pack_data):
        """
        接受廣播
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[羣聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def send_verify(self, phone_number):
        """
        發送驗證
        """
        content = json.dumps(dict(phone_number=phone_number))
        self.send_data(content, 1)

    def send_single_chat(self, chat_from, chat_to, chat_content):
        """
        發送單聊內容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        self.send_data(content, 2)

    def send_group_chat(self, chat_from, chat_to, chat_content):
        """
        發送組聊內容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        self.send_data(content, 3)

    def send_broadcast_chat(self, chat_from, chat_content):
        """
        發送羣聊內容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))
        self.send_data(content, 4)

    def send_data(self, send_content, command_id):
        """
        發送函數
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        self.transport.write(header_pack + send_content)


class EchoClientFactory(ClientFactory):
    def __init__(self):
        self.p = EchoClient()

    def startedConnecting(self, connector):
        log.msg("Started to connect")

    def buildProtocol(self, addr):
        log.msg("Connected.")
        return self.p

    def clientConnectionFailed(self, connector, reason):
        log.msg("Lost connection. Reason:", reason)

    def clientConnectionLost(self, connector, reason):
        log.msg("Connection failed. Reason:", reason)


if __name__ == '__main__':
    cf = EchoClientFactory()
    chat_from = sys.argv[1]
    all_phone_numbers = ['000001', '000002', '000003', '000004']
    all_phone_numbers.remove(chat_from)
    import random
    reactor.callLater(3, cf.p.send_verify, chat_from)
    reactor.callLater(10, cf.p.send_single_chat, chat_from, random.choice(all_phone_numbers), '你好,這是單聊')
    reactor.callLater(11, cf.p.send_group_chat, chat_from, [random.choice(all_phone_numbers), random.choice(all_phone_numbers)], '你好,這是組聊')
    reactor.callLater(12, cf.p.send_broadcast_chat, chat_from, '你好,這是羣聊')

    reactor.connectTCP('127.0.0.1', 8124, cf)

    reactor.run()

  客戶端比較簡單,主要是幾個發送函數,基本都是以send_開頭,就是主動發送消息以及驗證的;接受從服務器的處理函數,基本以handle_開頭。跟服務器端同樣,接受到數據之後,先解析header,根據header裏面的length截取數據,再根據command_id來把數據送個它的處理函數。

  這邊弄了個定時任務,第3秒開始驗證;第10秒隨機發送一個單聊;第11秒隨機發送一個組聊;第12秒發送一個羣聊。

  咱們開3個客戶端,看看結果吧。

yudahai@yudahaiPC:tcpserver$ python frontClient.py 000001
2016-06-21 17:33:17+0800 [-] Log opened.
2016-06-21 17:33:17+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7fa325b41680>
2016-06-21 17:33:17+0800 [-] Started to connect
2016-06-21 17:33:17+0800 [Uninitialized] Connected.
2016-06-21 17:33:17+0800 [Uninitialized] New connection IPv4Address(TCP, '127.0.0.1', 8124)
2016-06-21 17:33:20+0800 [EchoClient,client] 驗證經過
2016-06-21 17:33:29+0800 [EchoClient,client] [羣聊][000001]:你好,這是羣聊
2016-06-21 17:33:29+0800 [EchoClient,client] [單聊][000002]:你好,這是單聊
2016-06-21 17:33:30+0800 [EchoClient,client] [組聊][000002]:你好,這是組聊
2016-06-21 17:33:31+0800 [EchoClient,client] [羣聊][000002]:你好,這是羣聊
2016-06-21 17:33:38+0800 [EchoClient,client] [單聊][000003]:你好,這是單聊
2016-06-21 17:33:39+0800 [EchoClient,client] [組聊][000003]:你好,這是組聊
2016-06-21 17:33:40+0800 [EchoClient,client] [羣聊][000003]:你好,這是羣聊
yudahai@yudahaiPC:tcpserver$ python frontClient.py 000002
2016-06-21 17:33:19+0800 [-] Log opened.
2016-06-21 17:33:19+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7f23f9a48680>
2016-06-21 17:33:19+0800 [-] Started to connect
2016-06-21 17:33:19+0800 [Uninitialized] Connected.
2016-06-21 17:33:19+0800 [Uninitialized] New connection IPv4Address(TCP, '127.0.0.1', 8124)
2016-06-21 17:33:22+0800 [EchoClient,client] 驗證經過
2016-06-21 17:33:27+0800 [EchoClient,client] [單聊][000001]:你好,這是單聊
2016-06-21 17:33:29+0800 [EchoClient,client] [羣聊][000001]:你好,這是羣聊
2016-06-21 17:33:31+0800 [EchoClient,client] [羣聊][000002]:你好,這是羣聊
2016-06-21 17:33:40+0800 [EchoClient,client] [羣聊][000003]:你好,這是羣聊
yudahai@yudahaiPC:tcpserver$ python frontClient.py 000003
2016-06-21 17:33:28+0800 [-] Log opened.
2016-06-21 17:33:28+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7ff3067dc680>
2016-06-21 17:33:28+0800 [-] Started to connect
2016-06-21 17:33:28+0800 [Uninitialized] Connected.
2016-06-21 17:33:28+0800 [Uninitialized] New connection IPv4Address(TCP, '127.0.0.1', 8124)
2016-06-21 17:33:31+0800 [EchoClient,client] 驗證經過
2016-06-21 17:33:40+0800 [EchoClient,client] [羣聊][000003]:你好,這是羣聊

  這就是3個客戶端的結果,是否是你指望的值?

  再看看服務器端的調試結果。

/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py
2016-06-21 17:23:01+0800 [-] Log opened.
2016-06-21 17:23:01+0800 [-] ChatFactory starting on 8124
2016-06-21 17:23:01+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f08b0ec8638>
2016-06-21 17:23:26+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 59802)
2016-06-21 17:23:29+0800 [Chat,0,127.0.0.1] 歡迎, 000001!
2016-06-21 17:23:36+0800 [Chat,0,127.0.0.1] Phone_number:000003 不在線,不能聊天.
2016-06-21 17:23:37+0800 [Chat,0,127.0.0.1] Phone_number:000003 不在線,不能聊天.
2016-06-21 17:23:37+0800 [Chat,0,127.0.0.1] Phone_number:000002 不在線,不能聊天.
2016-06-21 17:33:17+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 59926)
2016-06-21 17:33:19+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 59928)
2016-06-21 17:33:20+0800 [Chat,1,127.0.0.1] 歡迎, 000001!
2016-06-21 17:33:22+0800 [Chat,2,127.0.0.1] 歡迎, 000002!
2016-06-21 17:33:28+0800 [Chat,1,127.0.0.1] Phone_number:000004 不在線,不能聊天.
2016-06-21 17:33:28+0800 [Chat,1,127.0.0.1] Phone_number:000004 不在線,不能聊天.
2016-06-21 17:33:28+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 59930)
2016-06-21 17:33:30+0800 [Chat,2,127.0.0.1] Phone_number:000003 不在線,不能聊天.
2016-06-21 17:33:31+0800 [Chat,3,127.0.0.1] 歡迎, 000003!

  不在線的時候,都打印出來了。

  其實整個例子仍是比較簡單的,可是不少地方還很是不完善,這個要在咱們接下來的系列中,慢慢完善。

  好比:若是一個客戶端同時發送2個數據,上面的代碼就只處理了一個,另一個就丟棄掉了;還有,咱們的程序考慮的是正常的上線、離線,若是客戶端由於網絡問題,忽然斷線,沒有發生tcp結束的4次握手,服務器端是不知道的,這時候如何保證服務器端知道客戶端在線不在線?還有,twisted如何異步訪問數據庫、redis、rabbitmq等,這個咱們之後都會慢慢講。

相關文章
相關標籤/搜索