Day7—socket進階

本節內容 :html

  1. Socket實現多鏈接處理
  2. Socket實現簡單的ssh
  3. SocketServer實現真正的併發

 

1.Socket實現多鏈接處理linux

根據上一篇文章多socket的基本介紹,咱們大體能明白怎麼去創建一個客戶端和服務器通信,可是你會發現,若是客戶端斷開,服務器也會跟着短了,若是咱們想要客戶端斷開,服務器還能夠爲下一個客戶端服務,該如何實現呢?緩存

1 conn,addr = server.accept() #接收並創建與客戶端的連接,一直阻塞直到客戶端連進來

咱們想要實現多鏈接處理,上面這句代碼很關鍵。只要咱們同一個客戶端斷開鏈接後,它還能夠循環監聽,那麼多鏈接就實現了。服務器

 1 import socket
 2 
 3 server = socket.socket() #獲取socket實例
 4 
 5 server.bind(('localhost',9999)) #綁定 ip + port
 6 
 7 server.listen() # 開始監聽
 8 
 9 while True: #第一層循環用來持續接收新連接
10     print('waiting for new conn...')
11     conn,addr = server.accept()     #等待新連接
12     print('new conn:',addr)
13     while True: #第二層循環用來保持和連到的連接持續通信
14         data = conn.recv(1024)
15         if not data:
16             print('client is off...')
17             break     #斷開已創建的通信,回到等新連接的循環
18          print('recv data:',data)
19         conn.send(data.upper())
20 server.close()
多鏈接-服務端

注意了, 此時服務器端依然只能同時爲一個客戶服務,其客戶來了,得排隊(鏈接掛起),要想同時處理多個連接,請繼續日後看。網絡

 

2. Socket實現簡單的ssh多線程

出了能夠發送一些簡單消息,咱們還能夠乾點更高級的,好比來作一個簡單的ssh(不明白的能夠百度這是幹嗎的!),它就是經過客戶端連上服務器後,讓服務器執行命令,並把結果返回給客戶端。併發

 1 import os
 2 import socket
 3 
 4 server = socket.socket()
 5 server.bind(('localhost',7777))
 6 server.listen()
 7 
 8 while True:
 9     print("等待接收。。")
10     conn,addr = server.accept()
11     print("接收到客戶端:",addr)
12     while True:
13         cmd = conn.recv(1024).decode()  #接收客戶端發送的命令
14         data = os.popen(cmd).read()  #用poen內置函數執行指令,並讀取結果
15         print(data)
16         if not data:  #若是沒有內容,說明沒有此指令
17             data = "mei you ci ming ling"
18         conn.send(data.encode("utf-8")) #將執行的結果返回給客戶端
19         break
20 
21 server.close()
ssh server
 1 import socket
 2 
 3 client = socket.socket()
 4 client.connect(('localhost',7777))
 5 while True:
 6 
 7     cmd = input(">>>")  #輸入指令
 8     if len(cmd) == 0:      #用長短來判斷用戶是否輸入爲空
 9         continue
10     elif cmd == 'exit':    #判斷是否爲退出
11         print("退出!")
12         break
13     client.send(cmd.encode("utf-8"))  #發送指令須要編碼成bytes類型
14     data = client.recv(6094).decode()
15     print(data)
ssh client

很好,咱們已經實現了一個簡單的ssh,但你若是嘗試的指令夠多,就會發現,程序仍是有問題的app

  1. 不能執行top等相似的 會持續輸出的命令,這是由於,服務器端在收到客戶端指令後,會一次性經過os.popen執行,並獲得結果後返回給客戶,但top這樣的命令用os.popen執行你會發現永遠都不會結束,因此客戶端也永遠拿不到返回。(真正的ssh是經過select 異步等模塊實現的,咱們之後會涉及)
  2. 不能執行像cd這種沒有返回的指令, 由於客戶端每發送一條指令,就會經過client.recv(1024)等待接收服務器端的返回結果,可是cd命令沒有結果 ,服務器端調用conn.send(data)時是不會發送數據給客戶端的。 因此客戶端就會一直等着,等到天荒地老,結果就卡死了。解決的辦法是,在服務器端判斷命令的執行返回結果的長度,若是結果爲空,就本身加個結果返回給客戶端,如寫上"cmd exec success, has no output."
  3. 若是執行的命令返回結果的數據量比較大,會發現,結果返回不全,在客戶端上再執行一條命令,結果返回的仍是上一條命令的後半段的執行結果,這是爲何呢?這是由於,咱們的客戶寫client.recv(1024), 即客戶端一次最多隻接收1024個字節,若是服務器端返回的數據是2000字節,那有至少9百多字節是客戶端第一次接收不了的,那怎麼辦呢,服務器端此時不能把數據直接扔了呀,so它會暫時存在服務器的io發送緩衝區裏,等客戶端下次再接收數據的時候再發送給客戶端。 這就是爲何客戶端執行第2條命令時,卻接收到了第一條命令的結果的緣由。 這時有同窗說了, 那我直接在客戶端把client.recv(1024)改大一點不就行了麼, 改爲一次接收個100mb,哈哈,這是不行的,由於socket每次接收和發送都有最大數據量限制的,畢竟網絡帶寬也是有限的呀,不能一次發太多,發送的數據最大量的限制 就是緩衝區能緩存的數據的最大量,這個緩衝區的最大值在不一樣的系統上是不同的, 我實在查不到一個具體的數字,但測試的結果是,在linux上最大一次可接收10mb左右的數據,不過官方的建議是不超過8k,也就是8192,而且數據要能夠被2整除,不要問爲何 。若是一次只能接收最多不超過8192的數據 ,那服務端返回的數據超過了這個數字怎麼辦呢?好比讓服務器端打開一個5mb的文件並返回,客戶端怎麼才能完整的接受到呢?那就只能循環收取啦。

  在開始解決上面問題3以前,咱們要考慮,客戶端要循環接收服務器端的大量數據返回直到一條命令的結果所有返回爲止, 但問題是客戶端知道服務器端返回的數據有多大麼?答案是不知道,那既然不知道服務器的要返回多大的數據,那客戶端怎麼知道要循環接收多少次呢?答案是不知道,擦,那咋辦? 總不能靠猜吧?呵呵。。。 固然不能,那隻能讓服務器在發送數據以前主動告訴客戶端,要發送多少數據給客戶端,而後再開始發送數據,yes, 機智如我,搞起。ssh

 1 import os
 2 import socket
 3 
 4 server = socket.socket()
 5 server.bind(('localhost',7777))
 6 server.listen()
 7 
 8 while True:
 9     print("等待接收。。")
10     conn,addr = server.accept()
11     print("接收到客戶端:",addr)
12     while True:
13         cmd = conn.recv(1024).decode()
14         if cmd == 'exit':
15             print('client is off...')
16             break
17         data = os.popen(cmd).read()
18         print(data)
19         if not data:
20             data = "cmd exec success,has no output"
21         conn.send(str(len(data)).encode()) #數據發送前,先告訴客戶端數據的大小
22         conn.sendall(data.encode("utf-8"))  #發送端也有最大數據量限制,因此這裏用sendall,至關於重複循環調用conn.send,直至數據發送完畢
23 
24 server.close()
ssh server 發送大小
 1 import socket
 2 
 3 client = socket.socket()
 4 client.connect(('localhost',7777))
 5 while True:
 6 
 7     cmd = input(">>>")
 8     if len(cmd) == 0:
 9         continue
10     elif cmd == 'exit':
11         client.send(cmd.encode("utf-8"))
12         print("退出!")
13         break
14     client.send(cmd.encode("utf-8"))
15     res_return_size  = client.recv(1024) #接收這條命令執行結果的大小
16    #data = client.recv(6094).decode()
17     print(res_return_size)
ssh client 接受大小

若是你夠幸運,會發現程序執行會報錯,客戶端本想只接服務器端命令的執行結果,但實際上卻連命令結果也跟着接收了一部分。這又是爲何呢?異步

這裏就引入了一個重要的概念,「粘包」, 即服務器端你調用時 ’send ’ 2次,但你send調用時,數據其實並無馬上被髮送給客戶端,而是放到了系統的socket發送緩衝區裏,等緩衝區滿了、或者數據等待超時了,數據纔會被send到客戶端,這樣就把好幾回的小數據拼成一個大數據,統一發送到客戶端了,這麼作的目地是爲了提升io利用效率,一次性發送總比連發好幾回效率高嘛。 但也帶來一個問題,就是「粘包」,即2次或屢次的數據粘在了一塊兒統一發送了。就是報錯的狀況 。

咱們在這裏必需要想辦法把粘包分開, 由於不分開,咱們就沒辦法取出來服務器端返回的命令執行結果的大小呀。so ,那怎麼分開呢?首先咱們是沒辦法讓緩衝區強制刷新把數據發給客戶端的。 能作的,只有一個。讓緩衝區超時,超時了,系統就不會等緩衝區滿了,會直接把數據發走,由於不能一個勁的等後面的數據呀,等過久,會形成數據延遲了,那但是極很差的。如何讓緩衝區超時呢?

答案就是:

  1.  low-way:time.sleep(0.5)。經屢次測試,讓服務器程序sleep 至少0.5就會形成緩衝區超時。哈哈哈, 你會說,這麼玩不會被老闆開除麼,雖然咱們以爲0.5s很少,可是對數據實時要求高的業務場景,好比股票交易,過了0.5s 股票價格能夠就漲跌不少。但沒辦法,我剛學socket的時候 找不到更好的辦法,就是這麼玩的,如今想一想也真是low呀
  2. high-way: 服務器端每發送一個數據給客戶端,就馬上等待客戶端進行迴應。即調用 conn.recv(1024), 因爲recv在接收不到數據時是阻塞的,這樣就會形成,服務器端接收不到客戶端的響應,就不會執行後面的conn.sendall(命令結果)的指令,收到客戶端響應後,再發送命令結果時,緩衝區就已經被清空了,由於上一次的數據已經被強制發到客戶端了。 機智 吧, 看下面代碼實現。
 1 import os
 2 import socket
 3 
 4 server = socket.socket()
 5 server.bind(('localhost',7777))
 6 server.listen()
 7 
 8 while True:
 9     print("等待接收。。")
10     conn,addr = server.accept()
11     print("接收到客戶端:",addr)
12     while True:
13         cmd = conn.recv(1024).decode()
14         if cmd == 'exit':
15             print('client is off...')
16             break
17         data = os.popen(cmd).read()
18         print(data)
19         if not data:
20             data = "cmd exec success,has no output"
21         conn.send(str(len(data)).encode())
22         conn.recv(1024)  #接收客戶端回執信息,不作處理,防粘包
23         conn.sendall(data.encode("utf-8"))
24 
25 server.close()
ssh server 不粘包
 1 import socket
 2 
 3 client = socket.socket()
 4 client.connect(('localhost',7777))
 5 while True:
 6 
 7     cmd = input(">>>")
 8     if len(cmd) == 0:
 9         continue
10     elif cmd == 'exit':
11         client.send(cmd.encode("utf-8"))
12         print("退出!")
13         break
14     client.send(cmd.encode("utf-8"))
15 
16     res_return_size  = client.recv(1024) #接收這條命令執行結果的大小
17     print("getting cmd result , ", res_return_size)
18     total_rece_size = int(res_return_size)
19     print("total size:",res_return_size)
20     client.send("準備好接收了,發吧loser".encode("utf-8"))  #發送回執信息
21     received_size = 0 #已接收到的數據
22     cmd_res = b''
23     f = open("test_copy.html","wb")#把接收到的結果存下來,一會看看收到的數據 對不對
24     while received_size != total_rece_size: #表明還沒收完
25         data = client.recv(1024)
26         received_size += len(data) #爲何不是直接1024,還判斷len幹嗎,注意,實際收到的data有可能比1024少
27         cmd_res += data
28     else:
29         print("數據收完了",received_size)
30         #print(cmd_res.decode())
31         f.write(cmd_res) #把接收到的結果存下來,一會看看收到的數據 對不對
32     #print(data.decode()) #命令執行結果
33 
34 client.close()
ssh client 不粘包

 

3.SocketServer實現真正的併發

SocketServer內部使用 IO多路複用 以及 「多線程」 和 「多進程」 ,從而實現併發處理多個客戶端請求的Socket服務端。即:每一個客戶端請求鏈接到服務器時,Socket服務端都會在服務器是建立一個「線程」或者「進程」 專門負責處理當前客戶端的全部請求。

ThreadingTCPServer

ThreadingTCPServer實現的Soket服務器內部會爲每一個client建立一個 「線程」,該線程用來和客戶端進行交互。

一、ThreadingTCPServer基礎

使用ThreadingTCPServer:

  • 建立一個繼承自 SocketServer.BaseRequestHandler 的類
  • 類中必須定義一個名稱爲 handle 的方法
  • 啓動ThreadingTCPServer
 1 import socketserver
 2 
 3 class MyTCPHandler(socketserver.BaseRequestHandler):
 4 
 5     """
 6     The request handler class for our server.
 7 
 8     It is instantiated once per connection to the server, and must
 9     override the handle() method to implement communication to the
10     client.
11     """
12     def handle(self):
13         # self.request is the TCP socket connected to the client
14         while True:
15             try:
16                 self.data = self.request.recv(1024).strip()
17                 print("{} wrote:".format(self.client_address[0]))
18                 print(self.data)
19                 # just send back the same data, but upper-cased
20                 self.request.sendall(self.data.upper())
21             except ConnectionResetError as e:
22                 print("error:",e)
23                 break
24 
25 if __name__ == "__main__":
26     HOST, PORT = "localhost", 9999
27 
28     # Create the server, binding to localhost on port 9999
29     server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler)
30 
31     # Activate the server; this will keep running until you
32     # interrupt the program with Ctrl-C
33     server.serve_forever()
SocketServer服務器
 1 import socket
 2 #客戶端沒什麼特別,跟以前都同樣
 3 client = socket.socket()
 4 client.connect(("localhost",9999))
 5 
 6 while True:
 7     data = input(">>>").strip()
 8     client.send(data.encode("utf-8"))
 9     recv_data = client.recv(1024).decode()
10     print(recv_data)
客戶端

二、ThreadingTCPServer源碼剖析

ThreadingTCPServer的類圖關係以下:

內部調用流程爲:

  • 啓動服務端程序
  • 執行 TCPServer.__init__ 方法,建立服務端Socket對象並綁定 IP 和 端口
  • 執行 BaseServer.__init__ 方法,將自定義的繼承自SocketServer.BaseRequestHandler 的類 MyRequestHandle賦值給 self.RequestHandlerClass
  • 執行 BaseServer.server_forever 方法,While 循環一直監聽是否有客戶端請求到達 ...
  • 當客戶端鏈接到達服務器
  • 執行 ThreadingMixIn.process_request 方法,建立一個 「線程」 用來處理請求
  • 執行 ThreadingMixIn.process_request_thread 方法
  • 執行 BaseServer.finish_request 方法,執行 self.RequestHandlerClass()  即:執行 自定義 MyRequestHandler 的構造方法(自動調用基類BaseRequestHandler的構造方法,在該構造方法中又會調用 MyRequestHandler的handle方法)

ThreadingTCPServer相關源碼:

class BaseServer:

    """Base class for server classes.

    Methods for the caller:

    - __init__(server_address, RequestHandlerClass)
    - serve_forever(poll_interval=0.5)
    - shutdown()
    - handle_request()  # if you do not use serve_forever()
    - fileno() -> int   # for select()

    Methods that may be overridden:

    - server_bind()
    - server_activate()
    - get_request() -> request, client_address
    - handle_timeout()
    - verify_request(request, client_address)
    - server_close()
    - process_request(request, client_address)
    - shutdown_request(request)
    - close_request(request)
    - handle_error()

    Methods for derived classes:

    - finish_request(request, client_address)

    Class variables that may be overridden by derived classes or
    instances:

    - timeout
    - address_family
    - socket_type
    - allow_reuse_address

    Instance variables:

    - RequestHandlerClass
    - socket

    """

    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        pass

    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            while not self.__shutdown_request:
                # XXX: Consider using another file descriptor or
                # connecting to the socket to wake this up instead of
                # polling. Polling reduces our responsiveness to a
                # shutdown request and wastes cpu at all other times.
                r, w, e = _eintr_retry(select.select, [self], [], [],
                                       poll_interval)
                if self in r:
                    self._handle_request_noblock()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

    def shutdown(self):
        """Stops the serve_forever loop.

        Blocks until the loop has finished. This must be called while
        serve_forever() is running in another thread, or it will
        deadlock.
        """
        self.__shutdown_request = True
        self.__is_shut_down.wait()

    # The distinction between handling, getting, processing and
    # finishing a request is fairly arbitrary.  Remember:
    #
    # - handle_request() is the top-level call.  It calls
    #   select, get_request(), verify_request() and process_request()
    # - get_request() is different for stream or datagram sockets
    # - process_request() is the place that may fork a new process
    #   or create a new thread to finish the request
    # - finish_request() instantiates the request handler class;
    #   this constructor will handle the request all by itself

    def handle_request(self):
        """Handle one request, possibly blocking.

        Respects self.timeout.
        """
        # Support people who used socket.settimeout() to escape
        # handle_request before self.timeout was available.
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
        if not fd_sets[0]:
            self.handle_timeout()
            return
        self._handle_request_noblock()

    def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that select.select has returned that the socket is
        readable before this function was called, so there should be
        no risk of blocking in get_request().
        """
        try:
            request, client_address = self.get_request()
        except socket.error:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)

    def handle_timeout(self):
        """Called if no new request arrives within self.timeout.

        Overridden by ForkingMixIn.
        """
        pass

    def verify_request(self, request, client_address):
        """Verify the request.  May be overridden.

        Return True if we should proceed with this request.

        """
        return True

    def process_request(self, request, client_address):
        """Call finish_request.

        Overridden by ForkingMixIn and ThreadingMixIn.

        """
        self.finish_request(request, client_address)
        self.shutdown_request(request)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        pass

    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self)

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        pass

    def handle_error(self, request, client_address):
        """Handle an error gracefully.  May be overridden.

        The default is to print a traceback and continue.

        """
        print '-'*40
        print 'Exception happened during processing of request from',
        print client_address
        import traceback
        traceback.print_exc() # XXX But this goes to stderr!
        print '-'*40

BaseServer
BaseServer
class TCPServer(BaseServer):

    """Base class for various socket-based server classes.

    Defaults to synchronous IP stream (i.e., TCP).

    Methods for the caller:

    - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
    - serve_forever(poll_interval=0.5)
    - shutdown()
    - handle_request()  # if you don't use serve_forever()
    - fileno() -> int   # for select()

    Methods that may be overridden:

    - server_bind()
    - server_activate()
    - get_request() -> request, client_address
    - handle_timeout()
    - verify_request(request, client_address)
    - process_request(request, client_address)
    - shutdown_request(request)
    - close_request(request)
    - handle_error()

    Methods for derived classes:

    - finish_request(request, client_address)

    Class variables that may be overridden by derived classes or
    instances:

    - timeout
    - address_family
    - socket_type
    - request_queue_size (only for stream sockets)
    - allow_reuse_address

    Instance variables:

    - server_address
    - RequestHandlerClass
    - socket

    """

    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    request_queue_size = 5

    allow_reuse_address = False

    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

    def server_bind(self):
        """Called by constructor to bind the socket.

        May be overridden.

        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        self.socket.listen(self.request_queue_size)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        self.socket.close()

    def fileno(self):
        """Return socket file number.

        Interface required by select().

        """
        return self.socket.fileno()

    def get_request(self):
        """Get the request and client address from the socket.

        May be overridden.

        """
        return self.socket.accept()

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        try:
            #explicitly shutdown.  socket.close() merely releases
            #the socket and waits for GC to perform the actual close.
            request.shutdown(socket.SHUT_WR)
        except socket.error:
            pass #some platforms may raise ENOTCONN here
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        request.close()
TCPServer
class ThreadingMixIn:
    """Mix-in class to handle each request in a new thread."""

    # Decides how threads will act upon termination of the
    # main process
    daemon_threads = False

    def process_request_thread(self, request, client_address):
        """Same as in BaseServer but as a thread.

        In addition, exception handling is done here.

        """
        try:
            self.finish_request(request, client_address)
            self.shutdown_request(request)
        except:
            self.handle_error(request, client_address)
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        t.start()
ThreadingMixIn
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
ThreadingTCPServer

ForkingTCPServer

ForkingTCPServer和ThreadingTCPServer的使用和執行流程基本一致,只不過在內部分別爲請求者創建 「線程」  和 「進程」。

基本使用:

import socketserver

class MyTCPHandler(socketserver.BaseRequestHandler):
    """
    The request handler class for our server.

    It is instantiated once per connection to the server, and must
    override the handle() method to implement communication to the
    client.
    """

    def handle(self):
        # self.request is the TCP socket connected to the client
        while True:
            try:
                self.data = self.request.recv(1024).strip()
                print("{} wrote:".format(self.client_address[0]))
                print(self.data)
                # just send back the same data, but upper-cased
                self.request.sendall(self.data.upper())
            except ConnectionResetError as e:
                print("error:",e)
                break

if __name__ == "__main__":
    HOST, PORT = "localhost", 9999

    # Create the server, binding to localhost on port 9999
    server = socketserver.ForkingTCPServer((HOST, PORT), MyTCPHandler)

    # Activate the server; this will keep running until you
    # interrupt the program with Ctrl-C
    server.serve_forever()
服務端
import socket

client = socket.socket()
client.connect(("localhost",9999))

while True:
    data = input(">>>").strip()
    client.send(data.encode("utf-8"))
    recv_data = client.recv(1024).decode()
    print(recv_data)
客戶端
相關文章
相關標籤/搜索