本節內容 :html
1.Socket實現多鏈接處理linux
根據上一篇文章多socket的基本介紹,咱們大體能明白怎麼去創建一個客戶端和服務器通信,可是你會發現,若是客戶端斷開,服務器也會跟着短了,若是咱們想要客戶端斷開,服務器還能夠爲下一個客戶端服務,該如何實現呢?緩存
1 conn,addr = server.accept() #接收並創建與客戶端的連接,一直阻塞直到客戶端連進來
咱們想要實現多鏈接處理,上面這句代碼很關鍵。只要咱們同一個客戶端斷開鏈接後,它還能夠循環監聽,那麼多鏈接就實現了。服務器
1 import socket 2 3 server = socket.socket() #獲取socket實例 4 5 server.bind(('localhost',9999)) #綁定 ip + port 6 7 server.listen() # 開始監聽 8 9 while True: #第一層循環用來持續接收新連接 10 print('waiting for new conn...') 11 conn,addr = server.accept() #等待新連接 12 print('new conn:',addr) 13 while True: #第二層循環用來保持和連到的連接持續通信 14 data = conn.recv(1024) 15 if not data: 16 print('client is off...') 17 break #斷開已創建的通信,回到等新連接的循環 18 print('recv data:',data) 19 conn.send(data.upper()) 20 server.close()
注意了, 此時服務器端依然只能同時爲一個客戶服務,其客戶來了,得排隊(鏈接掛起),要想同時處理多個連接,請繼續日後看。網絡
2. Socket實現簡單的ssh多線程
出了能夠發送一些簡單消息,咱們還能夠乾點更高級的,好比來作一個簡單的ssh(不明白的能夠百度這是幹嗎的!),它就是經過客戶端連上服務器後,讓服務器執行命令,並把結果返回給客戶端。併發
1 import os 2 import socket 3 4 server = socket.socket() 5 server.bind(('localhost',7777)) 6 server.listen() 7 8 while True: 9 print("等待接收。。") 10 conn,addr = server.accept() 11 print("接收到客戶端:",addr) 12 while True: 13 cmd = conn.recv(1024).decode() #接收客戶端發送的命令 14 data = os.popen(cmd).read() #用poen內置函數執行指令,並讀取結果 15 print(data) 16 if not data: #若是沒有內容,說明沒有此指令 17 data = "mei you ci ming ling" 18 conn.send(data.encode("utf-8")) #將執行的結果返回給客戶端 19 break 20 21 server.close()
1 import socket 2 3 client = socket.socket() 4 client.connect(('localhost',7777)) 5 while True: 6 7 cmd = input(">>>") #輸入指令 8 if len(cmd) == 0: #用長短來判斷用戶是否輸入爲空 9 continue 10 elif cmd == 'exit': #判斷是否爲退出 11 print("退出!") 12 break 13 client.send(cmd.encode("utf-8")) #發送指令須要編碼成bytes類型 14 data = client.recv(6094).decode() 15 print(data)
很好,咱們已經實現了一個簡單的ssh,但你若是嘗試的指令夠多,就會發現,程序仍是有問題的app
在開始解決上面問題3以前,咱們要考慮,客戶端要循環接收服務器端的大量數據返回直到一條命令的結果所有返回爲止, 但問題是客戶端知道服務器端返回的數據有多大麼?答案是不知道,那既然不知道服務器的要返回多大的數據,那客戶端怎麼知道要循環接收多少次呢?答案是不知道,擦,那咋辦? 總不能靠猜吧?呵呵。。。 固然不能,那隻能讓服務器在發送數據以前主動告訴客戶端,要發送多少數據給客戶端,而後再開始發送數據,yes, 機智如我,搞起。ssh
1 import os 2 import socket 3 4 server = socket.socket() 5 server.bind(('localhost',7777)) 6 server.listen() 7 8 while True: 9 print("等待接收。。") 10 conn,addr = server.accept() 11 print("接收到客戶端:",addr) 12 while True: 13 cmd = conn.recv(1024).decode() 14 if cmd == 'exit': 15 print('client is off...') 16 break 17 data = os.popen(cmd).read() 18 print(data) 19 if not data: 20 data = "cmd exec success,has no output" 21 conn.send(str(len(data)).encode()) #數據發送前,先告訴客戶端數據的大小 22 conn.sendall(data.encode("utf-8")) #發送端也有最大數據量限制,因此這裏用sendall,至關於重複循環調用conn.send,直至數據發送完畢 23 24 server.close()
1 import socket 2 3 client = socket.socket() 4 client.connect(('localhost',7777)) 5 while True: 6 7 cmd = input(">>>") 8 if len(cmd) == 0: 9 continue 10 elif cmd == 'exit': 11 client.send(cmd.encode("utf-8")) 12 print("退出!") 13 break 14 client.send(cmd.encode("utf-8")) 15 res_return_size = client.recv(1024) #接收這條命令執行結果的大小 16 #data = client.recv(6094).decode() 17 print(res_return_size)
若是你夠幸運,會發現程序執行會報錯,客戶端本想只接服務器端命令的執行結果,但實際上卻連命令結果也跟着接收了一部分。這又是爲何呢?異步
這裏就引入了一個重要的概念,「粘包」, 即服務器端你調用時 ’send ’ 2次,但你send調用時,數據其實並無馬上被髮送給客戶端,而是放到了系統的socket發送緩衝區裏,等緩衝區滿了、或者數據等待超時了,數據纔會被send到客戶端,這樣就把好幾回的小數據拼成一個大數據,統一發送到客戶端了,這麼作的目地是爲了提升io利用效率,一次性發送總比連發好幾回效率高嘛。 但也帶來一個問題,就是「粘包」,即2次或屢次的數據粘在了一塊兒統一發送了。就是報錯的狀況 。
咱們在這裏必需要想辦法把粘包分開, 由於不分開,咱們就沒辦法取出來服務器端返回的命令執行結果的大小呀。so ,那怎麼分開呢?首先咱們是沒辦法讓緩衝區強制刷新把數據發給客戶端的。 能作的,只有一個。讓緩衝區超時,超時了,系統就不會等緩衝區滿了,會直接把數據發走,由於不能一個勁的等後面的數據呀,等過久,會形成數據延遲了,那但是極很差的。如何讓緩衝區超時呢?
答案就是:
1 import os 2 import socket 3 4 server = socket.socket() 5 server.bind(('localhost',7777)) 6 server.listen() 7 8 while True: 9 print("等待接收。。") 10 conn,addr = server.accept() 11 print("接收到客戶端:",addr) 12 while True: 13 cmd = conn.recv(1024).decode() 14 if cmd == 'exit': 15 print('client is off...') 16 break 17 data = os.popen(cmd).read() 18 print(data) 19 if not data: 20 data = "cmd exec success,has no output" 21 conn.send(str(len(data)).encode()) 22 conn.recv(1024) #接收客戶端回執信息,不作處理,防粘包 23 conn.sendall(data.encode("utf-8")) 24 25 server.close()
1 import socket 2 3 client = socket.socket() 4 client.connect(('localhost',7777)) 5 while True: 6 7 cmd = input(">>>") 8 if len(cmd) == 0: 9 continue 10 elif cmd == 'exit': 11 client.send(cmd.encode("utf-8")) 12 print("退出!") 13 break 14 client.send(cmd.encode("utf-8")) 15 16 res_return_size = client.recv(1024) #接收這條命令執行結果的大小 17 print("getting cmd result , ", res_return_size) 18 total_rece_size = int(res_return_size) 19 print("total size:",res_return_size) 20 client.send("準備好接收了,發吧loser".encode("utf-8")) #發送回執信息 21 received_size = 0 #已接收到的數據 22 cmd_res = b'' 23 f = open("test_copy.html","wb")#把接收到的結果存下來,一會看看收到的數據 對不對 24 while received_size != total_rece_size: #表明還沒收完 25 data = client.recv(1024) 26 received_size += len(data) #爲何不是直接1024,還判斷len幹嗎,注意,實際收到的data有可能比1024少 27 cmd_res += data 28 else: 29 print("數據收完了",received_size) 30 #print(cmd_res.decode()) 31 f.write(cmd_res) #把接收到的結果存下來,一會看看收到的數據 對不對 32 #print(data.decode()) #命令執行結果 33 34 client.close()
3.SocketServer實現真正的併發
SocketServer內部使用 IO多路複用 以及 「多線程」 和 「多進程」 ,從而實現併發處理多個客戶端請求的Socket服務端。即:每一個客戶端請求鏈接到服務器時,Socket服務端都會在服務器是建立一個「線程」或者「進程」 專門負責處理當前客戶端的全部請求。
ThreadingTCPServer
ThreadingTCPServer實現的Soket服務器內部會爲每一個client建立一個 「線程」,該線程用來和客戶端進行交互。
一、ThreadingTCPServer基礎
使用ThreadingTCPServer:
1 import socketserver 2 3 class MyTCPHandler(socketserver.BaseRequestHandler): 4 5 """ 6 The request handler class for our server. 7 8 It is instantiated once per connection to the server, and must 9 override the handle() method to implement communication to the 10 client. 11 """ 12 def handle(self): 13 # self.request is the TCP socket connected to the client 14 while True: 15 try: 16 self.data = self.request.recv(1024).strip() 17 print("{} wrote:".format(self.client_address[0])) 18 print(self.data) 19 # just send back the same data, but upper-cased 20 self.request.sendall(self.data.upper()) 21 except ConnectionResetError as e: 22 print("error:",e) 23 break 24 25 if __name__ == "__main__": 26 HOST, PORT = "localhost", 9999 27 28 # Create the server, binding to localhost on port 9999 29 server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) 30 31 # Activate the server; this will keep running until you 32 # interrupt the program with Ctrl-C 33 server.serve_forever()
1 import socket 2 #客戶端沒什麼特別,跟以前都同樣 3 client = socket.socket() 4 client.connect(("localhost",9999)) 5 6 while True: 7 data = input(">>>").strip() 8 client.send(data.encode("utf-8")) 9 recv_data = client.recv(1024).decode() 10 print(recv_data)
二、ThreadingTCPServer源碼剖析
ThreadingTCPServer的類圖關係以下:
內部調用流程爲:
ThreadingTCPServer相關源碼:
class BaseServer: """Base class for server classes. Methods for the caller: - __init__(server_address, RequestHandlerClass) - serve_forever(poll_interval=0.5) - shutdown() - handle_request() # if you do not use serve_forever() - fileno() -> int # for select() Methods that may be overridden: - server_bind() - server_activate() - get_request() -> request, client_address - handle_timeout() - verify_request(request, client_address) - server_close() - process_request(request, client_address) - shutdown_request(request) - close_request(request) - handle_error() Methods for derived classes: - finish_request(request, client_address) Class variables that may be overridden by derived classes or instances: - timeout - address_family - socket_type - allow_reuse_address Instance variables: - RequestHandlerClass - socket """ timeout = None def __init__(self, server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" self.server_address = server_address self.RequestHandlerClass = RequestHandlerClass self.__is_shut_down = threading.Event() self.__shutdown_request = False def server_activate(self): """Called by constructor to activate the server. May be overridden. """ pass def serve_forever(self, poll_interval=0.5): """Handle one request at a time until shutdown. Polls for shutdown every poll_interval seconds. Ignores self.timeout. If you need to do periodic tasks, do them in another thread. """ self.__is_shut_down.clear() try: while not self.__shutdown_request: # XXX: Consider using another file descriptor or # connecting to the socket to wake this up instead of # polling. Polling reduces our responsiveness to a # shutdown request and wastes cpu at all other times. r, w, e = _eintr_retry(select.select, [self], [], [], poll_interval) if self in r: self._handle_request_noblock() finally: self.__shutdown_request = False self.__is_shut_down.set() def shutdown(self): """Stops the serve_forever loop. Blocks until the loop has finished. This must be called while serve_forever() is running in another thread, or it will deadlock. """ self.__shutdown_request = True self.__is_shut_down.wait() # The distinction between handling, getting, processing and # finishing a request is fairly arbitrary. Remember: # # - handle_request() is the top-level call. It calls # select, get_request(), verify_request() and process_request() # - get_request() is different for stream or datagram sockets # - process_request() is the place that may fork a new process # or create a new thread to finish the request # - finish_request() instantiates the request handler class; # this constructor will handle the request all by itself def handle_request(self): """Handle one request, possibly blocking. Respects self.timeout. """ # Support people who used socket.settimeout() to escape # handle_request before self.timeout was available. timeout = self.socket.gettimeout() if timeout is None: timeout = self.timeout elif self.timeout is not None: timeout = min(timeout, self.timeout) fd_sets = _eintr_retry(select.select, [self], [], [], timeout) if not fd_sets[0]: self.handle_timeout() return self._handle_request_noblock() def _handle_request_noblock(self): """Handle one request, without blocking. I assume that select.select has returned that the socket is readable before this function was called, so there should be no risk of blocking in get_request(). """ try: request, client_address = self.get_request() except socket.error: return if self.verify_request(request, client_address): try: self.process_request(request, client_address) except: self.handle_error(request, client_address) self.shutdown_request(request) def handle_timeout(self): """Called if no new request arrives within self.timeout. Overridden by ForkingMixIn. """ pass def verify_request(self, request, client_address): """Verify the request. May be overridden. Return True if we should proceed with this request. """ return True def process_request(self, request, client_address): """Call finish_request. Overridden by ForkingMixIn and ThreadingMixIn. """ self.finish_request(request, client_address) self.shutdown_request(request) def server_close(self): """Called to clean-up the server. May be overridden. """ pass def finish_request(self, request, client_address): """Finish one request by instantiating RequestHandlerClass.""" self.RequestHandlerClass(request, client_address, self) def shutdown_request(self, request): """Called to shutdown and close an individual request.""" self.close_request(request) def close_request(self, request): """Called to clean up an individual request.""" pass def handle_error(self, request, client_address): """Handle an error gracefully. May be overridden. The default is to print a traceback and continue. """ print '-'*40 print 'Exception happened during processing of request from', print client_address import traceback traceback.print_exc() # XXX But this goes to stderr! print '-'*40 BaseServer
class TCPServer(BaseServer): """Base class for various socket-based server classes. Defaults to synchronous IP stream (i.e., TCP). Methods for the caller: - __init__(server_address, RequestHandlerClass, bind_and_activate=True) - serve_forever(poll_interval=0.5) - shutdown() - handle_request() # if you don't use serve_forever() - fileno() -> int # for select() Methods that may be overridden: - server_bind() - server_activate() - get_request() -> request, client_address - handle_timeout() - verify_request(request, client_address) - process_request(request, client_address) - shutdown_request(request) - close_request(request) - handle_error() Methods for derived classes: - finish_request(request, client_address) Class variables that may be overridden by derived classes or instances: - timeout - address_family - socket_type - request_queue_size (only for stream sockets) - allow_reuse_address Instance variables: - server_address - RequestHandlerClass - socket """ address_family = socket.AF_INET socket_type = socket.SOCK_STREAM request_queue_size = 5 allow_reuse_address = False def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) self.socket = socket.socket(self.address_family, self.socket_type) if bind_and_activate: try: self.server_bind() self.server_activate() except: self.server_close() raise def server_bind(self): """Called by constructor to bind the socket. May be overridden. """ if self.allow_reuse_address: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) self.server_address = self.socket.getsockname() def server_activate(self): """Called by constructor to activate the server. May be overridden. """ self.socket.listen(self.request_queue_size) def server_close(self): """Called to clean-up the server. May be overridden. """ self.socket.close() def fileno(self): """Return socket file number. Interface required by select(). """ return self.socket.fileno() def get_request(self): """Get the request and client address from the socket. May be overridden. """ return self.socket.accept() def shutdown_request(self, request): """Called to shutdown and close an individual request.""" try: #explicitly shutdown. socket.close() merely releases #the socket and waits for GC to perform the actual close. request.shutdown(socket.SHUT_WR) except socket.error: pass #some platforms may raise ENOTCONN here self.close_request(request) def close_request(self, request): """Called to clean up an individual request.""" request.close()
class ThreadingMixIn: """Mix-in class to handle each request in a new thread.""" # Decides how threads will act upon termination of the # main process daemon_threads = False def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) self.shutdown_request(request) except: self.handle_error(request, client_address) self.shutdown_request(request) def process_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
ForkingTCPServer
ForkingTCPServer和ThreadingTCPServer的使用和執行流程基本一致,只不過在內部分別爲請求者創建 「線程」 和 「進程」。
基本使用:
import socketserver class MyTCPHandler(socketserver.BaseRequestHandler): """ The request handler class for our server. It is instantiated once per connection to the server, and must override the handle() method to implement communication to the client. """ def handle(self): # self.request is the TCP socket connected to the client while True: try: self.data = self.request.recv(1024).strip() print("{} wrote:".format(self.client_address[0])) print(self.data) # just send back the same data, but upper-cased self.request.sendall(self.data.upper()) except ConnectionResetError as e: print("error:",e) break if __name__ == "__main__": HOST, PORT = "localhost", 9999 # Create the server, binding to localhost on port 9999 server = socketserver.ForkingTCPServer((HOST, PORT), MyTCPHandler) # Activate the server; this will keep running until you # interrupt the program with Ctrl-C server.serve_forever()
import socket client = socket.socket() client.connect(("localhost",9999)) while True: data = input(">>>").strip() client.send(data.encode("utf-8")) recv_data = client.recv(1024).decode() print(recv_data)