python socketserver模塊

socketserver

socketserver內部使用IO多路複用以及「多線程」和「多進程」,從而實現併發處理多個客戶端請求的scoket服務端。即,每一個客戶端請求鏈接到服務器時,socket服務端都會在服務器是建立一個「線程」或「進程」專門負責處理當前客戶端的全部請求。html

ThradingTCPServer

ThradingTCPServer實現的socket服務器內部會爲每一個client建立一個「線程」,該線程用來客戶端進行交互python

一、ThreadingTCPServer基礎程序員

使用ThreadingTCPServer:服務器

  • 建立一個繼承自 SocketServer.BaseRequestHandler 的類
  • 類中必須定義一個名稱爲 handle 的方法
  • 啓動ThreadingTCPServer

使用多線程

server端併發

import socketserver


class MyServer(socketserver.BaseRequestHandler):  # $ 必須繼承BaseRequestHandler
    print('Myserver start'.center(50, "-"))

    def handle(self):  # $ 必須有handle方法
        print('New connection:', self.client_address)
        while True:
            data = self.request.recv(1024)
            if not data:
                break
            print(
                'recv Client {} data {}'.format(
                    self.client_address,
                    data.decode()))
            self.request.send(data)


if __name__ == '__main__':
    server = socketserver.ThreadingTCPServer(
        ('127.0.0.1', 8009), MyServer)  # $ 實現多線程的socket
    server.serve_forever()  # $ 當前鏈接斷開不會出現關閉或報錯,能夠與其餘客戶端繼續鏈接

  

client端:app

import socket
ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)

while True:
    raw = input('client input >> ').strip()
    sk.send(raw.encode("utf-8"))
    msg = sk.recv(1024)
    print(msg.decode("utf-8"))
sk.close()

 

輸出異步

------------------Myserver start------------------
New connection: ('127.0.0.1', 13560)
recv Client ('127.0.0.1', 13560) data hello
New connection: ('127.0.0.1', 13561)
recv Client ('127.0.0.1', 13561) data world

---------------client1:
client input >> hello
hello
client input >> 

---------------client2:
client input >> world
world
client input >>

  

socketserver分析

socketserver中包含了兩種類,一種爲服務類(server class),一種爲請求處理類(request handle class)。前者提供了許多方法:像綁定,監聽,運行…… (也就是創建鏈接的過程) 後者則專一於如何處理用戶所發送的數據(也就是事務邏輯)。socket

服務類繼承層次關係ide

 

BaseServer不直接對外服務。

TCPServer針對TCP套接字流

UDPServer針對UDP數據報套接字

UnixStreamServer和UnixDatagramServer針對UNIX域套接字

 

ThreadingTCPServer 源碼分析

BaseServer

class BaseServer:

    """Base class for server classes.

    Methods for the caller:

    - __init__(server_address, RequestHandlerClass)
    - serve_forever(poll_interval=0.5)
    - shutdown()
    - handle_request()  # if you do not use serve_forever()
    - fileno() -> int   # for select()

    Methods that may be overridden:

    - server_bind()
    - server_activate()
    - get_request() -> request, client_address
    - handle_timeout()
    - verify_request(request, client_address)
    - server_close()
    - process_request(request, client_address)
    - shutdown_request(request)
    - close_request(request)
    - handle_error()

    Methods for derived classes:

    - finish_request(request, client_address)

    Class variables that may be overridden by derived classes or
    instances:

    - timeout
    - address_family
    - socket_type
    - allow_reuse_address

    Instance variables:

    - RequestHandlerClass
    - socket

    """

    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        pass

    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            while not self.__shutdown_request:
                # XXX: Consider using another file descriptor or
                # connecting to the socket to wake this up instead of
                # polling. Polling reduces our responsiveness to a
                # shutdown request and wastes cpu at all other times.
                r, w, e = _eintr_retry(select.select, [self], [], [],
                                       poll_interval)
                if self in r:
                    self._handle_request_noblock()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

    def shutdown(self):
        """Stops the serve_forever loop.

        Blocks until the loop has finished. This must be called while
        serve_forever() is running in another thread, or it will
        deadlock.
        """
        self.__shutdown_request = True
        self.__is_shut_down.wait()

    # The distinction between handling, getting, processing and
    # finishing a request is fairly arbitrary.  Remember:
    #
    # - handle_request() is the top-level call.  It calls
    #   select, get_request(), verify_request() and process_request()
    # - get_request() is different for stream or datagram sockets
    # - process_request() is the place that may fork a new process
    #   or create a new thread to finish the request
    # - finish_request() instantiates the request handler class;
    #   this constructor will handle the request all by itself

    def handle_request(self):
        """Handle one request, possibly blocking.

        Respects self.timeout.
        """
        # Support people who used socket.settimeout() to escape
        # handle_request before self.timeout was available.
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
        if not fd_sets[0]:
            self.handle_timeout()
            return
        self._handle_request_noblock()

    def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that select.select has returned that the socket is
        readable before this function was called, so there should be
        no risk of blocking in get_request().
        """
        try:
            request, client_address = self.get_request()
        except socket.error:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)

    def handle_timeout(self):
        """Called if no new request arrives within self.timeout.

        Overridden by ForkingMixIn.
        """
        pass

    def verify_request(self, request, client_address):
        """Verify the request.  May be overridden.

        Return True if we should proceed with this request.

        """
        return True

    def process_request(self, request, client_address):
        """Call finish_request.

        Overridden by ForkingMixIn and ThreadingMixIn.

        """
        self.finish_request(request, client_address)
        self.shutdown_request(request)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        pass

    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self)

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        pass

    def handle_error(self, request, client_address):
        """Handle an error gracefully.  May be overridden.

        The default is to print a traceback and continue.

        """
        print '-'*40
        print 'Exception happened during processing of request from',
        print client_address
        import traceback
        traceback.print_exc() # XXX But this goes to stderr!
        print '-'*40

 

TCPServer

class TCPServer(BaseServer):

    """Base class for various socket-based server classes.

    Defaults to synchronous IP stream (i.e., TCP).

    Methods for the caller:

    - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
    - serve_forever(poll_interval=0.5)
    - shutdown()
    - handle_request()  # if you don't use serve_forever()
    - fileno() -> int   # for select()

    Methods that may be overridden:

    - server_bind()
    - server_activate()
    - get_request() -> request, client_address
    - handle_timeout()
    - verify_request(request, client_address)
    - process_request(request, client_address)
    - shutdown_request(request)
    - close_request(request)
    - handle_error()

    Methods for derived classes:

    - finish_request(request, client_address)

    Class variables that may be overridden by derived classes or
    instances:

    - timeout
    - address_family
    - socket_type
    - request_queue_size (only for stream sockets)
    - allow_reuse_address

    Instance variables:

    - server_address
    - RequestHandlerClass
    - socket

    """

    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    request_queue_size = 5

    allow_reuse_address = False

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

    def server_bind(self):
        """Called by constructor to bind the socket.

        May be overridden.

        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        self.socket.listen(self.request_queue_size)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        self.socket.close()

    def fileno(self):
        """Return socket file number.

        Interface required by select().

        """
        return self.socket.fileno()

    def get_request(self):
        """Get the request and client address from the socket.

        May be overridden.

        """
        return self.socket.accept()

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        try:
            #explicitly shutdown.  socket.close() merely releases
            #the socket and waits for GC to perform the actual close.
            request.shutdown(socket.SHUT_WR)
        except socket.error:
            pass #some platforms may raise ENOTCONN here
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        request.close()

 

ThreadingMixIn
class ThreadingMixIn:
    """Mix-in class to handle each request in a new thread."""

    # Decides how threads will act upon termination of the
    # main process
    daemon_threads = False

    def process_request_thread(self, request, client_address):
        """Same as in BaseServer but as a thread.

        In addition, exception handling is done here.

        """
        try:
            self.finish_request(request, client_address)
        except Exception:
            self.handle_error(request, client_address)
        finally:
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        t.start()

  

RequestHandler

class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

class BaseRequestHandler:

    """Base class for request handler classes.

    This class is instantiated for each request to be handled.  The
    constructor sets the instance variables request, client_address
    and server, and then calls the handle() method.  To implement a
    specific service, all you need to do is to derive a class which
    defines a handle() method.

    The handle() method can find the request as self.request, the
    client address as self.client_address, and the server (in case it
    needs access to per-server information) as self.server.  Since a
    separate instance is created for each request, the handle() method
    can define arbitrary other instance variariables.

    """

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

  

請求處理類有三種方法:

setup()

Called before the handle() method to perform any initialization actions required. The default implementation does nothing.

也就是在handle()以前被調用,主要的做用就是執行處理請求以前的初始化相關的各類工做。默認不會作任何事。(若是想要讓其作一些事的話,就要程序員在本身的請求處理器中覆蓋這個方法(由於通常自定義的請求處理器都要繼承python中提供的BaseRequestHandler,ps:下文會提到的),而後往裏面添加東西便可)

handle()

This function must do all the work required to service a request. The default implementation does nothing. Several instance attributes are available to it; the request is available as self.request; the client address as self.client_address; and the server instance as self.server, in case it needs access to per-server information.

The type of self.request is different for datagram or stream services. For stream services,self.request is a socket object; for datagram services, self.request is a pair of string and socket.

handle()的工做就是作那些全部與處理請求相關的工做。默認也不會作任何事。他有數個實例參數:self.request    self.client_address   self.server

finish()

Called after the handle() method to perform any clean-up actions required. The default implementation does nothing. If setup() raises an exception, this function will not be called.

在handle()方法以後會被調用,他的做用就是執行當處理完請求後的清理工做,默認不會作任何事

 

內部調用流程爲:

  1. 啓動服務端程序
  2. 執行 TCPServer.__init__ 方法,建立服務端Socket對象並綁定 IP 和 端口
address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    request_queue_size = 5

    allow_reuse_address = False

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

    def server_bind(self):
        """Called by constructor to bind the socket.

        May be overridden.

        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        self.socket.listen(self.request_queue_size)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        self.socket.close()

 

執行 BaseServer.__init__ 方法,將自定義的繼承自SocketServer.BaseRequestHandler 的類 MyRequestHandle賦值給 self.RequestHandlerClass

 

 

 

def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

  

執行 BaseServer.server_forever 方法,While 循環一直監聽是否有客戶端請求到達 ...

def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            # XXX: Consider using another file descriptor or connecting to the
            # socket to wake this up instead of polling. Polling reduces our
            # responsiveness to a shutdown request and wastes cpu at all other
            # times.
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)

                while not self.__shutdown_request:
                    ready = selector.select(poll_interval)
                    if ready:
                        self._handle_request_noblock()

                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

  

當客戶端鏈接到達服務器

def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that selector.select() has returned that the socket is
        readable before this function was called, so there should be no risk of
        blocking in get_request().
        """
        try:
            request, client_address = self.get_request()
        except OSError:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except Exception:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
            except:
                self.shutdown_request(request)
                raise
        else:
            self.shutdown_request(request)

 

執行 ThreadingMixIn.process_request 方法,建立一個 「線程」 用來處理請求

執行 ThreadingMixIn.process_request_thread 方法

 

# main process
    daemon_threads = False

    def process_request_thread(self, request, client_address):
        """Same as in BaseServer but as a thread.

        In addition, exception handling is done here.

        """
        try:
            self.finish_request(request, client_address)
        except Exception:
            self.handle_error(request, client_address)
        finally:
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        t.start()

  

執行 BaseServer.finish_request 方法,執行 self.RequestHandlerClass()  即:執行 自定義 MyRequestHandler 的構造方法(自動調用基類BaseRequestHandler的構造方法,在該構造方法中又會調用 MyRequestHandler的handle方法)

finish_request方法中執行了self.RequestHandlerClass(request, client_address, self)。self.RequestHandlerClass是什麼呢?

self.RequestHandlerClass = RequestHandlerClass(就在__init__方法中)。因此finish_request方法本質上就是建立了一個服務處理實例RequestHandlerClass

 RequestHandlerClass繼承了BaseRequestHandler類,並實現handle方法,在實例化RequestHandlerClass時,會執行handle方法,處理用戶所發送的數據(也就是事務邏輯)。

class BaseRequestHandler:

    """Base class for request handler classes.

    This class is instantiated for each request to be handled.  The
    constructor sets the instance variables request, client_address
    and server, and then calls the handle() method.  To implement a
    specific service, all you need to do is to derive a class which
    defines a handle() method.

    The handle() method can find the request as self.request, the
    client address as self.client_address, and the server (in case it
    needs access to per-server information) as self.server.  Since a
    separate instance is created for each request, the handle() method
    can define other arbitrary instance variables.

    """

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

 時序圖

 

  

附錄

class SocketServer.BaseServer:這是模塊中的全部服務器對象的超類。它定義了接口,以下所述,可是大多數的方法不實現,在子類中進行細化。

    BaseServer.fileno():返回服務器監聽套接字的整數文件描述符。一般用來傳遞給select.select(), 以容許一個進程監視多個服務器。

    BaseServer.handle_request():處理單個請求。處理順序:get_request(), verify_request(), process_request()。若是用戶提供handle()方法拋出異常,將調用服務器的handle_error()方法。若是self.timeout內沒有請求收到, 將調用handle_timeout()並返回handle_request()。

    BaseServer.serve_forever(poll_interval=0.5): 處理請求,直到一個明確的shutdown()請求。每poll_interval秒輪詢一次shutdown。忽略self.timeout。若是你須要作週期性的任務,建議放置在其餘線程。

    BaseServer.shutdown():告訴serve_forever()循環中止並等待其中止。python2.6版本。

    BaseServer.address_family: 地址家族,好比socket.AF_INET和socket.AF_UNIX。

    BaseServer.RequestHandlerClass:用戶提供的請求處理類,這個類爲每一個請求建立實例。

    BaseServer.server_address:服務器偵聽的地址。格式根據協議家族地址的各不相同,請參閱socket模塊的文檔。

    BaseServer.socketSocket:服務器上偵聽傳入的請求socket對象的服務器。

服務器類支持下面的類變量:

    BaseServer.allow_reuse_address:服務器是否容許地址的重用。默認爲false ,而且可在子類中更改。

    BaseServer.request_queue_size

請求隊列的大小。若是單個請求須要很長的時間來處理,服務器忙時請求被放置到隊列中,最多能夠放request_queue_size個。一旦隊列已滿,來自客戶端的請求將獲得 「Connection denied」錯誤。默認值一般爲5 ,但能夠被子類覆蓋。

    BaseServer.socket_type:服務器使用的套接字類型; socket.SOCK_STREAM和socket.SOCK_DGRAM等。

    BaseServer.timeout:超時時間,以秒爲單位,或 None表示沒有超時。若是handle_request()在timeout內沒有收到請求,將調用handle_timeout()。

下面方法能夠被子類重載,它們對服務器對象的外部用戶沒有影響。

    BaseServer.finish_request():實際處理RequestHandlerClass發起的請求並調用其handle()方法。 經常使用。

    BaseServer.get_request():接受socket請求,並返回二元組包含要用於與客戶端通訊的新socket對象,以及客戶端的地址。

    BaseServer.handle_error(request, client_address):若是RequestHandlerClass的handle()方法拋出異常時調用。默認操做是打印traceback到標準輸出,並繼續處理其餘請求。

    BaseServer.handle_timeout():超時處理。默認對於forking服務器是收集退出的子進程狀態,threading服務器則什麼都不作。

    BaseServer.process_request(request, client_address) :調用finish_request()建立RequestHandlerClass的實例。若是須要,此功能能夠建立新的進程或線程來處理請求,ForkingMixIn和ThreadingMixIn類作到這點。經常使用。

    BaseServer.server_activate():經過服務器的構造函數來激活服務器。默認的行爲只是監聽服務器套接字。可重載。

    BaseServer.server_bind():經過服務器的構造函數中調用綁定socket到所需的地址。可重載。

    BaseServer.verify_request(request, client_address):返回一個布爾值,若是該值爲True ,則該請求將被處理,反之請求將被拒絕。此功能能夠重寫來實現對服務器的訪問控制。默認的實現始終返回True。client_address能夠限定客戶端,好比只處理指定ip區間的請求。 經常使用。

  

這個幾個服務類都是同步處理請求的:一個請求沒處理完不能處理下一個請求。要想支持異步模型,能夠利用多繼承讓server類繼承ForkingMixIn 或 ThreadingMixIn mix-in classes。

ForkingMixIn利用多進程(分叉)實現異步。

ThreadingMixIn利用多線程實現異步。

 

參考:https://www.cnblogs.com/MnCu8261/p/5546823.html

相關文章
相關標籤/搜索