38 - 網絡編程-socketserver

1 socket編程弊端

socket編程過於底層,編程雖然有套路,可是要寫出健壯的代碼仍是比較困難的,因此不少語言都會socket底層API進行封裝,Python的封裝就是SocketServer模塊。它是網絡服務編程框架,便於企業級快速開發。編程

2 SocketServer模塊

SocketServer,網絡通訊服務器,是Python標準庫中的一個高級模塊,其做用是建立網絡服務器。該模塊簡化了編寫網絡服務器的任務。bash

2.1 服務器類

SocketServer模塊中定義了五種服務器類。服務器

  • BaseServer(server_address, RequestHandlerClass):服務器的基類,定義了API。
  • TCPServer(server_address, RequestHandlerClass, bind_and_activate=True):使用TCP/IP套接字。
  • UDPServer:使用數據報套接字
  • UnixStreamServer:使用UNIX套接字,只適用UNIX平臺
  • UnixDatagramServer:使用UNIX套接字,只適用UNIX平臺

它們的繼承關係:網絡

+------------+
        | BaseServer |
        +------------+
              |
              v
        +-----------+        +------------------+
        | TCPServer |------->| UnixStreamServer |
        +-----------+        +------------------+
              |
              v
        +-----------+        +--------------------+
        | UDPServer |------->| UnixDatagramServer |
        +-----------+        +--------------------+

除了基類爲抽象基類意外,其餘四個類都是同步阻塞的。多線程

2.2 Mixin類

SocketServer模塊中還提供了一些Mixin類,用於和同步類組成多線程或者多進程的異步方式。框架

  • class ForkingUDPServer(ForkingMixIn, UDPServer)
  • class ForkingTCPServer(ForkingMixIn, TCPServer)
  • class ThreadingUDPServer(ThreadingMixIn, UDPServer)
  • class ThreadingTCPServer(ThreadingMixIn, TCPServer)
  • class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer)
  • class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer)

fork是建立多進程,thread是建立多線程。fork須要操做系統支持,Windows不支持。異步

2.3 RequestHandlerClass是啥

要想使用服務器類,須要傳入一個RequestHandlerClass類,爲何要傳入這個RequestHandlerClass類,它是幹什麼的?下面一塊兒來看看源碼:socket

  1. TCPServer入口
# 446 行 
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
    """Constructor.  May be extended, do not override."""
    BaseServer.__init__(self, server_address, RequestHandlerClass)    # 把RequestHandlerClass交給了BaseServer
    self.socket = socket.socket(self.address_family,
                                self.socket_type)
  1. BaseServer
# 204 行
def __init__(self, server_address, RequestHandlerClass):
    """Constructor.  May be extended, do not override."""
    self.server_address = server_address
    self.RequestHandlerClass = RequestHandlerClass  # 將RequestHandlerClass看成類屬性付給了實例自己
    self.__is_shut_down = threading.Event()
    self.__shutdown_request = False
  1. 開啓的服務serve_forever方法
# 219 行
def serve_forever(self, poll_interval=0.5):
    self.__is_shut_down.clear()
    try:
        ... ...
                if ready:
                    self._handle_request_noblock()   # 這裏執行了_handle_request_noblock()方法

                self.service_actions()
    finally:
        self.__shutdown_request = False
        self.__is_shut_down.set()
  1. _handle_request_noblock()方法
# 304行
    def _handle_request_noblock(self):
        ... ... 
            try:
                self.process_request(request, client_address)    # 這裏又調用了process_request方法
            except Exception:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
            except:
                self.shutdown_request(request)
                raise
        else:
            self.shutdown_request(request)
  1. process_request方法
# 342 行
def process_request(self, request, client_address):
    self.finish_request(request, client_address)   # 這裏又調用了finish_request方法
    self.shutdown_request(request)
  1. finish_request方法
# 359 行
def finish_request(self, request, client_address):
    """Finish one request by instantiating RequestHandlerClass."""
    self.RequestHandlerClass(request, client_address, self)   # 實例化對象

經過觀察源碼,咱們知道RequestHandlerClass接受了兩個參數,根據上下文代碼咱們可知:tcp

  1. request:客戶端的socket鏈接
  2. client_address: 客戶端的地址二元組信息

那麼這個RequestHandlerClass究竟是啥?

2.4 編程接口

        經過源碼,咱們能夠知道,最後傳入RequestHandlerClass的是socket和client的ip地址,是否是和咱們前面寫的TCP多線程版本的recv函數很想?沒錯,RequestHandlerClass在這裏被稱爲編程接口,因爲處理業務邏輯。
        可是RequestHandlerClass咱們沒有參照,該如何寫呢? socketserver提供了抽象基類BaseRequestHandler,共咱們繼承來實現。

BaseRequestHandler 是和用戶鏈接的用戶請求處理類的基類,因此 RequestHandlerClass 必須是 BaseRequestHandler 的子類,

class BaseRequestHandler:

    # 在初始化時就會調用這些方法
    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):    # 每個鏈接初始化
        pass

    def handle(self):   # 每一次請求處理
        pass 

    def finish(self):   # 每個鏈接清理
        pass

觀察源碼咱們知道,setup、handler、finish在類初始化時就會被執行:

  1. setup():執行處理請求以前的初始化相關的各類工做。默認不會作任何事。
  2. handler():執行那些全部與處理請求相關的工做。默認也不會作任何事。
  3. finish():執行當處理完請求後的清理工做,默認不會作任何事。

這些參數究竟是什麼?

import socketserver

class MyRequestHandler(socketserver.BaseRequestHandler):
    def handle(self):
        print(self.server)  # <socketserver.TCPServer object at 0x00000153270DA320>
        print(self.client_address)  # ('127.0.0.1', 62124)
        print(self.request)  # <socket.socket fd=296, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 999), raddr=('127.0.0.1', 62124)>

s = socketserver.TCPServer(('127.0.0.1', 999), MyRequestHandler)
s.serve_forever()

根據上面的輸出信息咱們知道:

  • self.server:指代當前server自己
  • self.client_address:客戶端地址
  • self.request: 和客戶端鏈接的socket對象

咱們總結一下,使用socketserver建立一個服務器須要如下幾個步驟:

  1. 從BaseRequestHandler類派生出子類,並覆蓋其handler()方法來建立請求處理程序類,此方法將處理傳入請求。
  2. 實例化一個服務器類,傳參服務器的地址和請求處理類
  3. 調用服務器實例的handler_request()或serve_forever()方法(啓動服務)
  4. 調用server_close()方法(關閉服務)

3 實現EchoServer

顧名思義:Echo,即來什麼消息就回顯什麼消息,即客戶端發來什麼消息,就返回什麼消息。

import socketserver
import logging
import threading

FORMAT = '%(asctime)s %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

class MyRequestHandler(socketserver.BaseRequestHandler):

    def setup(self):
        
        # 優先執行父類同名方法是一個很好的習慣,即使是父類不做任何處理
        super(MyRequestHandler, self).setup() 
        self.event = threading.Event()

    def handle(self):
        super(MyRequestHandler, self).handle()
        while not self.event.is_set():
            data = self.request.recv(1024)
            if data == b'quit' or data == b'':
                self.event.set()
                break

            msg = '{}:{} {}'.format(*self.client_address, data.decode())
            logging.info(msg)
            self.request.send(msg.encode())

    def finish(self):
        super(MyRequestHandler, self).finish()
        self.event.set()

if __name__ == '__main__':
    with socketserver.ThreadingTCPServer(('127.0.0.1', 9999), MyRequestHandler) as server:

        # 讓全部啓動的線程都爲daemon進程
        server.daemon_threads = True

        # serve_foreve會阻塞,因此交給子線程運行
        threading.Thread(target=server.serve_forever, daemon=True).start()
        while True:
            cmd = input('>>>').strip()
            if not cmd: continue
            if cmd == 'quit':
                server.server_close()
                break
            print(threading.enumerate())

ThreadingTCPServer是TCPServer的子類,它混合了ThreadingMixIn類使用多線程來接受客戶端的鏈接。

4 聊天室

代碼以下:

import socketserver
import logging
import threading

FORMAT = '%(asctime)s %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


class ChatRequestHandler(socketserver.BaseRequestHandler):
    clients = set()

    # 每一個鏈接進來時的操做
    def setup(self):
        super(ChatRequestHandler, self).setup()
        self.event = threading.Event()
        self.lock = threading.Lock()
        self.clients.add(self.request)

    # 每一個鏈接的業務處理
    def handle(self):
        super(ChatRequestHandler, self).handle()
        while not self.event.is_set():
            
            # use Client code except
            try:
                data = self.request.recv(1024)
            except ConnectionResetError:
                self.clients.remove(self.request)
                self.event.set()
                break
                
            # use TCP tools except
            if data.rstrip() == b'quit' or data == b'':
                with self.lock:
                    self.clients.remove(self.request)
                logging.info("{}:{} is down ~~~~~~".format(*self.client_address))
                self.event.set()
                break

            msg = "{}:{} {}".format(*self.client_address, data.decode()).encode()
            logging.info('{}:{} {}'.format(*self.client_address, msg))
            with self.lock:
                for client in self.clients:
                    client.send(msg)

    # 每一個鏈接關閉的時候,會執行
    def finish(self):

        super(ChatRequestHandler, self).finish()
        self.event.set()
        self.request.close()


class ChatTCPServer:
    def __init__(self, ip, port):
        self.ip = ip
        self.port = port
        self.sock = socketserver.ThreadingTCPServer((self.ip, self.port), ChatRequestHandler)

    def start(self):
        self.sock.daemon_threads = True
        threading.Thread(target=self.sock.serve_forever, name='server', daemon=True).start()

    def stop(self):
        self.sock.server_close()


if __name__ == '__main__':
    cts = ChatTCPServer('127.0.0.1', 9999)
    cts.start()

    while True:
        cmd = input('>>>>:').strip()
        if cmd == 'quit':
            cts.stop()
        if not cmd: continue
        print(threading.enumerate())

使用TCP工具強制退出時會發送空串,而使用咱們本身寫的tcp client,則不會發送,因此這裏所了兩種異常的處理。socketserver只是用在服務端,客戶端使用上一篇TCP client便可。

相關文章
相關標籤/搜索