深刻淺出談 socket

如今咱們開發每每不斷使用封裝好的web框架, 運行web服務也有至關多的容器, 可是其原理每每都離不開socket. 像是nginx底層就是採用相似python中epoll的異步監聽方式加上socket結合來作. 本文采起從最簡單的socket通訊實現聊天機器人, 到僞併發實現聊天機器人, 最後採用異步監聽方式實現聊天機器人, 逐步推動.python

首先咱們實現一個最簡單版的的socket服務端, server_s1.pynginx

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket

HOST='127.0.0.1'
PORT=9999

sockaddr=(HOST,PORT)
sk=socket.socket()
sk.bind(sockaddr)
sk.listen(5)
conn,address=sk.accept()
ret_bytes=conn.recv(1024)
print(str(ret_bytes,encoding='utf-8'))
conn.sendall(ret_bytes+bytes(', 已收到!',encoding='utf-8'))
sk.close()複製代碼
  • sk=socket.socket() 這裏建立socket對象
  • 經過sk.bind(sockaddr) 傳入一個元組對象以此來設置服務端ip和port
  • sk.listen(5) 表示設置最大等待鏈接數爲5個
  • conn,address=sk.accept() 此時阻塞進程, 循環等待被鏈接, 返回鏈接對象和包含鏈接信息的對象
  • ret_bytes=conn.recv(1024) 等待接受1024個字節的信息
  • conn.sendall(ret_bytes+bytes(', 已收到!',encoding='utf-8')) 將接受的信息加上 , 已收到! 從新發送給客戶端. 注意, 在python2中能夠傳遞str類型的數據, 可是在python3中只能傳遞byte類型的數據
  • sk.close() 關閉鏈接

至此簡單的服務端已經寫好了, 咱們看看客戶端, client_c1.pyweb

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket

HOST='127.0.0.1'
PORT=9999

sockaddr=(HOST,PORT)
ct=socket.socket()
ct.connect(sockaddr)
ct.sendall(bytes('第一次鏈接',encoding='utf-8'))
ret_bytes=ct.recv(1024)
print(str(ret_bytes,encoding='utf-8'))
ct.close()複製代碼
  • 客戶端中須要鏈接服務端, 經過ct.connect(sockaddr) 來執行

到如今爲止, 已經把簡單聊天機器人已經寫好了, 客戶端向服務端發送第一次鏈接 , 服務端接受輸出到客戶端並回饋給客戶端第一次鏈接, 已收到! 接下來咱們試着讓這個服務端更健壯一些, 嘗試讓它能夠不斷的返回客戶端發送過來的內容編程

這是第二個版本的服務端, server_s2.py服務器

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket

HOST='127.0.0.1'
PORT=9999

sockaddr=(HOST,PORT)
sk=socket.socket()
sk.bind(sockaddr)
sk.listen(5)
while True:
    conn,address=sk.accept()
    while True:
        try:
            ret_bytes=conn.recv(1024)
        except Exception as ex:
            print("已從",address,"斷開")
            break
        else:
            conn.sendall(ret_bytes+bytes(', 已收到!',encoding='utf-8'))
sk.close()複製代碼
  • 最內層的循環表示一旦鏈接則一直等待客戶端發送消息併發回去, 直到鏈接斷開
  • 最外層的循環表示即便斷開鏈接可是服務器仍處於等待其餘客戶端鏈接
  • 加入異常處理表示, 客戶端斷開鏈接, 服務端僅僅斷開這次鏈接

接下來看看客戶端文件, client_c2.py網絡

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket

HOST='127.0.0.1'
PORT=9999

sockaddr=(HOST,PORT)
ct=socket.socket()
ct.connect(sockaddr)
while True:
    inp=input("請輸入要發送的內容: ")
    ct.sendall(bytes(inp,encoding='utf-8'))
    ret_bytes=ct.recv(1024)
    print(str(ret_bytes,encoding='utf-8'))
ct.close()複製代碼
  • 客戶端僅僅須要將要發送內容的部分放到循環中便可

如今第二個版本已經能夠接二連三的處理同一鏈接的消息, 即便斷開也不會影響服務器的健壯性. 可是, 咱們的服務器功能還很單一, 只能一次處理一個客戶端的鏈接. 接下來將用select模塊實現僞併發處理客戶端鏈接併發

這裏是第三個版本的服務端文件, server_s3.pyapp

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
import select

HOST = '127.0.0.1'
PORT = 9999

sockaddr = (HOST, PORT)

sk = socket.socket()
sk.bind(sockaddr)
sk.listen(5)

sk_inps = [sk, ]

while True:
    change_list, keep_list, error_list = select.select(sk_inps, [], sk_inps, 1)
    for sk_tmp in change_list:
        if sk_tmp == sk:
            conn, address = sk_tmp.accept()
            sk_inps.append(conn)
        else:
            try:
                ret_bytes = sk_tmp.recv(1024)
            except Exception as ex:
                sk_inps.remove(sk_tmp)
                print("已從", sk_tmp.getpeername(), "斷開")
            else:
                sk_tmp.sendall(ret_bytes + bytes(', 已收到!', encoding='utf-8'))

    for sk_tmp in error_list:
        sk_inps.remove(sk_tmp)

sk.close()複製代碼

咱們首先來看一下循環的過程框架

循環過程

  • change_list, keep_list, error_list = select.select(sk_inps, [], sk_inps, 1) 中, select.select() 會自動監控起參數的內容, 當第一個參數中的對象發生變化時候會將該對象加到change_list中, 該次循環結束時change_list便會自動清空. 第一個參數中的變化對於sk對象, 這裏只有客戶端鏈接sk對象或者與sk對象斷開兩種狀況
  • 接着咱們遍歷change_lis中的內容, 當有客戶端鏈接時候, 如圖所見, chang_list中只有sk對象, 此時咱們將客戶端的鏈接conn加入到sk_inps中, 讓select下次循環時候也監控conn對象的變化
  • 當客戶端發送消息時候意味着conn對象的變化, 此時change_list中加入該鏈接對象, 根據此對象, 咱們能夠處理客戶端發送來的消息
  • 經過以上方式, 讓服務端輪流處理每一個客戶端鏈接, 因爲cpu如今的處理速度極快, 給人的感受就是併發處理多個客戶端請求, 其實是假裝併發處理
  • sk_inps.remove(sk_tmp) 這一句中, 一旦客戶端斷開鏈接, 則服務端就會捕捉到異常並將該客戶端對象從監控列表sk_inps中移除
  • 接着咱們來講是select.select() 中的第二個參數, 該參數中有什麼對象則keep_list 中就會加入什麼對象, 該參數對於讀寫分離的僞併發處理有很大意義, 咱們稍後再作介紹
  • select.select() 的第三個參數是當被監控的對象出現錯誤或者異常時候就將出錯的對象加入到error_list 中, 隨後咱們遍歷error_list並根據裏邊的出錯對象將其從sk_inps中除去

該版本的客戶端延續上一版本便可, 無需更改. 至此, 咱們就創建一個能併發簡單處理多客戶端鏈接的服務器. 可是, 對於change_list 中遍歷時候咱們既有讀又有寫的操做, 這樣當後期的處理複雜的時候, 代碼維護很難再進行下去. 接下來咱們接着開發咱們的僞併發處理的最終版本異步

這裏是服務的文件, server_s4.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
import select

HOST = '127.0.0.1'
PORT = 9997

sockaddr = (HOST, PORT)

sk = socket.socket()
sk.bind(sockaddr)
sk.listen(5)

sk_inps = [sk, ]
sk_outs=[]
message_dic={}

while True:
    change_list, keep_list, error_list = select.select(sk_inps, sk_outs, sk_inps, 1)
    for sk_tmp in change_list:
        if sk_tmp == sk:
            conn, address = sk_tmp.accept()
            sk_inps.append(conn)
            message_dic[conn]=[]
        else:
            try:
                ret_bytes = sk_tmp.recv(1024)
            except Exception as ex:
                sk_inps.remove(sk_tmp)
                print("已從", sk_tmp.getpeername(), "斷開")
                del message_dic[sk_tmp]
            else:
                sk_outs.append(sk_tmp)
                message_dic[sk_tmp].append(str(ret_bytes,encoding='utf-8'))

    for conn in keep_list:
        message= message_dic[conn][0]
        conn.sendall(bytes(message+", 已收到!",encoding='utf-8'))
        del message_dic[conn][0]
        sk_outs.remove(conn)

    for sk_tmp in error_list:
        sk_inps.remove(sk_tmp)

sk.close()複製代碼
  • sk_outs=[] 中保存發送消息的客戶端鏈接對象
  • message_dic={} 中保存消息內容
  • 當客戶端發送消息時候, 咱們在一個for循環中將其鏈接對象和消息內容分別保存起來, 在第二個循環中我處理消息內容

以上就是僞併發處理客戶端請求全部內容, 究其本質實際上是IO多路複用原理. 同時python中也提供了真正的併發處理模塊socketserver, 下面咱們採用socketserver來實現

首先看咱們的服務端文件, server_s5.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socketserver

HOST = '127.0.0.1'
PORT = 9997

sockaddr = (HOST, PORT)

class MySocket(socketserver.BaseRequestHandler):
    def handle(self):
        conn = self.request
        while True:
            try:
                ret_bytes = conn.recv(1024)
            except Exception as ex:
                print("已從", self.client_address, "斷開")
                break
            else:
                conn.sendall(ret_bytes + bytes(', 已收到!', encoding='utf-8'))


if __name__ == "__main__":
    server = socketserver.ThreadingTCPServer(sockaddr, MySocket)
    server.serve_forever()複製代碼
  • 其原理只是將上述的IO多路複用改爲了threading線程處理, 再加上原本的Socket內容造成
  • server = socketserver.ThreadingTCPServer(sockaddr, MySocket) 該句會將Socket服務端設置ip和port等內容封裝到對象中, 執行初始化時候須要加入本身寫的繼承socketserver.BaseRequestHandler的類
  • server.serve_forever() 此句執行時候會使得對象調用handle(self) 方法, 在該方法中咱們對客戶端鏈接進行處理

以上咱們將Socket從基礎原理到複雜自定義已經使用封裝好的模塊使用介紹完畢. 接下來咱們補充一些理論知識和經常使用的Socket參數和方法:
首先咱們來回顧一下OSI模型和TCP/IP協議簇,如圖(圖片引自網絡)

OSI模型與TCP/IP協議簇

每層都有相對應的協議,可是socket API只是操做系統提供的一個用於網絡編程的接口, 如圖( 圖片引自網絡)

socket與各層關係

根據 socket 傳輸數據方式的不一樣(其實就是使用協議的不一樣), 致使其與不一樣層打交道

  • Stream sockets, 是一種面向鏈接的 socket, 使用 TCP 協議.
  • Datagram sockets, 無鏈接的 socket,使用 UDP 協議.
  • Raw sockets, 一般用在路由器或其餘網絡設備中, 這種socket直接由網絡層通向應用層.

如下是注意點:

  • 在咱們建立對象時候sk=socket.socket(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None) 實際上默認傳入了參數, 第一個參數表示ip協議, ocket.AF_INET表示ipv4協議(默認就是), 第二個參數表示傳輸數據格式, socket.SOCK_STREAM表示tcp協議(默認就是), socket.SOCK_DGRAM表示udp協議
  • ret_bytes=conn.recv(1024) 中表示最多接受1024個字節; 若沒有接受到內容則會阻塞進程, 等待接受內容
  • send() 可能會發送部份內容, sendall()本質就是內部循環調用send()直到將內容發送完畢, 建議使用sendall()
  • 當用socket作ftp文件傳輸時候會產生粘包問題, 此時只需在發送文件大小以後等待接受服務端返回一個確認碼後, 再發送文件便可解決
相關文章
相關標籤/搜索