目錄:1.C/S架構 2.TCP/IP協議 3.socket套接字 4.粘包與解決辦法 5.操做系統 6.進程理論 7.開啓進程 8.join方法 9.守護進程 10.互斥鎖 11.隊列Queue 12.生產者消費者模型 13.線程理論 14.開啓線程 15.守護線程 16.GIL 17.進程池與線程池 18.死鎖與遞歸鎖 19.信號量 20.Event 21.定時器 22.線程queue 23.協程 24.greenlet 25.gevent 26.IO模型 27.IO多路複用與selectors模塊 28.socketserverpython
C/S架構linux
C指的是client(客戶端軟件),S指的是server(服務端軟件)web
客戶端軟件想要基於網絡發送一條消息給服務端軟件,流程以下:算法
1.客戶端產生數據,存放於客戶端軟件的內存中,調用接口將內存中的數據發送/拷貝到操做系統內存shell
2.客戶端操做系統收到數據後,按照客戶端軟件指定的規則(即協議),調用網卡發送數據編程
3.網絡傳輸數據json
4.服務端調用系統接口,想要將數據從操做系統內存拷貝到本身的內存中windows
5.服務端操做系統收到4的指令後,使用與客戶端相同的規則(即協議)從網卡接收到數據,拷貝給服務端軟件緩存
TCP/IP協議安全
Transmission Control Protocol/Internet Protocol
傳輸控制協議/因特網互聯協議
五層模型:
1.物理層,光纜、電纜、雙絞線、無線電波等,功能:主要是基於電器特性
發送高(1)低(0)電壓電信號
2.數據鏈路層,功能:規定了電信號的分組方式
Ethernet以太網協議,規定了一組電信號構成一個數據包,叫作‘幀’
每個數據幀分爲報頭(head)和數據(data)
報頭 固定由6個字節的發送者/原地址+6個字節的接受者/目標地址+6個字節的數據類型組成,固定18個字節
數據包含最短46個字節,最長1500個字節
因此一個幀最小64字節,最大1518字節,超過限度就分片發送
但凡接入互聯網的機器就必須有一塊網卡,每塊網卡都有獨一無二的一個mac地址
mac地址:每塊網卡在出廠時都會燒製上世界上惟一的一個mac地址
3.網絡層,功能:引入一套新的地址用來區分不一樣的廣播域/子網,這套地址即網絡地址
IP協議,是規定網絡地址的協議,ipv4規定網絡地址由32位2進製表示,範圍是0.0.0.0到255.255.255.255
這一層發出的數據就是:IP頭+data,這樣一種結構
ip地址+mac地址就標示了全世界獨一無二的一臺機器
子網掩碼,就是表示子網絡特徵的一個參數,形式上與IP地址同樣,由32位2進制組成
其網絡部分所有爲1,主機部分所有爲0,子網掩碼是用來標識一個ip地址的哪些位表明網絡位,
哪些位表明主機位,區分網絡爲和主機位是爲了劃分子網,避免廣播風暴和地址浪費
A類ip地址:1個網絡位3個主機位1.0.0.0-126.0.0.0
B類ip地址:2個網絡位2個主機位128.0.0.0-191.255.255.255
C類ip地址:3個網絡位1個主機位192.0.0.0-223.255.255.255
0.0.0.0對應當前主機,255.255.255.255是當前子網的廣播地址
4.傳輸層,功能:創建端口到端口的通訊
網絡層的ip幫咱們區分子網,數據鏈路層的mac地址幫咱們找到主機
端口即應用程序與網卡關聯的編號,端口範圍0-65535,其中0-1023爲系統佔用端口
找到端口就找到軟件了,IP+端口就能找到全世界惟一一個軟件
tcp協議,可靠傳輸,理論上tcp數據包沒有長度限制,但爲了保證網絡效率,一般不超過ip數據包長度
流式協議,它的數據是數據流,該協議沒有封包,因此極可能出現粘包,由於消息邊界不明確
以太網頭-ip頭-tcp頭-數據
tcp協議是可靠協議,由於在數據傳輸過程當中,只要不獲得確認,就從新發送數據,直到獲得確認
tcp要先創建雙向通路,tcp是流式協議,雙向通路就像管道,建好雙向通路才能夠發數據
tcp創建雙向通路與斷開雙向通路須要三次握手四次揮手
爲何是四次揮手呢,二、3可能不是同時的,2是確認客戶端斷鏈接的請求的,這個能夠直接回復
可是3是服務端發起斷鏈接的請求,須要服務端到客戶端的數據發完才能斷鏈接,因此二、3可能不是同時的,
那麼若是這兩步合併就會出現數據沒傳完的狀況
udp協議,不可靠傳輸
udp不須要創建通路,直接就發送數據
udp協議自帶封包,就算內容是空的,也會在外面包一層,這樣的話udp的數據包就是有邊界的,不會粘包
以太網頭-ip頭-udp頭-數據
tcp協議雖然安全性高,可是網絡開銷大,而udp雖然沒有提供安全機制,但網絡開銷小
在如今這個網絡安全已經相對較高的狀況下,爲了保證傳輸速率,優先考慮udp協議
5.應用層
通過層層包裝後的數據結構是:Ethernet頭 + ip頭 + tcp/udp頭 + data
socket:就是套接字
什麼是socket?socket是應用層與tcp/ip協議族通訊的中間軟件抽象層,它是一組接口
基於tcp的套接字實例:
server端:
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)#買手機
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)#重用端口,加在bind前面
phone.bind(('127.0.0.1',8080)) #綁定手機卡 注意這裏,傳進去的是一個由ip和端口組成的元組
這個地方127.0.0.1就限制了客戶端服務端必須都在本機上才能用
phone.listen(5) #開機 5是掛起數量,ip、端口和掛起數量都應該寫在配置文件中,不該該寫死在這裏
while True: #連接循環,是爲了能夠循環服務,一個客戶端終止了就等着再服務新的客戶端
conn,addr=phone.accept() #等待電話鏈接 這裏拿到的是一個由兩個元素組成的元組,分別賦值給conn,addr
conn就是客戶端與服務端之間創建的通路,addr就是客戶端的地址
print('電話線路是',conn)
print('客戶端的手機號是',addr)
while True: #通訊循環,對應客戶端的通訊循環,發數據收數據
try: #應對windows系統
data=conn.recv(1024) #收消息 從本身的緩存中收最大1024個字節的數據
if not data:break #若是一直收到空的內容,linux系統會進入死循環,這樣應對
具體講是這樣的,正常狀況下不會發空過來,若是客戶端單方面給終止了纔會一直髮空過來,
那麼windows系統上會直接報錯,同try...except處理
而linux系統上不會報錯會進入死循環,死循環對cpu的佔用率極高,因此用if判斷這句來處理
print('客戶端發來的消息是',data)
conn.send(data.upper())
except Exception:
break
conn.close()
phone.close()
client端:
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect(('127.0.0.1',8080)) 注意這裏,傳進去的是一個由ip和端口組成的元組
while True:#通訊循環
msg=input('>>:').strip()
if not msg:continue #判斷msg是否爲空,爲空的話就continue
phone.send(msg.encode('utf-8')) #要指定編碼方式,轉成bytes格式才能進行網絡傳輸
data=phone.recv(1024)
print(data.decode('utf-8'))
phone.close()
socket套接字方法
socket實例類:socket.socket(family=AF_INET,type=SOCK_STREAM,proto=0,fileno=None)
family(socket家族) 這兩個內容能夠忽略
socket.AF_INET 用於網絡編程,大部分時候都用這個
socket.AF_UNIX 用於本機進程間通信
socket type類型
socket.SOCK_STREAM 用於tcp
socket.SOCK_DGRAM 用於udp
socket.SOCK_RAW 原始套接字
socket.SOCK_RDM 是一種可靠的udp形式,即保證交付數據報但不保證順序
服務端套接字函數
s.bind() 綁定(主機,端口號)到套接字
s.listen() 開始tcp監聽
s.accept() 被動接受tcp客戶端的鏈接,(阻塞式)等待鏈接的到來
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)#重用端口,加在bind前面
客戶端套接字函數
s.connect() 主動初始化tcp服務器鏈接
s.connect_ex() connect()函數的擴展版本,出錯時返回出錯碼,而不是拋出異常
公共用途的套接字函數
s.recv() 接收數據
s.send() 發送數據,若是待發送數據量大於己端緩存區剩餘空間,數據會丟失
s.sendall() 發送完整的tcp數據,本質上就是循環調用send(),直到發完爲止
s.recvfrom() 從套接字接收數據,返回值是(bytes,address)
s.close() 關閉套接字
socket.setblocking(flag),設置socket爲非阻塞模式
基於udp的套接字實例: 瞭解
udp服務端
import socket
udpserver=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
udpserver.bind(('127.0.0.1',8080))
while True:#通信循環 udp協議沒有連接,因此沒有連接循環
data,client_addr=udpserver.recvfrom(1024)
print(data.decode('utf-8'))
print(client_addr)
msg=input('>>:')
udpserver.sendto(msg.encode('utf-8'),client_addr)
udp客戶端
import socket
udpclient=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
server_ip_port=('127.0.0.1',8080)
while True:#通信循環 udp協議沒有連接,因此沒有連接循環
inp=input('>>:')
udpclient.sendto(inp.encode('utf-8'),server_ip_port)
data,server_addr=udpclient.recvfrom(1024)
print(data.decode('utf-8'))
send與recv
1.這二者都不是直接接收對方的數據,而是操做本身的操做系統內存,
就是send把內容發給本身的操做系統的內存,由操做系統來發,數據到了後先存在操做系統的內存中,而recv是去操做系統的內存中拿
這二者沒有一一對應關係,不須要一個send對應一個recv
2.recv有兩個階段,wait for data階段和copy data階段,時間都耗費在第一階段
而send只有一個copy data階段
粘包現象與解決辦法
tcp協議是面向流的協議,容易出現粘包,而udp是面向消息的協議,每一個udp字段都是一條消息
應用程序必須以消息爲單位提取數據,不能一次提取任意字節的數據,因此udp不會粘包
粘包問題的發生主要仍是接收方不知道消息之間的界限,不知道一次性提取多少數據
在數據量比較小而且時間間隔很是短的狀況下,或數據量很是大超出了接受範圍時纔可能發生粘包問題
總結:
1.tcp(transport control protocol 傳輸控制協議)
是面向鏈接的,面向流的,提供高可靠性服務,收發兩端一一成對,爲了更有效地收發,
使用了優化算法(Ngale算法),將屢次間隔較短且數據量小的數據合併成一個大的數據塊,
而後封包,這樣一來,接收方難以分辨數據邊界,必須提供科學的拆包機制,
即面向流的通訊沒有消息保護邊界
2.udp(user datagram protocol 用戶數據包協議)
是無鏈接的,面向消息的,提供高效率的服務,不會使用塊的合併優化算法
因爲udp支持一對多的模式,因此接收端的skbuff(套接字緩衝區)採用鏈式結構來記錄每一個到達的udp包
每一個udp包有消息頭,對接收端來講,就容易區分處理了
即面向消息的通訊是有消息保護邊界的
3.tcp基於數據流,因而收發的消息不能爲空,這就須要在客戶端服務端都添加空消息處理機制
udp基於數據報,便是輸入空內容,那也不是空消息,udp協議會封裝上消息頭
用json和struct解決粘包實例:
server端
import socket,subprocess,struct,json
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)#重用端口,加在bind前面
ip_port=('127.0.0.1',8080)
phone.bind(ip_port)
phone.listen(5)
while True:
conn,client_addr=phone.accept()
while True:
try:
# 1.收命令
cmd=conn.recv(1024)
if not cmd:break
# 2. 執行命令,拿到結果,使用os.system是拿不到結果的,須要用subprocess模塊,使用管道
res=subprocess.Popen(cmd.decode('utf-8'),
shell=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
# err=res.stderr.read()
# if err:
# cmd_res=err
# else:
# cmd_res=res.stdout.read()
# conn.send(cmd_res)
out_res=res.stdout.read()
err_res=res.stderr.read()
data_size=len(out_res)+len(err_res) #數據的長度
head_dic={'data_size':data_size} #作一個字典,數據長度爲value
head_json=json.dumps(head_dic) #把包含數據長度的字典轉成字符串格式
head_bytes=head_json.encode('utf-8') #把字符串格式的字典轉成二進制格式用於網絡傳輸,這個就是報頭
#先發送報頭長度
head_len=len(head_bytes) #提取報頭長度
conn.send(struct.pack('i',head_len)) 使用struct將報頭長度轉成固定的4個字節發送給客戶端,這個i是整型的意思,後面必須跟數字
#發送報頭內容
conn.send(head_bytes) #把報頭髮過去
#最後發送數據部分
#3.把結果返回給客戶端
conn.send(out_res)
conn.send(err_res)
except Exception:
break
conn.close()
phone.close()
client端
import socket,struct,json
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
ip_port=('127.0.0.1',8080)
phone.connect(ip_port)
while True:
cmd=input('>>:').strip()
if not cmd:continue
phone.send(bytes(cmd,encoding='utf-8'))
#收報頭長度
head_struct=phone.recv(4)
head_len=struct.unpack('i',head_struct)[0] 提取報頭長度,解包出來是一個元組的格式,索引值爲0的位置就是要的報頭長度
#收報頭內容
head_bytes=phone.recv(head_len) #按照報頭長度接收,拿到報頭部分,是二進制格式的
head_json=head_bytes.decode('utf-8') #把二進制格式的報頭解碼成json字符串格式的
head_dic=json.loads(head_json)提取報頭內容,把字符串格式的報頭內容反序列化爲字典格式
data_size=head_dic['data_size'] 獲取數據長度
#收數據
recv_size=0
recv_data=b''
while recv_size < data_size:
data=phone.recv(1024)
recv_size+=len(data)
recv_data+=data
print(recv_data.decode('gbk')) #這個地方是個坑,雖然輸入cmd的時候用的utf-8,服務端那邊解碼也是utf-8
可是由於pycharm是運行在windows系統上,系統的編碼是gbk,因此要看到顯示
須要用gbk解碼
phone.close()
是否是能夠這樣理解,先拿到數據長度,再把數據長度作成字典,再把字典先序列化後編碼成二進制格式,這個通過加工處理的‘字典’就是報頭,包含數據的長度信息
而後,len拿到報頭的長度,把這個長度struck.pack轉成固定4個字節,而後分三次發送,第一次發4個字節,再發報頭內容,也就是那個特殊的‘字典’,
最後發送數據部分
接收的時候,先接收固定4個字節,把4個字節的內容unpack解包,拿到的就是報頭長度的信息,而後根據這個報頭長度的信息區接收,
接收到的內容是二進制格式,先解碼成字符串格式,再反序列化成本來的字典格式,再用key取值拿到字典裏面關於數據長度的信息,
最後根據這個信息去循環收取數據
struck能夠有兩種模式,i與l,i是整型,l是長整形,l的範圍大於i
操做系統
操做系統的做用
1.隱藏醜陋複雜的硬件接口,提供良好的抽象接口
2.管理、調度進程,而且將多個進程對硬件的競爭變得有序
多道技術
1.產生背景:針對單核,實現併發。如今的主機都是多核,每一個核都會利用多道技術
2.空間上的多路複用:如內存中同時有多道程序
3.時間上的多路複用:複用一個cpu的時間片
遇到io切,佔用cpu時間過長也切,核心在於切以前將進程的狀態保存下來
這樣才能保證下次切回來時,能基於上次切走的位置繼續運行
4.空間上的複用最大的問題是:
程序之間的內存必須分割,這種分割須要在硬件層面實現,由操做系統控制。若是內存彼此不分割,則一個程序能夠訪問另一個程序的內存,
首先喪失的是安全性,好比你的qq程序能夠訪問操做系統的內存,這意味着你的qq能夠拿到操做系統的全部權限。
其次喪失的是穩定性,某個程序崩潰時有可能把別的程序的內存也給回收了,比方說把操做系統的內存給回收了,則操做系統崩潰。
第一代計算機:真空管和穿孔卡片
第二代計算機:晶體管和批處理系統,現代操做系統的前身在這裏出現
第三代計算機:集成電路芯片和多道程序設計,出現了多道技術,可是其操做系統仍是批處理系統,並無使用多道技術,由於尚未解決上面的4中的問題
第四代計算機:我的計算機
進程理論
進程是資源單位,線程是執行單位
進程:
進程就是正在進行的一個過程或者說一個任務。負責執行任務的是cpu
程序僅僅只是一堆代碼而已,進程指的是程序的運行過程
同一個程序執行兩次那也是兩個進程
併發與並行
不管是併發仍是並行,在用戶看來都是‘同時’運行的,無論是進程仍是線程,
都只是一個任務而已,真正幹活的是cpu,而一個cpu同一時刻只能執行一個任務
併發:僞並行,即看起來是同時運行,單個cpu+多道技術就能夠實現
即在一個時間段內有不少任務要作,但cpu同一時刻只能作一個任務,那就先作一會1,
再作一會2,再作一會3....這就保證了每一個人任務都在進行中
並行:同時運行,只有具有多個cpu才能實現並行
開啓進程
python中的多線程沒法利用多核優點,若是想要充分使用cpu的多核資源python中大部分狀況下須要使用多進程
python提供multiprocessing模塊,用來開啓子進程
與線程不一樣,進程間沒有任何共享狀態,進程修改的數據,改動僅限於該進程內,內存空間是隔離的
Process類
Process([group[,target[,name[,args[,kwargs]]]]])
由該類實例化獲得的對象,可用來開啓一個子進程
須要使用關鍵字的方式來指定參數
args指定的爲傳給target函數的位置參數,是一個元祖形式,必須有逗號
注意:在windows中,Process()必須放在 if __name__ == '__main__'下
參數介紹:
group 參數未使用,值始終未None
target 表示調用對象,即子進程要執行的任務
name 爲子進程的名稱
args 表示調用對象的位置參數元組,args=(1,2,'egon')
kwargs 表示調用對象的字典,kwargs={'name':'egon','egon':18}
方法介紹:
p.start() 開啓進程,並調用該子進程中的p.run()
p.run() 進程啓動時運行的方法,正是它去調用target指定的函數,自定義類時必須實現run()
p.terminate() 強制終止進程p,不會進行任何清理操做,也是給操做系統發信號,由操做系統進行清理操做
若是p還建立了子進程,那這樣操做後其子進程就變成了殭屍進程
若是p還保存了一個鎖,那麼將不會被釋放,進而致使死鎖
p.is_alive() 若是p仍然運行,返回True
p.join([timeout]) 主線程等待p終止,是主線程處於等待狀態,p處於運行狀態
timeout是可選的超時時間
屬性介紹:
p.daemon 默認值是False,若是設爲True,表明p爲後臺運行的守護進程,當p的父進程終止時,
p也隨之終止,而且設爲True後不能建立本身的新進程,必須在p.start()以前設置
p.name 進程的名稱
p.pid 進程的pid
建立並開啓子進程
方式一:調用Process類
import time,random
from multiprocessing import Process # 這個Process必定要是大寫開頭的纔對
def piao(name):
print('%s is piaoing'%name)
time.sleep(random.randrange(1,5))
print('%s piao end'%name)
if __name__ == '__main__':
#實例化獲得四個對象
p1 = Process(target=piao,args=('egon',)) #注意這裏args須要傳的值是元組,因此,很重要
p2 = Process(target=piao,args=('alex',)) # 也能夠用kwargs傳字典進去
p3 = Process(target=piao,args=('wupeiqi',))
p4 = Process(target=piao,args=('yuanhao',))
#調用對象下的方法,開啓四個進程
p1.start() # 僅僅只是給操做系統發送了一個信號
p2.start() # 並非先運行p1再運行p2再運行p3最後運行p4這樣的順序
p3.start() # 只是發一個信號,開進程由操做系統來執行
p4.start()
print('主進程')
方式二:繼承Process類,必須本身寫run()方法,本身寫一個類繼承Process類
import time,random
from multiprocessing import Process 這個Process必定要是大寫開頭的纔對
class Piao(Process):
def __init__(self,name):
super().__init__() 或 Process.__init__(self) super方法不用寫self
self.name=name
def run(self): # 必定要是run()
print('%s is piaoing'%self.name)
time.sleep(random.randrange(1,5))
print('%s piao end'%self.name)
if __name__ == '__main__':
p1=Piao('egon')
p2=Piao('alex')
p3=Piao('wupeiqi')
p4=Piao('yuanhao')
p1.start()
p2.start()
p3.start()
p4.start()
print('主進程')
查看pid:
1.用os模塊,os.getpid()查看本身的pid,os.getppid()查看父進程的pid
2.p.pid
全部的子進程都要經歷殭屍進程這個狀態,就是子進程執行完畢了後要清理掉的時候,成爲殭屍進程,
保留一點點子進程的消息,以供父進程查看,父進程終結掉時會清理掉殭屍進程,有害
孤兒進程:子進程沒終結掉,而父進程終結了,由init進程接收,無害
進程間內存空間是隔離的
基於多進程實現併發的套接字通訊,實例:
server:
import socket
from multiprocessing import Process
def talk(conn):
while True:
try:
data = conn.recv(1024)
if not data:
continue
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
p = Process(target=talk, args=(conn,))
p.start()
server.close()
if __name__ == '__main__':
server('127.0.0.1', 8800)
client:
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8800))
while True:
msg = input('>>').strip()
if not msg:
continue
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
join方法
在主進程運行過程當中,若是想併發的執行其餘的任務,咱們能夠開啓子進程,
此時主進程任務與子進程任務分兩種狀況:
1.在主進程任務與子進程任務彼此獨立的狀況下,主進程的任務先執行完畢後,
主進程還須要等待子進程執行完畢,而後統一回收資源
2.若是主進程任務執行到某一個階段時,須要等待子進程執行完畢後才能繼續執行,
此時就須要一種機制可以讓主進程檢測子進程是否執行完畢,執行完畢就繼續執行主進程,
未執行完畢,主進程一直阻塞,這種機制就是join方法的做用
join方法實例:
import time,random
from multiprocessing import Process
def task(name):
print('%s is piaoing'%name)
time.sleep(random.randrange(1,5))
print('%s piao end'%name)
if __name__ == '__main__':
p1 = Process(target=task,args=('egon',))
p2 = Process(target=task,args=('alex',))
p3 = Process(target=task,args=('wupeiqi',))
p4 = Process(target=task,args=('yuanhao',))
p1.start() 進程只要start就會開始運行,這裏start4個,那麼系統中就有4個併發的進程
p2.start()
p3.start()
p4.start()
p1.join() join方法是讓主進程阻塞等待,p1-p4仍然併發執行
p2.join() 因此4個join花費的總時間就是耗費時間最長的那個進程運行的時間
p3.join()
p4.join()
#上面start與join能夠簡寫以下
p_l = [p1,p2,p3,p4]
for i in p_l:
i.start()
for v in p_l:
v.join()
print('主進程')
守護進程
主進程建立子進程,而後將該子進程設置成守護進程
守護進程會在 主進程代碼 執行結束後就終止
守護進程內沒法再開啓子進程,不然拋異常
若是咱們有兩個任務須要併發執行,那麼開一個主進程和一個子進程就能夠了
若是子進程的任務在主進程任務結束後就沒有存在的必要了,那麼該子進程應該在開啓前就被設置成
守護進程,主進程任務執行完,守護進程隨即結束
代碼實例:
import time,random
from multiprocessing import Process
def task(name):
print('%s is piaoing'%name)
time.sleep(random.randrange(1,5))
print('%s piao end'%name)
if __name__ == '__main__':
p = Process(target=task,args=('egon',))
p.daemon = True 設置守護進程,必定要放在p.start()以前,否則會報錯
p.start()
print('主進程') 這是主進程,只要終端中打印了這一行,守護進程也隨之終止
這個函數的執行結果是隻打印’主進程‘這三個字
互斥鎖
針對的是對共享數據的修改,只要對修改共享數據的代碼加鎖
原理上就是把併發改爲串行,下降了效率,可是保證了數據安全不錯亂
實例:
from multiprocessing import Process,Lock
import time,os
def work(lock):
lock.acquire() #加鎖
print('%s is running'%os.getpid())
time.sleep(2)
print('%s is done'%os.getpid())
lock.release() #釋放鎖
if __name__ == '__main__':
lock = Lock()
for i in range(3):
p = Process(target=work,args=(lock,))
p.start()
模擬搶票,實例:
db.txt >>: {"count": 1} 這個count必定要用雙引號,否則用json讀不出來
from multiprocessing import Process, Lock
import json
import time
def search(name):
time.sleep(1)
dic = json.load(open('db.txt', 'r', encoding='utf-8'))
print('<%s> check number:[%s]' % (name, dic.get('count')))
def get(name):
time.sleep(1)
dic = json.load(open('db.txt', 'r', encoding='utf-8'))
if dic.get('count') > 0:
dic['count'] -= 1
time.sleep(1)
json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
print('<%s> successful' % name)
else:
print('sorry<%s>,no more' % name)
def task(name, lock):
search(name)
lock.acquire()
get(name)
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(1, 10):
t = Process(target=task, args=('egon %s' % i, lock))
t.start()
join是將一個任務總體串行,多個子進程仍是併發的,子進程與主進程改爲了串行,而互斥鎖則是將一個任務中的某一段代碼串行
隊列Queue
進程彼此之間隔離,要實現進程間通訊(IPC),multiprocessing提供兩種方式進行消息傳遞
隊列和管道,都是使用內存空間
建立隊列的類(底層就是以管道和鎖定的方式實現),隊列就是管道+鎖
Queue([maxsize]) 建立共享的進程隊列
maxsize是隊列中容許最大項數,省略則無大小限制
隊列裏面存放的是消息,而非數據,因此不要放大的,要放小的
隊列佔用的是內存空間,因此maxsize即便沒有大小限制也受到內存大小的限制
主要方法:
q.put() 在隊列中插入數據
q.get() 能夠從隊列中讀取並刪除一個元素
q.full() 判斷隊列滿了沒有
q.empty() 判斷隊列空了沒有
實例:
from multiprocessing import Process, Queue
q = Queue(3)
q.put(1)
q.put(2)
q.put(3)
print(q.full())
print(q.get())
print(q.get())
print(q.get())
print(q.empty())
生產者消費者模型
生產者指的是生產數據的任務,消費者指的是處理數據的任務
在併發編程中,可能會出現生產者跟消費者效率不協調的狀況,這時候就須要生產者消費者模型
生產者消費者模型:
是經過一個容器來解決生產者和消費者的強耦合問題。
生產者與消費者之間不直接通信,而是經過一個阻塞隊列進行通信
生產者把數據交給阻塞隊列,消費者去阻塞隊列那裏拿數據
阻塞隊列至關於一個緩衝區,來平衡生產者與消費者的處理能力,解耦合
用隊列queue來實現
一種用於生產者消費者模型的隊列的機制
JoinableQueue([maxsize])
方法與queue相似,q.get() q.put(),其特有的方法:
q.task_done() 消費者使用此方法發出信號,表示q.get()的返回項目已經被處理
q.join() 生產者調用此方法阻塞,直到隊列中全部項目均被處理
實例:
from multiprocessing import Process,JoinableQueue
import time
def producer(q,name,food):
for i in range(1,10):
time.sleep(2)
res = '%s%s'%(food,i)
q.put(res)
print('----->%s produce %s'%(name,res))
q.join() 這裏是爲了阻塞住子進程,等待q.task_done()發消息
def consumer(q,name):
while True:
res = q.get()
time.sleep(1)
print('--->%s eat %s'%(name,res))
q.task_done()
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer,args=(q,'egon1','baozi'))
p2 = Process(target=producer,args=(q,'egon2','gutou'))
c1 = Process(target=consumer,args=(q,'alex1',))
c2 = Process(target=consumer,args=(q,'alex2',))
c1.daemon = True 設置守護進程
c2.daemon = True 設置守護進程
p1.start()
p2.start()
c1.start()
c2.start()
p1.join() 這裏是爲了阻塞主進程,主進程等待p1 p2結束纔會執行 而p1 p2會被q.join阻塞,要等待c1 c2裏面的
p2.join() q.task_done()執行,把所有取完的信號發回後纔會把p1 p2結束掉,而p1 p2結束了,
print('主程序') c1 c2做爲消費者也不必存在了,設置成守護進程
這樣的話,p1,p2執行完了執行主程序,主程序執行完了c1 c2一塊跟着結束
線程理論
進程是資源單位,線程是執行單位,線程纔是cpu上的執行單位
進程比如是上海地鐵,線程就是2號線,3號線,4號線
多線程就是一個進程中存在多個線程,多個線程共享該進程的地址空間和全部資源
建立線程的開銷要遠小於建立進程的開銷
開啓線程
方式1:Thread類直接建立
import threading
def countNum(n):
print('running on number:%s'%n)
if __name__=='__main__':
t1=threading.Thread(target=countNum,args=(23,))
t2=threading.Thread(target=countNum,args=(34,))
t1.start()
t2.start()
print('ending')
或者from threading import Thread,這樣的話在建立對象的時候就能夠直接Thread(),我的感受用這個好一點,佔內存少,調用也方便
方式2:Thread類繼承式建立
import threading
class MyThread(threading.Thread):
def __init__(self,num):
threading.Thread.__init__(self)
self.num=num
def run(self):
print('running on number:%s'%self.num)
t1=MyThread(56)
t2=MyThread(78)
t1.start() 調用執行run()方法
t2.start()
print('ending')
進程與線程的區別:
1.建立進程的開銷大於建立線程的開銷
2.在主進程下開啓多個線程,每一個線程的pid都跟主進程同樣,而開多個進程,每一個進程的pid各不相同
3.進程之間的地址空間是隔離的,同一個進程內的多個線程地址空間是共享的,一個線程內的修改會反映到全部的線程中
Thread對象的其餘屬性或方法,這個是Thread下的方法
Thread.isAlive() 返回線程是否活動
Thread.getName() 返回線程名
Thread.setName() 設置線程名
threading模塊提供的一些方法,這是threading下的方法,與Thread一個級別
threading.currentThread() 返回當前的線程變量,能夠用這個拿到當前線程變量
threading.enumerate() 返回一個包含正在運行的線程的list
threading.activeCount() 返回正在運行的線程數量
與len(threading.enumerate())結果同樣
代碼示例:
join方法阻塞主線程
import threading
import time
def tingge():
print('tingge')
time.sleep(3)
def xieboke():
print('xieboke')
time.sleep(5)
t1=threading.Thread(target=tingge)
t2=threading.Thread(target=xieboke)
t1.start()
t2.start()
t1.join() #join在子線程完成運行以前,這個子線程的父線程將一直被阻塞
t2.join()
print('ending')
守護線程
不管是進程仍是線程,都遵循:守護xxx會等待主xxx運行完畢後就被銷燬
須要強調的是,運行完畢並不是終止運行
1.對主進程而言,運行完畢指的是主進程代碼運行完畢
2.對主線程而言,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢
主線程纔算運行完畢
代碼實例:
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hi'%name)
if __name__ == '__main__':
t = Thread(target=sayhi,args=('egon',))
t.setDaemon(True) 設置守護線程,放在start以前 或:t.daemon = True
t.start()
print('主線程')
由於sayhi中有sleep(2),因此在主線程print以後,被設置成守護線程的t就終止了
基於多線程實現併發的套接字通訊,實例:
server端:
import socket
from threading import Thread
def communicate(conn):
while True:
try:
data = conn.recv(1024)
if not data:
break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
t = Thread(target=communicate, args=(conn,))
t.start()
server.close()
if __name__ == '__main__':
server('127.0.0.1', 8800)
client端:
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8800))
while True:
msg = input('>>').strip()
if not msg:
continue
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
client.close()
GIL 全局解釋器鎖
在Cpython解釋器中,同一個進程下開啓的多線程,同一時刻只能有一個線程執行,沒法利用多核
GIL是加在cpython解釋器上的一把互斥鎖,只是咱們經常使用的是cpython解釋器而已,可是GIL並非python的鎖
GIL並非python的特性,而是在實現Cpython時所引進的一個概念,python徹底能夠不依賴於GIL
在cpython中,由於GIL的存在沒法實現多線程的並行,沒法利用多核,可是能夠實現併發
GIL本質就是一把互斥鎖(mutex),因此就是在把併發改串行
GIL與Lock:
首先,鎖的目的是爲了保護共享數據,同一時間只能有一個線程來修改共享的數據
而後,保護不一樣的數據就應該加不一樣的鎖
最後,GIL與Lock保護的數據不同,GIL保護的是解釋器級別的,好比垃圾回收的數據
Lock保護用戶本身開發的應用程序的數據
互斥鎖代碼示例:
import time,threading
# from threading import Thread,Lock 這樣寫比較好
def subNum():
global num #100個線程會同時走到這一步
r.acquire() #加上互斥鎖以後,acquire與release之間的內容被上鎖,
temp=num #待資源訪問結束後纔會釋放,而後讓下一個線程進來
time.sleep(0.0001)
num=temp-1
r.release()
num=100
thread_list=[]
r=threading.Lock()
for i in range(100):
t=threading.Thread(target=subNum) #產生100個線程
t.start()
thread_list.append(t)
for t in thread_list:
t.join()
print('Result:',num)
對於計算來講,cpu越多越好,但對與I/O來講,再多的cpu也沒用
對於運行一個程序來講,cpu越多執行效率越高,由於一個程序不會是純計算或純I/O
對於I/O密集型,不管是多核仍是單核,多線程合適
對於計算密集型,單核時使用多線程,多核時使用多進程
即,多進程適合多核時進行計算密集型任務,其他狀況都是多線程更合適
多線程適合於I/O密集型,好比socket,爬蟲,web
多進程適合於計算密集型,好比金融分析
如今都是多核了,因此在多核狀況下:計算密集型用多進程,這樣才能利用多核優點
IO密集型用多線程,由於咱們如今作的大都是IO密集型的操做,因此大部分狀況下用多線程
代碼實例:
對於計算密集型,多核時的多進程:效率高,8秒多
from multiprocessing import Process
import time
def counter():
i = 0
for i in range(40000000):
i += 1
return True
def main():
l=[]
start_time=time.time()
for i in range(2):
t=Process(target=counter)
t.start()
l.append(t)
for t in l:
t.join()
# counter()
# counter()
end_time=time.time()
print('Total time:{}'.format(end_time-start_time))
if __name__=='__main__':
main()
對於計算密集型,多核時的多線程:12秒多,效率低於多進程
from threading import Thread
import time
def counter():
i = 0
for i in range(40000000):
i += 1
return True
def main():
l=[]
start_time=time.time()
for i in range(2):
t=Thread(target=counter)
t.start()
l.append(t)
t.join()
end_time=time.time()
print('Total time:{}'.format(end_time-start_time))
if __name__=='__main__':
main()
進程池與線程池
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor 線程池,提供異步調用
ProcessPoolExecutor 進程池,提供異步調用
進程池與線程池的用法徹底同樣
提交任務的兩種方式:同步調用與異步調用
同步調用:提交完任務後,就在原地等待任務執行完畢,拿到結果再執行下一行代碼
異步調用:提交完任務後,不在原地等待任務執行完畢,直接執行下面的代碼
實例:
1.同步調用,不等於阻塞,阻塞是IO阻塞,同步調用是在等待任務執行結果,跟IO沒有必然聯繫
import time
import random
from concurrent.futures import ThreadPoolExecutor
def la(name):
print('%s is laing' % name)
time.sleep(random.randint(1, 3))
res = random.randint(7, 13) * '#'
return {'name': name, 'res': res}
def weight(shit):
name = shit.get('name')
size = len(shit.get('res'))
print('%s lale <%s>kg' % (name, size))
if __name__ == '__main__':
pool = ThreadPoolExecutor(13)
shit1 = pool.submit(la, 'alex').result() # 等待執行結果,拿到結果後再往下走
weight(shit1)
shit2 = pool.submit(la, 'wusir').result()
weight(shit2)
2.異步調用
import time
import random
from concurrent.futures import ThreadPoolExecutor
def la(name):
print('%s is laing' % name)
time.sleep(random.randint(1, 3))
res = random.randint(7, 13) * '#'
weight({'name': name, 'res': res})
def weight(shit):
name = shit.get('name')
size = len(shit.get('res'))
print('%s lale <%s>kg' % (name, size))
if __name__ == '__main__':
pool = ThreadPoolExecutor(13)
pool.submit(la, 'alex') # 不拿到結果直接往下走
pool.submit(la, 'wusir')
pool.submit(la, 'yuan')
異步調用+回調函數:
import time
import random
from concurrent.futures import ThreadPoolExecutor
def la(name):
print('%s is laing' % name)
time.sleep(random.randint(1, 3))
res = random.randint(7, 13) * '#'
return {'name': name, 'res': res}
def weight(shit):
shit = shit.result() 使用回調函數後傳進來的shit是一個obj對象,要用obj.result()拿到結果
name = shit.get('name')
size = len(shit.get('res'))
print('%s lale <%s>kg' % (name, size))
if __name__ == '__main__':
pool = ThreadPoolExecutor(13)
pool.submit(la, 'alex').add_done_callback(weight) weight拿到的是一個對象obj,因此須要用obj.result()拿到結果
pool.submit(la, 'wusir').add_done_callback(weight)
pool.submit(la, 'yuan').add_done_callback(weight)
基本方法:
submit(func, *args, **kwargs) 異步提交任務
map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操做
shutdown(wait=True) 至關於進程池的pool.close()+pool.join()的操做
wait=True 等待池內全部任務執行完畢回收完資源後才繼續
wait=False 當即返回,並不會等待池內的任務執行完畢
可是無論wait的參數是什麼,整個程序都會等到全部任務執行完畢
submit與map必須在shutdown以前
result(timeout=None) 取得結果
add_done_callback(func) 回調函數
爲進程池或線程池內的每個進程或線程綁定一個函數func,該函數func在進程或線程的任務執行完畢後自動觸發
並接受任務的返回值做爲參數傳給func
實例:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import os
import time
import random
def task(name):
print('name:%s pid:%s run' % (name, os.getpid()))
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
pool = ProcessPoolExecutor(4) # 進程池 指定進程池最大數目就是4個,這樣的話最多就是開4個進程,多餘的任務等着有進程結束才能執行
pool = ThreadPoolExecutor(4) # 線程池,10個線程的pid都是同樣的,而上面進程池的時候10個進程的pid是固定的4個pid
for i in range(10):
pool.submit(task, 'egon%s' % i)
pool.shutdown(wait=True) # 會等待全部10個進程都走完才走下面的print
print('zhu')
練習:
from concurrent.futures import ThreadPoolExecutor
import time
def get(url):
print('GET %s' % url)
response = requests.get(url)
time.sleep(2)
return {'url': url, 'content': response.text}
def parse(res):
res = res.result()
print('%s res is %s' % (res.get('url'), len(res.get('content'))))
if __name__ == '__main__':
urls = [
'http://www.baidu.com',
'http://www.bilibili.com',
'http://www.zhihu.com',
'http://www.acfun.cn',
]
pool = ThreadPoolExecutor(3)
for url in urls:
pool.submit(get, url).add_done_callback(parse)
基於線程池實現併發的套接字通訊
client:
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8800))
while True:
msg = input('>>').strip()
if not msg:
continue
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
client.close()
server:
import socket
from concurrent.futures import ThreadPoolExecutor
def communicate(conn):
while True:
try:
data = conn.recv(1024)
if not data:
break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
pool.submit(communicate, conn)
server.close()
if __name__ == '__main__':
pool = ThreadPoolExecutor(2)
server('127.0.0.1', 8800)
死鎖與遞歸鎖
死鎖:兩個及以上的進程或線程在執行過程當中,因爭奪資源而形成的相互等待的現象
解決辦法就是遞歸鎖,爲了支持在同一線程中屢次請求同一資源,可重入鎖RLock
遞歸鎖能夠acquire屢次,而互斥鎖只能acquire一次
遞歸鎖內部有一個count變量,記錄acquire的計數,只要count的計數不是0,其餘線程就搶不到,直到全部的acquire都被release,count爲0時才能被其餘線程搶到
代碼實例:能夠多用RLock
import time,threading
Rlock=threading.RLock()
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
self.fun1()
self.fun2()
def fun1(self):
Rlock.acquire()
print('i am %s,get res:%s---%s'%(self.name,'ResA',time.time()))
Rlock.acquire()
print('i am %s,get res:%s---%s'%(self.name,'ResB',time.time()))
Rlock.release()
Rlock.release()
def fun2(self):
Rlock.acquire()
print('i am %s,get res:%s---%s' % (self.name, 'ResB', time.time()))
time.sleep(0.2)
Rlock.acquire()
print('i am %s,get res:%s---%s' % (self.name, 'ResA', time.time()))
Rlock.release()
Rlock.release()
if __name__=='__main__':
print('start------------------------%s'%time.time())
for i in range(0,10):
my_thread=MyThread()
my_thread.start()
信號量
也是一把鎖,能夠指定信號量爲5,互斥鎖使得同一時間只能有一個任務搶到鎖去執行,
那麼信號量同一時間能夠有5個任務拿到鎖去執行,同一時間能夠有多個任務搶到鎖去執行
代碼實例:
from threading import Thread,Semaphore
import threading,time
def func():
sm.acquire()
print('%s get sm'%threading.current_thread().getName())
time.sleep(3)
sm.release()
if __name__ == '__main__':
sm = Semaphore(5)
for i in range(23):
t = Thread(target=func)
t.start()
Event
線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測的
若是程序中的線程須要判斷某個線程的狀態來肯定本身下一步的操做,那就用到Event方法
from threading import Event
event.isSet() 返回event的狀態值
event.wait() 若是狀態值爲False將阻塞線程
event.set() 設置狀態值爲Ture,因此阻塞池的線程激活進入就緒狀態,等待系統調用
event.clear() 恢復狀態值爲False
實例:
1.
from threading import Thread, Event
import time
event = Event()
def student(name):
print('%s 正在聽課' % name)
event.wait(2) # 設置超時時間,超過這個時間沒有拿到set的True就不等了繼續往下執行
print('%s 課間活動' % name)
def teacher(name):
print('%s 正在授課' % name)
time.sleep(7)
event.set()
if __name__ == '__main__':
stu1 = Thread(target=student, args=('alex',))
stu2 = Thread(target=student, args=('wusir',))
stu3 = Thread(target=student, args=('yuan',))
t1 = Thread(target=teacher, args=('egon',))
stu1.start()
stu2.start()
stu3.start()
t1.start()
2.
from threading import Thread, Event, currentThread
import time
event = Event()
def conn():
n = 0
while not event.is_set():
if n == 3:
print('%s try too many times' % currentThread().getName())
return
print('%s try %s' % (currentThread().getName(), n ))
event.wait(0.5)
n += 1
print('%s is connected' % currentThread().getName())
def check():
print('%s is checking' % currentThread().getName())
time.sleep(5)
event.set()
if __name__ == '__main__':
for i in range(3):
t = Thread(target=conn)
t.start()
t = Thread(target=check)
t.start()
定時器
指定n秒後執行某操做
代碼實例:
1.
from threading import Timer
def hello():
print('hello world')
t = Timer(1,hello) 一秒後,執行hello函數
t.start()
2. 作一個4位數的驗證碼,有效期是5秒,超過5秒不輸入就刷新出新的驗證碼
from threading import Timer
import random
class Code:
def __init__(self):
self.make_cache()
def make_cache(self, interval=5):
self.cache = self.make_code()
print(self.cache)
self.t = Timer(interval, self.make_cache)
self.t.start()
def make_code(self, n=4):
res = ''
for i in range(n):
s1 = str(random.randint(0, 9))
s2 = chr(random.randint(65, 90))
res += random.choice([s1, s2])
return res
def check(self):
while True:
code = input('>>:').strip()
if code.upper() == self.cache:
print('right')
self.t.cancel()
break
obj = Code()
obj.check()
線程queue
實例:
import queue
#最經常使用
q=queue.Queue() #默認先進先出(FIFO)括號裏面能夠設定最大值
q.put(111)
q.put('hello')
q.put(222)
print(q.get())
print(q.get())
print(q.get())
print(q.get()) 這裏由於只有三個數據,第四次get沒有數據,因此會一直阻塞住
#堆棧,後進先出
q1=queue.LifoQueue() last in first out
q1.put(111)
q1.put(222)
q1.put(333)
print(q1.get()) 取到的是333
#優先級隊列,按照優先級
q2=queue.PriorityQueue()
q2.put([4,'hello4']) put進去的元素,我目前知道列表或元組,第一個元素規定優先級
q2.put([1,'hello']) 一般是數字,數字越小,優先級越高
q2.put([2,'hello2'])
print(q2.get()) 取到的是[1,'hello']
print(q2.get()) 取到的是[2,'hello2']
協程
併發的本質是切換+保存狀態
協程:是單線程下的併發,又稱爲微線程、纖程Coroutine
協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的
python的線程屬於內核級別的,即由操做系統控制調度
單線程內開啓協程,一旦遇到I/O就會從應用程序級別控制切換
優勢:
1.協程的切換開銷小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級
2.單線程內就能夠實現併發的效果,最大限度利用cpu
缺點:
1.協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程
2.協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程
協程必須在只有一個單線程裏實現併發
修改共享數據不需加鎖
用戶程序裏本身保存多個控制流的上下文棧
一個協程遇到IO操做會自動切換到其餘協程
實例:
import time
def consumer():
r=''
while True:
n=yield r
if not n:
return
print('[CONSUMER]<<Consuming%s...'%n)
time.sleep(1)
r='200 OK'
def produce(c):
next(c)
n=0
while n<5:
n+=1
print('[PRODUCER]>>Producing %s...'%n)
cr=c.send(n)
print('[PRODUCER] Consumer return:%s'%cr)
c.close()
if __name__=='__main__':
c=consumer()
produce(c)
greenlet
在單個線程內有多個任務,須要在多個任務之間切換,yield太麻煩,就用greenlet模塊
代碼實例:
from greenlet import greenlet
def test1():
print(12)
gr2.switch()
print(34)
gr2.switch()
def test2():
print(56)
gr1.switch()
print(78)
gr1=greenlet(test1)
gr2=greenlet(test2)
gr1.switch() # 這裏能夠傳參數,只有在第一次switch時能夠傳參數
可是greenlet不能處理I/O阻塞操做,能夠用gevent模塊解決
gevent
異步提交任務,就由於是異步提交的,因此才須要用jion方法阻塞住
協程只有一個單線程,異步提交任務,不join的話,spawn提交了任務就走完了,線程就結束了,可能提交的任務還沒開始,就會直接死掉
因此必定要用join阻塞住主線程,保證線程不死
用法:
g1 = gevent.spawn(func1,1,2,3,4,x=5,y=6) 建立一個協程對象g1,第一個參數放函數名
後面的所有都是給該函數名傳值的
g2 = gevent.spawn(func2)
g1.join() 等待g1的結果
g2.join()
或者上面兩個合併:gevent.joinall([gevent.spawn(func1),gevent.spawn(func2)])
g1.value 拿到g1的返回值
代碼實例:
import gevent,time
def foo():
print('running in foo')
gevent.sleep(2) 用來模擬gevent能夠識別的I/O阻塞,time.sleep()不能識別
print('switch to foo again')
def bar():
print('switch to bar')
gevent.sleep(5)
print('switch to bar again')
start=time.time()
print(start)
#g1 = gevent.spawn(foo)
#g2 = gevent.spawn(bar)
#g1.join()
#g2.join() # 或用下面的方法
gevent.joinall(
[gevent.spawn(foo),
gevent.spawn(bar)] 裏面放列表的形式
)
print(time.time()-start)
切記:須要加上下面這句代碼,gevent才能識別正常的I/O阻塞,否則不能識別time.sleep(1)
from gevent import monkey;monkey.patch_all()
乾脆就直接記住,要用gevent,就在文件的開頭加上這句話!開頭!import socket的前面
#爬蟲
import time
from gevent import monkey;monkey.patch_all()
import gevent
from urllib import request
def f(url):
print('GET:%s'%url)
resp=request.urlopen(url)
data=resp.read()
print('%d bytes received from %s.'%(len(data),url))
start=time.time()
gevent.joinall([
gevent.spawn(f,'https://nba.hupu.com/'),
gevent.spawn(f,'https://www.zhihu.com/'),
gevent.spawn(f,'https://www.bilibili.com/'),
])
print(time.time()-start)
基於gevent模塊實現併發的套接字通訊
client:
import socket
from threading import Thread, currentThread
def client():
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8800))
while True:
client.send(('%s say hello' % currentThread().getName()).encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
client.close()
if __name__ == '__main__':
for i in range(500): # 開500個線程,這樣的話,就是服務端單線程處理500個併發
t = Thread(target=client)
t.start()
server:
from gevent import monkey; monkey.patch_all()
import socket
import gevent
def communicate(conn):
while True:
try:
data = conn.recv(1024)
if not data:
break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
gevent.spawn(communicate, conn)
server.close()
if __name__ == '__main__':
g1 = gevent.spawn(server, '127.0.0.1', 8800)
g1.join()
其實使用yield、greenl和gevent都是在實現協程,單線程下的併發,yield最麻煩,greenlet稍好一點,這二者都不能處理IO操做
gevent最好,其底層封裝的仍是greenlet,能夠處理IO操做
IO模型
同步:在發出一個功能調用時,在沒有獲得結果以前,該調用就不會返回,好比打電話
異步:在發出一個功能調用時,調用者不能馬上獲得結果,好比發短信,一般跟回調機制結合到一塊兒
阻塞:調用結果返回前,當前線程會被掛起,函數只有獲得結果以後纔會將阻塞的線程激活
非阻塞:在不能馬上獲得結果以前也會馬上返回,同時該函數不會阻塞當前線程
當一個read操做發生時,該操做會經歷兩個階段:
等待數據準備 waiting for the data to be ready
將數據從內核拷貝到進程中 copying the data from the kernel to the process
阻塞IO(blocking IO)
在linux中,默認狀況下,全部的socket都是blocking
特色是在IO執行的兩個階段都被block
能夠用多線程模型來解決小規模的服務請求,大規模的要用的非阻塞IO
非阻塞IO(non-blocking IO)
屢次發送系統調用,wait for data階段,數據沒準備好會返回error,而後不停的發
循環往復的進行recvfrom系統調用,這稱之爲輪詢,輪詢檢查內核數據,直到數據準備好
拷貝數據的過程,進程處於阻塞狀態
優勢:wait for data無阻塞 copy data阻塞
缺點:系統調用發送太多,數據不能實時接收到
可是,非阻塞IO毫不被推薦
異步IO
全程無阻塞
IO多路複用(IO multiplexing)
監聽多個連接 select/epoll,優點在於處理多個鏈接
特色:全程阻塞,比阻塞IO多一次系統調用
可以監聽多個文件描述符(套接字對象),進而實現併發
代碼實例:
這是server端
import socket,time,select
sock=socket.socket()
sock.bind(('127.0.0.1',8800))
sock.listen(5)
sock.setblocking(False)
inputs=[sock,] #這個sock是在有一個clent鏈接的時候纔會有變化
while 1:
r,w,e=select.select(inputs,[],[]) #監聽的是有變化的套接字
for obj in r:
if obj==sock:
conn,addr=obj.accept() #這個conn是在clent發送數據的時候發生變化
print('conn',conn) #conn是連接的客戶端的套接字對象
inputs.append(conn)
else:
try:
data = obj.recv(1024)
print(data.decode('utf-8'))
send_data = input('>>:')
obj.send(send_data.encode('utf-8'))
except Exception:
inputs.remove(obj)
#在linux上這樣寫,上面的寫法是應用於windows上
# if not data:
# inputs.remove(obj)
# continue
在全部的IO模型裏面,阻塞IO、非阻塞IO、IO多路複用以及沒學的驅動信號,都是同步IO
只有異步IO是異步的。
關於同步異步的判斷:不管是wait for data 仍是 copy data 只要出現阻塞,那就是同步的
IO多路複用
對於windows而言,只有select,對於linux有select poll epoll三種方式
epoll>poll>select
select缺點:
1.每次調用select都須要把全部的文件描述符fd拷貝到內核空間,致使效率降低;
2.每次調用,都遍歷全部的fd是否有數據訪問,這個過程效率過低,這個問題最重要;
3.最大鏈接數有上限(1024)
poll:跟select基本一致,可是最大鏈接數沒有限制
epoll:
第一個函數:建立epoll句柄,將全部的文件描述符fd拷貝到內存空間,可是隻須要拷一次
第二個函數:回調函數,是某一個函數或動做成功完成後會觸發的函數,
爲全部的fd綁定一個回調函數,一旦有數據訪問,就會觸發該回調函數,
回調函數將fd放到一個鏈表中
第三個函數:判斷鏈表是否爲空
epoll的最大鏈接數沒有上限(相對的)
selectors模塊
三種IO多路複用模型在不一樣平臺有不一樣支持,
使用seletors幫咱們默認選擇當前平臺下合適的IO多路複用模型
代碼實例:
server端
import selectors #基於select模塊實現的IO多路複用,建議使用
import socket
sock=socket.socket()
sock.bind(('127.0.0.1',8800))
sock.listen(5)
sock.setblocking(False)
=selectors.DefaultSelector() #根據具體平臺選擇最佳IO多路複用機制,好比linux上選epoll
def read(conn,mask):
try:
data=conn.recv(1024)
print(data.decode('utf-8'))
send_data=input('>>:')
conn.send(send_data.encode('utf-8'))
except Exception:
sel.unregister(conn) #解除綁定,避免出現一個客戶端認爲關閉服務端崩掉的狀況
def accept(sock,mask):
conn,addr=sock.accept()
#print(conn)
sel.register(conn,selectors.EVENT_READ,read)
sel.register(sock,selectors.EVENT_READ,accept) #註冊事件
while 1:
#print('waiting...')
events=sel.select() #監聽
for key,mask in events:
#print(key.data) #當前綁定的方法,好比accept,read
#print(key.fileobj) #當前活動的文件描述符,好比sock,conn
func=key.data
obj=key.fileobj
func(obj,mask)
#官方的列子以下:
import selectors
import socket
sel = selectors.DefaultSelector()
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print('accepted', conn, 'from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
data = conn.recv(1000) # Should be ready
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
socketserver
服務端的特色:
1.一直運行提供服務,即連接循環,通訊循環是基於一個連接的
2.綁定一個惟一的地址
socketserver模塊分爲兩大類:server類解決連接問題,request解決通訊問題
代碼實例:
server端:
import socketserver
class FTPserver(socketserver.BaseRequestHandler): #通信
def handle(self): 這個是固定死的,必須定義handle方法,上面的繼承也是固定的
print(self)
print(self.request) 獨有的request方法,其實就是conn,一個套接字對象
while True:
data=self.request.recv(1024)
print(data)
self.request.send(data.upper())
if __name__=='__main__':
obj=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FTPserver) 基於多線程實現併發
obj.serve_forever() #連接循環
client端:
import socket
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.connect(('127.0.0.1',8080))
while True:#通訊循環
msg=input('>>:').strip()
if not msg:continue #判斷msg是否爲空,爲空的話就continue
phone.send(msg.encode('utf-8'))
data=phone.recv(1024)
print(data)
phone.close()