說到網絡通訊,那就不得不說TCP/IP協議簇的OSI七層模型了,這個東西當初在學校都學爛了。。。(PS:畢竟本人是網絡工程專業出身。。。) 簡單介紹下七層模型從底層到上層的順序:物理層(定義物理設備的各項標準),數據鏈路層(mac地址等其餘東西的封裝),網絡層(IP包頭的的封裝),傳輸層(TCP/UDP數據報頭的封裝),會話層(這一層涉及的東西很少,可是我以爲SSL(安全套接字)應該封裝在這一層,對於應用是透明的),表示層(數據的壓縮解壓縮放在這一層),應用層(這一層就是用戶使用的應用了)html
OSI的七層模型從上到下是一層一層封裝的過程,一個數據從一臺計算機到另外一臺計算機是先從上到下封裝,以後傳輸到達另外一臺計算機以後是從下到上一層一層解封裝的過程。python
好了,說了這麼多,那麼咱們開發裏面涉及到的網絡通訊包含一些什麼東西呢?首先咱們程序開發中大部分所使用的協議是TCP協議(傳輸層)UDP協議涉及的比較少。網絡層是IPV4協議,固然IPV6多是將來的主流,還有一些其餘網絡協議這裏並不涉及。。。SSL(安全套接字)可能在web應用中爲了提供數據安全性用得比較多,可是服務端應用程序和程序之間的數據傳輸不會使用到ssl進行數據安全性的保護。因此,咱們日常用的基於網絡編程所使用的網絡協議通常就是使用傳輸層的TCP協議,網絡層的IPV4協議,更底層的協議。。。並不須要咱們考慮,那些是網絡設備之間該關心的事情。。。哈哈哈linux
那麼,爲何要用TCP協議呢?由於TCP協議提供了一個面向鏈接的,穩定的通道,數據是按照順序到達的,TCP協議的機制保證了TCP協議傳輸數據是一個可靠的傳輸。可是TCP協議也有他的缺點,那就是保證可靠傳輸所犧牲的代價就是速度比較慢(固然也不是很離譜,除非在某些極端狀況下:好比傳輸大量的小數據)TCP之因此是面向鏈接的是由於它的三次握手以及四次揮手(太出名了,這裏不用介紹了。。。),保障數據包按照順序到達是經過分片編號實現的,每個數據包都有一個編號,接收端根據這個編號去將接收的數據按原來數據順序組裝起來。固然,TCP協議之因此是一個可靠的傳輸和他的重傳機制也是分不開的,當對方沒有收到你發送的一個數據包的時候,TCP協議會從新傳輸這個數據包到對方,直到對方確認收到了這個數據包爲止。。。。固然TCP協議裏面還有不少優秀的東西。這裏就不一一贅述了。哈哈,否則這裏就成了網絡專場了。。。web
好了,如今肯定了咱們使用TCP/IP來進行網絡通訊,那麼要實現通訊須要知道對方機器的IP地址和端口號(固然,這裏指的端口號通常是指TCP的端口號,從1-65535,其中,1024及以前的端口被定義爲知名端口,最好不要使用)shell
前面的第一節只是前戲,爲了引出咱們今天介紹的socket編程,這前戲作的也是挺累的,哈哈哈。編程
python中的socket是一個模塊,使用這個內置模塊可以讓咱們實現網絡通訊。其實在unix/linux一切皆文件的思想下,socket也能夠看做是一個文件。。。python進行網絡數據通訊也能夠理解爲一個打開文件讀寫數據的操做,只不過這是一個特殊的操做罷了。接下來是一個關於socket通訊的流程圖:windows
流程描述:安全
服務器根據地址類型(ipv4,ipv6)、socket類型、協議建立socket服務器
服務器爲socket綁定ip地址和端口號網絡
服務器socket監聽端口號請求,隨時準備接收客戶端發來的鏈接,這時候服務器的socket並無被打開
客戶端建立socket
客戶端打開socket,根據服務器ip地址和端口號試圖鏈接服務器socket
服務器socket接收到客戶端socket請求,被動打開,開始接收客戶端請求,直到客戶端返回鏈接信息。這時候socket進入阻塞狀態,所謂阻塞即accept()方法一直等到客戶端返回鏈接信息後才返回,開始接收下一個客戶端鏈接請求
客戶端鏈接成功,向服務器發送鏈接狀態信息
服務器accept方法返回,鏈接成功
客戶端向socket寫入信息(或服務端向socket寫入信息)
服務器讀取信息(客戶端讀取信息)
客戶端關閉
服務器端關閉
下面是socket模塊裏面提供的一些方法,讓咱們實現socke通訊。
sk=socket.socket()
建立socket對象,這個時候能夠指定兩個參數,一個是family,另外一個是type
family:AFINET(表明IPV4,默認參數),AFINET6(表明使用IPV6),AF_UNIX(UNIX文件系統通訊)
type:SOCKSTREAM(TCP協議通訊,默認參數),SOCKDGRAM(UDP協議通訊) 默認不寫的話,family=AFINET type=SOCKSTREAM
sk.bind(address)
sk.bind(address) 將套接字綁定到地址。address地址的格式取決於地址族。在AF_INET下,以元組(host,port)的形式表示地址。
sk.listen(backlog)
開始監聽傳入鏈接。backlog指定在拒絕鏈接以前,能夠掛起的最大鏈接數量。 backlog等於5,表示內核已經接到了鏈接請求,但服務器尚未調用accept進行處理的鏈接個數最大爲5 這個值不能無限大,由於要在內核中維護鏈接隊列
sk.setblocking(bool)
是否阻塞(默認True),若是設置False,那麼accept和recv時一旦無數據,則報錯。
sk.accept()
接受鏈接並返回(conn,address),其中conn是新的套接字對象,能夠用來接收和發送數據。address是鏈接客戶端的地址。
接收TCP 客戶的鏈接(阻塞式)等待鏈接的到來
sk.connect(address)
鏈接到address處的套接字。通常,address的格式爲元組(hostname,port),若是鏈接出錯,返回socket.error錯誤。
sk.connect_ex(address)
同上,只不過會有返回值,鏈接成功時返回 0 ,鏈接失敗時候返回編碼,例如:10061
sk.close()
關閉套接字
sk.recv(bufsize[,flag])
接受套接字的數據。數據以字符串形式返回,bufsize指定最多能夠接收的數量。flag提供有關消息的其餘信息,一般能夠忽略。
sk.recvfrom(bufsize[.flag])
與recv()相似,但返回值是(data,address)。其中data是包含接收數據的字符串,address是發送數據的套接字地址。
sk.send(string[,flag])
將string中的數據發送到鏈接的套接字。返回值是要發送的字節數量,該數量可能小於string的字節大小。即:可能未將指定內容所有發送。
sk.sendall(string[,flag])
將string中的數據發送到鏈接的套接字,但在返回以前會嘗試發送全部數據。成功返回None,失敗則拋出異常。
內部經過遞歸調用send,將全部內容發送出去。
sk.sendto(string[,flag],address)
將數據發送到套接字,address是形式爲(ipaddr,port)的元組,指定遠程地址。返回值是發送的字節數。該函數主要用於UDP協議。
sk.settimeout(timeout)
設置套接字操做的超時期,timeout是一個浮點數,單位是秒。值爲None表示沒有超時期。通常,超時期應該在剛建立套接字時設置,由於它們可能用於鏈接的操做(如 client 鏈接最多等待5s )
sk.getpeername()
返回鏈接套接字的遠程地址。返回值一般是元組(ipaddr,port)。
sk.getsockname()
返回套接字本身的地址。一般是一個元組(ipaddr,port)
sk.fileno()
套接字的文件描述符
好了,前戲作了那麼多,到如今都還沒寫一行代碼呢,接下來到了實戰的步驟了。
1 #server端實現: 2 import socket # 導入socket模塊 3 sk=socket.socket() # 實例化socket對象 4 address=("0.0.0.0",8000) # 定義好監聽的IP地址和端口,能夠綁定網卡上的IP地址,可是生產環境中通常綁定0.0.0.0這樣全部網卡上都可以接收並處理這個端口的數據了 5 sk.bind(address) # 綁定IP地址和端口 6 sk.listen(5) # 設定偵聽開始並設定偵聽隊列裏面等待的鏈接數最大爲5 7 8 while True: # 循環,爲了創建客戶端鏈接 9 conn,addr=sk.accept() # conn拿到的是接入的客戶端的對象,以後服務端和客戶端通訊都是基於這個conn對象進行通訊,add獲取的是鏈接的客戶端的IP和端口 10 while True: # 這個循環來實現相互之間聊天的邏輯 11 try: # 爲何要使用try包裹這一塊呢?由於在通訊過程當中客戶端忽然退出了的話,服務端阻塞在recv狀態時將會拋出異常並終止程序,爲了解決這個異常,須要咱們本身捕獲這個異常,這是在windows上,linux上不會拋出異常 12 data=conn.recv(1024) # 定義接收的數據大小爲1024個字節並接受客戶端傳遞過來的數據,注意這裏的數據在python3中是bytes類型的,在python3中網絡通訊發送和接收的數據必須是bytes類型的,不然會報錯 13 if data: # 假如客戶端傳遞數據過來時 14 print("-->",str(data,"utf-8")) # 打印客戶端傳遞過來的數據,須要從bytes類型數據解碼成unicode類型的數據 15 data=bytes(input(">>>"),"utf-8") # 接收輸入的數據並轉換成bytes類型的數據 16 conn.send(data) # 將bytes類型的數據發送給客戶端 17 else: # 不然關閉這個客戶端鏈接的對象,當客戶端正常退出,執行了sk.close()時將不會發送數據到服務端,也就是說recv獲取到的數據是空的 18 conn.close() # 這時關閉這個conn對象並退出當前循環等待下一個客戶端對象來鏈接 19 break 20 except ConnectionResetError as e: # 捕獲到異常以後,打印異常出來並退出循環等待下一個客戶端鏈接 21 print(e) 22 break 23 24 #client端實現 25 import socket # 導入socket模塊 26 sk=socket.socket() # 實例化客戶端對象 27 address=("127.0.0.1",8000) # 設置客戶端須要鏈接的服務端IP地址以及端口 28 sk.connect(address) # 鏈接服務端的IP地址以及端口 29 while True: # 循環實現對話 30 data=input(">>>").strip() # 獲取用戶輸入的數據 31 if data=="exit": # 若是輸入的是exit 關閉該對象並退出程序 32 sk.close() # 關閉對象 33 break # 退出循環 34 sk.send(bytes(data,"utf-8")) # 發送剛輸入的數據,要先轉換成bytes類型的數據 35 data=str(sk.recv(1024),"utf-8") # 接收服務端發送的數據,並將其轉換成unicode數據類型 36 print("-->",data) # 打印服務端傳輸過來的數據
上面是一個簡單聊天服務器的實現。。。下面實現的邏輯比這個更復雜一點。
遠程執行命令須要服務端添加一個subprocess模塊,將執行命令並返回執行結果,下面是執行代碼: #server端實現 import socket # 導入socket模塊 import subprocess # 導入subprocess模塊 sk=socket.socket() # 實例化socket對象 address=("0.0.0.0",8000) # 設定綁定IP地址以及端口 sk.bind(address) # 綁定IP地址及端口 sk.listen(5) # 進入偵聽狀態,並設置等待隊列長度爲5
1 while True: # 循環接收客戶端的鏈接 2 conn,addr=sk.accept() # 接收客戶端的鏈接 3 while True: # 循環處理客戶端發送過來的命令 4 try: # 捕捉客戶端不正常退出異常 5 cmd=str(conn.recv(1024),"utf-8") # 接收客戶端傳遞過來的命令 6 if cmd: # 命令不爲空,則執行下面代碼 7 print("-->",cmd) # 打印客戶端傳過來的命令 8 obj=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) 9 """ 10 調用subprocess的Popen類,執行客戶端傳過來的命令,shell設置爲true,不然可能執行異常,把標準正確輸出和標準錯誤輸出都接收,這樣執行結果就不會顯示在server端了 11 """ 12 if not obj.wait(): # obj對象有一個wait方法,等待子進程執行完成並返回執行退出狀態碼,0爲正常,不然不正常 13 cmd_result=obj.stdout.read() # 正常狀況下,接收執行結果 14 else: 15 print("error") # 不然打印出error 16 cmd_result=obj.stderr.read() # 接收錯誤信息 17 print(str(cmd_result,"gbk")) # 在server中打印出錯誤信息 18 result_len=bytes(str(len(cmd_result)),"utf-8") # 獲取返回結果的長度 19 conn.send(result_len) # 將返回結果的長度發送給客戶端 20 conn.recv(1024) # 爲了防止數據包黏連,用來在兩個send中間插入一個recv,不然,可能兩次send的時候前面發送的數據和後面的數據之間會發生數據黏連,形成客戶端收到的不是一個純數字的字符串而引起報錯 21 conn.sendall(cmd_result) # 將執行結果發送給客戶端 22 else: 23 conn.close() # 若是客戶端退出,則關閉這個連接並退出循環,等待下一個連接 24 break 25 except ConnectionResetError as e: # 若是客戶端異常退出,打印異常並退出當前循環,等待下一個連接 26 print(e) 27 break 28 #客戶端實現 29 import socket # 導入socket模塊 30 sk=socket.socket() # 建立socket對象 31 address=("127.0.0.1",8000) # 設置服務端的IP和端口 32 sk.connect(address) # 和服務端創建連接 33 while True: # 循環用來輸入命令並接受返回結果 34 cmd=input(">>>").strip() # 接受命令 35 if cmd=="exit": # 若是客戶端輸入exit則關閉該連接並退出 36 sk.close() 37 break 38 sk.send(bytes(cmd,"utf-8")) # 發送命令到服務端 39 result_len=int(str(sk.recv(1024),"utf-8")) # 接受服務端發送的執行結果的長度 40 sk.send(bytes("ok","utf8")) # 防止數據接受黏連設置的一個send,沒有實際意義 41 print(result_len) # 打印接受到的數據長度 42 data=bytes() # 定義一個bytes類型的變量用來接受執行結果 43 while len(data) != result_len: # 循環接收執行結果,直到接收完成爲止 44 data+=sk.recv(1024) 45 print(str(data,"gbk")) # 打印執行結果
上面是遠程執行命令的實現,下面要實現遠程上傳文件的代碼。
1 #server端實現 2 import socket # 導入socket模塊 3 import os @ 導入os模塊 4 sk=socket.socket() # 建立socket對象 5 address=("0.0.0.0",8000) # 設置綁定地址 6 sk.bind(address) # 綁定地址 7 sk.listen() # 偵聽地址及端口 8 BASE_DIR=os.path.dirname(os.path.abspath(__file__)) # 獲取當前文件所在目錄 9 while True: # 循環,鏈接客戶端 10 conn,addr=sk.accept() # 獲取鏈接的客戶端對象 11 while True: # 循環實現文件上傳 12 try: # 捕捉客戶端異常退出 13 data=str(conn.recv(1024),"utf8") # 接收客戶端發送過來的數據 14 file_name=data.split("|")[0] # 獲取數據的第一個值爲文件名 15 file_size=int(data.split("|")[1]) # 第二個值爲文件大小 16 path=os.path.join(BASE_DIR,"post_server_dir",file_name) # 路徑拼接,獲取到該文件名的絕對路徑 17 writ_size=0 # 設置接收的數據大小初始值 18 with open(path,"wb") as f: # 以wb的方式打開文件,由於傳輸過來的數據都是bytes類型的 19 while writ_size < file_size: # 當接收的數據小於文件的大小時,循環接收,一直到接受的數據大小等於文件大小結束 20 data=conn.recv(1024) # 每次接收1024字節的數據 21 f.write(data) # 將接受的數據寫入到文件中 22 writ_size+=len(data) # 接收的文件數據大小自加 23 print("reveive %s successful!"%file_name) # 循環結束時證實文件接收完成,打印接收成功信息 24 conn.sendall(bytes("true","utf8")) # 將接受成功信息發送給客戶端 25 except Exception as e: # 捕捉到異常後打印異常並退出當前循環,等待下一次鏈接 26 print(e) 27 break 28 #客戶端實現 29 import socket # 導入socket模塊 30 import os # 導入os模塊 31 sk=socket.socket() # 建立socket對象 32 address=("127.0.0.1",8000) # 設置服務端鏈接的IP及地址 33 sk.connect(address) # 鏈接服務端 34 BASE_DIR=os.path.dirname(os.path.abspath(__file__)) # 獲取當前文件所在的目錄路徑 35 while True: # 用來循環上傳文件 36 cmd=input(">>>").strip() # 獲取用戶輸入命令 37 if cmd.split("|")[0]=="post": # 若是輸入的第一個參數是post 38 path=os.path.join(BASE_DIR,cmd.split("|")[1]) # 獲取第二個參數文件名,並將其添加到路徑中,獲取文件絕對路徑 39 file_name=os.path.basename(path) # 獲取文件名 40 file_size=os.stat(path).st_size # 獲取文件大小 41 sk.sendall(bytes("|".join((file_name,str(file_size))),"utf8")) # 把文件名和文件大小發送給服務端 42 with open(path,"rb") as f: # 打開文件,以rb的方式讀取,可直接發送給服務端 43 while True: # 循環讀取文件直到讀取完成 44 data=f.read(1024) # 讀取文件的1024字節 45 if not data: # 若是文件讀取完成,讀取的內容就爲空 46 break # 這時,退出循環 47 sk.sendall(data) # 把讀取的數據發送給服務端 48 flag=str(sk.recv(1024),"utf8") # 讀取服務端發送的消息 49 if flag=="true": # 若是服務端發送過來接收成功的消息,說明文件傳輸成功,打印傳輸成功消息 50 print("send file %s successful!"%file_name) 51 else: # 不然打印文件傳輸失敗的消息 52 print("upload file %s failed!"%file_name)
上面是實現文件傳輸的socket編程,由於每次都讀取文件的1024字節並傳遞給服務端,這樣不會把整個文件加載進內存,從而能夠實現大文件的上傳。
前面寫的socket都是每次只能實現和一個客戶端通訊的,那麼能不能實現同時和多個客戶端通訊,也就是咱們所說的併發呢?固然是能夠的,python中內置了一個模塊叫作socketserver,這個模塊對python的socket模塊進行更高級的封裝,經過使用多進程或者多線程的方式實現併發通訊,也就是說每一個客戶端來鏈接的話,新開一個進程或者線程去和它通訊,雖然這種方式對資源消耗很大,可是它的源碼其實並不複雜,閱讀他的源碼可以對咱們的提高有比較好的幫助,因此,建議有空的能夠去讀讀socketserver的源碼。加上註釋一共才700行左右的源碼。
咱們先來實現一個支持多客戶端的socketserver示例,再來分析這個示例在python中是如何實現的。
1 #服務端實現 2 import socketserver # 導入socketserver模塊 3 class Myserver(socketserver.BaseRequestHandler): # 定義一個類名繼承自socketserver.BaseRequestHandler 4 def handle(self): # 自定義一個函數名handle(注意這個名字不能命名成別的名字,父類中有這個方法,至關於重寫了這個方法)裏面實現的是socket通訊的邏輯代碼塊 5 while True: 6 conn=self.request 7 while True: 8 try: 9 data=str(conn.recv(1024),"utf8") 10 print("<<<",data) 11 data=bytes(input(">>>"),"utf8") 12 conn.sendall(data) 13 except Exception as e: 14 print(e) 15 conn.close() 16 break 17 18 if __name__=="__main__": 19 server=socketserver.ThreadingTCPServer(("0.0.0.0",8081),Myserver) # 建立socketserver.ThreadingTCPServer實例,將地址及端口以及咱們剛剛建立的類傳遞進去,這個類將會在客戶端請求過來時threading一個線程來處理這個請求 20 server.serve_forever() # 調用實例的serve_forever方法 21 #客戶端實現 22 import socket # 這塊代碼和上面的基本一致,能夠參照上面的註釋解釋 23 sk=socket.socket() 24 address=("127.0.0.1",8081) 25 sk.connect(address) 26 while True: 27 data=bytes(input(">>>"),"utf8") 28 if str(data,"utf8")=="exit": 29 sk.close() 30 break 31 sk.sendall(data) 32 data=str(sk.recv(1024),"utf8") 33 print("<<<",data)
能夠發現使用socketserver模塊,比直接使用socket模塊方便了不少,可以省去不少代碼,而且可以經過多線程的使用來實現真正的多客戶端的通訊。
socketserver源碼分析: 首先,socketserver提供了五個類供咱們使用,他們之間的關係以下所示:
+------------+ | BaseServer | # 基類 +------------+ | v +-----------+ +------------------+ | TCPServer |------->| UnixStreamServer | +-----------+ +------------------+ | v +-----------+ +--------------------+ | UDPServer |------->| UnixDatagramServer | +-----------+ +--------------------+
而咱們使用的類ThreadingTCPServer在源碼中的定義方式以下所示: class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass 是經過繼承ThreadingMixIn, TCPServer兩個類實現的,那麼,這裏面幫咱們幹了些什麼呢?首先,咱們來分析server=socketserver.ThreadingTCPServer(("0.0.0.0",8081),Myserver)這條語句幫咱們作了些什麼,這條語句是建立一個類的對象,建立對象的時候會執行類的__init__方法,首先,找的是父類ThreadingMixIn的__init__方法,可是ThreadingMixIn中並無,因而去找TCPServer中的。在TCPServer中的__init__方法幫咱們作了一些事情,幫咱們建立socket鏈接對象,並調用了父類BaseServer的__init__方法作了一些初始化工做。
而後server.serve_forever()這條語句幫咱們作了什麼事呢?
調用對象的serve_forever方法,首先去父類ThreadingMixIn中沒找到,去TCPServer中也沒找到,到父類BaseServer中找到了,代碼以下:
1 def serve_forever(self, poll_interval=0.5): # poll_interval設置的是超時時間,以秒爲單位,做爲select的第四個可選參數傳遞進入 2 self.__is_shut_down.clear() # 設置event中的flag爲False 3 try: 4 with _ServerSelector() as selector: 5 selector.register(self, selectors.EVENT_READ) # 設置讀事件 6 7 while not self.__shutdown_request: 8 ready = selector.select(poll_interval) # 實例化select對象 9 if ready: 10 self._handle_request_noblock() # 若是ready是真,獲取客戶端的請求 11 self.service_actions() 12 finally: 13 self.__shutdown_request = False 14 self.__is_shut_down.set()
這個函數作的事情是調用了selecter實現非阻塞的I/O多路複用。並調用handlerequest_noblock-->processrequest-->finishrequest-->RequestHandlerClass(這個也就是咱們建立對象時傳進去的Myserver)這裏面在初始化時-->setup-->handle(這個函數在原來的父類中是什麼都沒寫的,咱們給重寫了這個函數,因此,調用到這的時候就是調用咱們本身寫的子類的函數執行咱們的代碼)
整個socketserver源碼以下:
1 #!/usr/bin/env python 2 # encoding:utf-8 3 # __author__: socket_server_resource_code 4 # date: 2016/9/29 15:30 5 # blog: http://huxianglin.cnblogs.com/ http://xianglinhu.blog.51cto.com/ 6 7 __version__ = "0.4" 8 9 import socket # 導入socket模塊 10 import selectors # 導入selectors模塊 11 import os # 導入os模塊 12 import twist 13 try: 14 import threading # 導入threading模塊 15 except ImportError: # 若是沒有這個模塊則導入dummy_threading模塊並重命名爲threading模塊 16 import dummy_threading as threading 17 from time import monotonic as time # 從time模塊導入monotonic模塊並重命名爲time模塊 18 19 __all__ = ["BaseServer", "TCPServer", "UDPServer", "ForkingUDPServer", 20 "ForkingTCPServer", "ThreadingUDPServer", "ThreadingTCPServer", 21 "BaseRequestHandler", "StreamRequestHandler", 22 "DatagramRequestHandler", "ThreadingMixIn", "ForkingMixIn"] 23 """ 24 內置一個全局變量存儲的是這個模塊中的全部類名稱 25 """ 26 if hasattr(socket, "AF_UNIX"): # 若是socket對象中有「AF_UNIX」,那麼在__all__列表中添加下面幾個方法 27 28 __all__.extend(["UnixStreamServer","UnixDatagramServer", 29 "ThreadingUnixStreamServer", 30 "ThreadingUnixDatagramServer"]) 31 32 if hasattr(selectors, 'PollSelector'): # 判斷selectors對象中是否有PollSelector 33 _ServerSelector = selectors.PollSelector 34 else: 35 _ServerSelector = selectors.SelectSelector 36 37 class BaseServer: # 基類 38 39 timeout = None # 超時時間,設置爲空 40 41 def __init__(self, server_address, RequestHandlerClass): 42 """Constructor. May be extended, do not override.""" 43 self.server_address = server_address # 設置server的地址 44 self.RequestHandlerClass = RequestHandlerClass # 設置處理邏輯類 45 self.__is_shut_down = threading.Event() # 線程會阻塞在threading,Event()的地方,直到主線程執行完成後纔會執行子線程 46 self.__shutdown_request = False # 設置一個bool 47 48 def server_activate(self): 49 pass 50 51 def serve_forever(self, poll_interval=0.5): # poll_interval設置的是超時時間,以秒爲單位,做爲select的第四個可選參數傳遞進入 52 self.__is_shut_down.clear() # 設置event中的flag爲False 53 try: 54 with _ServerSelector() as selector: 55 selector.register(self, selectors.EVENT_READ) # 設置讀事件 56 57 while not self.__shutdown_request: 58 ready = selector.select(poll_interval) # 實例化select對象 59 if ready: 60 self._handle_request_noblock() # 若是ready是真,獲取客戶端的請求 61 self.service_actions() 62 finally: 63 self.__shutdown_request = False 64 self.__is_shut_down.set() 65 66 def shutdown(self): 67 self.__shutdown_request = True 68 self.__is_shut_down.wait() 69 70 def service_actions(self): 71 pass 72 73 def handle_request(self): 74 timeout = self.socket.gettimeout() 75 if timeout is None: 76 timeout = self.timeout 77 elif self.timeout is not None: 78 timeout = min(timeout, self.timeout) 79 if timeout is not None: 80 deadline = time() + timeout 81 with _ServerSelector() as selector: 82 selector.register(self, selectors.EVENT_READ) 83 84 while True: 85 ready = selector.select(timeout) 86 if ready: 87 return self._handle_request_noblock() 88 else: 89 if timeout is not None: 90 timeout = deadline - time() 91 if timeout < 0: 92 return self.handle_timeout() 93 94 def _handle_request_noblock(self): 95 try: 96 request, client_address = self.get_request() 97 except OSError: 98 return 99 if self.verify_request(request, client_address): # 永遠返回true 100 try: 101 self.process_request(request, client_address) 102 except: 103 self.handle_error(request, client_address) 104 self.shutdown_request(request) 105 else: 106 self.shutdown_request(request) 107 108 def handle_timeout(self): 109 pass 110 111 def verify_request(self, request, client_address): 112 return True 113 114 def process_request(self, request, client_address): 115 self.finish_request(request, client_address) # 調用處理邏輯類 116 self.shutdown_request(request) # 調用處理關閉邏輯類。。。其實什麼都沒幹 117 118 def server_close(self): 119 pass 120 121 def finish_request(self, request, client_address): 122 self.RequestHandlerClass(request, client_address, self) # 調用邏輯類 123 124 def shutdown_request(self, request): 125 self.close_request(request) 126 127 def close_request(self, request): 128 pass 129 130 def handle_error(self, request, client_address): 131 print('-'*40) 132 print('Exception happened during processing of request from', end=' ') 133 print(client_address) 134 import traceback 135 traceback.print_exc() # XXX But this goes to stderr! 136 print('-'*40) 137 138 139 class TCPServer(BaseServer): 140 141 address_family = socket.AF_INET 142 143 socket_type = socket.SOCK_STREAM 144 145 request_queue_size = 5 146 147 allow_reuse_address = False 148 149 def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True): # 將傳入的參數進行初始化 150 """Constructor. May be extended, do not override.""" 151 BaseServer.__init__(self, server_address, RequestHandlerClass) # 調用父類構造方法,將傳入參數賦值到對象上 152 self.socket = socket.socket(self.address_family, 153 self.socket_type) # 設置socket對象,ip tcp模式 154 if bind_and_activate: 155 try: 156 self.server_bind() # 綁定IP和端口 157 self.server_activate() # 設置偵聽 158 except: 159 self.server_close() # 關閉對象並引起異常 160 raise 161 162 def server_bind(self): 163 if self.allow_reuse_address: 164 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 設置tcp的狀態通常不會當即關閉而經歷TIME_WAIT的過程。後想繼續重用該socket 165 self.socket.bind(self.server_address) # 綁定IP和端口 166 self.server_address = self.socket.getsockname() # getsockname返回本身套接字的地址 167 168 def server_activate(self): # 設置偵聽 169 self.socket.listen(self.request_queue_size) 170 171 def server_close(self): # 關閉socket鏈接 172 self.socket.close() 173 174 def fileno(self): # 返回socket的fileno信息 175 return self.socket.fileno() 176 177 def get_request(self): # socket server設置準備就緒狀態,等待接收客戶端發來的信息 178 return self.socket.accept() 179 180 def shutdown_request(self, request): # 關閉客戶端socket的鏈接 181 try: 182 request.shutdown(socket.SHUT_WR) 183 except OSError: 184 pass 185 self.close_request(request) 186 187 def close_request(self, request): 188 request.close() 189 190 191 class UDPServer(TCPServer): 192 193 allow_reuse_address = False 194 195 socket_type = socket.SOCK_DGRAM 196 197 max_packet_size = 8192 198 199 def get_request(self): 200 data, client_addr = self.socket.recvfrom(self.max_packet_size) 201 return (data, self.socket), client_addr 202 203 def server_activate(self): 204 pass 205 206 def shutdown_request(self, request): 207 self.close_request(request) 208 209 def close_request(self, request): 210 pass 211 212 class ForkingMixIn: 213 214 timeout = 300 215 active_children = None 216 max_children = 40 217 218 def collect_children(self): 219 if self.active_children is None: 220 return 221 while len(self.active_children) >= self.max_children: 222 try: 223 pid, _ = os.waitpid(-1, 0) 224 self.active_children.discard(pid) 225 except ChildProcessError: 226 self.active_children.clear() 227 except OSError: 228 break 229 for pid in self.active_children.copy(): 230 try: 231 pid, _ = os.waitpid(pid, os.WNOHANG) 232 self.active_children.discard(pid) 233 except ChildProcessError: 234 self.active_children.discard(pid) 235 except OSError: 236 pass 237 238 def handle_timeout(self): 239 self.collect_children() 240 241 def service_actions(self): 242 self.collect_children() 243 244 def process_request(self, request, client_address): 245 pid = os.fork() 246 if pid: 247 if self.active_children is None: 248 self.active_children = set() 249 self.active_children.add(pid) 250 self.close_request(request) 251 return 252 else: 253 try: 254 self.finish_request(request, client_address) 255 self.shutdown_request(request) 256 os._exit(0) 257 except: 258 try: 259 self.handle_error(request, client_address) 260 self.shutdown_request(request) 261 finally: 262 os._exit(1) 263 264 265 class ThreadingMixIn: 266 daemon_threads = False 267 268 def process_request_thread(self, request, client_address): 269 try: 270 self.finish_request(request, client_address) 271 self.shutdown_request(request) 272 except: 273 self.handle_error(request, client_address) 274 self.shutdown_request(request) 275 276 def process_request(self, request, client_address): 277 t = threading.Thread(target = self.process_request_thread, 278 args = (request, client_address)) 279 t.daemon = self.daemon_threads 280 t.start() 281 282 283 class ForkingUDPServer(ForkingMixIn, UDPServer): pass 284 class ForkingTCPServer(ForkingMixIn, TCPServer): pass 285 286 class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass 287 class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass 288 289 if hasattr(socket, 'AF_UNIX'): 290 291 class UnixStreamServer(TCPServer): 292 address_family = socket.AF_UNIX 293 294 class UnixDatagramServer(UDPServer): 295 address_family = socket.AF_UNIX 296 297 class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass 298 299 class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass 300 301 class BaseRequestHandler: 302 303 def __init__(self, request, client_address, server): 304 self.request = request 305 self.client_address = client_address 306 self.server = server 307 self.setup() 308 try: 309 self.handle() 310 finally: 311 self.finish() 312 313 def setup(self): 314 pass 315 316 def handle(self): 317 pass 318 319 def finish(self): 320 pass 321 322 class StreamRequestHandler(BaseRequestHandler): 323 324 rbufsize = -1 325 wbufsize = 0 326 disable_nagle_algorithm = False 327 328 def setup(self): 329 self.connection = self.request 330 if self.timeout is not None: 331 self.connection.settimeout(self.timeout) 332 if self.disable_nagle_algorithm: 333 self.connection.setsockopt(socket.IPPROTO_TCP, 334 socket.TCP_NODELAY, True) 335 self.rfile = self.connection.makefile('rb', self.rbufsize) 336 self.wfile = self.connection.makefile('wb', self.wbufsize) 337 338 def finish(self): 339 if not self.wfile.closed: 340 try: 341 self.wfile.flush() 342 except socket.error: 343 pass 344 self.wfile.close() 345 self.rfile.close() 346 347 348 class DatagramRequestHandler(BaseRequestHandler): 349 350 """Define self.rfile and self.wfile for datagram sockets.""" 351 352 def setup(self): 353 from io import BytesIO 354 self.packet, self.socket = self.request 355 self.rfile = BytesIO(self.packet) 356 self.wfile = BytesIO() 357 358 def finish(self): 359 self.socket.sendto(self.wfile.getvalue(), self.client_address)