socketserver
socketserver內部使用IO多路複用以及「多線程」和「多進程」,從而實現併發處理多個客戶端請求的scoket服務端。即,每一個客戶端請求鏈接到服務器時,socket服務端都會在服務器是建立一個「線程」或「進程」專門負責處理當前客戶端的全部請求。html
ThradingTCPServer
ThradingTCPServer實現的socket服務器內部會爲每一個client建立一個「線程」,該線程用來客戶端進行交互python
一、ThreadingTCPServer基礎程序員
使用ThreadingTCPServer:服務器
- 建立一個繼承自 SocketServer.BaseRequestHandler 的類
- 類中必須定義一個名稱爲 handle 的方法
- 啓動ThreadingTCPServer
使用多線程
server端併發
import socketserver class MyServer(socketserver.BaseRequestHandler): # $ 必須繼承BaseRequestHandler print('Myserver start'.center(50, "-")) def handle(self): # $ 必須有handle方法 print('New connection:', self.client_address) while True: data = self.request.recv(1024) if not data: break print( 'recv Client {} data {}'.format( self.client_address, data.decode())) self.request.send(data) if __name__ == '__main__': server = socketserver.ThreadingTCPServer( ('127.0.0.1', 8009), MyServer) # $ 實現多線程的socket server.serve_forever() # $ 當前鏈接斷開不會出現關閉或報錯,能夠與其餘客戶端繼續鏈接
client端:app
import socket ip_port = ('127.0.0.1',8009) sk = socket.socket() sk.connect(ip_port) while True: raw = input('client input >> ').strip() sk.send(raw.encode("utf-8")) msg = sk.recv(1024) print(msg.decode("utf-8")) sk.close()
輸出異步
------------------Myserver start------------------ New connection: ('127.0.0.1', 13560) recv Client ('127.0.0.1', 13560) data hello New connection: ('127.0.0.1', 13561) recv Client ('127.0.0.1', 13561) data world ---------------client1: client input >> hello hello client input >> ---------------client2: client input >> world world client input >>
socketserver分析
socketserver中包含了兩種類,一種爲服務類(server class),一種爲請求處理類(request handle class)。前者提供了許多方法:像綁定,監聽,運行…… (也就是創建鏈接的過程) 後者則專一於如何處理用戶所發送的數據(也就是事務邏輯)。socket
服務類繼承層次關係ide
BaseServer不直接對外服務。
TCPServer針對TCP套接字流
UDPServer針對UDP數據報套接字
UnixStreamServer和UnixDatagramServer針對UNIX域套接字
ThreadingTCPServer 源碼分析
BaseServer
class BaseServer: """Base class for server classes. Methods for the caller: - __init__(server_address, RequestHandlerClass) - serve_forever(poll_interval=0.5) - shutdown() - handle_request() # if you do not use serve_forever() - fileno() -> int # for select() Methods that may be overridden: - server_bind() - server_activate() - get_request() -> request, client_address - handle_timeout() - verify_request(request, client_address) - server_close() - process_request(request, client_address) - shutdown_request(request) - close_request(request) - handle_error() Methods for derived classes: - finish_request(request, client_address) Class variables that may be overridden by derived classes or instances: - timeout - address_family - socket_type - allow_reuse_address Instance variables: - RequestHandlerClass - socket """ timeout = None def __init__(self, server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" self.server_address = server_address self.RequestHandlerClass = RequestHandlerClass self.__is_shut_down = threading.Event() self.__shutdown_request = False def server_activate(self): """Called by constructor to activate the server. May be overridden. """ pass def serve_forever(self, poll_interval=0.5): """Handle one request at a time until shutdown. Polls for shutdown every poll_interval seconds. Ignores self.timeout. If you need to do periodic tasks, do them in another thread. """ self.__is_shut_down.clear() try: while not self.__shutdown_request: # XXX: Consider using another file descriptor or # connecting to the socket to wake this up instead of # polling. Polling reduces our responsiveness to a # shutdown request and wastes cpu at all other times. r, w, e = _eintr_retry(select.select, [self], [], [], poll_interval) if self in r: self._handle_request_noblock() finally: self.__shutdown_request = False self.__is_shut_down.set() def shutdown(self): """Stops the serve_forever loop. Blocks until the loop has finished. This must be called while serve_forever() is running in another thread, or it will deadlock. """ self.__shutdown_request = True self.__is_shut_down.wait() # The distinction between handling, getting, processing and # finishing a request is fairly arbitrary. Remember: # # - handle_request() is the top-level call. It calls # select, get_request(), verify_request() and process_request() # - get_request() is different for stream or datagram sockets # - process_request() is the place that may fork a new process # or create a new thread to finish the request # - finish_request() instantiates the request handler class; # this constructor will handle the request all by itself def handle_request(self): """Handle one request, possibly blocking. Respects self.timeout. """ # Support people who used socket.settimeout() to escape # handle_request before self.timeout was available. timeout = self.socket.gettimeout() if timeout is None: timeout = self.timeout elif self.timeout is not None: timeout = min(timeout, self.timeout) fd_sets = _eintr_retry(select.select, [self], [], [], timeout) if not fd_sets[0]: self.handle_timeout() return self._handle_request_noblock() def _handle_request_noblock(self): """Handle one request, without blocking. I assume that select.select has returned that the socket is readable before this function was called, so there should be no risk of blocking in get_request(). """ try: request, client_address = self.get_request() except socket.error: return if self.verify_request(request, client_address): try: self.process_request(request, client_address) except: self.handle_error(request, client_address) self.shutdown_request(request) def handle_timeout(self): """Called if no new request arrives within self.timeout. Overridden by ForkingMixIn. """ pass def verify_request(self, request, client_address): """Verify the request. May be overridden. Return True if we should proceed with this request. """ return True def process_request(self, request, client_address): """Call finish_request. Overridden by ForkingMixIn and ThreadingMixIn. """ self.finish_request(request, client_address) self.shutdown_request(request) def server_close(self): """Called to clean-up the server. May be overridden. """ pass def finish_request(self, request, client_address): """Finish one request by instantiating RequestHandlerClass.""" self.RequestHandlerClass(request, client_address, self) def shutdown_request(self, request): """Called to shutdown and close an individual request.""" self.close_request(request) def close_request(self, request): """Called to clean up an individual request.""" pass def handle_error(self, request, client_address): """Handle an error gracefully. May be overridden. The default is to print a traceback and continue. """ print '-'*40 print 'Exception happened during processing of request from', print client_address import traceback traceback.print_exc() # XXX But this goes to stderr! print '-'*40
TCPServer
class TCPServer(BaseServer): """Base class for various socket-based server classes. Defaults to synchronous IP stream (i.e., TCP). Methods for the caller: - __init__(server_address, RequestHandlerClass, bind_and_activate=True) - serve_forever(poll_interval=0.5) - shutdown() - handle_request() # if you don't use serve_forever() - fileno() -> int # for select() Methods that may be overridden: - server_bind() - server_activate() - get_request() -> request, client_address - handle_timeout() - verify_request(request, client_address) - process_request(request, client_address) - shutdown_request(request) - close_request(request) - handle_error() Methods for derived classes: - finish_request(request, client_address) Class variables that may be overridden by derived classes or instances: - timeout - address_family - socket_type - request_queue_size (only for stream sockets) - allow_reuse_address Instance variables: - server_address - RequestHandlerClass - socket """ address_family = socket.AF_INET socket_type = socket.SOCK_STREAM request_queue_size = 5 allow_reuse_address = False def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) self.socket = socket.socket(self.address_family, self.socket_type) if bind_and_activate: try: self.server_bind() self.server_activate() except: self.server_close() raise def server_bind(self): """Called by constructor to bind the socket. May be overridden. """ if self.allow_reuse_address: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) self.server_address = self.socket.getsockname() def server_activate(self): """Called by constructor to activate the server. May be overridden. """ self.socket.listen(self.request_queue_size) def server_close(self): """Called to clean-up the server. May be overridden. """ self.socket.close() def fileno(self): """Return socket file number. Interface required by select(). """ return self.socket.fileno() def get_request(self): """Get the request and client address from the socket. May be overridden. """ return self.socket.accept() def shutdown_request(self, request): """Called to shutdown and close an individual request.""" try: #explicitly shutdown. socket.close() merely releases #the socket and waits for GC to perform the actual close. request.shutdown(socket.SHUT_WR) except socket.error: pass #some platforms may raise ENOTCONN here self.close_request(request) def close_request(self, request): """Called to clean up an individual request.""" request.close()
ThreadingMixIn
class ThreadingMixIn: """Mix-in class to handle each request in a new thread.""" # Decides how threads will act upon termination of the # main process daemon_threads = False def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) except Exception: self.handle_error(request, client_address) finally: self.shutdown_request(request) def process_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
RequestHandler
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass class BaseRequestHandler: """Base class for request handler classes. This class is instantiated for each request to be handled. The constructor sets the instance variables request, client_address and server, and then calls the handle() method. To implement a specific service, all you need to do is to derive a class which defines a handle() method. The handle() method can find the request as self.request, the client address as self.client_address, and the server (in case it needs access to per-server information) as self.server. Since a separate instance is created for each request, the handle() method can define arbitrary other instance variariables. """ def __init__(self, request, client_address, server): self.request = request self.client_address = client_address self.server = server self.setup() try: self.handle() finally: self.finish() def setup(self): pass def handle(self): pass def finish(self): pass
請求處理類有三種方法:
-
setup
() -
Called before the
handle()
method to perform any initialization actions required. The default implementation does nothing.也就是在handle()以前被調用,主要的做用就是執行處理請求以前的初始化相關的各類工做。默認不會作任何事。(若是想要讓其作一些事的話,就要程序員在本身的請求處理器中覆蓋這個方法(由於通常自定義的請求處理器都要繼承python中提供的BaseRequestHandler,ps:下文會提到的),而後往裏面添加東西便可)
-
handle
() -
This function must do all the work required to service a request. The default implementation does nothing. Several instance attributes are available to it; the request is available as
self.request
; the client address asself.client_address
; and the server instance asself.server
, in case it needs access to per-server information.The type of
self.request
is different for datagram or stream services. For stream services,self.request
is a socket object; for datagram services,self.request
is a pair of string and socket.handle()的工做就是作那些全部與處理請求相關的工做。默認也不會作任何事。他有數個實例參數:self.request self.client_address self.server
-
finish
() -
Called after the
handle()
method to perform any clean-up actions required. The default implementation does nothing. Ifsetup()
raises an exception, this function will not be called.在handle()方法以後會被調用,他的做用就是執行當處理完請求後的清理工做,默認不會作任何事
內部調用流程爲:
- 啓動服務端程序
- 執行 TCPServer.__init__ 方法,建立服務端Socket對象並綁定 IP 和 端口
address_family = socket.AF_INET socket_type = socket.SOCK_STREAM request_queue_size = 5 allow_reuse_address = False def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): """Constructor. May be extended, do not override.""" BaseServer.__init__(self, server_address, RequestHandlerClass) self.socket = socket.socket(self.address_family, self.socket_type) if bind_and_activate: try: self.server_bind() self.server_activate() except: self.server_close() raise def server_bind(self): """Called by constructor to bind the socket. May be overridden. """ if self.allow_reuse_address: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.server_address) self.server_address = self.socket.getsockname() def server_activate(self): """Called by constructor to activate the server. May be overridden. """ self.socket.listen(self.request_queue_size) def server_close(self): """Called to clean-up the server. May be overridden. """ self.socket.close()
執行 BaseServer.__init__ 方法,將自定義的繼承自SocketServer.BaseRequestHandler 的類 MyRequestHandle賦值給 self.RequestHandlerClass
def __init__(self, server_address, RequestHandlerClass): """Constructor. May be extended, do not override.""" self.server_address = server_address self.RequestHandlerClass = RequestHandlerClass self.__is_shut_down = threading.Event() self.__shutdown_request = False
執行 BaseServer.server_forever 方法,While 循環一直監聽是否有客戶端請求到達 ...
def serve_forever(self, poll_interval=0.5): """Handle one request at a time until shutdown. Polls for shutdown every poll_interval seconds. Ignores self.timeout. If you need to do periodic tasks, do them in another thread. """ self.__is_shut_down.clear() try: # XXX: Consider using another file descriptor or connecting to the # socket to wake this up instead of polling. Polling reduces our # responsiveness to a shutdown request and wastes cpu at all other # times. with _ServerSelector() as selector: selector.register(self, selectors.EVENT_READ) while not self.__shutdown_request: ready = selector.select(poll_interval) if ready: self._handle_request_noblock() self.service_actions() finally: self.__shutdown_request = False self.__is_shut_down.set()
當客戶端鏈接到達服務器
def _handle_request_noblock(self): """Handle one request, without blocking. I assume that selector.select() has returned that the socket is readable before this function was called, so there should be no risk of blocking in get_request(). """ try: request, client_address = self.get_request() except OSError: return if self.verify_request(request, client_address): try: self.process_request(request, client_address) except Exception: self.handle_error(request, client_address) self.shutdown_request(request) except: self.shutdown_request(request) raise else: self.shutdown_request(request)
執行 ThreadingMixIn.process_request 方法,建立一個 「線程」 用來處理請求
執行 ThreadingMixIn.process_request_thread 方法
# main process daemon_threads = False def process_request_thread(self, request, client_address): """Same as in BaseServer but as a thread. In addition, exception handling is done here. """ try: self.finish_request(request, client_address) except Exception: self.handle_error(request, client_address) finally: self.shutdown_request(request) def process_request(self, request, client_address): """Start a new thread to process the request.""" t = threading.Thread(target = self.process_request_thread, args = (request, client_address)) t.daemon = self.daemon_threads t.start()
執行 BaseServer.finish_request 方法,執行 self.RequestHandlerClass() 即:執行 自定義 MyRequestHandler 的構造方法(自動調用基類BaseRequestHandler的構造方法,在該構造方法中又會調用 MyRequestHandler的handle方法)
finish_request方法中執行了self.RequestHandlerClass(request, client_address, self)。self.RequestHandlerClass是什麼呢?
self.RequestHandlerClass = RequestHandlerClass(就在__init__方法中)。因此finish_request方法本質上就是建立了一個服務處理實例RequestHandlerClass
RequestHandlerClass繼承了BaseRequestHandler類,並實現handle方法,在實例化RequestHandlerClass時,會執行handle方法,處理用戶所發送的數據(也就是事務邏輯)。
class BaseRequestHandler: """Base class for request handler classes. This class is instantiated for each request to be handled. The constructor sets the instance variables request, client_address and server, and then calls the handle() method. To implement a specific service, all you need to do is to derive a class which defines a handle() method. The handle() method can find the request as self.request, the client address as self.client_address, and the server (in case it needs access to per-server information) as self.server. Since a separate instance is created for each request, the handle() method can define other arbitrary instance variables. """ def __init__(self, request, client_address, server): self.request = request self.client_address = client_address self.server = server self.setup() try: self.handle() finally: self.finish()
時序圖
附錄
class SocketServer.BaseServer:這是模塊中的全部服務器對象的超類。它定義了接口,以下所述,可是大多數的方法不實現,在子類中進行細化。 BaseServer.fileno():返回服務器監聽套接字的整數文件描述符。一般用來傳遞給select.select(), 以容許一個進程監視多個服務器。 BaseServer.handle_request():處理單個請求。處理順序:get_request(), verify_request(), process_request()。若是用戶提供handle()方法拋出異常,將調用服務器的handle_error()方法。若是self.timeout內沒有請求收到, 將調用handle_timeout()並返回handle_request()。 BaseServer.serve_forever(poll_interval=0.5): 處理請求,直到一個明確的shutdown()請求。每poll_interval秒輪詢一次shutdown。忽略self.timeout。若是你須要作週期性的任務,建議放置在其餘線程。 BaseServer.shutdown():告訴serve_forever()循環中止並等待其中止。python2.6版本。 BaseServer.address_family: 地址家族,好比socket.AF_INET和socket.AF_UNIX。 BaseServer.RequestHandlerClass:用戶提供的請求處理類,這個類爲每一個請求建立實例。 BaseServer.server_address:服務器偵聽的地址。格式根據協議家族地址的各不相同,請參閱socket模塊的文檔。 BaseServer.socketSocket:服務器上偵聽傳入的請求socket對象的服務器。 服務器類支持下面的類變量: BaseServer.allow_reuse_address:服務器是否容許地址的重用。默認爲false ,而且可在子類中更改。 BaseServer.request_queue_size 請求隊列的大小。若是單個請求須要很長的時間來處理,服務器忙時請求被放置到隊列中,最多能夠放request_queue_size個。一旦隊列已滿,來自客戶端的請求將獲得 「Connection denied」錯誤。默認值一般爲5 ,但能夠被子類覆蓋。 BaseServer.socket_type:服務器使用的套接字類型; socket.SOCK_STREAM和socket.SOCK_DGRAM等。 BaseServer.timeout:超時時間,以秒爲單位,或 None表示沒有超時。若是handle_request()在timeout內沒有收到請求,將調用handle_timeout()。 下面方法能夠被子類重載,它們對服務器對象的外部用戶沒有影響。 BaseServer.finish_request():實際處理RequestHandlerClass發起的請求並調用其handle()方法。 經常使用。 BaseServer.get_request():接受socket請求,並返回二元組包含要用於與客戶端通訊的新socket對象,以及客戶端的地址。 BaseServer.handle_error(request, client_address):若是RequestHandlerClass的handle()方法拋出異常時調用。默認操做是打印traceback到標準輸出,並繼續處理其餘請求。 BaseServer.handle_timeout():超時處理。默認對於forking服務器是收集退出的子進程狀態,threading服務器則什麼都不作。 BaseServer.process_request(request, client_address) :調用finish_request()建立RequestHandlerClass的實例。若是須要,此功能能夠建立新的進程或線程來處理請求,ForkingMixIn和ThreadingMixIn類作到這點。經常使用。 BaseServer.server_activate():經過服務器的構造函數來激活服務器。默認的行爲只是監聽服務器套接字。可重載。 BaseServer.server_bind():經過服務器的構造函數中調用綁定socket到所需的地址。可重載。 BaseServer.verify_request(request, client_address):返回一個布爾值,若是該值爲True ,則該請求將被處理,反之請求將被拒絕。此功能能夠重寫來實現對服務器的訪問控制。默認的實現始終返回True。client_address能夠限定客戶端,好比只處理指定ip區間的請求。 經常使用。
這個幾個服務類都是同步處理請求的:一個請求沒處理完不能處理下一個請求。要想支持異步模型,能夠利用多繼承讓server類繼承ForkingMixIn 或 ThreadingMixIn mix-in classes。
ForkingMixIn利用多進程(分叉)實現異步。
ThreadingMixIn利用多線程實現異步。
參考:https://www.cnblogs.com/MnCu8261/p/5546823.html