在前面的學習中咱們其實已經能夠經過socket
模塊來創建咱們的服務端,而且還介紹了關於TCP協議的粘包問題。可是還有一個很是大的問題就是咱們所編寫的Server端是不支持併發性服務的,在咱們以前的代碼中只能加入一個通訊循環來進行排隊式的單窗口一對一服務。那麼這一篇文章將主要介紹如何使用socketserver
模塊來創建具備併發性的Server端。python
咱們先看它的一段代碼,對照代碼來看功能。windows
#!/usr/bin/env python3 # _*_ coding:utf-8 _*_ # ==== 使用socketserver建立支持多併發性的服務器 TCP協議 ==== import socketserver class MyServer(socketserver.BaseRequestHandler): """自定義類""" def handle(self): """handle處理請求""" print("雙向連接通道創建完成:", self.request) # 對於TCP協議來講,self.request至關於雙向連接通道conn,即accept()的第一部分 print("客戶端的信息是:", self.client_address) # 對於TCP協議來講,至關於accept()的第二部分,即客戶端的ip+port while 1: # 開始內層通訊循環 try: # # bug修復:針對windows環境 data = self.request.recv(1024) if not data: break # bug修復:針對類UNIX環境 print("收到客戶機[{0}]的消息:[{1}]".format(self.client_address, data)) self.request.sendall(data.upper()) # #sendall是重複調用send. except Exception as e: break self.request.close() # 當出現異常狀況下必定要關閉連接 if __name__ == '__main__': s1 = socketserver.ThreadingTCPServer(("0.0.0.0", 6666), MyServer) # 公網服務器綁定 0.0.0.0 私網測試爲 127.0.0.1 s1.serve_forever() # 啓動服務
1.導入
socketserver
模塊服務器2.建立一個新的類,並繼承
socketserver.BaseRequestHandler
類網絡3.覆寫
handle
方法,對於TCP協議來講,self.request
至關於雙向連接通道conn
,self.client_address
至關於被服務方的ip和port信息,也就是addr
,而整個handle
方法至關於連接循環。多線程4.寫入收發邏輯規則併發
5.防止客戶端發送空的消息已致雙方卡死app
6.防止客戶端忽然斷開已致服務端崩潰dom
7.粘包優化(可選)socket
8.實例化
socketserver.ThreadingTCPServer
類,並傳入IP+port,以及剛寫好的類名tcp9.使用
socketserver.ThreadingTCPServer
實例化對象中的server_forever( )
方法啓動服務它實際上是這樣的:
咱們不用管連接循環,由於在執行
handle
方法以前內部已經幫咱們作好了。當咱們使用serve_forever()
方法的時候便開始監聽連接描述符對象,一旦有連接請求就建立一個子線程來處理該連接。
基於UDP協議的socketserver
服務端與基於TCP協議的socketserver
服務端截然不同,可是仍是有幾點不太同樣的地方。
對TCP來講:
self.request = 雙向連接通道(conn)
對UDP來講:
self.request = (client_data_byte,udp的套接字對象)
#!/usr/bin/env python3 # _*_ coding:utf-8 _*_ # ==== 使用socketserver建立支持多併發性的服務器 UDP協議 ==== import socketserver class MyServer(socketserver.BaseRequestHandler): """自定義類""" def handle(self): """handle處理請求""" # 因爲UDP是基於消息的協議,故根本不用通訊循環 data = self.request[0] # 對於UDP協議來講,self.request實際上是個元組。第一個元素是消息內容主題(Bytes類型),至關於recvfrom()的第一部分 server = self.request[1] # 第二個元素是服務端自己,即本身 print("客戶端的信息是:", self.client_address) # 對於UDP協議來講,至關於recvfrom()的第二部分,即客戶端的ip+port print("收到客戶機[{0}]的消息:[{1}]".format(self.client_address, data)) server.sendto(data.upper(),self.client_address) if __name__ == '__main__': s1 = socketserver.ThreadingUDPServer(("0.0.0.0", 6666), MyServer) # 公網服務器綁定 0.0.0.0 私網測試爲 127.0.0.1 s1.serve_forever() # 啓動服務
好了,接下來咱們開始剖析socketserver
模塊中的源碼部分。在Pycharm下使用CTRL+鼠標左鍵
,能夠進入源碼進行查看。
咱們在查看源碼前必定要首先要明白兩點:
socketserver
類分爲兩部分,其一是server
類主要是負責處理連接方面,另外一類是request
類主要負責處理通訊方面。
好了,請在腦子裏記住這個概念。咱們來看一些socketserver
模塊的實現用了哪些其餘的基礎模塊。
注意,接下來的源碼註釋部分我並無在源代碼中修改,也請讀者不要修改源代碼的任何內容。
import socket # 這模塊挺熟悉吧 import selectors # 這個是一個多線程模塊,主要支持I/O多路複用。 import os # 老朋友了 import sys # 老朋友 import threading # 多線程模塊 from io import BufferedIOBase # 讀寫相關的模塊 from time import monotonic as time # 老朋友time模塊
好了,讓咱們接着往下走。能夠看到一個變量__all__
,是否是以爲很熟悉?就是咱們使用 from xxx import xxx
能導入進的東西全是被__all__
控制的,咱們看一下它包含了哪些內容。
__all__ = ["BaseServer", "TCPServer", "UDPServer", "ThreadingUDPServer", "ThreadingTCPServer", "BaseRequestHandler", "StreamRequestHandler", "DatagramRequestHandler", "ThreadingMixIn"] # 這個是咱們本來的 __all__ 中的值。 if hasattr(os, "fork"): __all__.extend(["ForkingUDPServer","ForkingTCPServer", "ForkingMixIn"]) if hasattr(socket, "AF_UNIX"): __all__.extend(["UnixStreamServer","UnixDatagramServer", "ThreadingUnixStreamServer", "ThreadingUnixDatagramServer"]) # 上面兩個if判斷是給__all__添加內容的,os.fork()這個方法是建立一個新的進程,而且只在類UNIX平臺下才有效,Windows平臺下是無效的,因此這裏對於Windows平臺來講就from socketserver import xxx 確定少了三個類,這三個類的做用咱們接下來會聊到。而關於socket中的AF_UNIX來講咱們其實已經學習過了,是基於文件的socket家族。這在Windows上也是不支持的,只有在類UNIX平臺下才有效。因此Windows平臺下的導入又少了4個類。 # poll/select have the advantage of not requiring any extra file descriptor, # contrarily to epoll/kqueue (also, they require a single syscall). if hasattr(selectors, 'PollSelector'): _ServerSelector = selectors.PollSelector else: _ServerSelector = selectors.SelectSelector # 這兩個if仍是作I/O多路複用使用的,Windows平臺下的結果是False,而類Unix平臺下的該if結果爲True,這關乎I/O多路複用的性能選擇。究竟是select仍是poll或者epoll。
咱們接着向下看源碼,會看到許許多多的類。先關掉它來假設本身是解釋器一行一行往下走會去執行那個部分。首先是一條if
判斷
if hasattr(os, "fork"): class ForkingMixIn: pass # 這裏我本身省略了 # 咱們能夠看見這條代碼是接下來執行的,它意思仍是若是在類Unix環境下,則會去建立該類。若是在Windows平臺下則不會建立該類
繼續走,其實這種if
判斷再建立類的地方還有兩處。我這裏所有列出來:
if hasattr(os, "fork"): class ForkingUDPServer(ForkingMixIn, UDPServer): pass class ForkingTCPServer(ForkingMixIn, TCPServer): pass if hasattr(socket, 'AF_UNIX'): class UnixStreamServer(TCPServer): address_family = socket.AF_UNIX class UnixDatagramServer(UDPServer): address_family = socket.AF_UNIX class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): passclass ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
好了,說完了大致粗略的一個流程,咱們該來研究這裏面的類都有什麼做用,這裏能夠查看每一個類的文檔信息。大體以下:
前面已經說過,socketserver
模塊中主要分爲兩大類,咱們就依照這個來進行劃分。
socketserver模塊源碼內部class功能一覽 | |
---|---|
處理連接相關 | |
BaseServer | 基礎連接類 |
TCPServer | TCP協議類 |
UDPServer | UDP協議類 |
UnixStreamServer | 文件形式字節流類 |
UnixDatagramServer | 文件形式數據報類 |
處理通訊相關 | |
BaseRequestHandler | 基礎請求處理類 |
StreamRequestHandler | 字節流請求處理類 |
DatagramRequestHandler | 數據報請求處理類 |
多線程相關 | |
ThreadingMixIn | 線程方式 |
ThreadingUDPServer | 多線程UDP協議服務類 |
ThreadingTCPServer | 多線程TCP協議服務類 |
多進程相關 | |
ForkingMixIn | 進程方式 |
ForkingUDPServer | 多進程UDP協議服務類 |
ForkingTCPServer | 多進程TCP協議服務類 |
他們的繼承關係以下:
ForkingUDPServer(ForkingMixIn, UDPServer)
ForkingTCPServer(ForkingMixIn, TCPServer)
ThreadingUDPServer(ThreadingMixIn, UDPServer)
ThreadingTCPServer(ThreadingMixIn, TCPServer)
StreamRequestHandler(BaseRequestHandler)
DatagramRequestHandler(BaseRequestHandler)
處理連接相關
處理通訊相關
多線程相關
總繼承關係(處理通訊相關的不在其中,而且不包含多進程)
最後補上一個多進程的繼承關係,就不放在總繼承關係中了,容易圖形形成混亂。
多進程相關
有了繼承關係咱們能夠來模擬實例化的過程,咱們以TCP協議爲準:
socketserver.ThreadingTCPServer(("0.0.0.0", 6666), MyServer)
咱們點進(選中上面代碼的ThradingTCPServe
r部分,CTRL+鼠標左鍵
)源碼部分,查找其 __init__
方法:
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
看來沒有,那麼就找第一父類有沒有,咱們點進去能夠看到第一父類ThreadingMixIn
也沒有__init__
方法,看上面的繼承關係圖能夠看出是普通多繼承,那麼就是廣度優先的查找順序。咱們來看第二父類TCPServer
中有沒有,看來第二父類中是有__init__
方法的,咱們詳細來看。
class TCPServer(BaseServer): """註釋全被我刪了,影響視線""" address_family = socket.AF_INET # 基於網絡的套接字家族 socket_type = socket.SOCK_STREAM # TCP(字節流)協議 request_queue_size = 5 # 消息隊列最大爲5,能夠理解爲backlog,即半連接池的大小 allow_reuse_address = False # 端口重用默認關閉 def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) self.socket = socket.socket(self.address_family, self.socket_type) # 能夠看見,上面先是調用了父類的__init__方法,而後又實例化出了一個socket對象!因此咱們先不着急往下看,先看其父類中的__init__方法。 if bind_and_activate: try: self.server_bind() self.server_activate() except: self.server_close() raise
來看一下,BaseServer
類中的__init__
方法。
class BaseServer: """註釋依舊全被我刪了""" timeout = None # 這個變量能夠理解爲超時時間,先不着急說他。先看 __init__ 方法 def __init__(self, server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" self.server_address = server_address # 即咱們傳入的 ip+port ("0.0.0.0", 6666) self.RequestHandlerClass = RequestHandlerClass # 即咱們傳入的自定義類 MyServer self.__is_shut_down = threading.Event() # 這裏能夠看到執行了該方法,這裏先不詳解,由於它是一個事件鎖,因此不用管 self.__shutdown_request = False
在BaseServer
中執行了thrading
模塊下的Event()
方法。我這裏仍是提一嘴這個方法是幹嗎用的,它會去控制線程的啓動順序,這裏實例化出的self.__is_shut_down
其實就是一把鎖,沒什麼深究的,接下來的文章中我也會寫到。咱們繼續往下看,如今是該回到TCPServer
的__init__
方法中來了。
class TCPServer(BaseServer): """註釋全被我刪了,影響視線""" address_family = socket.AF_INET # 基於網絡的套接字家族 socket_type = socket.SOCK_STREAM # TCP(字節流)協議 request_queue_size = 5 # 消息隊列最大爲5,能夠理解爲backlog,即半連接池的大小 allow_reuse_address = False # 端口重用默認關閉 def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): # 看這裏!!!! """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) self.socket = socket.socket(self.address_family, self.socket_type) if bind_and_activate: # 在建立完socket對象後就會進行該判斷。默認參數bind_and_activate就是爲True try: self.server_bind() # 如今進入該方法查看細節 self.server_activate() except: self.server_close() raise
好了,須要找這個self.bind()
方法,仍是從頭開始找。實例自己沒有,第一父類ThreadingMixIn
也沒有,因此如今咱們看的是TCPServer
的server_bind()
方法:
def server_bind(self): """Called by constructor to bind the socket. May be overridden. """ if self.allow_reuse_address: # 這裏的變量對應 TCPServer.__init__ 上面定義的類方法,端口重用這個。因爲是False,因此咱們直接往下執行。 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) # 綁定 ip+port 即 ("0.0.0.0", 6666) self.server_address = self.socket.getsockname() # 獲取socket的名字 其實仍是 ("0.0.0.0", 6666)
如今咱們該看TCPServer
下的server_activate()
方法了。
def server_activate(self): """Called by constructor to activate the server. May be overridden. """ self.socket.listen(self.request_queue_size) # 其實就是監聽半連接池,backlog爲5
這個時候沒有任何異常會拋出的,因此咱們已經跑完了整個實例化的流程。並將其賦值給s1
如今咱們看一下s1
的__dict__
字典,再接着進行源碼分析。
{'server_address': ('0.0.0.0', 6666), 'RequestHandlerClass': <class '__main__.MyServer'>, '_BaseServer__is_shut_down': <threading.Event object at 0x000002A96A0208E0>, '_BaseServer__shutdown_request': False, 'socket': <socket.socket fd=716, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 6666)>}
咱們接着來看下一條代碼。
s1.serve_forever()
仍是老規矩,因爲s1
是ThreadingTCPServer
類的實例對象,因此咱們去一層層的找serve_forever()
,最後在BaseServer
類中找到了。
def serve_forever(self, poll_interval=0.5): """註釋被我刪了""" self.__is_shut_down.clear() # 上面說過了那個Event鎖,控制子線程的啓動順序。這裏的clear()表明清除,這個不是重點,往下看。 try: # XXX: Consider using another file descriptor or connecting to the # socket to wake this up instead of polling. Polling reduces our # responsiveness to a shutdown request and wastes cpu at all other # times. with _ServerSelector() as selector: selector.register(self, selectors.EVENT_READ)# 這裏是設置了一個監聽類型爲讀取事件。也就是說當有請求來的時候當前socket對象就會發生反應。 while not self.__shutdown_request: # 爲False,會執行,注意!下面都是死循環了!!! ready = selector.select(poll_interval) # 設置最大監聽時間爲0.5s # bpo-35017: shutdown() called during select(), exit immediately. if self.__shutdown_request: # BaseServer類中的類方法,爲False,因此不執行這個。 break if ready: # 表明有連接請求會執行下面的方法 self._handle_request_noblock() # 這兒是比較重要的一個點。咱們先來看。 self.service_actions() finally: self.__shutdown_request = False self.__is_shut_down.set() # 這裏是一個釋放鎖的行爲
若是有連接請求,則會執行self._handle_request_noblock()
方法,它在哪裏呢?恰好這個方法就在BaseServer
中serve_forever()
方法的正下方第4個方法的位置。
def _handle_request_noblock(self): """註釋被我刪了""" try: request, client_address = self.get_request() # 這裏的這個方法在TCPServer中,它的return值是 self.socket.accept(),就是就是返回了元組而後被解壓賦值了。其實到這一步三次握手監聽已經開啓了。 except OSError: return if self.verify_request(request, client_address): # 這個是驗證ip和port,返回的始終是True try: self.process_request(request, client_address) # request 雙向連接通道,client_address客戶端ip+port。如今咱們來找這個方法。 except Exception: self.handle_error(request, client_address) self.shutdown_request(request) except: self.shutdown_request(request) raise else: self.shutdown_request(request)
如今開始查找self.process_request(request, client_address)
該方法,仍是先從實例對象自己找,找不到去第一父類找。他位於第一父類ThreadingMixIn
中。
def process_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) # 建立子線程!!看這裏! t.daemon = self.daemon_threads # ThreadingMixIn的類屬性,爲False if not t.daemon and self.block_on_close: # 第一個值爲False,第二個值爲True。他們都是ThreadingMixIn的類屬性 if self._threads is None: # 會執行 self._threads = [] # 建立了空列表 self._threads.append(t) # 將當前的子線程添加至空列表中 t.start() # 開始當前子線程的運行,即運行self.process_request_thread方法
咱們能夠看到,這裏的target
參數中指定了一個方法self.process_request_thread
,其實意思就是說當這個線程t
在start
的時候會去執行該方法。咱們看一下它都作了什麼,這個方法仍是在ThreadingMixIn
類中。
def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) # 能夠看到又執行該方法了,這裏我再標註一下,別弄頭暈了。request 雙向連接通道,client_address客戶端ip+port。 except Exception: self.handle_error(request, client_address) finally: self.shutdown_request(request) # 它不會關閉這個線程,而是將其設置爲wait()狀態。
看self.finish_request()
方法,它在BaseServer
類中
def finish_request(self, request, client_address): """Finish one request by instantiating RequestHandlerClass.""" self.RequestHandlerClass(request, client_address, self) # 這裏是幹嗎?其實就是在進行實例化!
self.RequestHandlerClass(request, client_address, self)
,咱們找到self
的__dict__
字典,看看這個究竟是什麼東西
{'server_address': ('0.0.0.0', 6666), 'RequestHandlerClass': <class '__main__.MyServer'>, '_BaseServer__is_shut_down': <threading.Event object at 0x000002A96A0208E0>, '_BaseServer__shutdown_request': False, 'socket': <socket.socket fd=716, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 6666)>}
能夠看到,它就是咱們傳入的那個類,即自定義的MyServer
類。咱們把request,client_address,以及整個是實例self傳給了MyServer的__init__
方法。可是咱們的MyServer類沒有__init__
,怎麼辦呢?去它父類BaseRequestHandler
裏面找唄。
class BaseRequestHandler: """註釋被我刪了""" def __init__(self, request, client_address, server): self.request = request # request 雙向連接通道 self.client_address = client_address # 客戶端ip+port self.server = server # 即 實例對象自己。上面的__dict__就是它的__dict__ self.setup() # 鉤子函數,咱們能夠本身寫一個類而後繼承`BaseRequestHandler`並覆寫其setup方法便可。 try: self.handle() # 看,自動執行handle finally: self.finish() # 鉤子函數 def setup(self): pass def handle(self): pass def finish(self): pass
如今咱們知道了,爲何必定要覆寫handle
方法了吧。
實例化過程圖解
server_forever()啓動服務圖解
在不少時候,咱們的TCP服務端爲了防止網絡泛洪能夠設置一個三次握手驗證機制。那麼這個驗證機制的實現其實也是很是簡單的,咱們的思路在於進入通訊循環以前,客戶端和服務端先走一次連接認證,只有經過認證的客戶端纔可以繼續和服務端進行連接。
下面就來看一下具體的實現步驟。
#_*_coding:utf-8_*_ __author__ = 'Linhaifeng' from socket import * import hmac,os secret_key=b'linhaifeng bang bang bang' def conn_auth(conn): ''' 認證客戶端連接 :param conn: :return: ''' print('開始驗證新連接的合法性') msg=os.urandom(32) # 新方法,生成32位隨機Bytes類型的值 conn.sendall(msg) h=hmac.new(secret_key,msg) digest=h.digest() respone=conn.recv(len(digest)) return hmac.compare_digest(respone,digest) # 對比結果爲True或者爲False def data_handler(conn,bufsize=1024): if not conn_auth(conn): print('該連接不合法,關閉') conn.close() return print('連接合法,開始通訊') while True: data=conn.recv(bufsize) if not data:break conn.sendall(data.upper()) def server_handler(ip_port,bufsize,backlog=5): ''' 只處理連接 :param ip_port: :return: ''' tcp_socket_server=socket(AF_INET,SOCK_STREAM) tcp_socket_server.bind(ip_port) tcp_socket_server.listen(backlog) while True: conn,addr=tcp_socket_server.accept() print('新鏈接[%s:%s]' %(addr[0],addr[1])) data_handler(conn,bufsize) if __name__ == '__main__': ip_port=('127.0.0.1',9999) bufsize=1024 server_handler(ip_port,bufsize)
#_*_coding:utf-8_*_ __author__ = 'Linhaifeng' from socket import * import hmac,os secret_key=b'linhaifeng bang bang bang' def conn_auth(conn): ''' 驗證客戶端到服務器的連接 :param conn: :return: ''' msg=conn.recv(32) # 拿到隨機位數 h=hmac.new(secret_key,msg) # 摻鹽 digest=h.digest() conn.sendall(digest) def client_handler(ip_port,bufsize=1024): tcp_socket_client=socket(AF_INET,SOCK_STREAM) tcp_socket_client.connect(ip_port) conn_auth(tcp_socket_client) while True: data=input('>>: ').strip() if not data:continue if data == 'quit':break tcp_socket_client.sendall(data.encode('utf-8')) respone=tcp_socket_client.recv(bufsize) print(respone.decode('utf-8')) tcp_socket_client.close() if __name__ == '__main__': ip_port=('127.0.0.1',9999) bufsize=1024 client_handler(ip_port,bufsize)
到這裏已經很簡單了,服務器將隨機數給客戶機發過去,客戶機收到後也用自家的鹽與隨機數加料,再使用digest()
將它轉化爲字節,直接發送了回來而後客戶端經過hmac.compare_digest()