第六篇:python高級之網絡編程 python高級之網絡編程

python高級之網絡編程

 

python高級之網絡編程

本節內容

  1. 網絡通訊概念
  2. socket編程
  3. socket模塊一些方法
  4. 聊天socket實現
  5. 遠程執行命令及上傳文件
  6. socketserver及其源碼分析

1.網絡通訊概念

說到網絡通訊,那就不得不說TCP/IP協議簇的OSI七層模型了,這個東西當初在學校都學爛了。。。(PS:畢竟本人是網絡工程專業出身。。。) 簡單介紹下七層模型從底層到上層的順序:物理層(定義物理設備的各項標準),數據鏈路層(mac地址等其餘東西的封裝),網絡層(IP包頭的的封裝),傳輸層(TCP/UDP數據報頭的封裝),會話層(這一層涉及的東西很少,可是我以爲SSL(安全套接字)應該封裝在這一層,對於應用是透明的),表示層(數據的壓縮解壓縮放在這一層),應用層(這一層就是用戶使用的應用了)html

OSI的七層模型從上到下是一層一層封裝的過程,一個數據從一臺計算機到另外一臺計算機是先從上到下封裝,以後傳輸到達另外一臺計算機以後是從下到上一層一層解封裝的過程。python

好了,說了這麼多,那麼咱們開發裏面涉及到的網絡通訊包含一些什麼東西呢?首先咱們程序開發中大部分所使用的協議是TCP協議(傳輸層)UDP協議涉及的比較少。網絡層是IPV4協議,固然IPV6多是將來的主流,還有一些其餘網絡協議這裏並不涉及。。。SSL(安全套接字)可能在web應用中爲了提供數據安全性用得比較多,可是服務端應用程序和程序之間的數據傳輸不會使用到ssl進行數據安全性的保護。因此,咱們日常用的基於網絡編程所使用的網絡協議通常就是使用傳輸層的TCP協議,網絡層的IPV4協議,更底層的協議。。。並不須要咱們考慮,那些是網絡設備之間該關心的事情。。。哈哈哈linux

那麼,爲何要用TCP協議呢?由於TCP協議提供了一個面向鏈接的,穩定的通道,數據是按照順序到達的,TCP協議的機制保證了TCP協議傳輸數據是一個可靠的傳輸。可是TCP協議也有他的缺點,那就是保證可靠傳輸所犧牲的代價就是速度比較慢(固然也不是很離譜,除非在某些極端狀況下:好比傳輸大量的小數據)TCP之因此是面向鏈接的是由於它的三次握手以及四次揮手(太出名了,這裏不用介紹了。。。),保障數據包按照順序到達是經過分片編號實現的,每個數據包都有一個編號,接收端根據這個編號去將接收的數據按原來數據順序組裝起來。固然,TCP協議之因此是一個可靠的傳輸和他的重傳機制也是分不開的,當對方沒有收到你發送的一個數據包的時候,TCP協議會從新傳輸這個數據包到對方,直到對方確認收到了這個數據包爲止。。。。固然TCP協議裏面還有不少優秀的東西。這裏就不一一贅述了。哈哈,否則這裏就成了網絡專場了。。。web

好了,如今肯定了咱們使用TCP/IP來進行網絡通訊,那麼要實現通訊須要知道對方機器的IP地址和端口號(固然,這裏指的端口號通常是指TCP的端口號,從1-65535,其中,1024及以前的端口被定義爲知名端口,最好不要使用)shell

2.socket編程

前面的第一節只是前戲,爲了引出咱們今天介紹的socket編程,這前戲作的也是挺累的,哈哈哈。編程

python中的socket是一個模塊,使用這個內置模塊可以讓咱們實現網絡通訊。其實在unix/linux一切皆文件的思想下,socket也能夠看做是一個文件。。。python進行網絡數據通訊也能夠理解爲一個打開文件讀寫數據的操做,只不過這是一個特殊的操做罷了。接下來是一個關於socket通訊的流程圖:windows

流程描述:安全

  1. 服務器根據地址類型(ipv4,ipv6)、socket類型、協議建立socket服務器

  2. 服務器爲socket綁定ip地址和端口號網絡

  3. 服務器socket監聽端口號請求,隨時準備接收客戶端發來的鏈接,這時候服務器的socket並無被打開

  4. 客戶端建立socket

  5. 客戶端打開socket,根據服務器ip地址和端口號試圖鏈接服務器socket

  6. 服務器socket接收到客戶端socket請求,被動打開,開始接收客戶端請求,直到客戶端返回鏈接信息。這時候socket進入阻塞狀態,所謂阻塞即accept()方法一直等到客戶端返回鏈接信息後才返回,開始接收下一個客戶端鏈接請求

  7. 客戶端鏈接成功,向服務器發送鏈接狀態信息

  8. 服務器accept方法返回,鏈接成功

  9. 客戶端向socket寫入信息(或服務端向socket寫入信息)

  10. 服務器讀取信息(客戶端讀取信息)

  11. 客戶端關閉

  12. 服務器端關閉

3.socket模塊一些方法

下面是socket模塊裏面提供的一些方法,讓咱們實現socke通訊。

sk=socket.socket()

建立socket對象,這個時候能夠指定兩個參數,一個是family,另外一個是type

family:AFINET(表明IPV4,默認參數),AFINET6(表明使用IPV6),AF_UNIX(UNIX文件系統通訊)

type:SOCKSTREAM(TCP協議通訊,默認參數),SOCKDGRAM(UDP協議通訊) 默認不寫的話,family=AFINET type=SOCKSTREAM

sk.bind(address)

sk.bind(address) 將套接字綁定到地址。address地址的格式取決於地址族。在AF_INET下,以元組(host,port)的形式表示地址。

sk.listen(backlog)

開始監聽傳入鏈接。backlog指定在拒絕鏈接以前,能夠掛起的最大鏈接數量。 backlog等於5,表示內核已經接到了鏈接請求,但服務器尚未調用accept進行處理的鏈接個數最大爲5 這個值不能無限大,由於要在內核中維護鏈接隊列

sk.setblocking(bool)

是否阻塞(默認True),若是設置False,那麼accept和recv時一旦無數據,則報錯。

sk.accept()

接受鏈接並返回(conn,address),其中conn是新的套接字對象,能夠用來接收和發送數據。address是鏈接客戶端的地址。

接收TCP 客戶的鏈接(阻塞式)等待鏈接的到來

sk.connect(address)

鏈接到address處的套接字。通常,address的格式爲元組(hostname,port),若是鏈接出錯,返回socket.error錯誤。

sk.connect_ex(address)

同上,只不過會有返回值,鏈接成功時返回 0 ,鏈接失敗時候返回編碼,例如:10061

sk.close()

關閉套接字

sk.recv(bufsize[,flag])

接受套接字的數據。數據以字符串形式返回,bufsize指定最多能夠接收的數量。flag提供有關消息的其餘信息,一般能夠忽略。

sk.recvfrom(bufsize[.flag])

與recv()相似,但返回值是(data,address)。其中data是包含接收數據的字符串,address是發送數據的套接字地址。

sk.send(string[,flag])

將string中的數據發送到鏈接的套接字。返回值是要發送的字節數量,該數量可能小於string的字節大小。即:可能未將指定內容所有發送。

sk.sendall(string[,flag])

將string中的數據發送到鏈接的套接字,但在返回以前會嘗試發送全部數據。成功返回None,失敗則拋出異常。

內部經過遞歸調用send,將全部內容發送出去。

sk.sendto(string[,flag],address)

將數據發送到套接字,address是形式爲(ipaddr,port)的元組,指定遠程地址。返回值是發送的字節數。該函數主要用於UDP協議。

sk.settimeout(timeout)

設置套接字操做的超時期,timeout是一個浮點數,單位是秒。值爲None表示沒有超時期。通常,超時期應該在剛建立套接字時設置,由於它們可能用於鏈接的操做(如 client 鏈接最多等待5s )

sk.getpeername()

返回鏈接套接字的遠程地址。返回值一般是元組(ipaddr,port)。

sk.getsockname()

返回套接字本身的地址。一般是一個元組(ipaddr,port)

sk.fileno()

套接字的文件描述符

4.聊天socket實現

好了,前戲作了那麼多,到如今都還沒寫一行代碼呢,接下來到了實戰的步驟了。

複製代碼
 1 #server端實現:
 2 import socket  # 導入socket模塊
 3 sk=socket.socket()  # 實例化socket對象
 4 address=("0.0.0.0",8000)  # 定義好監聽的IP地址和端口,能夠綁定網卡上的IP地址,可是生產環境中通常綁定0.0.0.0這樣全部網卡上都可以接收並處理這個端口的數據了
 5 sk.bind(address)  # 綁定IP地址和端口
 6 sk.listen(5)  # 設定偵聽開始並設定偵聽隊列裏面等待的鏈接數最大爲5
 7 
 8 while True:  # 循環,爲了創建客戶端鏈接
 9     conn,addr=sk.accept()  # conn拿到的是接入的客戶端的對象,以後服務端和客戶端通訊都是基於這個conn對象進行通訊,add獲取的是鏈接的客戶端的IP和端口
10     while True:  # 這個循環來實現相互之間聊天的邏輯
11         try:  # 爲何要使用try包裹這一塊呢?由於在通訊過程當中客戶端忽然退出了的話,服務端阻塞在recv狀態時將會拋出異常並終止程序,爲了解決這個異常,須要咱們本身捕獲這個異常,這是在windows上,linux上不會拋出異常
12             data=conn.recv(1024)  # 定義接收的數據大小爲1024個字節並接受客戶端傳遞過來的數據,注意這裏的數據在python3中是bytes類型的,在python3中網絡通訊發送和接收的數據必須是bytes類型的,不然會報錯
13             if data:  # 假如客戶端傳遞數據過來時
14                 print("-->",str(data,"utf-8"))  # 打印客戶端傳遞過來的數據,須要從bytes類型數據解碼成unicode類型的數據
15                 data=bytes(input(">>>"),"utf-8")  # 接收輸入的數據並轉換成bytes類型的數據
16                 conn.send(data)  # 將bytes類型的數據發送給客戶端
17             else:  # 不然關閉這個客戶端鏈接的對象,當客戶端正常退出,執行了sk.close()時將不會發送數據到服務端,也就是說recv獲取到的數據是空的
18                 conn.close()  # 這時關閉這個conn對象並退出當前循環等待下一個客戶端對象來鏈接
19                 break
20         except ConnectionResetError as e:  # 捕獲到異常以後,打印異常出來並退出循環等待下一個客戶端鏈接
21             print(e)
22             break
23 
24 #client端實現
25 import socket  # 導入socket模塊
26 sk=socket.socket()  # 實例化客戶端對象
27 address=("127.0.0.1",8000)  # 設置客戶端須要鏈接的服務端IP地址以及端口
28 sk.connect(address)  # 鏈接服務端的IP地址以及端口
29 while True:  # 循環實現對話
30     data=input(">>>").strip()  # 獲取用戶輸入的數據
31     if data=="exit":  # 若是輸入的是exit 關閉該對象並退出程序
32         sk.close()  # 關閉對象
33         break  # 退出循環
34     sk.send(bytes(data,"utf-8"))  # 發送剛輸入的數據,要先轉換成bytes類型的數據
35     data=str(sk.recv(1024),"utf-8")  # 接收服務端發送的數據,並將其轉換成unicode數據類型
36     print("-->",data)  # 打印服務端傳輸過來的數據
複製代碼

 

上面是一個簡單聊天服務器的實現。。。下面實現的邏輯比這個更復雜一點。

5.遠程執行命令及上傳文件

遠程執行命令須要服務端添加一個subprocess模塊,將執行命令並返回執行結果,下面是執行代碼: #server端實現 import socket # 導入socket模塊 import subprocess # 導入subprocess模塊 sk=socket.socket() # 實例化socket對象 address=("0.0.0.0",8000) # 設定綁定IP地址以及端口 sk.bind(address) # 綁定IP地址及端口 sk.listen(5) # 進入偵聽狀態,並設置等待隊列長度爲5

複製代碼
 1 while True:  # 循環接收客戶端的鏈接
 2     conn,addr=sk.accept()  # 接收客戶端的鏈接
 3     while True:  # 循環處理客戶端發送過來的命令
 4         try:  # 捕捉客戶端不正常退出異常
 5             cmd=str(conn.recv(1024),"utf-8")  # 接收客戶端傳遞過來的命令
 6             if cmd:  # 命令不爲空,則執行下面代碼
 7                 print("-->",cmd)  # 打印客戶端傳過來的命令
 8                 obj=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
 9                 """
10                 調用subprocess的Popen類,執行客戶端傳過來的命令,shell設置爲true,不然可能執行異常,把標準正確輸出和標準錯誤輸出都接收,這樣執行結果就不會顯示在server端了
11                 """
12                 if not obj.wait():  # obj對象有一個wait方法,等待子進程執行完成並返回執行退出狀態碼,0爲正常,不然不正常
13                     cmd_result=obj.stdout.read()  # 正常狀況下,接收執行結果
14                 else:
15                    print("error")  # 不然打印出error
16                    cmd_result=obj.stderr.read()  # 接收錯誤信息
17                    print(str(cmd_result,"gbk"))  # 在server中打印出錯誤信息
18                 result_len=bytes(str(len(cmd_result)),"utf-8")  # 獲取返回結果的長度
19                 conn.send(result_len)  # 將返回結果的長度發送給客戶端
20                 conn.recv(1024)  # 爲了防止數據包黏連,用來在兩個send中間插入一個recv,不然,可能兩次send的時候前面發送的數據和後面的數據之間會發生數據黏連,形成客戶端收到的不是一個純數字的字符串而引起報錯
21                 conn.sendall(cmd_result)  # 將執行結果發送給客戶端
22             else:
23                 conn.close()  # 若是客戶端退出,則關閉這個連接並退出循環,等待下一個連接
24                 break
25         except ConnectionResetError as e:  # 若是客戶端異常退出,打印異常並退出當前循環,等待下一個連接
26             print(e)
27             break
28 #客戶端實現
29 import socket  # 導入socket模塊
30 sk=socket.socket()  # 建立socket對象
31 address=("127.0.0.1",8000)  # 設置服務端的IP和端口
32 sk.connect(address)  # 和服務端創建連接
33 while True:  # 循環用來輸入命令並接受返回結果
34     cmd=input(">>>").strip()  # 接受命令
35     if cmd=="exit":  # 若是客戶端輸入exit則關閉該連接並退出
36         sk.close()
37         break
38     sk.send(bytes(cmd,"utf-8"))  # 發送命令到服務端
39     result_len=int(str(sk.recv(1024),"utf-8"))  # 接受服務端發送的執行結果的長度
40     sk.send(bytes("ok","utf8"))  # 防止數據接受黏連設置的一個send,沒有實際意義
41     print(result_len)  # 打印接受到的數據長度
42     data=bytes()  # 定義一個bytes類型的變量用來接受執行結果
43     while len(data) != result_len:  # 循環接收執行結果,直到接收完成爲止
44         data+=sk.recv(1024)
45     print(str(data,"gbk"))  # 打印執行結果
複製代碼

 

上面是遠程執行命令的實現,下面要實現遠程上傳文件的代碼。

複製代碼
 1 #server端實現
 2 import socket  # 導入socket模塊
 3 import os  @ 導入os模塊
 4 sk=socket.socket()  # 建立socket對象
 5 address=("0.0.0.0",8000)  # 設置綁定地址
 6 sk.bind(address)  # 綁定地址
 7 sk.listen()  # 偵聽地址及端口
 8 BASE_DIR=os.path.dirname(os.path.abspath(__file__))  # 獲取當前文件所在目錄
 9 while True:  # 循環,鏈接客戶端
10     conn,addr=sk.accept()  # 獲取鏈接的客戶端對象
11     while True:  # 循環實現文件上傳
12         try:  # 捕捉客戶端異常退出
13             data=str(conn.recv(1024),"utf8")  # 接收客戶端發送過來的數據
14             file_name=data.split("|")[0]  # 獲取數據的第一個值爲文件名
15             file_size=int(data.split("|")[1])  # 第二個值爲文件大小
16             path=os.path.join(BASE_DIR,"post_server_dir",file_name)  # 路徑拼接,獲取到該文件名的絕對路徑
17             writ_size=0  # 設置接收的數據大小初始值
18             with open(path,"wb") as f:  # 以wb的方式打開文件,由於傳輸過來的數據都是bytes類型的
19                 while writ_size < file_size:  # 當接收的數據小於文件的大小時,循環接收,一直到接受的數據大小等於文件大小結束
20                     data=conn.recv(1024)  # 每次接收1024字節的數據
21                     f.write(data)  # 將接受的數據寫入到文件中
22                     writ_size+=len(data)  # 接收的文件數據大小自加
23             print("reveive %s successful!"%file_name)  # 循環結束時證實文件接收完成,打印接收成功信息
24             conn.sendall(bytes("true","utf8"))  # 將接受成功信息發送給客戶端
25         except Exception as e:  # 捕捉到異常後打印異常並退出當前循環,等待下一次鏈接
26             print(e)
27             break
28 #客戶端實現
29 import socket  # 導入socket模塊
30 import os  # 導入os模塊
31 sk=socket.socket()  # 建立socket對象
32 address=("127.0.0.1",8000)  # 設置服務端鏈接的IP及地址
33 sk.connect(address)  # 鏈接服務端
34 BASE_DIR=os.path.dirname(os.path.abspath(__file__))  # 獲取當前文件所在的目錄路徑
35 while True:  # 用來循環上傳文件
36     cmd=input(">>>").strip()  # 獲取用戶輸入命令
37     if cmd.split("|")[0]=="post":  # 若是輸入的第一個參數是post
38         path=os.path.join(BASE_DIR,cmd.split("|")[1])  # 獲取第二個參數文件名,並將其添加到路徑中,獲取文件絕對路徑
39         file_name=os.path.basename(path)  # 獲取文件名
40         file_size=os.stat(path).st_size  # 獲取文件大小
41         sk.sendall(bytes("|".join((file_name,str(file_size))),"utf8"))  # 把文件名和文件大小發送給服務端
42         with open(path,"rb") as f:  # 打開文件,以rb的方式讀取,可直接發送給服務端
43             while True:  # 循環讀取文件直到讀取完成
44                 data=f.read(1024)  # 讀取文件的1024字節
45                 if not data:  # 若是文件讀取完成,讀取的內容就爲空
46                     break  # 這時,退出循環
47                 sk.sendall(data)  # 把讀取的數據發送給服務端
48         flag=str(sk.recv(1024),"utf8")  # 讀取服務端發送的消息
49         if flag=="true":  # 若是服務端發送過來接收成功的消息,說明文件傳輸成功,打印傳輸成功消息
50             print("send file %s successful!"%file_name)
51         else:  # 不然打印文件傳輸失敗的消息
52             print("upload file %s failed!"%file_name)
複製代碼

 

上面是實現文件傳輸的socket編程,由於每次都讀取文件的1024字節並傳遞給服務端,這樣不會把整個文件加載進內存,從而能夠實現大文件的上傳。

6.socketserver及其源碼分析

前面寫的socket都是每次只能實現和一個客戶端通訊的,那麼能不能實現同時和多個客戶端通訊,也就是咱們所說的併發呢?固然是能夠的,python中內置了一個模塊叫作socketserver,這個模塊對python的socket模塊進行更高級的封裝,經過使用多進程或者多線程的方式實現併發通訊,也就是說每一個客戶端來鏈接的話,新開一個進程或者線程去和它通訊,雖然這種方式對資源消耗很大,可是它的源碼其實並不複雜,閱讀他的源碼可以對咱們的提高有比較好的幫助,因此,建議有空的能夠去讀讀socketserver的源碼。加上註釋一共才700行左右的源碼。

咱們先來實現一個支持多客戶端的socketserver示例,再來分析這個示例在python中是如何實現的。

複製代碼
 1 #服務端實現
 2 import socketserver  # 導入socketserver模塊
 3 class Myserver(socketserver.BaseRequestHandler):  # 定義一個類名繼承自socketserver.BaseRequestHandler
 4     def handle(self):  # 自定義一個函數名handle(注意這個名字不能命名成別的名字,父類中有這個方法,至關於重寫了這個方法)裏面實現的是socket通訊的邏輯代碼塊
 5         while True:
 6             conn=self.request
 7             while True:
 8                 try:
 9                     data=str(conn.recv(1024),"utf8")
10                     print("<<<",data)
11                     data=bytes(input(">>>"),"utf8")
12                     conn.sendall(data)
13                 except Exception as e:
14                     print(e)
15                     conn.close()
16                     break
17 
18 if __name__=="__main__":
19     server=socketserver.ThreadingTCPServer(("0.0.0.0",8081),Myserver)  # 建立socketserver.ThreadingTCPServer實例,將地址及端口以及咱們剛剛建立的類傳遞進去,這個類將會在客戶端請求過來時threading一個線程來處理這個請求
20     server.serve_forever()  # 調用實例的serve_forever方法
21 #客戶端實現
22 import socket  # 這塊代碼和上面的基本一致,能夠參照上面的註釋解釋
23 sk=socket.socket()
24 address=("127.0.0.1",8081)
25 sk.connect(address)
26 while True:
27     data=bytes(input(">>>"),"utf8")
28     if str(data,"utf8")=="exit":
29         sk.close()
30         break
31     sk.sendall(data)
32     data=str(sk.recv(1024),"utf8")
33     print("<<<",data)
複製代碼

 

能夠發現使用socketserver模塊,比直接使用socket模塊方便了不少,可以省去不少代碼,而且可以經過多線程的使用來實現真正的多客戶端的通訊。

socketserver源碼分析: 首先,socketserver提供了五個類供咱們使用,他們之間的關係以下所示:

複製代碼
+------------+
| BaseServer |  # 基類
+------------+
      |
      v
+-----------+        +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+        +------------------+
      |
      v
+-----------+        +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+        +--------------------+
複製代碼

 

而咱們使用的類ThreadingTCPServer在源碼中的定義方式以下所示: class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass 是經過繼承ThreadingMixIn, TCPServer兩個類實現的,那麼,這裏面幫咱們幹了些什麼呢?首先,咱們來分析server=socketserver.ThreadingTCPServer(("0.0.0.0",8081),Myserver)這條語句幫咱們作了些什麼,這條語句是建立一個類的對象,建立對象的時候會執行類的__init__方法,首先,找的是父類ThreadingMixIn的__init__方法,可是ThreadingMixIn中並無,因而去找TCPServer中的。在TCPServer中的__init__方法幫咱們作了一些事情,幫咱們建立socket鏈接對象,並調用了父類BaseServer的__init__方法作了一些初始化工做。

而後server.serve_forever()這條語句幫咱們作了什麼事呢?

調用對象的serve_forever方法,首先去父類ThreadingMixIn中沒找到,去TCPServer中也沒找到,到父類BaseServer中找到了,代碼以下:

複製代碼
 1 def serve_forever(self, poll_interval=0.5):  # poll_interval設置的是超時時間,以秒爲單位,做爲select的第四個可選參數傳遞進入
 2         self.__is_shut_down.clear()  # 設置event中的flag爲False
 3         try:
 4             with _ServerSelector() as selector:
 5                 selector.register(self, selectors.EVENT_READ)  # 設置讀事件
 6 
 7                 while not self.__shutdown_request:
 8                     ready = selector.select(poll_interval)  # 實例化select對象
 9                     if ready:
10                         self._handle_request_noblock()  #  若是ready是真,獲取客戶端的請求
11                     self.service_actions()
12         finally:
13             self.__shutdown_request = False
14             self.__is_shut_down.set()
複製代碼

 

這個函數作的事情是調用了selecter實現非阻塞的I/O多路複用。並調用handlerequest_noblock-->processrequest-->finishrequest-->RequestHandlerClass(這個也就是咱們建立對象時傳進去的Myserver)這裏面在初始化時-->setup-->handle(這個函數在原來的父類中是什麼都沒寫的,咱們給重寫了這個函數,因此,調用到這的時候就是調用咱們本身寫的子類的函數執行咱們的代碼)

整個socketserver源碼以下:

複製代碼
  1 #!/usr/bin/env python
  2 # encoding:utf-8
  3 # __author__: socket_server_resource_code
  4 # date: 2016/9/29 15:30
  5 # blog: http://huxianglin.cnblogs.com/ http://xianglinhu.blog.51cto.com/
  6 
  7 __version__ = "0.4"
  8 
  9 import socket  # 導入socket模塊
 10 import selectors  # 導入selectors模塊
 11 import os  # 導入os模塊
 12 import twist
 13 try:
 14     import threading  # 導入threading模塊
 15 except ImportError:  # 若是沒有這個模塊則導入dummy_threading模塊並重命名爲threading模塊
 16     import dummy_threading as threading
 17 from time import monotonic as time  # 從time模塊導入monotonic模塊並重命名爲time模塊
 18 
 19 __all__ = ["BaseServer", "TCPServer", "UDPServer", "ForkingUDPServer",
 20            "ForkingTCPServer", "ThreadingUDPServer", "ThreadingTCPServer",
 21            "BaseRequestHandler", "StreamRequestHandler",
 22            "DatagramRequestHandler", "ThreadingMixIn", "ForkingMixIn"]
 23 """
 24 內置一個全局變量存儲的是這個模塊中的全部類名稱
 25 """
 26 if hasattr(socket, "AF_UNIX"):  # 若是socket對象中有「AF_UNIX」,那麼在__all__列表中添加下面幾個方法
 27 
 28     __all__.extend(["UnixStreamServer","UnixDatagramServer",
 29                     "ThreadingUnixStreamServer",
 30                     "ThreadingUnixDatagramServer"])
 31 
 32 if hasattr(selectors, 'PollSelector'):  # 判斷selectors對象中是否有PollSelector
 33     _ServerSelector = selectors.PollSelector
 34 else:
 35     _ServerSelector = selectors.SelectSelector
 36 
 37 class BaseServer:  # 基類
 38 
 39     timeout = None  # 超時時間,設置爲空
 40 
 41     def __init__(self, server_address, RequestHandlerClass):
 42         """Constructor.  May be extended, do not override."""
 43         self.server_address = server_address  # 設置server的地址
 44         self.RequestHandlerClass = RequestHandlerClass  # 設置處理邏輯類
 45         self.__is_shut_down = threading.Event()  # 線程會阻塞在threading,Event()的地方,直到主線程執行完成後纔會執行子線程
 46         self.__shutdown_request = False  # 設置一個bool
 47 
 48     def server_activate(self):
 49         pass
 50 
 51     def serve_forever(self, poll_interval=0.5):  # poll_interval設置的是超時時間,以秒爲單位,做爲select的第四個可選參數傳遞進入
 52         self.__is_shut_down.clear()  # 設置event中的flag爲False
 53         try:
 54             with _ServerSelector() as selector:
 55                 selector.register(self, selectors.EVENT_READ)  # 設置讀事件
 56 
 57                 while not self.__shutdown_request:
 58                     ready = selector.select(poll_interval)  # 實例化select對象
 59                     if ready:
 60                         self._handle_request_noblock()  #  若是ready是真,獲取客戶端的請求
 61                     self.service_actions()
 62         finally:
 63             self.__shutdown_request = False
 64             self.__is_shut_down.set()
 65 
 66     def shutdown(self):
 67         self.__shutdown_request = True
 68         self.__is_shut_down.wait()
 69 
 70     def service_actions(self):
 71         pass
 72 
 73     def handle_request(self):
 74         timeout = self.socket.gettimeout()
 75         if timeout is None:
 76             timeout = self.timeout
 77         elif self.timeout is not None:
 78             timeout = min(timeout, self.timeout)
 79         if timeout is not None:
 80             deadline = time() + timeout
 81         with _ServerSelector() as selector:
 82             selector.register(self, selectors.EVENT_READ)
 83 
 84             while True:
 85                 ready = selector.select(timeout)
 86                 if ready:
 87                     return self._handle_request_noblock()
 88                 else:
 89                     if timeout is not None:
 90                         timeout = deadline - time()
 91                         if timeout < 0:
 92                             return self.handle_timeout()
 93 
 94     def _handle_request_noblock(self):
 95         try:
 96             request, client_address = self.get_request()
 97         except OSError:
 98             return
 99         if self.verify_request(request, client_address):  # 永遠返回true
100             try:
101                 self.process_request(request, client_address)
102             except:
103                 self.handle_error(request, client_address)
104                 self.shutdown_request(request)
105         else:
106             self.shutdown_request(request)
107 
108     def handle_timeout(self):
109         pass
110 
111     def verify_request(self, request, client_address):
112         return True
113 
114     def process_request(self, request, client_address):
115         self.finish_request(request, client_address)  # 調用處理邏輯類
116         self.shutdown_request(request)  # 調用處理關閉邏輯類。。。其實什麼都沒幹
117 
118     def server_close(self):
119         pass
120 
121     def finish_request(self, request, client_address):
122         self.RequestHandlerClass(request, client_address, self)  # 調用邏輯類
123 
124     def shutdown_request(self, request):
125         self.close_request(request)
126 
127     def close_request(self, request):
128         pass
129 
130     def handle_error(self, request, client_address):
131         print('-'*40)
132         print('Exception happened during processing of request from', end=' ')
133         print(client_address)
134         import traceback
135         traceback.print_exc() # XXX But this goes to stderr!
136         print('-'*40)
137 
138 
139 class TCPServer(BaseServer):
140 
141     address_family = socket.AF_INET
142 
143     socket_type = socket.SOCK_STREAM
144 
145     request_queue_size = 5
146 
147     allow_reuse_address = False
148 
149     def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):  # 將傳入的參數進行初始化
150         """Constructor.  May be extended, do not override."""
151         BaseServer.__init__(self, server_address, RequestHandlerClass)  # 調用父類構造方法,將傳入參數賦值到對象上
152         self.socket = socket.socket(self.address_family,
153                                     self.socket_type)  # 設置socket對象,ip tcp模式
154         if bind_and_activate:
155             try:
156                 self.server_bind()  # 綁定IP和端口
157                 self.server_activate()  # 設置偵聽
158             except:
159                 self.server_close()  # 關閉對象並引起異常
160                 raise
161 
162     def server_bind(self):
163         if self.allow_reuse_address:
164             self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # 設置tcp的狀態通常不會當即關閉而經歷TIME_WAIT的過程。後想繼續重用該socket
165         self.socket.bind(self.server_address)  # 綁定IP和端口
166         self.server_address = self.socket.getsockname()  # getsockname返回本身套接字的地址
167 
168     def server_activate(self):  # 設置偵聽
169         self.socket.listen(self.request_queue_size)
170 
171     def server_close(self):  # 關閉socket鏈接
172         self.socket.close()
173 
174     def fileno(self):  # 返回socket的fileno信息
175         return self.socket.fileno()
176 
177     def get_request(self):  # socket server設置準備就緒狀態,等待接收客戶端發來的信息
178         return self.socket.accept()
179 
180     def shutdown_request(self, request):  # 關閉客戶端socket的鏈接
181         try:
182             request.shutdown(socket.SHUT_WR)
183         except OSError:
184             pass
185         self.close_request(request)
186 
187     def close_request(self, request):
188         request.close()
189 
190 
191 class UDPServer(TCPServer):
192 
193     allow_reuse_address = False
194 
195     socket_type = socket.SOCK_DGRAM
196 
197     max_packet_size = 8192
198 
199     def get_request(self):
200         data, client_addr = self.socket.recvfrom(self.max_packet_size)
201         return (data, self.socket), client_addr
202 
203     def server_activate(self):
204         pass
205 
206     def shutdown_request(self, request):
207         self.close_request(request)
208 
209     def close_request(self, request):
210         pass
211 
212 class ForkingMixIn:
213 
214     timeout = 300
215     active_children = None
216     max_children = 40
217 
218     def collect_children(self):
219         if self.active_children is None:
220             return
221         while len(self.active_children) >= self.max_children:
222             try:
223                 pid, _ = os.waitpid(-1, 0)
224                 self.active_children.discard(pid)
225             except ChildProcessError:
226                 self.active_children.clear()
227             except OSError:
228                 break
229         for pid in self.active_children.copy():
230             try:
231                 pid, _ = os.waitpid(pid, os.WNOHANG)
232                 self.active_children.discard(pid)
233             except ChildProcessError:
234                 self.active_children.discard(pid)
235             except OSError:
236                 pass
237 
238     def handle_timeout(self):
239         self.collect_children()
240 
241     def service_actions(self):
242         self.collect_children()
243 
244     def process_request(self, request, client_address):
245         pid = os.fork()
246         if pid:
247             if self.active_children is None:
248                 self.active_children = set()
249             self.active_children.add(pid)
250             self.close_request(request)
251             return
252         else:
253             try:
254                 self.finish_request(request, client_address)
255                 self.shutdown_request(request)
256                 os._exit(0)
257             except:
258                 try:
259                     self.handle_error(request, client_address)
260                     self.shutdown_request(request)
261                 finally:
262                     os._exit(1)
263 
264 
265 class ThreadingMixIn:
266     daemon_threads = False
267 
268     def process_request_thread(self, request, client_address):
269         try:
270             self.finish_request(request, client_address)
271             self.shutdown_request(request)
272         except:
273             self.handle_error(request, client_address)
274             self.shutdown_request(request)
275 
276     def process_request(self, request, client_address):
277         t = threading.Thread(target = self.process_request_thread,
278                              args = (request, client_address))
279         t.daemon = self.daemon_threads
280         t.start()
281 
282 
283 class ForkingUDPServer(ForkingMixIn, UDPServer): pass
284 class ForkingTCPServer(ForkingMixIn, TCPServer): pass
285 
286 class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
287 class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
288 
289 if hasattr(socket, 'AF_UNIX'):
290 
291     class UnixStreamServer(TCPServer):
292         address_family = socket.AF_UNIX
293 
294     class UnixDatagramServer(UDPServer):
295         address_family = socket.AF_UNIX
296 
297     class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass
298 
299     class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
300 
301 class BaseRequestHandler:
302 
303     def __init__(self, request, client_address, server):
304         self.request = request
305         self.client_address = client_address
306         self.server = server
307         self.setup()
308         try:
309             self.handle()
310         finally:
311             self.finish()
312 
313     def setup(self):
314         pass
315 
316     def handle(self):
317         pass
318 
319     def finish(self):
320         pass
321 
322 class StreamRequestHandler(BaseRequestHandler):
323 
324     rbufsize = -1
325     wbufsize = 0
326     disable_nagle_algorithm = False
327 
328     def setup(self):
329         self.connection = self.request
330         if self.timeout is not None:
331             self.connection.settimeout(self.timeout)
332         if self.disable_nagle_algorithm:
333             self.connection.setsockopt(socket.IPPROTO_TCP,
334                                        socket.TCP_NODELAY, True)
335         self.rfile = self.connection.makefile('rb', self.rbufsize)
336         self.wfile = self.connection.makefile('wb', self.wbufsize)
337 
338     def finish(self):
339         if not self.wfile.closed:
340             try:
341                 self.wfile.flush()
342             except socket.error:
343                 pass
344         self.wfile.close()
345         self.rfile.close()
346 
347 
348 class DatagramRequestHandler(BaseRequestHandler):
349 
350     """Define self.rfile and self.wfile for datagram sockets."""
351 
352     def setup(self):
353         from io import BytesIO
354         self.packet, self.socket = self.request
355         self.rfile = BytesIO(self.packet)
356         self.wfile = BytesIO()
357 
358     def finish(self):
359         self.socket.sendto(self.wfile.getvalue(), self.client_address)
複製代碼

 

 
分類:  python高級
相關文章
相關標籤/搜索