羣聊服務端python
需求分析:bootstrap
1. 羣聊服務端需支持啓動和中止(清理資源);服務器
2. 能夠接收客戶端的鏈接; 接收客戶端發來的數據框架
3. 能夠將每條信息分發到全部客戶端dom
1) 先搭架子:socket
#TCP Server import threading,logging,time,random,datetime DATEFMT="%H:%M:%S" FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT) class ChatServer: def __init__(self): pass def start(self): pass def stop(self): pass def _accept(self): pass def _recv(self): #接收數據,TODO 分發 pass
2)基礎功能:性能
#TCP Server import threading,logging,time,random,datetime,socket DATEFMT="%H:%M:%S" FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT) class ChatServer: def __init__(self,ip='127.0.0.1',port=9999): # 初始化socket self.addr = (ip,port) self.sock = socket.socket() def start(self): # 綁定ip地址/端口,啓動監聽 self.sock.bind(self.addr) self.sock.listen() # accept默認阻塞 threading.Thread(target=self._accept,name='accept').start() def stop(self): pass def _accept(self):# 接收傳入的鏈接 conn,client = self.sock.accept() # recv默認阻塞 threading.Thread(target=self._recv, args=(conn,),name='recv').start() def _recv(self,conn): #接收數據,TODO 分發 data = conn.recv(1024) cs = ChatServer() cs.start()
3)功能完善測試
3.1 循環接收全部鏈接,將接收數據原文分發給全部客戶端ui
#TCP Server import threading,logging,time,random,datetime,socket DATEFMT="%H:%M:%S" FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT) class ChatServer: def __init__(self,ip='127.0.0.1',port=9999): # 初始化socket self.addr = (ip,port) self.sock = socket.socket() self.clients = {} # def start(self): # 綁定ip地址/端口,啓動監聽 self.sock.bind(self.addr) self.sock.listen() # accept默認阻塞 threading.Thread(target=self._accept,name='accept').start() def stop(self): pass def _accept(self):# 接收傳入的鏈接 conn,client = self.sock.accept() self.clients[client] = conn # (ip,port)二元組 # conn = <socket.socket fd=264, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999),raddr=('127.0.0.1', 11688)> # client = ('127.0.0.1', 11688) logging.info("{}-{}".format(conn,client)) # recv 默認阻塞 threading.Thread(target=self._recv, args=(conn,),name='recv').start() def _recv(self,conn): # 循環接收數據,TODO 分發 while True: data = conn.recv(1024) logging.info(data.decode()) msg = "ACK {}".format(data.decode()) for c in self.clients.values(): c.send(msg.encode()) cs = ChatServer() cs.start() e = threading.Event() def showthreads(): while not e.wait(5): logging.info(threading.enumerate()) showthreads() #運行結果: [15:55:39] [MainThread,7304] [<_MainThread(MainThread, started 7304)>, <Thread(accept, started 7776)>]
#檢查服務端狀態 C:\Users\zhangsan>netstat -an | find "9999" TCP 127.0.0.1:9999 0.0.0.0:0 LISTENING
客戶端鏈接:線程
發送消息:"hello"
#服務端運行狀態變化 [15:55:54] [MainThread,7304] [<_MainThread(MainThread, started 7304)>, <Thread(accept, started 7776)>] [15:55:58] [accept,7776] <socket.socket fd=404, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 11863)>-('127.0.0.1', 11863) [15:55:59] [MainThread,7304] [<_MainThread(MainThread, started 7304)>, <Thread(recv, started 6788)>] #accept線程不見了,先不關心 [15:56:04] [MainThread,7304] [<_MainThread(MainThread, started 7304)>, <Thread(recv, started 6788)>] [15:56:07] [recv,6788] hello
客戶端斷開鏈接:
客戶端主動斷開鏈接,服務器拋出了ConnectionAbortedError異常:
#異常 [15:58:57] [recv,6788] Exception in thread recv: Traceback (most recent call last): File "C:\Users\zhangpeng\AppData\Local\Programs\Python\Python35\lib\threading.py", line 914, in _bootstrap_inner self.run() File "C:\Users\zhangpeng\AppData\Local\Programs\Python\Python35\lib\threading.py", line 862, in run self._target(*self._args, **self._kwargs) File "C:/python/test.py", line 35, in _recv data = conn.recv(1024) ConnectionAbortedError: [WinError 10053] 你的主機中的軟件停止了一個已創建的鏈接。
3.2 修復accept線程不能循環接收鏈接問題
客戶端鏈接:
服務器代碼:
#TCP Server import threading,logging,time,random,datetime,socket DATEFMT="%H:%M:%S" FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT) class ChatServer: def __init__(self,ip='127.0.0.1',port=9999): # 初始化socket self.addr = (ip,port) self.sock = socket.socket() self.clients = {} # def start(self): # 綁定ip地址/端口,啓動監聽 self.sock.bind(self.addr) self.sock.listen() # accept默認阻塞 threading.Thread(target=self._accept,name='accept').start() def stop(self): pass def _accept(self):# 接收傳入的鏈接 while True: #修復accept循環接收數據 conn,client = self.sock.accept() self.clients[client] = conn # (ip,port)二元組 # conn = <socket.socket fd=264, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999),raddr=('127.0.0.1', 11688)> # client = ('127.0.0.1', 11688) logging.info("{}-{}".format(conn,client)) # recv 默認阻塞 threading.Thread(target=self._recv, args=(conn,),name='recv').start() def _recv(self,conn): # 循環接收數據,TODO 分發 while True: data = conn.recv(1024) logging.info(data.decode()) msg = "ACK {}".format(data.decode()) for c in self.clients.values(): c.send(msg.encode()) cs = ChatServer() cs.start() e = threading.Event() def showthreads(): while not e.wait(5): logging.info(threading.enumerate()) showthreads() #運行結果 [16:03:56] [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(accept, started 660)>] [16:04:01] [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(accept, started 660)>] [16:04:04] [accept,660] <socket.socket fd=408, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 11988)>-('127.0.0.1', 11988) [16:04:06] [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>] #成功啓動recv線程,接收數據 [16:04:11] [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>] [16:04:12] [recv,8320] client1 [16:04:15] [accept,660] <socket.socket fd=248, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 11991)>-('127.0.0.1', 11991) [16:04:16] [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>, <Thread(recv, started 7200)>] #又新增一個客戶端和recv線程。 [16:04:19] [recv,7200] client2 [16:04:21] [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>, <Thread(recv, started 7200)>] [16:04:26] [MainThread,944] [<_MainThread(MainThread, started 944)>, <Thread(recv, started 8320)>, <Thread(accept, started 660)>, <Thread(recv, started 7200)>]
3.3 完善清理資源:
#TCP Server import threading,logging,time,random,datetime,socket DATEFMT="%H:%M:%S" FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT) class ChatServer: def __init__(self,ip='127.0.0.1',port=9999): self.addr = (ip,port) self.sock = socket.socket() self.event = threading.Event() self.clients = {} # def start(self): self.sock.bind(self.addr) self.sock.listen() threading.Thread(target=self._accept,name='accept').start() def stop(self): # 完善清理工做 for c in self.clients.values(): c.close() self.sock.close() self.event.wait(1) self.event.set() def _accept(self): while not self.event.is_set(): conn,client = self.sock.accept() self.clients[client] = conn logging.info("{}-{}".format(conn,client)) threading.Thread(target=self._recv, args=(conn,),name='recv').start() def _recv(self,conn): while not self.event.is_set(): data = conn.recv(1024) logging.info(data.decode()) msg = "ACK {}".format(data.decode()) for c in self.clients.values(): c.send(msg.encode()) cs = ChatServer() cs.start() e = threading.Event() def showthreads(): while not e.wait(5): logging.info(threading.enumerate()) showthreads() e.wait(30) cs.stop()
3.4 添加Server端主動斷開和Client端通知斷開機制,修復處理客戶端主動斷開引起的異常
客戶端發送"quit"測試主動斷開功能:
服務端代碼:
#TCP Server import threading,logging,time,random,datetime,socket DATEFMT="%H:%M:%S" FORMAT = "[%(asctime)s]\t [%(threadName)s,%(thread)d] %(message)s" logging.basicConfig(level=logging.INFO,format=FORMAT,datefmt=DATEFMT) class ChatServer: def __init__(self,ip='127.0.0.1',port=9999): self.addr = (ip,port) self.sock = socket.socket() self.event = threading.Event() self.clients = {} # def start(self): self.sock.bind(self.addr) self.sock.listen() threading.Thread(target=self._accept,name='accept').start() def stop(self): for c in self.clients.values(): c.close() self.sock.close() self.event.wait(3) self.event.set() def _accept(self): while not self.event.is_set(): conn,client = self.sock.accept() self.clients[client] = conn logging.info("{}-{}".format(conn,client)) # recv 默認阻塞 threading.Thread(target=self._recv, args=(conn,client),name='recv').start() def _recv(self,conn,client): while not self.event.is_set(): try: data = conn.recv(1024) except Exception as e: logging.info(e) data = 'quit'.encode() logging.info(data.decode()) # Client通知退出機制 if data.decode() == 'quit' or data.decode == '': # logging.info(data.decode()) conn.send('Disconnect!!!'.encode()) self.clients.pop(client) conn.close() break msg = "ACK {}".format(data.decode()) for c in self.clients.values(): c.send(msg.encode()) cs = ChatServer() cs.start() e = threading.Event() def showthreads(): while not e.wait(5): logging.info(threading.enumerate()) threading.Thread(target=showthreads,daemon=True,name='showthreads').start() while True: # Sever控制檯退出方式 cmd = input('>>> ').strip() if cmd == 'quit': cs.stop() break #運行結果: >>> [17:32:33] [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(accept, started 4388)>] [17:32:38] [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(accept, started 4388)>] [17:32:43] [accept,4388] <socket.socket fd=360, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 13415)>-('127.0.0.1', 13415) [17:32:43] [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>] [17:32:47] [accept,4388] <socket.socket fd=384, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9999), raddr=('127.0.0.1', 13417)>-('127.0.0.1', 13417) [17:32:48] [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>, <Thread(recv, started 8248)>] [17:32:51] [recv,5556] test1 [17:32:53] [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>, <Thread(recv, started 8248)>] [17:32:55] [recv,8248] test2 [17:32:58] [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>, <Thread(recv, started 8248)>] [17:33:00] [recv,8248] quit [17:33:03] [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(recv, started 5556)>, <Thread(accept, started 4388)>] [17:33:07] [recv,5556] quit [17:33:08] [showthreads,8732] [<_MainThread(MainThread, started 3464)>, <Thread(showthreads, started daemon 8732)>, <Thread(accept, started 4388)>]
其它方法:
socket.recv(bufsize[,flags]) 獲取數據。默認是阻塞的方式
socket.recvfrom(bufsize[,flags]) 獲取數據,返回一個二元組(bytes,address)(可用於udp)
socket.recv_into(buffer[,nbytes[,flags]]) 獲取到nbytes的數據後,存儲到buffer中。若是nbytes沒有指定或0,將buffer大小的數據存入buffer中。返回接收的字節數。
socket.recvfrom_into(buffer[,nbytes[,flags]]) 獲取數據,返回一個二元組(bytes,address)到buffer中
socket.send(bytes[,flags]) TCP發送數據
socket.sendall(bytes[,flags]) TCP發送所有數據,成功返回None
socket.sendto(string[,flag],address) UDP發送數據
socket.sendfile(file,offset=0,count=None) 發送一個文件直到EOF,使用高性能的os.sendfile機制,返回發送的字節數。若是win下不支持sendfile,或者不是普通文件,使用send()發送文件。offset告訴起始位置。3.5版本開始
socket.makefile(mode='r', buffering=None, *, encoding=None, errors=None, newline=None)
建立一個與該套接字相關聯的文件對象。
socket.getpeername() 返回鏈接套接字的遠程地址。返回值一般是元祖(ipaddr,port)
socket.getsockname() 返回套接字本身的地址。一般是一個元祖(ipaddr,port)
socket.setblocking(flag) 若是flag爲0,則將套接字設爲非阻塞模式,不然將套接字設爲阻塞模式(默認值)。非阻塞模式下,若是調用recv()沒有發現任何數據,或send()調用沒法當即發送數據,那麼將引發socket.error異常。
socket.settimeout(value) 設置套接字操做的超時期,timeout是一個浮點數,單位是秒。值爲None表示沒有超時期。通常,超時期應該在剛建立套接字時設置,由於它們可能用於鏈接的操做(如connect())
socket.setsockopt(level,optname,value) 設置套接字選項的值。好比緩衝區大小。全部SO_* 開頭的常量,不一樣系統、不一樣版本都不盡相同
4) 總結
從需求分析,到創建框架,完善基本功能,測試/修改,最終雖然完成了一個看似基本功能已經實現的羣聊服務端,
但以上的全部例子只是測試,練習底層的socket使用,生產環境中通常都是使用封裝過的socket,且程序還有不少異常沒有處理。
conn.close() 服務端主動和客戶端斷開
sock.close() 服務端主動關閉服務端socket
recv,send,close,均可能在操做過程當中出現異常,客戶端主動斷開服務端也會拋ConnectionAbortedError異常,若是服務端不處理這個異常,客戶端下次鏈接,服務端就不能正常recv數據。