twisted(3)--再談twisted

  上一章,咱們直接寫了一個小例子來從總體講述twisted運行的大體過程,今天咱們首先深刻一些概念,在逐漸明白這些概念之後,咱們會修改昨天寫的例子。前端

  先看下面一張圖:python

  這個系列的第一篇文章,咱們已經爲你們展現了一張twisted的原理圖,那張圖,由於咱們沒有捕獲任何socket事件,因此只有一個圈。這張圖上面的大圈表明捕獲socket事件,這個也是twisted最主要的功能,它已經爲咱們作了。而且提供了2個函數,transport.write寫入事件,dataReceived讀取事件。下面的小圈子,也就是咱們本身的代碼,好比咱們昨天的驗證、單聊、組聊等功能。你們必定要時時刻刻記住這張圖,編寫twisted代碼的時候,腦子裏印着這張圖,這就跟咱們之前寫c代碼的時候,必定要記住內存模型同樣。react

  回到這個大圈,transport.write和dataReceived實際上是通過不少層封裝的函數,它們本質上仍是操做select模型中的寫文件描述符(write_fd)、讀文件描述符(read_fd),對應twisted的基礎類就是IWriteDescriptor和IReadDescriptor,若是咱們比較熟悉select模型,咱們都知道,每次新來一個鏈接,都是創建write_fd、read_fd、error_fd,select不停的輪詢這些fd,當其中任何一個知足條件時,觸發相應的事件,這些全部的東西,twisted都已經幫咱們作好了,並且異步化了。咱們接受到事件,只管處理就行了。web

  再看下面一個圖,
redis

  仔細看上面這個圖,再對比以前的圖,twisted在socket這塊所有爲咱們作好。json

  下面咱們再講一下transport這個對象,這個對象在每一個Protocol裏面都會產生一個,它表明一個鏈接,這個鏈接能夠是socket,也能夠是unix的pipe,twisted已經爲咱們封裝好,通常不會本身去新建它。一般咱們會用它來發送數據(write)、獲取鏈接另外一方的信息(getPeer)。flask

  再看一下dataReceived這個函數,就是每次接到數據之後觸發事件,上面說了,就是每次循環,select檢查這些fd,fd被寫入就觸發。這時候你們想一想,若是循環被阻塞,在這個data裏面會有不少數據,按照咱們昨天的程序,只會處理第一個數據,其餘的可能被丟棄掉了。緩存

  咱們昨天的例子,把客戶端運行代碼稍微修改一下,在第10秒的時候,同時發送2個數據(粘包),看看服務器運行狀況。服務器

if __name__ == '__main__':
    cf = EchoClientFactory()
    chat_from = sys.argv[1]
    all_phone_numbers = ['000001', '000002', '000003', '000004']
    all_phone_numbers.remove(chat_from)
    import random
    reactor.callLater(3, cf.p.send_verify, chat_from)
    reactor.callLater(10, cf.p.send_single_chat, chat_from, random.choice(all_phone_numbers), '你好,這是單聊')
    reactor.callLater(10, cf.p.send_single_chat, chat_from, random.choice(all_phone_numbers), '你好,這是單聊')
    # reactor.callLater(11, cf.p.send_group_chat, chat_from, [random.choice(all_phone_numbers), random.choice(all_phone_numbers)], '你好,這是組聊')
    # reactor.callLater(12, cf.p.send_broadcast_chat, chat_from, '你好,這是羣聊')

    reactor.connectTCP('127.0.0.1', 8124, cf)

    reactor.run()

  客戶端代碼已經更改,運行一下,看看服務器結果。網絡

/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py
2016-06-22 10:23:55+0800 [-] Log opened.
2016-06-22 10:23:55+0800 [-] ChatFactory starting on 8124
2016-06-22 10:23:55+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f382d908638>
2016-06-22 10:24:02+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 47834)
2016-06-22 10:24:05+0800 [Chat,0,127.0.0.1] 歡迎, 000001!
2016-06-22 10:24:12+0800 [Chat,0,127.0.0.1] 你好,這是單聊
2016-06-22 10:24:12+0800 [Chat,0,127.0.0.1] Phone_number:000002 不在線,不能聊天.

  果真,只處理了一個數據,後面一個直接丟棄掉了。

  一般來講,咱們都會爲每一個Protocol申請一段內存,每次接受到數據之後,先存放到這段內存中,而後再集中處理,這樣,即便循環被blocking住或者客戶端粘包,咱們也能正確處理。新的代碼以下:

  

# coding:utf-8
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor
import struct
import json
from twisted.python import log
import sys
log.startLogging(sys.stdout)


class Chat(Protocol):
    def __init__(self, users):
        self.users = users
        self.phone_number = None
        self.state = "VERIFY"
        self.version = 0
        self.command_func_dict = {
            1: self.handle_verify,
            2: self.handle_single_chat,
            3: self.handle_group_chat,
            4: self.handle_broadcast_chat
        }
        self._data_buffer = bytes()

    def connectionMade(self):
        log.msg("New connection, the info is:", self.transport.getPeer())

    def connectionLost(self, reason):
        if self.phone_number in self.users:
            del self.users[self.phone_number]

    def dataReceived(self, data):
        """
        接受到數據之後的操做
        """
        self._data_buffer += data

        while True:
            length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12])

            if length > len(self._data_buffer):
                break

            content = self._data_buffer[12:length]

            if command_id not in [1, 2, 3, 4]:
                return

            if self.state == "VERIFY" and command_id == 1:
                self.handle_verify(content)
            else:
                self.handle_data(command_id, content)

            self._data_buffer = self._data_buffer[length:]

            if len(self._data_buffer) < 12:
                break

    def handle_verify(self, content):
        """
        驗證函數
        """
        content = json.loads(content)
        phone_number = content.get('phone_number')
        if phone_number in self.users:
            log.msg("電話號碼<%s>存在老的鏈接." % phone_number.encode('utf-8'))
            self.users[phone_number].connectionLost("")
        log.msg("歡迎, %s!" % (phone_number.encode('utf-8'),))
        self.phone_number = phone_number
        self.users[phone_number] = self
        self.state = "DATA"

        send_content = json.dumps({'code': 1})

        self.send_content(send_content, 101, [phone_number])

    def handle_data(self, command_id, content):
        """
        根據command_id來分配函數
        """
        self.command_func_dict[command_id](content)

    def handle_single_chat(self, content):
        """
        單播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        log.msg(chat_content.encode('utf-8'))
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        self.send_content(send_content, 102, [chat_to])

    def handle_group_chat(self, content):
        """
        組播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = chat_to
        self.send_content(send_content, 103, phone_numbers)

    def handle_broadcast_chat(self, content):
        """
        廣播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = self.users.keys()
        self.send_content(send_content, 104, phone_numbers)

    def send_content(self, send_content, command_id, phone_numbers):
        """
        發送函數
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        for phone_number in phone_numbers:
            if phone_number in self.users.keys():
                self.users[phone_number].transport.write(header_pack + send_content)
            else:
                log.msg("Phone_number:%s 不在線,不能聊天." % phone_number.encode('utf-8'))


class ChatFactory(Factory):
    def __init__(self):
        self.users = {}

    def buildProtocol(self, addr):
        return Chat(self.users)


reactor.listenTCP(8124, ChatFactory())
reactor.run()

  咱們在構造函數裏面,加入了一個字段,這個字段就是self._data_buffer,在每次接受到數據之後,都循環處理這段內存。再看看運行結果,有什麼不一樣。

/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py
2016-06-22 10:40:42+0800 [-] Log opened.
2016-06-22 10:40:42+0800 [-] ChatFactory starting on 8124
2016-06-22 10:40:42+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f96860e0680>
2016-06-22 10:40:57+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 48010)
2016-06-22 10:41:00+0800 [Chat,0,127.0.0.1] 歡迎, 000001!
2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] 你好,這是單聊
2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] Phone_number:000004 不在線,不能聊天.
2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] 你好,這是單聊
2016-06-22 10:41:07+0800 [Chat,0,127.0.0.1] Phone_number:000003 不在線,不能聊天.

  是否是正確了?接受數據,咱們先講到這,下面咱們講開發tcpserver必定要處理的問題,異常斷線

異常斷線

  異常斷線的處理在tcpserver開發過程當中必不可少,不少時候,尤爲是無線、3G、4G網絡,信號很差的時候就斷線,因爲是網絡問題,沒有通過tcp結束的4次握手,服務器不可能及時檢查到此事件,這時候就有可能出錯。一般咱們會採起一種心跳包機制,即客戶端每隔一段時間就向服務器端發送一個心跳包,服務器端每隔一段時間就檢測一下,若是發現客戶端連續2次或者屢次沒有發送心跳包,就認爲客戶端已經掉線,再採起措施。

  好了,說了這麼多,先要從新部署一下程序,我把一個客戶端發在個人另一臺筆記本上,先鏈接好,而後拔掉網線,再從服務器端發送一組數據過去,看看會發生什麼。

  首先,咱們把000002放在筆記本上,000001在服務器端,在10秒和20秒的時候,分別發送一個單聊給000002,看看服務器端和000002的狀況。

000001的運行代碼修改以下:

if __name__ == '__main__':
    cf = EchoClientFactory()
    chat_from = sys.argv[1]
    all_phone_numbers = ['000001', '000002', '000003', '000004']
    all_phone_numbers.remove(chat_from)
    import random
    reactor.callLater(3, cf.p.send_verify, chat_from)
    reactor.callLater(10, cf.p.send_single_chat, chat_from, '000002', '你好,這是10秒的時候發送')
    reactor.callLater(20, cf.p.send_single_chat, chat_from, '000002', '你好,這是20秒的時候發送')

    reactor.connectTCP('127.0.0.1', 8124, cf)

    reactor.run()

  10秒和20秒,分別發送數據到服務器端,而000002端,在10秒和20秒的中間,拔掉網線,咱們看看發生了什麼狀況。

  首先,服務器端的運行結果以下:

/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py
2016-06-22 11:40:02+0800 [-] Log opened.
2016-06-22 11:40:02+0800 [-] ChatFactory starting on 8124
2016-06-22 11:40:02+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7f9c39f89638>
2016-06-22 11:41:26+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '192.168.5.15', 57150)
2016-06-22 11:41:29+0800 [Chat,0,192.168.5.15] 歡迎, 000002!
2016-06-22 11:41:41+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '127.0.0.1', 49526)
2016-06-22 11:41:44+0800 [Chat,1,127.0.0.1] 歡迎, 000001!
2016-06-22 11:41:51+0800 [Chat,1,127.0.0.1] 你好,這是10秒的時候發送
2016-06-22 11:42:01+0800 [Chat,1,127.0.0.1] 你好,這是20秒的時候發送

  它在000002中斷了之後,並無發現000002已經中斷,仍是照樣write下去,其實本質上,它仍是把數據發到了write_fd上,而後就是底層的事了。

  而000002客戶端的結果比較有意思。

2016-06-22 11:41:26+0800 [-] Log opened.
2016-06-22 11:41:26+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7f4e75db7680>
2016-06-22 11:41:26+0800 [-] Started to connect
2016-06-22 11:41:26+0800 [Uninitialized] Connected.
2016-06-22 11:41:26+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124)
2016-06-22 11:41:29+0800 [EchoClient,client] 驗證經過
2016-06-22 11:41:51+0800 [EchoClient,client] [單聊][000001]:你好,這是10秒的時候發送
2016-06-22 11:44:27+0800 [EchoClient,client] [單聊][000001]:你好,這是20秒的時候發送

  你們注意到沒有,竟然仍是收到了,可是看時間,時間和原來的是不對的。我後來把網線從新插上去,而後就接受到了。twisted把write_fd的數據從新發送給了客戶端,由於客戶端沒有任何改變,ip和端口都是原來的,網絡狀況沒有改變,因此再次就鏈接上來。

  咱們再試一下另一種狀況,也是移動端常常遇到的狀況,就是切換網絡,好比從4G切換到無線網,看看會發生什麼。

yudahai@yu-sony:~/PycharmProjects/flask001$ python frontClient.py 000002
2016-06-22 13:09:34+0800 [-] Log opened.
2016-06-22 13:09:34+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7fd8a0408680>
2016-06-22 13:09:34+0800 [-] Started to connect
2016-06-22 13:09:34+0800 [Uninitialized] Connected.
2016-06-22 13:09:34+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124)
2016-06-22 13:09:37+0800 [EchoClient,client] 驗證經過
2016-06-22 13:09:54+0800 [EchoClient,client] [單聊][000001]:你好,這是10秒的時候發送

  客戶端再也收不到了,這也是真實狀況。一般來講,用戶切換網絡的時候,都會更改網絡信息,這時候移動客戶端再也收不到這個信息了,並且服務器端也不會報錯(之後要爲咱們作消息確認機制埋下伏筆。)

  既然收不到了,咱們就解決這個問題,上面說了,增長心跳包機制,客戶端每隔一段時間發送一次心跳包,服務器端收到心跳包之後,記錄最近一次接受到的時間。每隔一段時間,服務器總體輪詢一次,若是發現某一個客戶端很長時間沒有接受到心跳包,就斷定它爲斷線,這時候主動切斷這個客戶端。

  心跳包的command_id也要加上,直接爲5吧,內容爲空。只是心跳包,沒有必要寫內容了。

  新代碼以下:
  frontTCP.py

# coding:utf-8
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor, task
import struct
import json
from twisted.python import log
import sys
import time
log.startLogging(sys.stdout)


class Chat(Protocol):
    def __init__(self, users):
        self.users = users
        self.phone_number = None
        self.state = "VERIFY"
        self.version = 0
        self.last_heartbeat_time = 0
        self.command_func_dict = {
            1: self.handle_verify,
            2: self.handle_single_chat,
            3: self.handle_group_chat,
            4: self.handle_broadcast_chat,
            5: self.handle_heartbeat
        }
        self._data_buffer = bytes()

    def connectionMade(self):
        log.msg("New connection, the info is:", self.transport.getPeer())

    def connectionLost(self, reason):
        log.msg("[%s]:斷線" % self.phone_number.encode('utf-8'))
        if self.phone_number in self.users:
            del self.users[self.phone_number]

    def dataReceived(self, data):
        """
        接受到數據之後的操做
        """
        self._data_buffer += data

        while True:
            length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12])

            if length > len(self._data_buffer):
                break

            content = self._data_buffer[12:length]

            if command_id not in [1, 2, 3, 4, 5]:
                return

            if self.state == "VERIFY" and command_id == 1:
                self.handle_verify(content)
            else:
                self.handle_data(command_id, content)

            self._data_buffer = self._data_buffer[length:]

            if len(self._data_buffer) < 12:
                break

    def handle_heartbeat(self, content):
        """
        處理心跳包
        """
        self.last_heartbeat_time = int(time.time())

    def handle_verify(self, content):
        """
        驗證函數
        """
        content = json.loads(content)
        phone_number = content.get('phone_number')
        if phone_number in self.users:
            log.msg("電話號碼<%s>存在老的鏈接." % phone_number.encode('utf-8'))
            self.users[phone_number].connectionLost("")
        log.msg("歡迎, %s!" % (phone_number.encode('utf-8'),))
        self.phone_number = phone_number
        self.users[phone_number] = self
        self.state = "DATA"

        send_content = json.dumps({'code': 1})

        self.send_content(send_content, 101, [phone_number])

    def handle_data(self, command_id, content):
        """
        根據command_id來分配函數
        """
        self.command_func_dict[command_id](content)

    def handle_single_chat(self, content):
        """
        單播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        log.msg(chat_content.encode('utf-8'))
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        self.send_content(send_content, 102, [chat_to])

    def handle_group_chat(self, content):
        """
        組播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = chat_to
        self.send_content(send_content, 103, phone_numbers)

    def handle_broadcast_chat(self, content):
        """
        廣播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = self.users.keys()
        self.send_content(send_content, 104, phone_numbers)

    def send_content(self, send_content, command_id, phone_numbers):
        """
        發送函數
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        for phone_number in phone_numbers:
            if phone_number in self.users.keys():
                self.users[phone_number].transport.write(header_pack + send_content)
            else:
                log.msg("Phone_number:%s 不在線,不能聊天." % phone_number.encode('utf-8'))


class ChatFactory(Factory):
    def __init__(self):
        self.users = {}

    def buildProtocol(self, addr):
        return Chat(self.users)

    def check_users_online(self):
        for key, value in self.users.items():
            if value.last_heartbeat_time != 0 and int(time.time()) - value.last_heartbeat_time > 4:
                log.msg("[%s]沒有檢測到心跳包,主動切斷" % key.encode('utf-8'))
                value.transport.abortConnection()

cf = ChatFactory()

task1 = task.LoopingCall(cf.check_users_online)
task1.start(3, now=False)

reactor.listenTCP(8124, cf)
reactor.run()

  就像上面所說的,加了一個接受心跳包的檢測的函數,handle_heartbeat,每次來一個心跳包,就把它相應的last_heartbeat_time變換一下,這樣,總體輪詢檢測的時候,我只要判斷最後一次鏈接時間和當前鏈接時間之差,就能夠判斷它是否是異常斷線了。

  這裏看我異常斷線的處理,transport.abortConnection(),從字面意思上,直接丟棄這個鏈接,它會調用Protocol的connectionLost,並且它無論那個fd裏面有沒有數據,所有丟棄。這個咱們之後用netstat分析鏈接的時候,會進一步說明這個函數,如今只要記住,它會強行中斷這個鏈接,刪除任何緩存在裏面的數據便可。

 

  frontClient.py

# coding:utf-8
from twisted.internet import reactor, task
from twisted.internet.protocol import Protocol, ClientFactory
import struct
from twisted.python import log
import sys
import json
log.startLogging(sys.stdout)


class EchoClient(Protocol):
    def __init__(self):
        self.command_func_dict = {
            101: self.handle_verify_s,
            102: self.handle_single_chat_s,
            103: self.handle_group_chat_s,
            104: self.handle_broadcast_chat_s
        }
        self.version = 0
        self.state = "VERIFY"
        self.phone_number = ""

    def connectionMade(self):
        log.msg("New connection", self.transport.getPeer())

    def dataReceived(self, data):
        length, self.version, command_id = struct.unpack('!3I', data[:12])
        content = data[12:length]
        if self.state == "VERIFY" and command_id == 101:
            self.handle_verify_s(content)
        else:
            self.handle_data(command_id, content)

    def handle_data(self, command_id, pack_data):
        self.command_func_dict[command_id](pack_data)

    def connectionLost(self, reason):
        log.msg("connection lost")

    def handle_verify_s(self, pack_data):
        """
        接受驗證結果
        """
        content = json.loads(pack_data)
        code = content.get('code')
        if code == 1:
            log.msg('驗證經過')
        self.state = "Data"

    def handle_single_chat_s(self, pack_data):
        """
        接受單聊
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[單聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def handle_group_chat_s(self, pack_data):
        """
        接受組聊
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[組聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def handle_broadcast_chat_s(self, pack_data):
        """
        接受廣播
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[羣聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def send_verify(self, phone_number):
        """
        發送驗證
        """
        content = json.dumps(dict(phone_number=phone_number))
        self.send_data(content, 1)

    def send_single_chat(self, chat_from, chat_to, chat_content):
        """
        發送單聊內容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        self.send_data(content, 2)

    def send_group_chat(self, chat_from, chat_to, chat_content):
        """
        發送組聊內容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        self.send_data(content, 3)

    def send_broadcast_chat(self, chat_from, chat_content):
        """
        發送羣聊內容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))
        self.send_data(content, 4)

    def send_data(self, send_content, command_id):
        """
        發送函數
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        self.transport.write(header_pack + send_content)

    def send_heartbeat(self):
        """
        發送心跳包
        """
        length = 12
        version = self.version
        command_id = 5
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        self.transport.write(header_pack)


class EchoClientFactory(ClientFactory):
    def __init__(self):
        self.p = EchoClient()

    def startedConnecting(self, connector):
        log.msg("Started to connect")

    def buildProtocol(self, addr):
        log.msg("Connected.")
        return self.p

    def clientConnectionFailed(self, connector, reason):
        log.msg("Lost connection. Reason:", reason)

    def clientConnectionLost(self, connector, reason):
        log.msg("Connection failed. Reason:", reason)


if __name__ == '__main__':
    cf = EchoClientFactory()
    chat_from = sys.argv[1]
    all_phone_numbers = ['000001', '000002', '000003', '000004']
    all_phone_numbers.remove(chat_from)
    import random

    task_send_heartbeat = task.LoopingCall(cf.p.send_heartbeat)
    task_send_heartbeat.start(2, now=False)

    reactor.callLater(3, cf.p.send_verify, chat_from)
    reactor.callLater(10, cf.p.send_single_chat, chat_from, '000002', '你好,這是10秒的時候發送')
    reactor.callLater(20, cf.p.send_single_chat, chat_from, '000002', '你好,這是20秒的時候發送')

    reactor.connectTCP('192.168.5.60', 8124, cf)

    reactor.run()

  這邊就添加了一個心跳包發送程序,每隔2秒發送一個心跳包。

  我在000002的客戶端在10秒和20秒之間,拔掉了網線,看看調試效果,

  先看服務器端的調試結果。

/usr/bin/python2.7 /home/yudahai/PycharmProjects/blog01/tcpserver/frontTCP.py
2016-06-22 15:15:23+0800 [-] Log opened.
2016-06-22 15:15:23+0800 [-] ChatFactory starting on 8124
2016-06-22 15:15:23+0800 [-] Starting factory <__main__.ChatFactory instance at 0x7ff3c3615758>
2016-06-22 15:15:53+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '192.168.5.15', 39774)
2016-06-22 15:15:54+0800 [__main__.ChatFactory] New connection, the info is: IPv4Address(TCP, '192.168.5.60', 36084)
2016-06-22 15:15:56+0800 [Chat,0,192.168.5.15] 歡迎, 000002!
2016-06-22 15:15:57+0800 [Chat,1,192.168.5.60] 歡迎, 000001!
2016-06-22 15:16:04+0800 [Chat,1,192.168.5.60] 你好,這是10秒的時候發送
2016-06-22 15:16:11+0800 [-] [000002]沒有檢測到心跳包,主動切斷
2016-06-22 15:16:11+0800 [-] [000002]:斷線
2016-06-22 15:16:14+0800 [Chat,1,192.168.5.60] 你好,這是20秒的時候發送
2016-06-22 15:16:14+0800 [Chat,1,192.168.5.60] Phone_number:000002 不在線,不能聊天.

  看見沒有,已經能主動檢測到了。

  再看一下客戶端000002的調試結果

yudahai@yu-sony:~/PycharmProjects/flask001$ python frontClient.py 000002
2016-06-22 15:15:53+0800 [-] Log opened.
2016-06-22 15:15:53+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7f4e3e3d56c8>
2016-06-22 15:15:53+0800 [-] Started to connect
2016-06-22 15:15:53+0800 [Uninitialized] Connected.
2016-06-22 15:15:53+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124)
2016-06-22 15:15:56+0800 [EchoClient,client] 驗證經過
2016-06-22 15:16:04+0800 [EchoClient,client] [單聊][000001]:你好,這是10秒的時候發送
2016-06-22 15:24:27+0800 [EchoClient,client] connection lost
2016-06-22 15:24:27+0800 [EchoClient,client] Connection failed. Reason: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionLost'>: Connection to the other side was lost in a non-clean fashion.
    ]
2016-06-22 15:24:27+0800 [EchoClient,client] Stopping factory <__main__.EchoClientFactory instance at 0x7f4e3e3d56c8>

  比較有意思,15:16我中斷了鏈接,沒有接受到,這時候服務器主動切斷網絡,再鏈接上來的時候,它已經接受到消息,本身被中斷了,其實客戶端應該有個斷線重連機制,不過這是客戶端的事,主要看你的業務需求。

  

  到這,利用心跳包來檢測異常網絡狀況就完成了,若是你有更好的方案,歡迎你們跟我討論,畢竟我不是專門作tcpserver的,不少東西可能沒有研究到。

  下一章,咱們研究twisted鏈接redis,把一些很狀態轉移到redis中,這樣,其餘模塊就能共享這個狀態了,這在物聯網中,用到尤爲多,好比設備在線斷線狀態、報警狀態等,前端web能夠直接拿來使用了;之後咱們還會講rabbitmq在twisted中的應用。

相關文章
相關標籤/搜索