1、socket(單連接)python
一、socket:應用層與TCP/IP協議族通訊的中間軟件抽象層,它是一組接口。在設計模式中,Socket其實就是一個門面模式,它把複雜的TCP/IP協議族隱藏在Socket接口後面;也有人將socket說成ip+port,ip是用來標識互聯網中的一臺主機的位置,而port是用來標識這臺機器上的一個應用程序,ip地址是配置到網卡上的,而port是應用程序開啓的,ip與port的綁定就標識了互聯網中獨一無二的一個應用程序;而程序的pid是同一臺機器上不一樣進程或者線程的標識。linux
二、套接字:用於在同一臺主機上多個應用程序之間的通信。套接字有兩種(或者稱爲有兩個種族),分別是基於文件型(AF_UNIX)和基於網絡型(AF_INET)。git
三、基於TCP的套接字(類型一)程序員
工做原理:先從服務器端提及。服務器端先初始化Socket,而後與端口綁定(bind),對端口進行監聽(listen),調用accept阻塞,等待客戶端鏈接。在這時若是有個客戶端初始化一個Socket,而後鏈接服務器(connect),若是鏈接成功,這時客戶端與服務器端的鏈接就創建了。客戶端發送數據請求,服務器端接收請求並處理請求,而後把迴應數據發送給客戶端,客戶端讀取數據,最後關閉鏈接,一次交互結束github
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import socket #導入socket模塊 5 ip_port = ("127.0.0.1",9999) #設置服務器ip和端口 6 server = socket.socket() #建立server實例 //聲明socket類型同時生成socket對象 7 server.bind(ip_port) #套接字綁定ip與端口 8 server.listen(5) #監聽鏈接//容許5個客戶端排隊 9 conn,addr = server.accept() #等待客戶端鏈接 // 客戶端鏈接後,返回新的套接字與IP地址 10 client_data = conn.recv(1024) #接收數據//把接收的數據實例化 11 #client_data = b'hello' 12 conn.sendall(client_data.upper()) #把數據發送到客戶端 //upper() 字母變成大寫 13 conn.close() #關閉鏈接
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import socket #導入socket模塊 5 ip_port = ("127.0.0.1",9999) #設置服務器ip和端口 6 client = socket.socket() #建立client實例 7 client.connect(ip_port) #設置要鏈接的ip和端口 8 info = "hello world" #要發送的數據 9 client.sendall(info.encode("utf-8")) #發送數據// 把str轉換爲bytes類型 10 server_data = client.recv(1024) #接收數據 11 client.close() #關閉鏈接
1 ① server = socket.socket() 2 套接字格式:socket(family,type[,protocal]) 使用給定的地址族、套接字類型、協議編號(默認爲0)來建立套接字。 3 參數一:地址簇 4 socket.AF_INET IPv4(默認) 5 socket.AF_INET6 IPv6 6 socket.AF_UNIX 只可以用於單一的Unix系統進程間通訊 7 參數二:類型 8 socket.SOCK_STREAM 流式socket , for TCP (默認) 9 socket.SOCK_DGRAM 數據報式socket , for UDP 10 socket.SOCK_RAW 原始套接字,普通的套接字沒法處理ICMP、IGMP等網絡報文,而SOCK_RAW能夠;其次,SOCK_RAW也能夠處理特殊的IPv4報文;此外,利用原始套接字,能夠經過IP_HDRINCL套接字選項由用戶構造IP頭。 11 socket.SOCK_RDM 是一種可靠的UDP形式,即保證交付數據報但不保證順序。SOCK_RAM用來提供對原始協議的低級訪問,在須要執行某些特殊操做時使用,如發送ICMP報文。SOCK_RAM一般僅限於高級用戶或管理員運行的程序使用。 12 socket.SOCK_SEQPACKET 可靠的連續數據包服務 13 參數三:協議 14 0 (默認)與特定的地址家族相關的協議,若是是 0 ,則系統就會根據地址格式和套接類別,自動選擇一個合適的協議 15 #建立TCP Socket:server=socket.socket(socket.AF_INET,socket.SOCK_STREAM) 16 #建立UDP Socket:server=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 17 注意點: 18 1)TCP發送數據時,已創建好TCP鏈接,因此不須要指定地址。UDP是面向無鏈接的,每次發送要指定是發給誰。 19 2)服務端與客戶端不能直接發送列表,元組,字典。須要字符串化repr(data) 20 ② server.bind(address) 21 將套接字綁定到地址。address地址的格式取決於地址族。在AF_INET下,以元組(host,port)的形式表示地址 22 ③ server.listen(backlog) 23 開始監聽傳入鏈接。backlog指定在拒絕鏈接以前,能夠掛起的最大鏈接數量。該值至少爲1,大部分應用程序設爲5就能夠了。backlog等於5,表示內核已經接到了鏈接請求,但服務器尚未調用accept進行處理的鏈接個數最大爲5,這個值不能無限大,由於要在內核中維護鏈接隊列 24 ④ server.setblocking(bool) 25 是否阻塞(默認True),若是設置False,那麼accept和recv時一旦無數據,則報錯 26 ⑤ conn,addr = server.accept() 27 接受鏈接並返回(conn,address),其中conn是新的套接字對象,能夠用來接收和發送數據。address是鏈接客戶端的地址。接收TCP 客戶的鏈接(阻塞式)等待鏈接的到來 28 ⑥ client.connect(address) 29 鏈接到address處的套接字。通常address的格式爲元組(hostname,port),若是鏈接出錯,返回socket.error錯誤。 30 ⑦ client.connect_ex(address) 31 同上,只不過會有返回值,鏈接成功時返回 0 ,鏈接失敗時候返回編碼,例如:10061 32 ⑧ client.close() 33 關閉套接字 34 ⑨ client.recv(bufsize[,flag]) 35 接受套接字的數據。數據以字符串形式返回,bufsize指定最多能夠接收的數量。flag提供有關消息的其餘信息,一般能夠忽略 36 ⑩ client.recvfrom(bufsize[.flag]) 37 與recv()相似,但返回值是(data,address)。其中data是包含接收數據的字符串,address是發送數據的套接字地址 38 ⑪ server.send(string[,flag]) 39 發送TCP數據;將string中的數據發送到鏈接的套接字。返回值是要發送的字節數量,該數量可能小於string的字節大小。即:可能未將指定內容所有發送 40 ⑫ server.sendall(string[,flag]) 41 完整發送TCP數據;將string中的數據發送到鏈接的套接字,但在返回以前會嘗試發送全部數據。成功返回None,失敗則拋出異常;內部經過遞歸調用send,將全部內容發送出去 42 ⑬ server.sendto(string[,flag],address) 43 將數據發送到套接字,address是形式爲(ipaddr,port)的元組,指定遠程地址。返回值是發送的字節數。該函數主要用於UDP協議 44 ⑭ sk.settimeout(timeout) 45 設置套接字操做的超時期,timeout是一個浮點數,單位是秒。值爲None表示沒有超時期。通常,超時期應該在剛建立套接字時設置,由於它們可能用於鏈接的操做(如 client 鏈接最多等待5s ) 46 ⑮ sk.getpeername() 47 返回鏈接套接字的遠程地址。返回值一般是元組(ipaddr,port) 48 ⑯ sk.getsockname() 49 返回套接字本身的地址。一般是一個元組(ipaddr,port) 50 ⑰ sk.fileno() 51 套接字的文件描述符 52 53 54 ###服務端套接字函數### 55 s.bind() #綁定(主機,端口號)到套接字 56 s.listen() #開始TCP監聽 57 s.accept() #被動接受TCP客戶的鏈接,(阻塞式)等待鏈接的到來 58 59 ###客戶端套接字函數### 60 s.connect() #主動初始化TCP服務器鏈接 61 s.connect_ex() #connect()函數的擴展版本,出錯時返回出錯碼,而不是拋出異常 62 63 ###公共用途的套接字函數### 64 s.recv() #接收TCP數據 65 s.send() #發送TCP數據(send在待發送數據量大於己端緩存區剩餘空間時,數據丟失,不會發完) 66 s.sendall() #發送完整的TCP數據(本質就是循環調用send,sendall在待發送數據量大於己端緩存區剩餘空間時,數據不丟失,循環調用send直到發完) 67 s.recvfrom() #接收UDP數據 68 s.sendto() #發送UDP數據 69 s.getpeername() #鏈接到當前套接字的遠端的地址 70 s.getsockname() #當前套接字的地址 71 s.getsockopt() #返回指定套接字的參數 72 s.setsockopt() #設置指定套接字的參數 73 s.close() #關閉套接字 74 75 ###面向鎖的套接字方法### 76 s.setblocking() #設置套接字的阻塞與非阻塞模式 77 s.settimeout() #設置阻塞套接字操做的超時時間 78 s.gettimeout() #獲得阻塞套接字操做的超時時間 79 80 ###面向文件的套接字的函數### 81 s.fileno() #套接字的文件描述符 82 s.makefile() #建立一個與該套接字相關的文件
四、基於UDP的套接字(類型二)
udp是無連接的,先啓動哪一端都不會報錯且能夠同時多個客戶端去跟服務端通訊web
1 #UDP server 2 ss = socket() #建立一個服務器的套接字 3 ss.bind() #綁定服務器套接字 4 inf_loop: #服務器無限循環 5 cs = ss.recvfrom()/ss.sendto() # 對話(接收與發送) 6 ss.close() # 關閉服務器套接字 7 8 9 #UDP client 10 cs = socket() # 建立客戶套接字 11 comm_loop: # 通信循環 12 cs.sendto()/cs.recvfrom() # 對話(發送/接收) 13 cs.close() # 關閉客戶套接字
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import socket 5 ip_port=('127.0.0.1',9000) 6 BUFSIZE=1024 7 udp_server_client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 8 udp_server_client.bind(ip_port) 9 10 while True: 11 msg,addr=udp_server_client.recvfrom(BUFSIZE) 12 print(msg,addr) 13 udp_server_client.sendto(msg.upper(),addr)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import socket 5 ip_port=('127.0.0.1',9000) 6 BUFSIZE=1024 7 udp_server_client=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 8 9 while True: 10 msg=input('>>: ').strip() 11 if not msg:continue 12 udp_server_client.sendto(msg.encode('utf-8'),ip_port) 13 back_msg,addr=udp_server_client.recvfrom(BUFSIZE) 14 print(back_msg.decode('utf-8'),addr) 15 udp_client_socket.close()
##qq聊天(因爲udp無鏈接,因此能夠同時多個客戶端去跟服務端通訊)##算法
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import socket 5 ip_port=('127.0.0.1',8081) 6 udp_server_sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) #買手機 7 udp_server_sock.bind(ip_port) 8 9 while True: 10 qq_msg,addr=udp_server_sock.recvfrom(1024) 11 print('來自[%s:%s]的一條消息:\033[1;44m%s\033[0m' %(addr[0],addr[1],qq_msg.decode('utf-8'))) 12 back_msg=input('回覆消息: ').strip() 13 14 udp_server_sock.sendto(back_msg.encode('utf-8'),addr)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import socket 5 BUFSIZE=1024 6 udp_client_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 7 8 qq_name_dic={ 9 '狗哥alex':('127.0.0.1',8081), 10 '瞎驢':('127.0.0.1',8081), 11 '一棵樹':('127.0.0.1',8081), 12 '武大郎':('127.0.0.1',8081), 13 } 14 15 16 while True: 17 qq_name=input('請選擇聊天對象: ').strip() 18 while True: 19 msg=input('請輸入消息,回車發送: ').strip() 20 if msg == 'quit':break 21 if not msg or not qq_name or qq_name not in qq_name_dic:continue 22 udp_client_socket.sendto(msg.encode('utf-8'),qq_name_dic[qq_name]) 23 24 back_msg,addr=udp_client_socket.recvfrom(BUFSIZE) 25 print('來自[%s:%s]的一條消息:\033[1;44m%s\033[0m' %(addr[0],addr[1],back_msg.decode('utf-8'))) 26 27 udp_client_socket.close()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import socket 5 BUFSIZE=1024 6 udp_client_socket=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) 7 8 qq_name_dic={ 9 '狗哥alex':('127.0.0.1',8081), 10 '瞎驢':('127.0.0.1',8081), 11 '一棵樹':('127.0.0.1',8081), 12 '武大郎':('127.0.0.1',8081), 13 } 14 15 16 while True: 17 qq_name=input('請選擇聊天對象: ').strip() 18 while True: 19 msg=input('請輸入消息,回車發送: ').strip() 20 if msg == 'quit':break 21 if not msg or not qq_name or qq_name not in qq_name_dic:continue 22 udp_client_socket.sendto(msg.encode('utf-8'),qq_name_dic[qq_name]) 23 24 back_msg,addr=udp_client_socket.recvfrom(BUFSIZE) 25 print('來自[%s:%s]的一條消息:\033[1;44m%s\033[0m' %(addr[0],addr[1],back_msg.decode('utf-8'))) 26 27 udp_client_socket.close()
五、粘包現象:①發送端須要等緩衝區滿才發送出去,形成粘包(發送數據時間間隔很短,數據了很小,TCP有一個Nagle算法會把數據合到一塊兒,產生粘包)
②接收方不及時接收緩衝區的包,形成多個包接收(客戶端發送了一段數據,服務端只收了一小部分,服務端下次再收的時候仍是從緩衝區拿上次遺留的數據,產生粘包)shell
緣由:接收方不知道消息之間的界限,不知道一次性提取多少字節的數據。編程
TCP有粘包現象,UDP永遠不會粘包:tcp是基於數據流的,收發兩端都要有一一成對的socket,TCP採用Nagle優化算法消息進行消息處理機制,面向流的通訊是無消息保護邊界的。而udp是基於數據報的,支持的是一對多的模式,套接字緩衝區採用了鏈式結構來記錄每個到達的UDP包,在每一個UDP包中添加了消息頭(消息來源地址,端口等信息),面向消息的通訊是有消息保護邊界的。json
tcp是可靠傳輸,udp是不可靠傳輸:tcp在數據傳輸時,發送端先把數據發送到本身的緩存中,而後協議控制將緩存中的數據發往對應端,對應端返回一個ack=1,發送端則清理緩存中的數據,對端返回ack=0,則從新發送數據,因此tcp是可靠的;而udp發送數據,對端是不會返回確認信息的,所以不可靠。
解決粘包:問題的根源在於,接收端不知道發送端將要傳送的字節流的長度,因此解決粘包的方法就是圍繞,如何讓發送端在發送數據前,把本身將要發送的字節流總大小讓接收端知曉,而後接收端來一個死循環接收完全部數據。
第一種解決方案:(low)
1 #server端 2 3 #!/usr/bin/env python 4 # -*- coding:utf-8 -*- 5 # _author_soloLi 6 from socket import * #因爲 socket 模塊中有太多的屬性。咱們在這裏破例使用了'from module import *'語句。使用 'from socket import *',咱們就把 socket模塊裏的全部屬性都帶到咱們的命名空間裏了,這樣能 大幅減短咱們的代碼。 7 import subprocess 8 ip_port=('127.0.0.1',8080) 9 back_log=5 10 buffer_size=1024 11 12 server=socket(AF_INET,SOCK_STREAM) 13 server.bind(ip_port) 14 server.listen(back_log) 15 16 while True: #連接循環 17 conn,addr=server.accept() 18 print('新的客戶端連接',addr) 19 while True: #通訊循環 20 21 ##//收數據//## 22 try: 23 cmd=conn.recv(buffer_size) 24 if not cmd:break 25 print('收到客戶端的命令',cmd) 26 27 #執行命令,獲得命令的運行結果cmd_res 28 29 #subprocess模塊提供了一種一致的方法來建立和處理附加進程,與標準庫中的其它模塊相比,提供了一個更高級的接口。用於替換以下模塊:os.system() , os.spawnv() , os和popen2模塊中的popen()函數,以及 commands(). 30 res=subprocess.Popen(cmd.decode('utf-8'),shell=True, #解碼(bytes->str) 31 stderr=subprocess.PIPE, 32 stdout=subprocess.PIPE, 33 stdin=subprocess.PIPE) 34 35 err=res.stderr.read() 36 if err: 37 cmd_res=err 38 else: 39 cmd_res=res.stdout.read() ##編碼是以當前所在的系統爲準的,若是是windows,那麼res.stdout.read()讀出的就是GBK編碼的,在接收端須要用GBK解碼且只能從管道里讀一次結果 40 41 ##//發數據//## 42 if not cmd_res: 43 cmd_res='執行成功'.encode('gbk') #轉換編碼(gbk->unicode)(轉換顯示中文的字符串) 44 45 length=len(cmd_res) 46 conn.send(str(length).encode('utf-8')) #編碼(str->bytes) 47 client_ready=conn.recv(buffer_size) 48 if client_ready == b'ready': 49 conn.send(cmd_res) 50 except Exception as e: 51 print(e) 52 break
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 from socket import * 5 ip_port=('127.0.0.1',8080) 6 back_log=5 7 buffer_size=1024 8 9 client=socket(AF_INET,SOCK_STREAM) 10 client.connect(ip_port) 11 12 while True: 13 cmd=input('>>: ').strip() 14 if not cmd:continue 15 if cmd == 'quit':break 16 17 client.send(cmd.encode('utf-8')) #編碼(str->bytes) 18 19 #解決粘包 20 length=client.recv(buffer_size) 21 client.send(b'ready') 22 23 length=int(length.decode('utf-8')) #解碼(bytes->str) 24 25 recv_size=0 26 recv_msg=b'' 27 while recv_size < length: 28 recv_msg += tcp_client.recv(buffer_size) 29 recv_size=len(recv_msg) #1024 30 31 print('命令的執行結果是 ',recv_msg.decode('gbk')) #轉換編碼(gbk->unicode)(轉換顯示中文的字符串) 32 client.close()
low的緣由:程序的運行速度遠快於網絡傳輸速度,因此在發送一段字節前,先用send去發送該字節流長度,這種方式會放大網絡延遲帶來的性能損耗
第二種解決方案:(NB)
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 from socket import * #因爲 socket 模塊中有太多的屬性。咱們在這裏破例使用了'from module import *'語句。使用 'from socket import *',咱們就把 socket模塊裏的全部屬性都帶到咱們的命名空間裏了,這樣能 大幅減短咱們的代碼。 5 import subprocess 6 ip_port=('127.0.0.1',8080) 7 back_log=5 8 buffer_size=1024 9 10 server=socket(AF_INET,SOCK_STREAM) 11 server.bind(ip_port) 12 server.listen(back_log) 13 14 while True: #連接循環 15 conn,addr=server.accept() 16 print('新的客戶端連接',addr) 17 while True: #通訊循環 18 19 ##//收數據//## 20 try: 21 cmd=conn.recv(buffer_size) 22 if not cmd:break 23 print('收到客戶端的命令',cmd) 24 25 #執行命令,獲得命令的運行結果cmd_res 26 27 #subprocess模塊提供了一種一致的方法來建立和處理附加進程,與標準庫中的其它模塊相比,提供了一個更高級的接口。用於替換以下模塊:os.system() , os.spawnv() , os和popen2模塊中的popen()函數,以及 commands(). 28 res=subprocess.Popen(cmd.decode('utf-8'),shell=True, #解碼(bytes->str) 29 stderr=subprocess.PIPE, 30 stdout=subprocess.PIPE, 31 stdin=subprocess.PIPE) 32 33 err=res.stderr.read() 34 if err: 35 cmd_res=err 36 else: 37 cmd_res=res.stdout.read() ##編碼是以當前所在的系統爲準的,若是是windows,那麼res.stdout.read()讀出的就是GBK編碼的,在接收端須要用GBK解碼且只能從管道里讀一次結果 38 39 ##//發數據//## 40 if not cmd_res: 41 cmd_res='執行成功'.encode('gbk') #轉換編碼(gbk->unicode)(轉換顯示中文的字符串) 42 43 length=len(cmd_res) 44 conn.send(str(length).encode('utf-8')) #編碼(str->bytes) 45 client_ready=conn.recv(buffer_size) 46 if client_ready == b'ready': 47 conn.send(cmd_res) 48 except Exception as e: 49 print(e) 50 break
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 from socket import * 5 import struct ##NB##//struct模塊能夠把一個類型,如數字,轉成固定長度的bytes 6 from functools import partial ##NB## 7 ip_port=('127.0.0.1',8080) 8 back_log=5 9 buffer_size=1024 10 11 client=socket(AF_INET,SOCK_STREAM) 12 client.connect(ip_port) 13 14 while True: 15 cmd=input('>>: ').strip() 16 if not cmd:continue 17 if cmd == 'quit':break 18 19 client.send(cmd.encode('utf-8')) #編碼(str->bytes) 20 21 #解決粘包 22 length_data=tcp_client.recv(4)##NB##爲字節流加上自定義固定長度報頭,報頭中包含字節流長度, 23 #而後一次send到對端,對端在接收時,先從緩存中取出定長的報頭,而後再取真實數據 24 length=struct.unpack('i',length_data)[0] ##NB## 25 length=int(length.decode('utf-8')) #解碼(bytes->str) 26 27 recv_size=0 28 recv_msg=b'' 29 while recv_size < length: 30 recv_msg += tcp_client.recv(buffer_size) 31 recv_size=len(recv_msg) #1024 32 33 print('命令的執行結果是 ',recv_msg.decode('gbk')) #轉換編碼(gbk->unicode)(轉換顯示中文的字符串) 34 client.close()
第二種方案的引伸:(NBST)
1 import json,struct 2 #假設經過客戶端上傳1T:1073741824000的文件a.txt 3 4 #爲避免粘包,必須自定製報頭 5 header={'file_size':1073741824000,'file_name':'/a/b/c/d/e/a.txt','md5':'8f6fbf8347faa4924a76856701edb0f3'} #1T數據,文件路徑和md5值 6 7 #爲了該報頭能傳送,須要序列化而且轉爲bytes 8 head_bytes=bytes(json.dumps(header),encoding='utf-8') #序列化並轉成bytes,用於傳輸 9 10 #爲了讓客戶端知道報頭的長度,用struck將報頭長度這個數字轉成固定長度:4個字節 11 head_len_bytes=struct.pack('i',len(head_bytes)) #這4個字節裏只包含了一個數字,該數字是報頭的長度 12 13 #客戶端開始發送 14 conn.send(head_len_bytes) #先發報頭的長度,4個bytes 15 conn.send(head_bytes) #再發報頭的字節格式 16 conn.sendall(文件內容) #而後發真實內容的字節格式 17 18 #服務端開始接收 19 head_len_bytes=s.recv(4) #先收報頭4個bytes,獲得報頭長度的字節格式 20 x=struct.unpack('i',head_len_bytes)[0] #提取報頭的長度 21 22 head_bytes=s.recv(x) #按照報頭長度x,收取報頭的bytes格式 23 header=json.loads(json.dumps(header)) #提取報頭 24 25 #最後根據報頭的內容提取真實的數據,好比 26 real_data_len=s.recv(header['file_size']) 27 s.recv(real_data_len)
1 #咱們能夠把報頭作成字典,字典裏包含將要發送的真實數據的詳細信息,而後json序列化,而後用struck將序列化後的數據長度打包成4個字節(4個本身足夠用了) 2 #發送時: 3 1.先發報頭長度 4 2.再編碼報頭內容而後發送 5 3.最後發真實內容 6 7 #接收時: 8 1.先手報頭長度,用struct取出來 9 2.根據取出的長度收取報頭內容,而後解碼,反序列化 10 3.從反序列化的結果中取出待取數據的詳細信息,而後去取真實的數據內容
1 import socket,struct,json 2 import subprocess 3 phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM) 4 phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加 5 6 phone.bind(('127.0.0.1',8080)) 7 8 phone.listen(5) 9 10 while True: 11 conn,addr=phone.accept() 12 while True: 13 cmd=conn.recv(1024) 14 if not cmd:break 15 print('cmd: %s' %cmd) 16 17 res=subprocess.Popen(cmd.decode('utf-8'), 18 shell=True, 19 stdout=subprocess.PIPE, 20 stderr=subprocess.PIPE) 21 err=res.stderr.read() 22 print(err) 23 if err: 24 back_msg=err 25 else: 26 back_msg=res.stdout.read() 27 28 headers={'data_size':len(back_msg)} 29 head_json=json.dumps(headers) 30 head_json_bytes=bytes(head_json,encoding='utf-8') 31 32 conn.send(struct.pack('i',len(head_json_bytes))) #先發報頭的長度 33 conn.send(head_json_bytes) #再發報頭 34 conn.sendall(back_msg) #在發真實的內容 35 36 conn.close() 37 38 服務端:定製稍微複雜一點的報頭
1 from socket import * 2 import struct,json 3 4 ip_port=('127.0.0.1',8080) 5 client=socket(AF_INET,SOCK_STREAM) 6 client.connect(ip_port) 7 8 while True: 9 cmd=input('>>: ') 10 if not cmd:continue 11 client.send(bytes(cmd,encoding='utf-8')) 12 13 head=client.recv(4) 14 head_json_len=struct.unpack('i',head)[0] 15 head_json=json.loads(client.recv(head_json_len).decode('utf-8')) 16 data_len=head_json['data_size'] 17 18 recv_size=0 19 recv_data=b'' 20 while recv_size < data_len: 21 recv_data+=client.recv(1024) 22 recv_size+=len(recv_data) 23 24 print(recv_data.decode('utf-8')) 25 #print(recv_data.decode('gbk')) #windows默認gbk編碼 26 27 客戶端
2、socketserver(多連接)
基於tcp的套接字,關鍵就是兩個循環,一個連接循環,一個通訊循環
socket:收到一個連接後直接進入通訊循環,其餘連接要等以前的那個連接通訊循環結束後才能進入通訊循環
socketserver:模塊中分兩大類:server類(解決連接問題)和request類(解決通訊問題)
socketserver模塊是標準庫中不少服務器框架的基礎。內部使用 IO多路複用 以及 「多線程」 和 「多進程」 ,從而實現併發處理多個客戶端請求的Socket服務端。即:每一個客戶端請求鏈接到服務器時,Socket服務端都會在服務器是建立一個「線程」或者「進程」 專門負責處理當前客戶端的全部請求。
ThreadingTCPServer(多線程,真併發)實現的Soket服務器內部會爲每一個client建立一個 「線程」,該線程用來和客戶端進行交互。
使用ThreadingTCPServer:
一、建立一個繼承自 SocketServer.BaseRequestHandler 的類
二、類中必須定義一個名稱爲 handle 的方法
三、啓動ThreadingTCPServer
1 from socket import * 2 import subprocess 3 import struct 4 ip_port=('127.0.0.1',8080) 5 back_log=5 6 buffer_size=1024 7 8 tcp_server=socket(AF_INET,SOCK_STREAM) 9 tcp_server.bind(ip_port) 10 tcp_server.listen(back_log) 11 12 while True: 13 conn,addr=tcp_server.accept() 14 print('新的客戶端連接',addr) 15 while True: 16 #收 17 try: 18 cmd=conn.recv(buffer_size) 19 if not cmd:break 20 print('收到客戶端的命令',cmd) 21 22 #執行命令,獲得命令的運行結果cmd_res 23 res=subprocess.Popen(cmd.decode('utf-8'),shell=True, 24 stderr=subprocess.PIPE, 25 stdout=subprocess.PIPE, 26 stdin=subprocess.PIPE) 27 err=res.stderr.read() 28 if err: 29 cmd_res=err 30 else: 31 cmd_res=res.stdout.read() 32 33 #發 34 if not cmd_res: 35 cmd_res='執行成功'.encode('gbk') 36 37 length=len(cmd_res) 38 39 data_length=struct.pack('i',length) 40 conn.send(data_length) 41 conn.send(cmd_res) 42 except Exception as e: 43 print(e) 44 break
1 from socket import * 2 import struct 3 from functools import partial 4 ip_port=('127.0.0.1',8080) 5 back_log=5 6 buffer_size=1024 7 8 tcp_client=socket(AF_INET,SOCK_STREAM) 9 tcp_client.connect(ip_port) 10 11 while True: 12 cmd=input('>>: ').strip() 13 if not cmd:continue 14 if cmd == 'quit':break 15 16 tcp_client.send(cmd.encode('utf-8')) 17 18 19 #解決粘包 20 length_data=tcp_client.recv(4) 21 length=struct.unpack('i',length_data)[0] 22 23 recv_size=0 24 recv_data=b'' 25 while recv_size < length: 26 recv_data+=tcp_client.recv(buffer_size) 27 recv_size=len(recv_data) 28 print('命令的執行結果是 ',recv_data.decode('gbk')) 29 tcp_client.close()
1 from socket import * 2 import struct 3 from functools import partial 4 ip_port=('127.0.0.1',8080) 5 back_log=5 6 buffer_size=1024 7 8 tcp_client=socket(AF_INET,SOCK_STREAM) 9 tcp_client.connect(ip_port) 10 11 while True: 12 cmd=input('>>: ').strip() 13 if not cmd:continue 14 if cmd == 'quit':break 15 16 tcp_client.send(cmd.encode('utf-8')) 17 18 19 #解決粘包 20 length_data=tcp_client.recv(4) 21 length=struct.unpack('i',length_data)[0] 22 23 recv_size=0 24 recv_data=b'' 25 while recv_size < length: 26 recv_data+=tcp_client.recv(buffer_size) 27 recv_size=len(recv_data) 28 print('命令的執行結果是 ',recv_data.decode('gbk')) 29 tcp_client.close()
1 import socketserver 2 3 4 ''' 5 def __init__(self, request, client_address, server): 6 self.request = request 7 self.client_address = client_address 8 self.server = server 9 self.setup() 10 try: 11 self.handle() 12 finally: 13 self.finish() 14 15 ''' 16 17 class MyServer(socketserver.BaseRequestHandler): 18 19 def handle(self): 20 print('conn is: ',self.request) #conn 21 print('addr is: ',self.client_address) #addr 22 23 while True: 24 try: 25 #收消息 26 data=self.request.recv(1024) 27 if not data:break 28 print('收到客戶端的消息是',data,self.client_address) 29 30 #發消息 31 self.request.sendall(data.upper()) 32 33 except Exception as e: 34 print(e) 35 break 36 37 if __name__ == '__main__': 38 s=socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyServer) #多線程 39 # s=socketserver.ForkingTCPServer(('127.0.0.1',8080),MyServer) #多進程 40 41 # self.server_address = server_address 42 # self.RequestHandlerClass = RequestHandlerClass 43 print(s.server_address) 44 print(s.RequestHandlerClass) 45 print(MyServer) 46 print(s.socket) 47 print(s.server_address) 48 s.serve_forever()
1 from socket import * 2 ip_port=('192.168.12.63',8080) 3 back_log=5 4 buffer_size=1024 5 6 tcp_client=socket(AF_INET,SOCK_STREAM) 7 tcp_client.connect(ip_port) 8 9 while True: 10 msg=input('>>: ').strip() 11 if not msg:continue 12 if msg == 'quit':break 13 14 tcp_client.send(msg.encode('utf-8')) 15 16 data=tcp_client.recv(buffer_size) 17 print('收到服務端發來的消息:',data.decode('utf-8')) 18 19 tcp_client.close()
1 from socket import * 2 import struct 3 from functools import partial 4 ip_port=('192.168.12.63',8080) 5 back_log=5 6 buffer_size=1024 7 8 tcp_client=socket(AF_INET,SOCK_STREAM) 9 tcp_client.connect(ip_port) 10 11 while True: 12 msg=input('>>: ').strip() 13 if not msg:continue 14 if msg == 'quit':break 15 16 tcp_client.send(msg.encode('utf-8')) 17 18 data=tcp_client.recv(buffer_size) 19 print('收到服務端發來的消息:',data.decode('utf-8')) 20 21 22 23 tcp_client.close()
源碼剖析:
server類:
request類:
繼承關係:
基於TCP的socketserver源碼:
ftpserver=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FtpServer)
ftpserver.serve_forever()
查找屬性的順序:ThreadingTCPServer->ThreadingMixIn->TCPServer->BaseServer
一、實例化獲得ftpserver,先找類ThreadingTCPServer的__init__,在TCPServer中找到,進而執行server_bind,server_active
二、找ftpserver下的serve_forever,在BaseServer中找到,進而執行self._handle_request_noblock(),該方法一樣是在BaseServer中
三、執行self._handle_request_noblock()進而執行request, client_address = self.get_request()(就是TCPServer中的self.socket.accept()),而後執行self.process_request(request, client_address)
四、在ThreadingMixIn中找到process_request,開啓多線程應對併發,進而執行process_request_thread,執行self.finish_request(request, client_address)
五、上述四部分完成了連接循環,本部分開始進入處理通信部分,在BaseServer中找到finish_request,觸發咱們本身定義的類的實例化,去找__init__方法,而咱們本身定義的類沒有該方法,則去它的父類也就是BaseRequestHandler中找....
源碼分析總結:
基於tcp的socketserver咱們本身定義的類中的
一、self.server即套接字對象
二、self.request即一個連接
三、self.client_address即客戶端地址
基於udp的socketserver咱們本身定義的類中的
self.request是一個元組(第一個元素是客戶端發來的數據,第二部分是服務端的udp套接字對象),如(b'adsf', <socket.socket fd=200, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8080)>)
self.client_address即客戶端地址
SocketServer的ThreadingTCPServer之因此能夠同時處理請求得益於 select 和 Threading 兩個東西,其實本質上就是在服務器端爲每個客戶端建立一個線程,當前線程用來處理對應客戶端的請求,因此,能夠支持同時n個客戶端連接(長鏈接)。
3、線程
線程是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務
一、threading模塊:創建在thread 模塊之上。thread模塊以低級、原始的方式來處理和控制線程,而threading 模塊經過對thread進行二次封裝,提供了更方便的api來處理線程。
線程建立有2種方式:
①直接調用(主流寫法)
1 import threading,time 2 def run(n): #定義每一個線程要運行的函數 3 print("test...",n) 4 time.sleep(2) 5 if __name__ == '__main__': 6 t1 = threading.Thread(target=run,args=("t1",)) #生成一個線程實例 //要加「,」號 7 t2 = threading.Thread(target=run,args=("t2",)) #生成另外一個線程實例 8 # 兩個同時執行,而後等待兩秒程序結束 9 t1.start()#啓動線程 10 t2.start()#啓動另外一個線程 11 print(t1.getName()) #獲取線程名 12 print(t2.getName()) 13 # 程序輸出 14 # test... t1 15 # test... t2
②繼承式調用(非主流寫法)(瞭解)
1 import threading,time 2 class MyThread(threading.Thread): 3 def __init__(self,num): 4 # threading.Thread.__init__(self) 5 super(MyThread,self).__init__() 6 self.num =num 7 def run(self):#定義每一個線程要運行的函數 8 print("running on number:%s" %self.num) 9 time.sleep(2) 10 if __name__ == '__main__': 11 # 兩個同時執行,而後等待兩秒程序結束 12 t1 = MyThread(1) 13 t2 = MyThread(2) 14 t1.start() 15 t2.start() 16 # 程序輸出 17 # running on number:1 18 # running on number:2
二、join(等待線程):等待線程執行完後,其餘線程再繼續執行(串行)
1 import threading,time 2 def run(n,sleep_time): 3 print("test...",n) 4 time.sleep(sleep_time) 5 print("test...done", n) 6 if __name__ == '__main__': 7 t1 = threading.Thread(target=run,args=("t1",2)) 8 t2 = threading.Thread(target=run,args=("t2",3) 9 # 兩個同時執行,而後等待t1執行完成後,主線程和子線程再開始執行 10 t1.start() 11 t2.start() 12 t1.join() # 等待t1 13 print("main thread") 14 # 程序輸出 15 # test... t1 16 # test... t2 17 # test...done t1 18 # main thread 19 # test...done t2
三、Daemon(守護線程):守護進程,主程序執行完畢時,守護線程會同時退出,無論是否執行完任務
1 import threading,time 2 def run(n): 3 print('[%s]------running----\n' % n) 4 time.sleep(2) 5 print('--done--') 6 def main(): 7 for i in range(5): 8 t = threading.Thread(target=run, args=[i, ]) 9 t.start() 10 t.join(1) 11 print('starting thread', t.getName()) 12 m = threading.Thread(target=main, args=[]) 13 m.setDaemon(True) # 將main線程設置爲Daemon線程,它作爲程序主線程的守護線程,當主線程退出時, 14 # m線程也會退出,由m啓動的其它子線程會同時退出,無論是否執行完任務 15 m.start() #注意:setDaemon必定在start以前設置 16 m.join(timeout=2) 17 print("---main thread done----") 18 # 程序輸出 19 # [0]------running---- 20 # starting thread Thread-2 21 # [1]------running---- 22 # --done-- 23 # ---main thread done----
1 其它方法 2 # run(): 線程被cpu調度後自動執行線程對象的run方法 3 # start():啓動線程活動。 4 # isAlive(): 返回線程是否活動的。 5 # getName(): 返回線程名。 6 # setName(): 設置線程名。 7 8 threading模塊提供的一些方法: 9 # threading.currentThread(): 返回當前的線程變量。 10 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 11 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
四、Mutex(同步鎖)(互斥鎖):因爲線程之間是進行隨機調度,而且每一個線程可能只執行n條執行以後,當多個線程同時修改同一條數據時可能會出現髒數據,因此,出現了線程鎖 - 同一時刻容許一個線程執行操做。(*注:不要在3.x上運行,不知爲何,3.x上的結果老是正確的,多是自動加了鎖)
1 import time 2 import threading 3 def addNum(): 4 global num # 在每一個線程中都獲取這個全局變量 5 print('--get num:', num) 6 time.sleep(1) 7 lock.acquire() # //修改數據前加鎖// 8 num -= 1 # 對此公共變量進行-1操做 9 lock.release() # //修改後釋放// 10 num = 100 # 設定一個共享變量 11 thread_list = [] 12 lock = threading.Lock() # //生成全局鎖// 13 for i in range(100): 14 t = threading.Thread(target=addNum) 15 t.start() 16 thread_list.append(t) 17 for t in thread_list: # 等待全部線程執行完畢 18 t.join() 19 print('final num:', num)
注:GIL,不管啓多少個線程,你有多少個cpu, Python在執行的時候在同一時刻只容許一個線程運行
五、死鎖:在線程間共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源,就會形成死鎖
RLock(遞歸鎖) :解決死鎖;爲了支持在同一線程中屢次請求同一資源,python提供了「可重入鎖」:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次acquire。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。
1 import threading, time 2 3 def run1(): 4 print("grab the first part data") 5 lock.acquire() 6 global num 7 num += 1 8 lock.release() 9 return num 10 11 def run2(): 12 print("grab the second part data") 13 lock.acquire() 14 global num2 15 num2 += 1 16 lock.release() 17 return num2 18 19 def run3(): 20 lock.acquire() 21 res = run1() 22 print('--------between run1 and run2-----') 23 res2 = run2() 24 lock.release() 25 print(res, res2) 26 27 if __name__ == '__main__': 28 num, num2 = 0, 0 29 lock = threading.RLock() 30 for i in range(10): 31 t = threading.Thread(target=run3) 32 t.start() 33 34 while threading.active_count() != 1: 35 print(threading.active_count()) 36 else: 37 print('----all threads done---') 38 print(num, num2)
六、Semaphore (信號量):Mutex 同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據 ,好比廁全部3個坑,那最多隻容許3我的上廁所,後面的人只能等裏面有人出來了才能再進去。
信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數器,每當調用acquire()時-1,調用release()時+1;計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release(); BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。
1 import threading, time 2 def run(n): 3 semaphore.acquire() ### 4 time.sleep(1) 5 print("run the thread: %s\n" % n) 6 semaphore.release() ### 7 if __name__ == '__main__': 8 num = 0 9 semaphore = threading.BoundedSemaphore(5) # 最多容許5個線程同時運行 10 for i in range(20): 11 t = threading.Thread(target=run, args=(i,)) 12 t.start() 13 while threading.active_count() != 1: 14 pass # print threading.active_count() 15 else: 16 print('----all threads done---') 17 print(num)
七、Event(事件):實現兩個或多個線程間的交互
下面是一個紅綠燈的例子,即起動一個線程作交通指揮燈,生成幾個線程作車輛,車輛行駛按紅燈停,綠燈行的規則。
1 import threading,time 2 3 def light(): 4 count = 0 5 while True: 6 if count < 10: #紅燈 7 print("\033[41;1m紅燈\033[0m",10-count) 8 elif count >= 10 and count < 30: #綠燈 9 event.set() # 設置標誌位 10 print("\033[42;1m綠燈\033[0m",30-count) 11 else: 12 event.clear() #把標誌位清空 13 count = 0 14 time.sleep(1) 15 count +=1 16 17 def car(n): 18 while True: 19 if event.is_set(): #檢測是否有標誌位 20 print("\033[32;0m[%s]在路上飛奔.....\033[0m"%n) 21 else: 22 print("\033[31;0m[%s]等紅燈等的花都謝了.....\033[0m" % n) 23 time.sleep(1) 24 25 if __name__ == "__main__": 26 event = threading.Event() 27 28 light = threading.Thread(target=light) 29 light.start() 30 31 car = threading.Thread(target=car,args=("tesla",)) 32 car.start()
八、queue(隊列)-- 多線程利器:實現解耦、隊列;先進先出,後進後出(當get不到數據時,會一直卡着等待數據)
列表是不安全的數據結構
1 import threading,time 2 3 li=[1,2,3,4,5] 4 5 def pri(): 6 while li: 7 a=li[-1] 8 print(a) 9 time.sleep(1) 10 try: 11 li.remove(a) 12 except Exception as e: 13 print('----',a,e) 14 15 t1=threading.Thread(target=pri,args=()) 16 t1.start() 17 t2=threading.Thread(target=pri,args=()) 18 t2.start()
######queue列隊類的方法####### #建立一個「隊列」對象 ①import Queue ②q = Queue.Queue(maxsize = 10) ③Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。 #將一個值放入隊列中 ①q.put(10) 調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item爲必需的,爲插入項目的值;第二個block爲可選參數,默認爲 1。若是隊列當前爲空且block爲1,put()方法就使調用線程暫停,直到空出一個數據單元。若是block爲0,put方法將引起Full異常。 #將一個值從隊列中取出 ①q.get()調用隊列對象的get()方法從隊頭刪除並返回一個項目。可選參數爲block,默認爲True。若是隊列爲空且block爲True, ②get()就使調用線程暫停,直至有項目可用。若是隊列爲空且block爲False,隊列將引起Empty異常。 #Python Queue模塊有三種隊列及構造函數: ①Python Queue模塊的FIFO隊列先進先出。 class queue.Queue(maxsize) ②LIFO相似於堆,即先進後出。 class queue.LifoQueue(maxsize) ③還有一種是優先級隊列級別越低越先出來。 class queue.PriorityQueue(maxsize) #此包中的經常使用方法(q = Queue.Queue()): ①q.qsize() 返回隊列的大小 ②q.empty() 若是隊列爲空,返回True,反之False ③q.full() 若是隊列滿了,返回True,反之False ④q.full 與 maxsize 大小對應 ⑤q.get([block[, timeout]]) 獲取隊列,timeout等待時間 ⑥q.get_nowait() 至關q.get(False) 非阻塞 ⑦q.put(item) 寫入隊列,timeout等待時間 ⑧q.put_nowait(item) 至關q.put(item, False) ⑨q.task_done() 在完成一項工做以後,q.task_done() 函數向任務已經完成的隊列發送一個信號 ⑩q.join() 實際上意味着等到隊列爲空,再執行別的操做
1 import queue 2 3 4 q = queue.Queue() 5 for i in range(10): 6 q.put(i) 7 8 for t in range(10): 9 print(q.get()) 10 11 # 0 12 # 1 13 # 2 14 # 3 15 # 4 16 # 5 17 # 6 18 # 7 19 # 8 20 # 9
1 ##爲何要使用生產者和消費者模式? 2 在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。 3 4 ##什麼是生產者消費者模式? 5 生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。例如:在餐廳,廚師作好菜,不須要直接和客戶交流,而是交給前臺,而客戶去飯菜也不須要不找廚師,直接去前臺領取便可,這也是一個結耦的過程。
1 import time,random 2 import queue,threading 3 4 q = queue.Queue() 5 6 def Producer(name): 7 count = 0 8 while count <10: 9 print("making........") 10 time.sleep(random.randrange(3)) 11 q.put(count) 12 print('Producer %s has produced %s baozi..' %(name, count)) 13 count +=1 14 #q.task_done() 15 #q.join() 16 print("ok......") 17 def Consumer(name): 18 count = 0 19 while count <10: 20 time.sleep(random.randrange(4)) 21 if not q.empty(): 22 data = q.get() 23 #q.task_done() 24 #q.join() 25 print(data) 26 print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) 27 else: 28 print("-----no baozi anymore----") 29 count +=1 30 31 p1 = threading.Thread(target=Producer, args=('A',)) 32 c1 = threading.Thread(target=Consumer, args=('B',)) 33 # c2 = threading.Thread(target=Consumer, args=('C',)) 34 # c3 = threading.Thread(target=Consumer, args=('D',)) 35 p1.start() 36 c1.start() 37 # c2.start() 38 # c3.start()
python多線程,不適合cpu密集操做型的任務,適合io操做密集型的任務。
因爲GIL的存在,python中的多線程其實並非真正的多線程(上下文的切換),若是想要充分地使用多核CPU的資源,在python中大部分狀況須要使用多進程。
4、進程
進程:一個總體的形式暴露給操做系統管理,裏面包含了對各類資源的調用,內存的管理,網絡接口的調用等;對各類資源的管理集合
一、multiprocessing(多進程模塊)
multiprocessing包是Python中的多進程管理包。與threading.Thread相似,它能夠利用multiprocessing.Process對象來建立一個進程。該進程能夠運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象能夠像多線程那樣,經過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。因此,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。
Process類:
#構造方法: Process([group [, target [, name [, args [, kwargs]]]]]) group: 線程組,目前尚未實現,庫引用中提示必須是None; target: 要執行的方法; name: 進程名; args/kwargs: 要傳入方法的參數。 #實例方法: is_alive():返回進程是否在運行。 join([timeout]):阻塞當前上下文環境的進程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。 start():進程準備就緒,等待CPU調度 run():strat()調用run方法,若是實例進程時未制定傳入target,這star執行t默認run()方法。 terminate():無論任務是否完成,當即中止工做進程 #屬性: daemon:和線程的setDeamon功能同樣 name:進程名字。 pid:進程號。
1 import multiprocessing,time 2 import threading 3 4 def thread_run(): 5 print("thread id ",threading.get_ident()) #獲取當前線程號 6 7 def run(name): 8 time.sleep(1) 9 print("process----",name) 10 t = threading.Thread(target=thread_run,) #進程裏面能夠建立線程 11 t.start() 12 13 if __name__ == "__main__": 14 15 for i in range(10): 16 p = multiprocessing.Process(target=run,args=("solo",)) 17 p.start()
1 #多進程id 2 3 from multiprocessing import Process 4 import os 5 6 def info(title): 7 print(title) 8 print('module name:', __name__) 9 print('parent process:', os.getppid()) # 父進程id 10 print('process id:', os.getpid()) # 子進程id 11 12 def f(name): 13 info('\033[31;1mfunction f\033[0m') 14 print('hello', name) 15 16 if __name__ == '__main__': 17 info('\033[32;1mmain process line\033[0m') 18 p = Process(target=f, args=('bob',)) 19 p.start() 20 p.join() 21 22 # 輸出 23 # main process line 24 # module name: __main__ 25 # parent process: 7668 26 # process id: 7496 27 # function f 28 # module name: __mp_main__ 29 # parent process: 7496 30 # process id: 7188 31 # hello bob
二、進程間通訊
不一樣進程間內存是不共享的,要想實現兩個進程間的數據交換要用一些中間鍵。
① Queue
Queue使用方法跟threading裏的queue差很少。
注:不是修改同一份數據,而是修改傳遞的數據。
1 from multiprocessing import Process, Queue 2 3 def f(q): 4 q.put([42, None, 'hello']) 5 6 if __name__ == '__main__': 7 q = Queue() 8 p = Process(target=f, args=(q,)) 9 p.start() 10 print(q.get()) # 進程p從進程q中拿數據 11 p.join() 12 13 #輸出 [42, None, 'hello']
② Pipe
1 import multiprocessing 2 3 def f(conn): 4 conn.send("hello from child") 5 conn.close() 6 7 pass 8 9 if __name__ == "__main__": 10 parent_conn,child_conn = multiprocessing.Pipe() 11 p = multiprocessing.Process(target=f,args=(child_conn,)) 12 p.start() 13 print(parent_conn.recv()) 14 p.join() 15 ##不是單向,是雙向的 16 17 #輸出 hello from child
③ Manager
Queue和pipe只是實現了數據交互,並沒實現數據共享,即一個進程去更改另外一個進程的數據。注:manager自己已經有鎖了,不須要再加鎖。
1 from multiprocessing import Process, Manager 2 import os 3 def f(d, l): 4 d[os.getpid()] =os.getpid() 5 l.append(os.getpid()) #給列表添加進程號 6 print(l) 7 8 if __name__ == '__main__': 9 with Manager() as manager: 10 d = manager.dict() #{} #生成一個字典,可在多個進程間共享和傳遞 11 12 l = manager.list(range(5))#生成一個列表,可在多個進程間共享和傳遞 13 p_list = [] #進程p的空列表 14 for i in range(10): 15 p = Process(target=f, args=(d, l)) 16 p.start() 17 p_list.append(p) 18 for res in p_list: #等待結果 19 res.join() 20 21 print(d) 22 print(l) 23 24 #輸出 25 # [0, 1, 2, 3, 4, 7420] 26 # [0, 1, 2, 3, 4, 7420, 8756] 27 # [0, 1, 2, 3, 4, 7420, 8756, 9404] 28 # [0, 1, 2, 3, 4, 7420, 8756, 9404, 8844] 29 # [0, 1, 2, 3, 4, 7420, 8756, 9404, 8844, 9852] 30 # [0, 1, 2, 3, 4, 7420, 8756, 9404, 8844, 9852, 9288] 31 # [0, 1, 2, 3, 4, 7420, 8756, 9404, 8844, 9852, 9288, 6360] 32 # [0, 1, 2, 3, 4, 7420, 8756, 9404, 8844, 9852, 9288, 6360, 10076] 33 # [0, 1, 2, 3, 4, 7420, 8756, 9404, 8844, 9852, 9288, 6360, 10076, 1728] 34 # [0, 1, 2, 3, 4, 7420, 8756, 9404, 8844, 9852, 9288, 6360, 10076, 1728, 4536] 35 # {1728: 1728, 9852: 9852, 10076: 10076, 6360: 6360, 8756: 8756, 7420: 7420, 9288: 9288, 9404: 9404, 4536: 4536, 8844: 8844} 36 # [0, 1, 2, 3, 4, 7420, 8756, 9404, 8844, 9852, 9288, 6360, 10076, 1728, 4536]
④ 進程同步
進程獨立運行,不涉及多個進程同事修改同一份數據。加鎖是爲了防止屏幕共享打印的時候出現混亂現象。
1 from multiprocessing import Process, Lock 2 3 def f(l, i): 4 l.acquire() 5 print('hello world', i) 6 l.release() 7 8 if __name__ == '__main__': 9 lock = Lock() 10 for num in range(10): 11 Process(target=f, args=(lock, num)).start()
三、進程池
進程建立子進程的過程,子進程克隆了一遍父進程裏的數據,若是父進程佔用空間特別大,子進程啓動過多就會致使系統空間不夠用,因此引出了進程池的概念;進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。
進程池中有兩個方法:
①apply 同步執行(串行)
②apply_async 異步執行(並行)
1 from multiprocessing import Process, Pool 2 import time,os 3 4 def Foo(i): 5 time.sleep(2) 6 print("in process",os.getpid()) 7 return i + 100 8 9 def Bar(arg): 10 print('-->exec done:',arg,os.getpid()) 11 12 if __name__ == "__main__": #執行此本程序(腳本)如下代碼被自動執行本程序(腳本)被當作模塊被導入,則如下代碼不執行 13 #freeze_support() 14 pool = Pool(5) #容許進程池同時放入5個進程 15 print("主進程:",os.getpid()) 16 for i in range(10): 17 #pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback回調 執行完func後再執行callback,由程序調用行 18 pool.apply(func=Foo, args=(i,)) #串行 19 #pool.apply_async(func=Foo, args=(i,)) #並行 20 pool.close() 21 pool.join() # 進程池中進程執行完畢後再關閉,若是註釋,那麼程序直接關閉。 22 23 #輸出 24 #主進程: 733 25 # in process 736 26 # in process 735 27 # in process 737 28 # in process 738 29 # in process 734 30 # -->exec done: 102 733 31 # -->exec done: 101 733 32 # -->exec done: 103 733 33 # -->exec done: 104 733 34 # -->exec done: 100 733 35 # in process 734 36 # in process 737 37 # in process 735 38 # in process 736 39 # in process 738 40 # -->exec done: 109 733 41 # -->exec done: 107 733 42 # -->exec done: 106 733 43 # -->exec done: 108 733 44 # -->exec done: 105 733
3、協程
協程是一種用戶態的輕量級線程。
協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
好處:
①無需線程上下文切換的開銷
②無需原子操做鎖定及同步的開銷
③方便切換控制流,簡化編程模型
④高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。因此很適合用於高併發處理。
缺點:
①沒法利用多核資源:協程的本質是個單線程,它不能同時將 單個CPU 的多個核用上,協程須要和進程配合才能運行在多CPU上.②固然咱們平常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
③進行阻塞(Blocking)操做(如IO時)會阻塞掉整個程序
一、生成器(函數+yield)實現協程操做 ,單線程下實現多併發的效果
1 def consumer(name): 2 print("------>starting eating baozi..") 3 while True: 4 new_baozi = yield #等待下一次喚醒賦值 5 print("[%s] is eating baozi %s"%(name,new_baozi)) 6 7 def producer(): 8 n = 0 9 while n < 5 : 10 n +=1 11 con.send(n) #喚醒yield而且傳值 12 con2.send(n) 13 print("\033[32;1m[producer]\033[0m is making baozi %s" % n) 14 15 if __name__ == "__main__": 16 con = consumer("c1") #生成生成器 17 con2 = consumer("c2") 18 con.__next__() #喚醒yield 19 con2.__next__() 20 producer() 21 22 # 輸出 23 # ------>starting eating baozi.. 24 # ------>starting eating baozi.. 25 # [c1] is eating baozi 1 26 # [c2] is eating baozi 1 27 # [producer] is making baozi 1 28 # [c1] is eating baozi 2 29 # [c2] is eating baozi 2 30 # [producer] is making baozi 2 31 # [c1] is eating baozi 3 32 # [c2] is eating baozi 3 33 # [producer] is making baozi 3 34 # [c1] is eating baozi 4 35 # [c2] is eating baozi 4 36 # [producer] is making baozi 4 37 # [c1] is eating baozi 5 38 # [c2] is eating baozi 5 39 # [producer] is making baozi 5
注:協程之因此能夠出來高併發,原理遇到I/O操做就切換,只剩下CPU操做(CPU操做很是快)。用yield顯然沒有實現此效果。
二、greenlet :封裝好的協程,利用.swith對協程操做進行手動切換。(竟然是手動。。。)
1 from greenlet import greenlet 2 3 def test1(): 4 print("in test1 12") 5 gr2.switch() 6 print("in test1 34") 7 gr2.switch() 8 9 def test2(): 10 print("in test2 56") 11 gr1.switch() 12 print("in test2 78") 13 gr1 = greenlet(test1) #啓動一個協程 14 gr2 = greenlet(test2) 15 gr1.switch() #切換操做 相似於yeild裏的next() 16 17 # 輸出 18 # in test1 12 19 # in test2 56 20 # in test1 34 21 # in test2 78
三、gevent:是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。(自動的)
1 import gevent 2 3 def foo(): 4 print("runing in foo") 5 gevent.sleep(2) 6 print("context swith to foo again") 7 8 def bar(): 9 print("context to bar") 10 gevent.sleep(1) 11 print("context to swith bar to bar") 12 13 gevent.joinall([ #啓動協程 14 gevent.spawn(foo), 15 gevent.spawn(bar), 16 ]) 17 18 #輸出 19 # runing in foo 20 # context to bar 21 # context to swith bar to bar 22 # context swith to foo again
①生產環境下,利用gevent作同步與異步的性能對比
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import urllib.request 5 import gevent,time 6 from gevent import monkey 7 monkey.patch_all() #monkey.patch_all()執行後能夠識別urllib裏面的I/0操做 8 9 def f(url): 10 print("GET: %s"%url) 11 resp = urllib.request.urlopen(url) 12 data = resp.read() 13 print("%d bytes received from %s"%(len(data),url)) 14 15 # 同步開銷 16 urls = [ 17 'https://www.python.org/', 18 'https://www.yahoo.com/', 19 'https://github.com/', 20 ] 21 time_start = time.time() 22 for url in urls: 23 f(url) 24 print("同步cost time",time.time()-time_start) 25 26 # 異步開銷 27 async_time_start = time.time() 28 gevent.joinall([ 29 gevent.spawn(f,'https://www.python.org/'), 30 gevent.spawn(f,'https://www.yahoo.com/'), 31 gevent.spawn(f,'https://github.com/') 32 ]) 33 print("異步cost time",time.time()-async_time_start) 34 35 # 輸出 36 # GET: https://www.python.org/ 37 # 48853 bytes received from https://www.python.org/ 38 # GET: https://www.yahoo.com/ 39 # 515012 bytes received from https://www.yahoo.com/ 40 # GET: https://github.com/ 41 # 51379 bytes received from https://github.com/ 42 # 同步cost time 4.631264925003052 43 # GET: https://www.python.org/ 44 # GET: https://www.yahoo.com/ 45 # GET: https://github.com/ 46 # 515013 bytes received from https://www.yahoo.com/ 47 # 51381 bytes received from https://github.com/ 48 # 48853 bytes received from https://www.python.org/ 49 # 異步cost time 1.8811075687408447
由上面程序可知,同步開銷時間爲4秒,異步開銷爲2.5秒,大大節省了開銷,這就是協程的魅力;monkey.patch_all()使gevent能識別到urllib中的I/O操做(原始的gevent不能識別socket、urllib的I/O操做)
②經過gevent實現單線程下的多socket併發
1 import sys 2 import socket 3 import time 4 import gevent 5 6 from gevent import socket,monkey 7 monkey.patch_all() 8 9 10 def server(port): 11 s = socket.socket() 12 s.bind(('0.0.0.0', port)) 13 s.listen(5) 14 while True: 15 conn, addr = s.accept() 16 gevent.spawn(handle_request, conn) 17 18 19 20 def handle_request(conn): 21 try: 22 while True: 23 data = conn.recv(1024) 24 print("recv:", data) 25 conn.send(data) 26 if not data: 27 conn.shutdown(socket.SHUT_WR) 28 29 except Exception as ex: 30 print(ex) 31 finally: 32 conn.close() 33 if __name__ == '__main__': 34 server(8001)
1 import socket 2 3 HOST = 'localhost' # The remote host 4 PORT = 8001 # The same port as used by the server 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 6 s.connect((HOST, PORT)) 7 while True: 8 msg = bytes(input(">>:"),encoding="utf8") 9 s.sendall(msg) 10 data = s.recv(1024) 11 #print(data) 12 13 print('Received', repr(data)) 14 s.close()
知道了異步的優勢,當遇到I/0操做時會進行切換操做,那麼程序是如何知道以前的I/O執行完畢再切換回來的呢?。。。follow me
4、事件驅動與異步IO
一般,咱們寫服務器處理模型的程序時,有如下幾種模型:
(1)每收到一個請求,建立一個新的進程,來處理該請求;
(2)每收到一個請求,建立一個新的線程,來處理該請求;
(3)每收到一個請求,放入一個事件列表,讓主進程經過非阻塞I/O方式來處理請求
分析:
第(1)種模型,因爲建立新的進程的開銷比較大,因此,會致使服務器性能比較差,但實現比較簡單。
第(2)種模型,因爲要涉及到線程的同步,有可能會面臨死鎖等問題。
第(3)種模型,在寫應用程序代碼時,邏輯比前面兩種都複雜。
綜述:通常廣泛認爲第(3)種方式是大多數網絡服務器採用的方式
看圖說話講事件驅動模型
在UI編程中,經常要對鼠標點擊進行響應,首先如何得到鼠標點擊呢?
方式一:建立一個線程,該線程一直循環檢測是否有鼠標點擊,那麼這個方式有如下幾個缺點:
①. CPU資源浪費,可能鼠標點擊的頻率很是小,可是掃描線程仍是會一直循環檢測,這會形成不少的CPU資源浪費;若是掃描鼠標點擊的接口是阻塞的呢?
②. 若是是堵塞的,又會出現下面這樣的問題,若是咱們不但要掃描鼠標點擊,還要掃描鍵盤是否按下,因爲掃描鼠標時被堵塞了,那麼可能永遠不會去掃描鍵盤;
③. 若是一個循環須要掃描的設備很是多,這又會引來響應時間的問題;
因此,該方式是很是很差的
方式二:就是事件驅動模型
目前大部分的UI編程都是事件驅動模型,如不少UI平臺都會提供onClick()事件,這個事件就表明鼠標按下事件。事件驅動模型大致思路以下:
①. 有一個事件(消息)隊列;
②. 鼠標按下時,往這個隊列中增長一個點擊事件(消息);
③. 有個循環,不斷從隊列取出事件,根據不一樣的事件,調用不一樣的函數,如onClick()、onKeyDown()等;
④. 事件(消息)通常都各自保存各自的處理函數指針,這樣,每一個消息都有獨立的處理函數;
事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特色是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。
讓咱們用例子來比較和對比一下單線程、多線程以及事件驅動編程模型。下圖展現了隨着時間的推移,這三種模式下程序所作的工做。這個程序有3個任務須要完成,每一個任務都在等待I/O操做時阻塞自身。阻塞在I/O操做上所花費的時間已經用灰色框標示出來了
(1)單線程同步模型中,任務按照順序執行。若是某個任務由於I/O而阻塞,其餘全部的任務都必須等待,直到它完成以後它們才能依次執行。這種明確的執行順序和串行化處理的行爲是很容易推斷得出的。若是任務之間並無互相依賴的關係,但仍然須要互相等待的話這就使得程序沒必要要的下降了運行速度。
(2)多線程版本中,這3個任務分別在獨立的線程中執行。這些線程由操做系統來管理,在多處理器系統上能夠並行處理,或者在單處理器系統上交錯執行。這使得當某個線程阻塞在某個資源的同時其餘線程得以繼續執行。與完成相似功能的同步程序相比,這種方式更有效率,但程序員必須寫代碼來保護共享資源,防止其被多個線程同時訪問。多線程程序更加難以推斷,由於這類程序不得不經過線程同步機制如鎖、可重入函數、線程局部存儲或者其餘機制來處理線程安全問題,若是實現不當就會致使出現微妙且使人痛不欲生的bug。
(3)事件驅動版本的程序中,3個任務交錯執行,但仍然在一個單獨的線程控制中。當處理I/O或者其餘昂貴的操做時,註冊一個回調到事件循環中,而後當I/O操做完成時繼續執行。回調描述了該如何處理某個事件。事件循環輪詢全部的事件,當事件到來時將它們分配給等待處理事件的回調函數。這種方式讓程序儘量的得以執行而不須要用到額外的線程。事件驅動型程序比多線程程序更容易推斷出行爲,由於程序員不須要關心線程安全問題。
當咱們面對以下的環境時,事件驅動模型一般是一個好的選擇:
①程序中有許多任務,並且…
②任務之間高度獨立(所以它們不須要互相通訊,或者等待彼此)並且…
③在等待事件到來時,某些任務會阻塞。
④當應用程序須要在任務間共享可變的數據時,這也是一個不錯的選擇,由於這裏不須要採用同步處理。
網絡應用程序一般都有上述這些特色,這使得它們可以很好的契合事件驅動編程模型。
總結:異步IO涉及到了事件驅動模型,進程中維護一個消息隊列,當客戶端又請求時,就會把請求添加到消息隊列中,線程從消息隊列中輪詢取要處理的請求,遇到I/O阻塞時(操做系統處理調用I/O接口處理,與程序無關),則進行上下文切換,處理其餘請求,當I/O操做完成時,調用回調函數,告訴線程處理完成,而後再切換回來,處理完成後返回給客戶端 Nginx能處理高併發就是用的這個原理
5、I/O五種網絡模式
1、概念說明
同步IO和異步IO,阻塞IO和非阻塞IO分別是什麼,到底有什麼區別?不一樣的人在不一樣的環境給出的答案是不一樣的。因此先限定一下本文的環境。本文討論的背景是Linux環境下的network IO
在進行解釋以前,首先要說明幾個概念:
- 用戶空間和內核空間
- 進程切換
- 進程的阻塞
- 文件描述符
- 緩存 I/O
①用戶空間與內核空間
如今操做系統都是採用虛擬存儲器,那麼對32位操做系統而言,它的尋址空間(虛擬存儲空間)爲4G(2的32次方)。操做系統的核心是內核,獨立於普通的應用程序,能夠訪問受保護的內存空間,也有訪問底層硬件設備的全部權限。爲了保證用戶進程不能直接操做內核(kernel),保證內核的安全,操心繫統將虛擬空間劃分爲兩部分,一部分爲內核空間,一部分爲用戶空間。針對linux操做系統而言,將最高的1G字節(從虛擬地址0xC0000000到0xFFFFFFFF),供內核使用,稱爲內核空間,而將較低的3G字節(從虛擬地址0x00000000到0xBFFFFFFF),供各個進程使用,稱爲用戶空間。
②進程切換
爲了控制進程的執行,內核必須有能力掛起正在CPU上運行的進程,並恢復之前掛起的某個進程的執行。這種行爲被稱爲進程切換。所以能夠說,任何進程都是在操做系統內核的支持下運行的,是與內核緊密相關的。
從一個進程的運行轉到另外一個進程上運行,這個過程當中通過下面這些變化:
1. 保存處理機上下文,包括程序計數器和其餘寄存器。
2. 更新PCB信息。
3. 把進程的PCB移入相應的隊列,如就緒、在某事件阻塞等隊列。
4. 選擇另外一個進程執行,並更新其PCB。
5. 更新內存管理的數據結構。
6. 恢復處理機上下文。
注:總而言之就是很耗資源,具體的能夠參考這篇文章:進程切換
③進程的阻塞
正在執行的進程,因爲期待的某些事件未發生,如請求系統資源失敗、等待某種操做的完成、新數據還沒有到達或無新工做作等,則由系統自動執行阻塞原語(Block),使本身由運行狀態變爲阻塞狀態。可見,進程的阻塞是進程自身的一種主動行爲,也所以只有處於運行態的進程(得到CPU),纔可能將其轉爲阻塞狀態。當進程進入阻塞狀態,是不佔用CPU資源的。
④文件描述符fd
文件描述符(File descriptor)是計算機科學中的一個術語,是一個用於表述指向文件的引用的抽象化概念。
文件描述符在形式上是一個非負整數。實際上,它是一個索引值,指向內核爲每個進程所維護的該進程打開文件的記錄表。當程序打開一個現有文件或者建立一個新文件時,內核向進程返回一個文件描述符。在程序設計中,一些涉及底層的程序編寫每每會圍繞着文件描述符展開。可是文件描述符這一律念每每只適用於UNIX、Linux這樣的操做系統。
⑤緩存 I/O
緩存 I/O 又被稱做標準 I/O,大多數文件系統的默認 I/O 操做都是緩存 I/O。在 Linux 的緩存 I/O 機制中,操做系統會將 I/O 的數據緩存在文件系統的頁緩存( page cache )中,也就是說,數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。
緩存 I/O 的缺點:
數據在傳輸過程當中須要在應用程序地址空間和內核進行屢次數據拷貝操做,這些數據拷貝操做所帶來的 CPU 以及內存開銷是很是大的。
2、I/O模式
剛纔說了,對於一次IO訪問(以read舉例),數據會先被拷貝到操做系統內核的緩衝區中,而後纔會從操做系統內核的緩衝區拷貝到應用程序的地址空間。因此說,當一個read操做發生時,它會經歷兩個階段:
1. 等待數據準備 (Waiting for the data to be ready)
2. 將數據從內核拷貝到進程中 (Copying the data from the kernel to the process)
正式由於這兩個階段,linux系統產生了下面五種網絡模式的方案。
- ①阻塞 I/O(blocking IO)
- ②非阻塞 I/O(nonblocking IO)
- ③I/O 多路複用( IO multiplexing)
- ④信號驅動 I/O( signal driven IO)
- ⑤異步 I/O(asynchronous IO)
注:因爲signal driven IO在實際中並不經常使用,因此我這隻說起剩下的四種IO Model
①阻塞 I/O(blocking IO)
在linux中,默認狀況下全部的socket都是blocking,一個典型的讀操做流程大概是這樣:
當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據(對於網絡IO來講,不少時候數據在一開始尚未到達。好比,尚未收到一個完整的UDP包。這個時候kernel就要等待足夠的數據到來)。這個過程須要等待,也就是說數據被拷貝到操做系統內核的緩衝區中是須要一個過程的。而在用戶進程這邊,整個進程會被阻塞(固然,是進程本身選擇的阻塞)。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,而後kernel返回結果,用戶進程才解除block的狀態,從新運行起來
因此,blocking IO的特色就是在IO執行的兩個階段都被block了,大大消耗了程序執行的時間
②非阻塞 I/O(nonblocking IO)
linux下,能夠經過設置socket使其變爲non-blocking。當對一個non-blocking socket執行讀操做時,流程是這個樣子:
當用戶進程發出read操做時,若是kernel中的數據尚未準備好,那麼它並不會block用戶進程,而是馬上返回一個error。從用戶進程角度講 ,它發起一個read操做後,並不須要等待,而是立刻就獲得了一個結果。用戶進程判斷結果是一個error時,它就知道數據尚未準備好,因而它能夠再次發送read操做。一旦kernel中的數據準備好了,而且又再次收到了用戶進程的system call,那麼它立刻就將數據拷貝到了用戶內存,而後返回
因此,nonblocking IO的特色是用戶進程須要不斷的主動詢問kernel數據好了沒有,wait for data階段進程沒有等待,可是copydata從內核拷貝到用戶進程時,程序是阻塞狀態
③I/O 多路複用( IO multiplexing)
IO multiplexing就是咱們說的select,poll,epoll,有些地方也稱這種IO方式爲event driven IO。select/epoll的好處就在於單個process就能夠同時處理多個網絡鏈接的IO。它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的全部socket,當某個socket有數據到達了,就通知用戶進程
當用戶進程調用了select,那麼整個進程會被block,而同時,kernel會「監視」全部select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操做,將數據從kernel拷貝到用戶進程
因此,I/O 多路複用的特色是經過一種機制一個進程能同時等待多個文件描述符,而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()函數就能夠返回
這個圖和blocking IO的圖其實並無太大的不一樣,事實上,還更差一些。由於這裏須要使用兩個system call (select 和 recvfrom),而blocking IO只調用了一個system call (recvfrom)。可是,用select的優點在於它能夠同時處理多個connection。
因此,若是處理的鏈接數不是很高的話,使用select/epoll的web server不必定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優點並非對於單個鏈接能處理得更快,而是在於能處理更多的鏈接。)
在IO multiplexing Model中,實際中,對於每個socket,通常都設置成爲non-blocking,可是,如上圖所示,整個用戶的process實際上是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block
④異步 I/O(asynchronous IO)
linux下的asynchronous IO其實用得不多。先看一下它的流程
用戶進程發起read操做以後,馬上就能夠開始去作其它的事。而另外一方面,從kernel的角度,當它受到一個asynchronous read以後,首先它會馬上返回,因此不會對用戶進程產生任何block。而後,kernel會等待數據準備完成,而後將數據拷貝到用戶內存,當這一切都完成以後,kernel會給用戶進程發送一個signal,告訴它read操做完成了
3、總結
①blocking和non-blocking的區別
調用blocking IO會一直block住對應的進程直到操做完成,而non-blocking IO在kernel還準備數據的狀況下會馬上返回。
②synchronous IO和asynchronous IO的區別
在說明synchronous IO和asynchronous IO的區別以前,須要先給出二者的定義。POSIX的定義是這樣子的:
- A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
- An asynchronous I/O operation does not cause the requesting process to be blocked;
二者的區別就在於synchronous IO作」IO operation」的時候會將process阻塞。按照這個定義,以前所述的blocking IO,non-blocking IO,IO multiplexing都屬於synchronous IO。
有人會說,non-blocking IO並無被block啊。這裏有個很是「狡猾」的地方,定義中所指的」IO operation」是指真實的IO操做,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,若是kernel的數據沒有準備好,這時候不會block進程。可是,當kernel中數據準備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。
而asynchronous IO則不同,當進程發起IO 操做以後,就直接返回不再理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程當中,進程徹底沒有被block。
各個IO Model的比較如圖所示:
經過上面的圖片,能夠發現non-blocking IO和asynchronous IO的區別仍是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,可是它仍然要求進程去主動的check,而且當數據準備完成之後,也須要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。而asynchronous IO則徹底不一樣。它就像是用戶進程將整個IO操做交給了他人(kernel)完成,而後他人作完後發信號通知。在此期間,用戶進程不須要去檢查IO操做的狀態,也不須要主動的去拷貝數據。
6、剖析IO多路複用(多鏈接):協程和IO多路複用都是單線程
IO多路複用是指內核一旦發現進程指定的一個或者多個IO條件準備讀取,它就通知該進程。IO多路複用適用以下場合:
-當客戶處理多個描述字時(通常是交互式輸入和網絡套接口),必須使用I/O複用。
-當一個客戶同時處理多個套接口時,而這種狀況是可能的,但不多出現。
-若是一個TCP服務器既要處理監聽套接口,又要處理已鏈接套接口,通常也要用到I/O複用。
-若是一個服務器即要處理TCP,又要處理UDP,通常要使用I/O複用。
-若是一個服務器要處理多個服務或多個協議,通常要使用I/O複用。
與多進程和多線程技術相比,I/O多路複用技術的最大優點是系統開銷小,系統沒必要建立進程/線程,也沒必要維護這些進程/線程,從而大大減少了系統的開銷。
select模塊(實現僞併發)
Python中有一個select模塊,其中提供了:select、poll、epoll三個方法,分別調用系統的 select,poll,epoll 從而實現IO多路複用
1 Windows Python: 2 提供: select 3 Mac Python: 4 提供: select 5 Linux Python: 6 提供: select、poll、epoll
select
select最先於1983年出如今4.2BSD中,它經過一個select()系統調用來監視多個文件描述符的數組,當select()返回後,該數組中就緒的文件描述符便會被內核修改標誌位,使得進程能夠得到這些文件描述符從而進行後續的讀寫操做。select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢,事實上從如今看來,這也是它所剩很少的優勢之一。select的一個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,不過能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制。另外,select()所維護的存儲大量文件描述符的數據結構,隨着文件描述符數量的增大,其複製的開銷也線性增加。同時,因爲網絡響應時間的延遲使得大量TCP鏈接處於非活躍狀態,但調用select()會對全部socket進行一次線性掃描,因此這也浪費了必定的開銷。
總結:select目前幾乎在全部的平臺上支持,其良好跨平臺支持也是它的一個優勢。select的一 個缺點在於單個進程可以監視的文件描述符的數量存在最大限制,在Linux上通常爲1024,能夠經過修改宏定義甚至從新編譯內核的方式提高這一限制,可是這樣也會形成效率的下降。開銷隨着文件描述符數量的增長而線性增大
select直接經過操做系統提供的C的網絡接口進行操做,而不是經過Python的解釋器。
poll
poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差異,可是poll沒有最大文件描述符數量的限制。poll和select一樣存在一個缺點就是,包含大量文件描述符的數組被總體複製於用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨着文件描述符數量的增長而線性增大。另外,select()和poll()將就緒的文件描述符告訴進程後,若是進程沒有對其進行IO操做,那麼下次調用select()和poll()的時候將再次報告這些文件描述符,因此它們通常不會丟失就緒的消息,這種方式稱爲水平觸發(Level Triggered)。
總結:poll沒有最大文件描述符數量的限制;但開銷隨着文件描述符數量的增長而線性增大;採用水平觸發(告訴進程哪些文件描述符剛剛變爲就緒狀態,若是咱們沒有采起行動,下次調用select()和poll()的時候將再次報告這些文件描述符,通常不會丟失就緒的消息)
epoll
直到Linux2.6纔出現了由內核直接支持的實現方法,那就是epoll,它幾乎具有了以前所說的一切優勢,被公認爲Linux2.6下性能最好的多路I/O就緒通知方法。epoll能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知,這種方式稱爲邊緣觸發),理論上邊緣觸發的性能要更高一些,可是代碼實現至關複雜。epoll一樣只告知那些就緒的文件描述符,並且當咱們調用epoll_wait()得到就緒文件描述符時,返回的不是實際的描述符,而是一個表明就緒描述符數量的值,你只須要去epoll指定的一個數組中依次取得相應數量的文件描述符便可,這裏也使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。另外一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,進程只有在調用必定的方法後,內核纔對全部監視的文件描述符進行掃描,而epoll事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()
總結:epoll沒有最大文件描述符數量的限制;能夠同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變爲就緒狀態,它只說一遍,若是咱們沒有采起行動,那麼它將不會再次告知);使用了內存映射(mmap)技術,這樣便完全省掉了這些文件描述符在系統調用時複製的開銷。採用基於事件的就緒通知方式;事先經過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會採用相似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便獲得通知。
① select.select方法:
select函數須要3個序列做爲它的必選參數,此外還有一個可選的以秒單位的超時時間做爲第4個參數。3個序列用於輸入、輸出以及異常狀況(錯誤);若是沒有給定超時時間,select會阻塞(也就是處於等待狀態),知道其中的一個文件描述符以及爲行動作好了準備,若是給定了超時時間,select最多阻塞給定的超時時間,若是超時時間爲0,那麼就給出一個連續的poll(即不阻塞);select的返回值是3個序列,每一個表明相應參數的一個活動子集。第一個序列用於監聽socket對象內部是否發生變化,若是有變化表示有新的鏈接
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # _author_soloLi 4 import select 5 import socket 6 import sys 7 import queue 8 9 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 10 server.setblocking(0) 11 server_address = ('localhost', 8002) 12 print(sys.stderr, 'starting up on %s port %s' % server_address) 13 server.bind(server_address) 14 server.listen(5) 15 16 inputs = [ server ] 17 outputs = [ ] 18 message_queues = {} 19 20 while inputs: 21 print( '\nwaiting for the next event') 22 readable, writable, exceptional = select.select(inputs, outputs, inputs) 23 24 for s in readable: 25 if s is server: 26 connection, client_address = s.accept() 27 print('new connection from', client_address) 28 connection.setblocking(0) 29 inputs.append(connection) 30 message_queues[connection] = queue.Queue() 31 else: 32 data = s.recv(1024) 33 if data: 34 print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) ) 35 message_queues[s].put(data) 36 if s not in outputs: 37 outputs.append(s) 38 else: 39 print('closing', client_address, 'after reading no data') 40 if s in outputs: 41 outputs.remove(s) 42 inputs.remove(s) 43 s.close() 44 del message_queues[s] 45 46 for s in writable: 47 try: 48 next_msg = message_queues[s].get_nowait() 49 except queue.Empty: 50 print('output queue for', s.getpeername(), 'is empty') 51 outputs.remove(s) 52 else: 53 print( 'sending "%s" to %s' % (next_msg, s.getpeername())) 54 s.send(next_msg) 55 56 for s in exceptional: 57 print('handling exceptional condition for', s.getpeername() ) 58 inputs.remove(s) 59 if s in outputs: 60 outputs.remove(s) 61 s.close() 62 del message_queues[s] 63 64 server.close()
#!/usr/bin/env python # -*- coding:utf-8 -*- # _author_soloLi import select import socket import sys import queue server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 建立一個TCP socket server.setblocking(0) # 設置爲不阻塞 server_address = ('localhost', 8002) print(sys.stderr, 'starting up on %s port %s' % server_address) server.bind(server_address) # 綁定IP地址和端口 server.listen(5) # 監聽連接 inputs = [ server ] # 建立inputs[]通訊列表,被select()方法監控和接收全部外部發過來的data(outgoing data)//#本身也要監測呀,由於server自己也是個fd #inputs = [server,conn]//[conn,] #inputs = [server,conn,conn2]//[conn2,] outputs = [ ] # 建立outputs[]通訊列表,被select()方法監控和接收全部要發出去的data(outgoing data) #outputs = [r1,] # message_queues = {} # 建立message_queues = {}字典,每一個鏈接要把輸入或輸出的數據先緩存到queue裏 #全部客戶端的進來的鏈接和數據將會被server的主循環程序放在上面的list中處理,咱們如今的server端須要等待鏈接可寫(writable)以後才能過來,而後接收數據並返回(所以不是在接收到數據以後就馬上返回),由於每一個鏈接要把輸入或輸出的數據先緩存到queue裏,而後再由select取出來再發出去。 while inputs: #此程序的主循環,調用select()時會阻塞和等待直到新的鏈接和數據進來//#若是沒有任何fd就緒,那程序就會一直阻塞在這裏 print( '\nwaiting for the next event') readable, writable, exceptional = select.select(inputs, outputs, inputs) #把inputs,outputs,exceptional(這裏跟inputs共用)傳給select()後,它返回3個新的列表,readable,writable,exceptional #(1)readable列表:監聽服務端對象(內部是否發生變化,若是有變化表示有新的鏈接),當inputs列表有變化時,變化的值會賦值給readable_list中, # ①若是有新的鏈接進來,sk會發生變化,此時readable_list—的值爲sk # ②若是conn對象發生變化,表示客戶端發送了新的消息過來,此時readable_list的值爲客戶端鏈接 #(2)writeable列表:實現讀寫分離,存放能夠進行發送信息的conn對象 #(3)exceptional列表:存放鏈接通訊出現的error ##(一)操做inputs列表:客戶端發進來的數據列表 for s in readable: # 一、當客戶端第一次鏈接服務端時,未在inputs裏(新鏈接進來了,接受這個鏈接) if s is server: connection, client_address = s.accept() print('new connection from', client_address) connection.setblocking(0) inputs.append(connection) #添加到inputs #由於這個新創建的鏈接還沒發數據過來,如今就接收的話程序就報錯了, #因此要想實現這個客戶端發數據來時server端能知道,就須要讓select再監測這個conn message_queues[connection] = queue.Queue() #初始化一個隊列,接收到客戶端的數據後,不馬上返回,暫存在隊列裏,之後發送 # 二、當客戶端鏈接上服務端以後,再次發送數據時,已經存在inputs(s不是server的話,那就只能是一個與客戶端創建的鏈接的fd了) else: data = s.recv(1024) # ①當客戶端正常打開程序時(正常接收客戶端發送的數據) if data: print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) ) message_queues[s].put(data) #收到的數據先放到queue裏,一會返回給客戶端 if s not in outputs: outputs.append(s) #放入返回的鏈接隊列裏 # ②當客戶端關閉程序時 else: # Interpret empty result as closed connection print('closing', client_address, 'after reading no data') # Stop listening for input on the connection if s in outputs: outputs.remove(s) #既然客戶端都斷開了,我就不用再給它返回數據了,因此這時候若是這個客戶端的鏈接對象還在outputs列表中,就把它刪掉 inputs.remove(s) #inputs中也刪除掉 s.close() #把這個鏈接關閉掉 del message_queues[s] #隊列中也要刪掉 ##(二)操做outputs列表:要返回給客戶端的鏈接列表 for s in writable: try: next_msg = message_queues[s].get_nowait() except queue.Empty: #①客戶端鏈接在跟它對應的queue裏沒有數據 print('output queue for', s.getpeername(), 'is empty') outputs.remove(s) #把這個鏈接從outputs列表中移除(確保下次循環的時候writeable,不返回這個已經處理完的鏈接了) #②客戶端鏈接在跟它對應的queue裏有數據 else: print( 'sending "%s" to %s' % (next_msg, s.getpeername())) s.send(next_msg) #把這個數據發給客戶端 ##(三)操做exceptional列表:鏈接通訊過程當中出現的錯誤 for s in exceptional: print('handling exceptional condition for', s.getpeername() ) # 把這個鏈接對象在inputs\outputs\message_queue中都刪除 inputs.remove(s) if s in outputs: outputs.remove(s) s.close() # 鏈接關閉掉 del message_queues[s] # 從隊列中移除此連接信息 server.close()
#!/usr/bin/env python # -*- coding:utf-8 -*- # _author_soloLi import socket import sys messages = [ 'This is the message. ', 'It will be sent ', 'in parts.', ] server_address = ('localhost', 10000) # Create a TCP/IP socket socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ] # Connect the socket to the port where the server is listening print >>sys.stderr, 'connecting to %s port %s' % server_address for s in socks: s.connect(server_address) for message in messages: # Send messages on both sockets for s in socks: print >>sys.stderr, '%s: sending "%s"' % (s.getsockname(), message) s.send(message) # Read responses on both sockets for s in socks: data = s.recv(1024) print >>sys.stderr, '%s: received "%s"' % (s.getsockname(), data) if not data: print >>sys.stderr, 'closing socket', s.getsockname() s.close()
②select.poll方法:
poll方法使用起來比select簡單。在調用poll時,會獲得一個poll對象。而後就可使用poll的對象的register方法註冊一個文件描述符(或者是帶有fileno方法的對象)。註冊後可使用unregister方法移出註冊的對象。註冊了一些對象(好比套接字)之後,就能夠調用poll方法(帶有一個可選的超時時間參數)並獲得一個(fd,event)格式列表(可能爲空),其中fd是文件描述符,event則告訴你發生了什麼。這是一個位掩碼(bitmask),意思是它是一個整數,這個整數的每一個位對應不一樣的事件。那些不一樣的事件是select模塊的常量,爲了驗證是否設置了一個定位(也就是說,一個給定的事件是否發生了),可使用按位與操做符(&):if event & select.POLLIN
select模塊中的polling事件常量:
事件名 描述 POLLIN #讀取來自文件描述符的數據 POLLPRT #讀取來自文件描述符的緊急數據 POLLOUT #文件描述符已經準備好數據,寫入時不會發生阻塞 POLLERR #與文件描述符有關的錯誤狀況 POLLHUP #掛起,鏈接丟失 POLLNVAL #無效請求,鏈接沒有打開
#poll 異步I/O import socket,select s = socket.socket() host = "127.0.0.1" port = 8002 s.bind((host,port)) fdmap = {s.fileno():s} #文件描述符到套接字對象的映射 s.listen(5) p = select.poll() #poll對象 p.register(s) #註冊一個文件描述符(帶有fileno方法的對象) while True: events = p.poll() for fd,event in events: if fd == s.fileno(): #新的鏈接進來 c,addr = s.accept() print("Got connectins from",addr) p.register(c) #註冊一個文件描述符(帶有fileno方法的對象) fdmap[c.fileno()] = c #添加到fdmap elif event & select.POLLIN: #讀取來自文件描述符的數據 data = fdmap[fd].recv(1024) if not data: #表示客戶端斷開 print(fdmap[fd].getpeername(),"disconnected") p.unregister(fd) #清除文件描述符 del fdmap[fd] #刪除fdmap對應的key值 else: print(data.decode())
#poll 異步I/O import socket sk = socket.socket() sk.connect(("127.0.0.1",8002)) while True: command = input("--->>>") sk.sendall(command.encode()) sk.close()
③select.epoll方法:
epoll是在2.6內核中提出的,是以前的select和poll的加強版本。相對於select和poll來講,epoll更加靈活,沒有描述符限制。epoll使用一個文件描述符管理多個描述符,將用戶關係的文件描述符的事件存放到內核的一個事件表中,這樣在用戶空間和內核空間的copy只需一次。
epoll操做過程:
epoll操做過程須要三個接口,分別以下:
int epoll_create(int size);#建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
1. int epoll_create(int size);
建立一個epoll的句柄,size用來告訴內核這個監聽的數目一共有多大,這個參數不一樣於select()中的第一個參數,給出最大監聽的fd+1的值,參數size並非限制了epoll所能監聽的描述符最大個數,只是對內核初始分配內部數據結構的一個建議。
當建立好epoll句柄後,它就會佔用一個fd值,在linux下若是查看/proc/進程id/fd/,是可以看到這個fd的,因此在使用完epoll後,必須調用close()關閉,不然可能致使fd被耗盡。
2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函數是對指定描述符fd執行op操做。
- epfd:是epoll_create()的返回值。
- op:表示op操做,用三個宏來表示:添加EPOLL_CTL_ADD,刪除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分別添加、刪除和修改對fd的監聽事件。
- fd:是須要監聽的fd(文件描述符)
- epoll_event:是告訴內核須要監聽什麼事
3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents個事件。
參數events用來從內核獲得事件的集合,maxevents告以內核這個events有多大,這個maxevents的值不能大於建立epoll_create()時的size,參數timeout是超時時間(毫秒,0會當即返回,-1將不肯定,也有說法說是永久阻塞)。該函數返回須要處理的事件數目,如返回0表示已超時。
#!/usr/bin/env python # -*- coding:utf-8 -*- # _author_soloLi import socket, logging import select, errno logger = logging.getLogger("network-server") def InitLog(): logger.setLevel(logging.DEBUG) fh = logging.FileHandler("network-server.log") fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.ERROR) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) fh.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) if __name__ == "__main__": InitLog() try: # 建立 TCP socket 做爲監聽 socket listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) except socket.error as msg: logger.error("create socket failed") try: # 設置 SO_REUSEADDR 選項 listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except socket.error as msg: logger.error("setsocketopt SO_REUSEADDR failed") try: # 進行 bind -- 此處未指定 ip 地址,即 bind 了所有網卡 ip 上 listen_fd.bind(('', 2003)) except socket.error as msg: logger.error("bind failed") try: # 設置 listen 的 backlog 數 listen_fd.listen(10) except socket.error as msg: logger.error(msg) try: # 建立 epoll 句柄 epoll_fd = select.epoll() # 向 epoll 句柄中註冊 監聽 socket 的 可讀 事件 epoll_fd.register(listen_fd.fileno(), select.EPOLLIN) except select.error as msg: logger.error(msg) connections = {} addresses = {} datalist = {} while True: # epoll 進行 fd 掃描的地方 -- 未指定超時時間則爲阻塞等待 epoll_list = epoll_fd.poll() for fd, events in epoll_list: # 若爲監聽 fd 被激活 if fd == listen_fd.fileno(): # 進行 accept -- 得到鏈接上來 client 的 ip 和 port,以及 socket 句柄 conn, addr = listen_fd.accept() logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno())) # 將鏈接 socket 設置爲 非阻塞 conn.setblocking(0) # 向 epoll 句柄中註冊 鏈接 socket 的 可讀 事件 epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET) # 將 conn 和 addr 信息分別保存起來 connections[conn.fileno()] = conn addresses[conn.fileno()] = addr elif select.EPOLLIN & events: # 有 可讀 事件激活 datas = '' while True: try: # 從激活 fd 上 recv 10 字節數據 data = connections[fd].recv(10) # 若當前沒有接收到數據,而且以前的累計數據也沒有 if not data and not datas: # 從 epoll 句柄中移除該 鏈接 fd epoll_fd.unregister(fd) # server 側主動關閉該 鏈接 fd connections[fd].close() logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) break else: # 將接收到的數據拼接保存在 datas 中 datas += data except socket.error as msg: # 在 非阻塞 socket 上進行 recv 須要處理 讀穿 的狀況 # 這裏其實是利用 讀穿 出 異常 的方式跳到這裏進行後續處理 if msg.errno == errno.EAGAIN: logger.debug("%s receive %s" % (fd, datas)) # 將已接收數據保存起來 datalist[fd] = datas # 更新 epoll 句柄中鏈接d 註冊事件爲 可寫 epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT) break else: # 出錯處理 epoll_fd.unregister(fd) connections[fd].close() logger.error(msg) break elif select.EPOLLHUP & events: # 有 HUP 事件激活 epoll_fd.unregister(fd) connections[fd].close() logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) elif select.EPOLLOUT & events: # 有 可寫 事件激活 sendLen = 0 # 經過 while 循環確保將 buf 中的數據所有發送出去 while True: # 將以前收到的數據發回 client -- 經過 sendLen 來控制發送位置 sendLen += connections[fd].send(datalist[fd][sendLen:]) # 在所有發送完畢後退出 while 循環 if sendLen == len(datalist[fd]): break # 更新 epoll 句柄中鏈接 fd 註冊事件爲 可讀 epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET) else: # 其餘 epoll 事件不進行處理 continue epoll socket echo server
selectors模塊
selectors模塊已經封裝了epoll,select方法;epoll優先級大於select
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): data = conn.recv(1000) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 10000)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept)