[Python 網絡編程] TCP編程/羣聊服務端 (二)

 

羣聊服務端python

 

需求分析:bootstrap

1. 羣聊服務端需支持啓動和中止(清理資源);服務器

2. 能夠接收客戶端的鏈接; 接收客戶端發來的數據框架

3. 能夠將每條信息分發到全部客戶端dom

 

1) 先搭架子:socket

#TCP Server
import threading,logging,time,random,datetime
DATEFMT="%H:%M:%S"
FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)

class ChatServer:
    def __init__(self):
        pass

    def start(self):
        pass

    def stop(self):
        pass

    def _accept(self):
        pass

    def _recv(self):
        #接收數據,TODO 分發
        pass

  

2)基礎功能:性能

#TCP Server
import threading,logging,time,random,datetime,socket
DATEFMT="%H:%M:%S"
FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)

class ChatServer:
    def __init__(self,ip='127.0.0.1',port=9999): # 初始化socket
        self.addr = (ip,port)
        self.sock = socket.socket()


    def start(self): # 綁定ip地址/端口,啓動監聽
        self.sock.bind(self.addr)
        self.sock.listen()
        # accept默認阻塞
        threading.Thread(target=self._accept,name='accept').start()

    def stop(self):
        pass

    def _accept(self):# 接收傳入的鏈接
        conn,client = self.sock.accept()
        # recv默認阻塞
        threading.Thread(target=self._recv, args=(conn,),name='recv').start()

    def _recv(self,conn): #接收數據,TODO 分發
        data = conn.recv(1024)


cs = ChatServer()
cs.start()

  

3)功能完善測試

3.1 循環接收全部鏈接,將接收數據原文分發給全部客戶端ui

#TCP Server
import threading,logging,time,random,datetime,socket
DATEFMT="%H:%M:%S"
FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)

class ChatServer:
    def __init__(self,ip='127.0.0.1',port=9999): # 初始化socket
        self.addr = (ip,port)
        self.sock = socket.socket()

        self.clients = {} #


    def start(self): # 綁定ip地址/端口,啓動監聽
        self.sock.bind(self.addr)
        self.sock.listen()
        # accept默認阻塞
        threading.Thread(target=self._accept,name='accept').start()

    def stop(self):
        pass

    def _accept(self):# 接收傳入的鏈接
        conn,client = self.sock.accept()
        self.clients[client] = conn # (ip,port)二元組
        # conn = <socket.socket fd=264, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999),raddr=('127.0.0.1', 11688)>
        # client = ('127.0.0.1', 11688)
        logging.info("{}-{}".format(conn,client))
        # recv 默認阻塞
        threading.Thread(target=self._recv, args=(conn,),name='recv').start()

    def _recv(self,conn): # 循環接收數據,TODO 分發
        while True:
            data = conn.recv(1024)
            logging.info(data.decode())
            msg = "ACK {}".format(data.decode())
            for c in self.clients.values():
                c.send(msg.encode())



cs = ChatServer()
cs.start()

e = threading.Event()
def showthreads():
    while not e.wait(5):
        logging.info(threading.enumerate())

showthreads()

#運行結果:
[15:55:39]	 [MainThread,7304] [<_MainThread(MainThread, started 7304)>, <Thread(accept, started 7776)>]

  

#檢查服務端狀態
C:\Users\zhangsan>netstat -an | find "9999"
  TCP    127.0.0.1:9999         0.0.0.0:0              LISTENING

  

客戶端鏈接:線程

發送消息:"hello"

#服務端運行狀態變化
[15:55:54]	 [MainThread,7304] [<_MainThread(MainThread, started 7304)>, <Thread(accept, started 7776)>]
[15:55:58]	 [accept,7776] <socket.socket fd=404, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 11863)>-('127.0.0.1', 11863)
[15:55:59]	 [MainThread,7304] [<_MainThread(MainThread, started 7304)>, <Thread(recv, started 6788)>] #accept線程不見了,先不關心
[15:56:04]	 [MainThread,7304] [<_MainThread(MainThread, started 7304)>, <Thread(recv, started 6788)>]
[15:56:07]	 [recv,6788] hello

  

客戶端斷開鏈接:

客戶端主動斷開鏈接,服務器拋出了ConnectionAbortedError異常:

#異常
[15:58:57]	 [recv,6788] 
Exception in thread recv:
Traceback (most recent call last):
  File "C:\Users\zhangpeng\AppData\Local\Programs\Python\Python35\lib\threading.py", line 914, in _bootstrap_inner
    self.run()
  File "C:\Users\zhangpeng\AppData\Local\Programs\Python\Python35\lib\threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "C:/python/test.py", line 35, in _recv
    data = conn.recv(1024)
ConnectionAbortedError: [WinError 10053] 你的主機中的軟件停止了一個已創建的鏈接。

  

3.2 修復accept線程不能循環接收鏈接問題

客戶端鏈接:

服務器代碼:

#TCP Server
import threading,logging,time,random,datetime,socket
DATEFMT="%H:%M:%S"
FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)

class ChatServer:
    def __init__(self,ip='127.0.0.1',port=9999): # 初始化socket
        self.addr = (ip,port)
        self.sock = socket.socket()

        self.clients = {} #


    def start(self): # 綁定ip地址/端口,啓動監聽
        self.sock.bind(self.addr)
        self.sock.listen()
        # accept默認阻塞
        threading.Thread(target=self._accept,name='accept').start()

    def stop(self):
        pass

    def _accept(self):# 接收傳入的鏈接
        while True: #修復accept循環接收數據
            conn,client = self.sock.accept()
            self.clients[client] = conn # (ip,port)二元組
            # conn = <socket.socket fd=264, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999),raddr=('127.0.0.1', 11688)>
            # client = ('127.0.0.1', 11688)
            logging.info("{}-{}".format(conn,client))
            # recv 默認阻塞
            threading.Thread(target=self._recv, args=(conn,),name='recv').start()

    def _recv(self,conn): # 循環接收數據,TODO 分發
        while True:
            data = conn.recv(1024)
            logging.info(data.decode())
            msg = "ACK {}".format(data.decode())
            for c in self.clients.values():
                c.send(msg.encode())


cs = ChatServer()
cs.start()

e = threading.Event()
def showthreads():
    while not e.wait(5):
        logging.info(threading.enumerate())

showthreads()



#運行結果
[16:03:56]	 [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(accept, started 660)>]
[16:04:01]	 [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(accept, started 660)>]
[16:04:04]	 [accept,660] <socket.socket fd=408, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 11988)>-('127.0.0.1', 11988)
[16:04:06]	 [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>] #成功啓動recv線程,接收數據
[16:04:11]	 [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>]
[16:04:12]	 [recv,8320] client1
[16:04:15]	 [accept,660] <socket.socket fd=248, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 11991)>-('127.0.0.1', 11991)
[16:04:16]	 [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>, <Thread(recv, started 7200)>] #又新增一個客戶端和recv線程。
[16:04:19]	 [recv,7200] client2
[16:04:21]	 [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>, <Thread(recv, started 7200)>]
[16:04:26]	 [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>, <Thread(recv, started 7200)>]

 

3.3 完善清理資源:

#TCP Server
import threading,logging,time,random,datetime,socket
DATEFMT="%H:%M:%S"
FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)

class ChatServer:
    def __init__(self,ip='127.0.0.1',port=9999):
        self.addr = (ip,port)
        self.sock = socket.socket()
        self.event = threading.Event()

        self.clients = {} #


    def start(self):
        self.sock.bind(self.addr)
        self.sock.listen()
        threading.Thread(target=self._accept,name='accept').start()

    def stop(self): # 完善清理工做
        for c in self.clients.values():
            c.close()
        self.sock.close()
        self.event.wait(1)
        self.event.set()

    def _accept(self):
        while not self.event.is_set():
            conn,client = self.sock.accept()
            self.clients[client] = conn
            logging.info("{}-{}".format(conn,client))
            
            threading.Thread(target=self._recv, args=(conn,),name='recv').start()

    def _recv(self,conn):
        while not self.event.is_set():
            data = conn.recv(1024)
            logging.info(data.decode())
            msg = "ACK {}".format(data.decode())
            for c in self.clients.values():
                c.send(msg.encode())


cs = ChatServer()
cs.start()

e = threading.Event()
def showthreads():
    while not e.wait(5):
        logging.info(threading.enumerate())

showthreads()


e.wait(30)
cs.stop()

  

3.4 添加Server端主動斷開和Client端通知斷開機制,修復處理客戶端主動斷開引起的異常

客戶端發送"quit"測試主動斷開功能:

服務端代碼:

#TCP Server
import threading,logging,time,random,datetime,socket
DATEFMT="%H:%M:%S"
FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s"
logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT)

class ChatServer:
    def __init__(self,ip='127.0.0.1',port=9999):
        self.addr = (ip,port)
        self.sock = socket.socket()
        self.event = threading.Event()

        self.clients = {} #


    def start(self):
        self.sock.bind(self.addr)
        self.sock.listen()
        threading.Thread(target=self._accept,name='accept').start()

    def stop(self):
        for c in self.clients.values():
            c.close()
        self.sock.close()
        self.event.wait(3)
        self.event.set()

    def _accept(self):
        while not self.event.is_set():
            conn,client = self.sock.accept()
            self.clients[client] = conn
            logging.info("{}-{}".format(conn,client))
            # recv 默認阻塞
            threading.Thread(target=self._recv, args=(conn,client),name='recv').start()

    def _recv(self,conn,client):
        while not self.event.is_set():
            try:
                data = conn.recv(1024)
            except Exception as e:
                logging.info(e)
                data = 'quit'.encode()

            logging.info(data.decode())
            # Client通知退出機制
            if data.decode() == 'quit' or data.decode == '':
                # logging.info(data.decode())
                conn.send('Disconnect!!!'.encode())

                self.clients.pop(client)
                conn.close()
                break

            msg = "ACK {}".format(data.decode())
            for c in self.clients.values():
                c.send(msg.encode())



cs = ChatServer()
cs.start()

e = threading.Event()
def showthreads():
    while not e.wait(5):
        logging.info(threading.enumerate())

threading.Thread(target=showthreads,daemon=True,name='showthreads').start()

while True: # Sever控制檯退出方式
    cmd = input('>>> ').strip()
    if cmd == 'quit':
        cs.stop()
        break

#運行結果:
>>> [17:32:33]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(accept, started 4388)>]
[17:32:38]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(accept, started 4388)>]
[17:32:43]	 [accept,4388] <socket.socket fd=360, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 13415)>-('127.0.0.1', 13415)
[17:32:43]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>]
[17:32:47]	 [accept,4388] <socket.socket fd=384, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 13417)>-('127.0.0.1', 13417)
[17:32:48]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>, <Thread(recv, started 8248)>]
[17:32:51]	 [recv,5556] test1
[17:32:53]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>, <Thread(recv, started 8248)>]
[17:32:55]	 [recv,8248] test2
[17:32:58]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>, <Thread(recv, started 8248)>]
[17:33:00]	 [recv,8248] quit
[17:33:03]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>]
[17:33:07]	 [recv,5556] quit
[17:33:08]	 [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(accept, started 4388)>]

  

其它方法:

socket.recv(bufsize[,flags])  獲取數據。默認是阻塞的方式

socket.recvfrom(bufsize[,flags])  獲取數據,返回一個二元組(bytes,address)(可用於udp)

socket.recv_into(buffer[,nbytes[,flags]])  獲取到nbytes的數據後,存儲到buffer中。若是nbytes沒有指定或0,將buffer大小的數據存入buffer中。返回接收的字節數。

socket.recvfrom_into(buffer[,nbytes[,flags]])  獲取數據,返回一個二元組(bytes,address)到buffer中

socket.send(bytes[,flags])  TCP發送數據

socket.sendall(bytes[,flags])  TCP發送所有數據,成功返回None

socket.sendto(string[,flag],address)  UDP發送數據

socket.sendfile(file,offset=0,count=None)  發送一個文件直到EOF,使用高性能的os.sendfile機制,返回發送的字節數。若是win下不支持sendfile,或者不是普通文件,使用send()發送文件。offset告訴起始位置。3.5版本開始

 

socket.makefile(mode='r', buffering=None, *, encoding=None, errors=None, newline=None) 

建立一個與該套接字相關聯的文件對象。

 

socket.getpeername()  返回鏈接套接字的遠程地址。返回值一般是元祖(ipaddr,port)

socket.getsockname()  返回套接字本身的地址。一般是一個元祖(ipaddr,port)

socket.setblocking(flag)  若是flag爲0,則將套接字設爲非阻塞模式,不然將套接字設爲阻塞模式(默認值)。非阻塞模式下,若是調用recv()沒有發現任何數據,或send()調用沒法當即發送數據,那麼將引發socket.error異常。

socket.settimeout(value)  設置套接字操做的超時期,timeout是一個浮點數,單位是秒。值爲None表示沒有超時期。通常,超時期應該在剛建立套接字時設置,由於它們可能用於鏈接的操做(如connect())

socket.setsockopt(level,optname,value)  設置套接字選項的值。好比緩衝區大小。全部SO_* 開頭的常量,不一樣系統、不一樣版本都不盡相同

 

 

4) 總結

從需求分析,到創建框架,完善基本功能,測試/修改,最終雖然完成了一個看似基本功能已經實現的羣聊服務端,

但以上的全部例子只是測試,練習底層的socket使用,生產環境中通常都是使用封裝過的socket,且程序還有不少異常沒有處理。

 

conn.close() 服務端主動和客戶端斷開

sock.close() 服務端主動關閉服務端socket

 

recv,send,close,均可能在操做過程當中出現異常,客戶端主動斷開服務端也會拋ConnectionAbortedError異常,若是服務端不處理這個異常,客戶端下次鏈接,服務端就不能正常recv數據。

相關文章
相關標籤/搜索