每次執行程序(好比說瀏覽器,音樂播放器)的時候都會完成必定的功能,好比說瀏覽器幫咱們打開網頁。 進程就是一個程序在一個數據集上的一次動態執行過程。進程通常由程序、數據集、進程控制塊三部分組成。進程的建立、撤銷和切換的開銷比較大
python
線程也叫輕量級進程,它是一個基本的CPU執行單元,也是程序執行過程當中的最小單元,線程的引入減少了程序併發執行時的開銷。線程沒有本身的系統資源,只擁有在運行時必不可少的資源。但線程能夠與同屬與同一進程的其餘線程共享進程所擁有的其餘資源。線程是屬於進程的,線程運行在進程空間內,同一進程所產生的線程共享同一內存空間,當進程退出時該進程所產生的線程都會被強制退出並清除。linux
協程,又稱微線程,線程是系統級別的它們由操做系統調度,而協程則是程序級別的由程序根據須要本身調度。在一個線程中會有不少函數,咱們把這些函數稱爲子程序,在子程序執行過程當中能夠中斷去執行別的子程序,而別的子程序也能夠中斷回來繼續執行以前的子程序,這個過程就稱爲協程。也就是說在同一線程內一段代碼在執行過程當中會中斷而後跳轉執行別的代碼,接着在以前中斷的地方繼續開始執行,相似與yield操做。協程是一中多任務實現方式,它不須要多個進程或線程就能夠實現多任務。算法
multiprocessing是python的多進程管理包。windows
threading 模塊創建在 _thread 模塊之上。_thread 模塊以低級、原始的方式來處理和控制線程,而 threading 模塊經過對 thread 進行二次封裝,提供了更方便的 api 來處理線程。api
greenlet、gevent(第三方模塊)能夠實現協程瀏覽器
程序是指令和數據的有序集合,其自己沒有任何運行的含義,是一個靜態的概念。安全
進程是一個「執行中的程序」,進程的實質是程序的一次執行過程,進程是動態產生,動態消亡的。進程是一個能獨立運行的基本單位,同時也是系統分配資源和調度的獨立單位;進程由程序、數據和進程控制塊三部分組成。因爲進程間的相互制約,使進程具備執行的間斷性,即進程按各自獨立的、不可預知的速度向前推動服務器
先來先服務(FCFS)調度算法是一種最簡單的調度算法,該算法既能夠做業調度,也能夠做用域進程調度。FCFS算法比較有利於長做業(進程),而不利於短做業(進程)。由此可知,本算法適合於CPU繁忙型做業,而不利於I/O繁忙型做業(進程)。markdown
短做業(進程)優先調度算法(SJ/PF)是指對短做業或者短進程優先調度的算法,該算法既能夠用於做業調度,也可用於進程調度。但其對長做業不利;不能保證緊迫性做業(進程)被及時處理;做業的長短只是被估算出來的。網絡
時間片輪轉(Round Robin,RR)將CPU的處理時間分紅固定大小的時間片,若是一個進程在被調度選中以後用完了系統規定的時間片,但又未完成要求的任務,則它自行釋放本身所佔有的CPU而排到就緒隊列的末尾,等待下一次調度。同時,進程調度程序又去調度當前就緒隊列中的第一個進程。
並行:並行是指二者同時執行,好比賽跑,兩我的都在不停的往前跑;(資源夠用,好比三個線程,四核CPU)
併發:並行是指資源有限的狀況下,二者交替輪流使用資源,好比一段路(單核CPU資源)同時只能過一我的,A走一段後,讓給B,B用完繼續給A,交替使用,目的是提升效率。
區別:
並行是從微觀上,也就是在一個精確的時間片刻,有不一樣的程序在執行,這就要求必須有多個處理器。
併發是從宏觀上,在一個時間段上能夠看出是同時執行,好比一個服務器同時處理多個session。
同步:所謂同步就是一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列
。要麼成功都成功,失敗都失敗,兩個任務的狀態能夠保持一致。
異步:所謂異步是不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了
。至於被依賴的任務最終是否真正完成,依賴它的任務沒法肯定,因此它是不可靠的任務序列
。
好比我去銀行辦理業務,可能會有兩種方式: 第一種 :選擇排隊等候; 第二種 :選擇取一個小紙條上面有個人號碼,等到排到我這一號時由櫃檯的人通知我輪到我去辦理業務了; 第一種:前者(排隊等候)就是同步等待消息通知,也就是我要一直在等待銀行辦理業務狀況; 第二種:後者(等待別人通知)就是異步等待消息通知。在異步消息處理中,等待消息通知者(在這個例子中就是等待辦理業務的人)每每註冊一個回調機制,
在所等待的事件被觸發時由觸發機制(在這裏是櫃檯的人)經過某種機制(在這裏是寫在小紙條上的號碼,喊號)找到等待該事件的人。
阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來講的
不管是排隊仍是使用號碼等待通知,若是在這個等待的過程當中,等待者除了等待消息通知以外不能作其它的事情,那麼該機制就是阻塞的,表如今程序中,也就是該程序一直阻塞在
該函數調用處不能繼續往下執行。相反,有的人喜歡在銀行辦理這些業務的時候一邊打打電話發發短信一邊等待,這樣的狀態就是非阻塞的,由於他(等待者)沒有阻塞在這個消息
通知上,而是一邊作本身的事情一邊等待。 注意:同步非阻塞形式其實是效率低下的,想象一下你一邊打着電話一邊還須要擡頭看到底隊伍排到你了沒有。若是把打電話和觀察排隊的位置當作是程序的兩個操做的話,這個
程序須要在這兩種不一樣的行爲之間來回的切換,效率可想而知是低下的;而異步非阻塞形式卻沒有這樣的問題,由於打電話是你(等待者)的事情,而通知你則是櫃檯(消息觸發機制)
的事情,程序沒有在兩種不一樣的操做中來回切換。
(1)同步阻塞形式
效率最低。拿上面的舉例來講,就是你專心排隊,什麼別的事都不作
(2)異步阻塞形式
若是在銀行等待辦理業務的人採用的是異步的方式去等待消息被觸發(通知),也就是領了一張小紙條,假如在這段時間裏他不能離開銀行去作其它的事情,那麼很顯然,這我的被阻塞在了這個等待的操做上面;異步操做也能夠被阻塞住的,只不過它不是在處理消息時阻塞,而是在等待消息通知書時被阻塞。
(3)同步非阻塞形式
其實是效率低下的。想象一下你一邊打着電話一邊還須要擡頭看到底隊伍排到你了沒有,若是把打電話和觀察排隊的位置當作是程序的兩個操做的話,這個程序須要在這兩種不一樣的行爲之間來回的切換
,效率可想而知是低下的。
(4)異步非阻塞形式
效率更高,由於打電話是你(等待者)的事情,而通知你則是櫃檯(消息觸發機制)的事情,程序沒有在兩種不一樣的操做中來回切換。
好比說,這我的忽然發覺本身煙癮犯了,須要出去抽根菸,因而他告訴大堂經理說,排到我這個號碼的時候麻煩到外面通知我一下,那麼他就沒有被阻塞在這個等待的操做上面,天然這個就是異步+非阻塞的方式了。
不少人會把同步和阻塞混淆,是由於不少時候同步操做會以阻塞的形式表現出來
不少人也會把異步和非阻塞混淆,由於異步操做通常都不會在真正的IO操做處被阻塞
。
一、multiprocessing模塊
python中的多線程沒法利用多核優點,若是想要充分的使用CPU資源(os.cpu_count()查看),在python中大部分狀況須要使用多進程。Python中提供了multiprocess模塊。multiprocess中幾乎包含了和進程有關的全部子模塊。大體分爲四個部分:建立進程部分、進程同步部分、進程池部分、進程之間數據共享。multiprocessing經常使用組件及功能:
1.一、管理進程模塊:
1.二、同步子進程模塊:
二、Array,Value---共享數據
若是你真有須要共享數據, multiprocessing提供了兩種方式。
(1)multiprocessing,Array,Value
數據能夠用Value或Array存儲在一個共享內存地圖裏,以下:
from multiprocessing importArray,Value,Process def func(a,b): a.value = 3.333333333333333 for i in range(len(b)): b[i] = -b[i] if __name__ == "__main__": num = Value('d',0.0) arr = Array('i',range(11)) c = Process(target=func,args=(num,arr)) d= Process(target=func,args=(num,arr)) c.start() d.start() c.join() d.join() print(num.value) for i in arr: print(i)<br> #輸出: #3.1415927 #[0, -1, -2,-3, -4, -5, -6, -7, -8, -9]
建立num和arr時,「d」和「i」參數由Array模塊使用的typecodes建立:「d」表示一個雙精度的浮點數,「i」表示一個有符號的整數,這些共享對象將被線程安全的處理。
Array(‘i’, range(10))中的‘i’參數:
‘c’: ctypes.c_char ‘u’: ctypes.c_wchar ‘b’: ctypes.c_byte ‘B’: ctypes.c_ubyte
‘h’: ctypes.c_short ‘H’: ctypes.c_ushort ‘i’: ctypes.c_int ‘I’: ctypes.c_uint
‘l’: ctypes.c_long, ‘L’: ctypes.c_ulong ‘f’: ctypes.c_float ‘d’: ctypes.c_double
(2)Manager
由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array類型的支持。
from multiprocessing importProcess,Manager def f(d,l): d["name"] = "zhangyanlin" d["age"] = 18 d["Job"] = "pythoner" l.reverse() if __name__ == "__main__": with Manager() as man: d = man.dict() l = man.list(range(10)) p = Process(target=f,args=(d,l)) p.start() p.join() print(d) print(l) #輸出: #{0.25: None, 1: '1', '2': 2} #[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Process模塊是一個建立進程的模塊,藉助這個模塊,就能夠完成進程的建立。
Process([group [, target [, name [, args [, kwargs]]]]]) 強調: 1. 須要使用關鍵字的方式來指定參數 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號
方法介紹:
1 p.start():啓動進程,並調用該子進程中的p.run() 2 p.run():進程啓動時運行的方法,正是它去調用target指定的函數,咱們自定義類的類中必定要實現該方法 3 p.terminate():強制終止進程p,不會進行任何清理操做,若是p建立了子進程,該子進程就成了殭屍進程,使用該方法須要特別當心這種狀況。若是p還保存了一個鎖那麼也將不會被釋放,進而致使死鎖 4 p.is_alive():若是p仍然運行,返回True 5 p.join([timeout]):主線程等待p終止(強調:是主線程處於等的狀態,而p是處於運行的狀態)。timeout是可選的超時時間,須要強調的是,p.join只能join住start開啓的進程,而不能join住run開啓的進程
屬性介紹:
1 p.daemon:默認值爲False,若是設爲True,表明p爲後臺運行的守護進程。設定爲True後,p不能建立本身的新進程,必須在p.start()以前設置 2 p.name:進程的名稱 3 p.pid:進程的pid 4 p.exitcode:進程在運行時爲None、若是爲–N,表示被信號N結束(瞭解便可) 5 p.authkey:進程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字符串。這個鍵的用途是爲涉及網絡鏈接的底層進程間通訊提供安全性(瞭解便可)
#方法一; import os from multiprocessing import Process def func1(name): print('hello', name) print("我是子進程: %d;個人父進程id是:%d" % (os.getpid(), os.getppid())) def func2(): print('hello') if __name__ == '__main__': p1 = Process(target=func1, args=('xiaobai',)) # 此處傳參必須是元組數據類型 p1.start() print("我是父進程:%d" % os.getpid()) p2 = Process(target=func2) p2.start() ''' # 執行結果 我是父進程:12612 hello xiaobai 我是子進程: 5760; 個人父進程id是:12612 '''
# 方法二:# 經過繼承Process類的形式開啓進程的方式 import os from multiprocessing import Process class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): #固定名字run !!! print(os.getpid()) print('%s 正在和女神聊天' % self.name) if __name__ == '__main__': p1 = MyProcess('xiaobai') p2 = MyProcess('xiaohei') p1.start() # start會自動調用run方法 p2.start() # 說明:若是須要傳參,必須寫入到__init__方法裏面,且必須加上super().__init__();由於父類Process裏面也有__init__方法。
Process對象的join方法
import time from multiprocessing import Process def func(name): print("hello", name) time.sleep(1) print('我是子進程') if __name__ == '__main__': p = Process(target=func, args=('xiaobai',)) p.start() p.join() # 加上join方法後,父進程就會阻塞等待子進程結束而結束。 print("父進程")
Process開啓多進程
多個進程同事運行(注意,子進程的執行順序不是根據自動順序決定的)
import time from multiprocessing import Process def func(name): print("hello 進程 %d" % name ) time.sleep(1) if __name__ == '__main__': for i in range(10): p = Process(target=func, args=(i,)) p.start()
import time from multiprocessing import Process def func(name): print("hello 進程 %d" % name ) time.sleep(0.1) if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=func, args=(i,)) p.start() p_lst.append(p) p.join() print("父進程執行中")
進程之間的數據隔離問題
from multiprocessing import Process n = 100 #在windows系統中把全局變量定義在if __name__ == '__main__'之上就能夠了 def work(): global n n = 0 print("子進程內:", n) if __name__ == '__main__': p = Process(target=work) p.start() print("主進程內:", n)
主進程建立守護進程,守護進程會隨着主進程的結束而結束。守護進程內沒法再開啓子進程,不然拋出異常:AssertionError: daemonic processes are not allowed to have children
import time from multiprocessing import Process def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() time.sleep(0.1) print("main-------") #打印該行則主進程代碼結束,則守護進程p1應該被終止. #可能p1執行的打印信息任務會由於主進程打印(main----)被終止.
socket聊天併發實例
from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': #windows下start進程必定要寫到這下面 while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr)) p.start() 使用多進程實現socket聊天併發-server
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8')) 使用多進程實現socket聊天併發-client
當多個進程使用同一份數據資源的時候,就會引起數據安全或順序混亂問題。
# 多進程搶佔輸出資源 import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) if __name__ == '__main__': for i in range(3): p = Process(target=work, args=(i,)) p.start() # 執行結果 """ 0: 14316 is running 1: 9900 is running 2: 10056 is running 1: 9900 is done 2: 10056 is done 0: 14316 is done """
# 使用鎖維護執行順序 import os import time import random from multiprocessing import Process, Lock def work(lock, n): lock.acquire() print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() if __name__ == '__main__': lock = Lock() for i in range(3): p = Process(target=work, args=(lock, i)) p.start() # 執行結果 """ 0: 15276 is running 0: 15276 is done 1: 6360 is running 1: 6360 is done 2: 14776 is running 2: 14776 is done """
上面這種狀況雖然使用加鎖的形式實現了順序的執行,可是程序又從新變成串行了,沒錯,加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行修改,速度是慢了,但犧牲了速度卻保證了數據的安全性。所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題,這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道
隊列和管道都是將數據存放於內存中,隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來,咱們應該儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可擴展性。
四、進程池(Using a pool of workers)
Pool類描述了一個工做進程池,他有幾種不一樣的方法讓任務卸載工做進程。
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。咱們能夠用Pool類建立一個進程池, 展開提交的任務給進程池。 例:
#apply from multiprocessing import Pool import time def f1(i): time.sleep(0.5) print(i) return i + 100 if __name__ == "__main__": pool = Pool(5) for i in range(1,31): pool.apply(func=f1,args=(i,)) #apply_async def f1(i): time.sleep(0.5) print(i) return i + 100 def f2(arg): print(arg) if __name__ == "__main__": pool = Pool(5) for i in range(1,31): pool.apply_async(func=f1,args=(i,),callback=f2) pool.close() pool.join()
一個進程池對象能夠控制工做進程池的哪些工做能夠被提交,它支持超時和回調的異步結果,有一個相似map的實現。
注意:Pool對象的方法只能夠被建立pool的進程所調用。
apply(func[, args[, kwds]]) :使用arg和kwds參數調用func函數,結果返回前會一直阻塞,因爲這個緣由,apply_async()更適合併發執行,另外,func函數僅被pool中的一個進程運行。
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : 是apply()的一個變體,會返回一個結果對象。若是callback被指定,那麼callback能夠接收一個參數而後被調用,當結果準備好回調時會調用callback,調用失敗時,則用error_callback替換callback。 Callbacks應被當即完成,不然處理結果的線程會被阻塞。
close() : 阻止更多的任務提交到pool,待任務完成後,工做進程會退出。
terminate() : 無論任務是否完成,當即中止工做進程。在對pool對象進程垃圾回收的時候,會當即調用terminate()。
join() : wait工做線程的退出,在調用join()前,必須調用close() or terminate()。這樣是由於被終止的進程須要被父進程調用wait(join等價與wait),不然進程會成爲殭屍進程。
map(func, iterable[, chunksize])
map_async(func, iterable[, chunksize[, callback[, error_callback]]])
imap(func, iterable[, chunksize])
imap_unordered(func, iterable[, chunksize])
starmap(func, iterable[, chunksize])
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
threading 模塊創建在 _thread 模塊之上。thread 模塊以低級、原始的方式來處理和控制線程,而 threading 模塊經過對 thread 進行二次封裝,提供了更方便的 api 來處理線程。Thread方法:
t.start() : 激活線程,
t.getName() : 獲取線程的名稱
t.setName() : 設置線程的名稱
t.name : 獲取或設置線程的名稱
t.is_alive() : 判斷線程是否爲激活狀態
t.isAlive() :判斷線程是否爲激活狀態
t.setDaemon() 設置爲後臺線程或前臺線程(默認:False);經過一個布爾值設置線程是否爲守護線程,必須在執行start()方法以後纔可使用。若是是後臺線程,主線程執行過程當中,後臺線程也在進行,主線程執行完畢後,後臺線程不論成功與否,均中止;若是是前臺線程,主線程執行過程當中,前臺線程也在進行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止
t.isDaemon() : 判斷是否爲守護線程
t.ident :獲取線程的標識符。線程標識符是一個非零整數,只有在調用了start()方法以後該屬性纔有效,不然它只返回None。
t.join() :逐個執行每一個線程,執行完畢後繼續往下執行,該方法使得多線程變得無心義
t.run() :線程被cpu調度後自動執行線程對象的run方法
from threading import Thread from threading import currentThread # 獲取當前線程對象的 對象 import time def task(): print('%s is runing' %currentThread().getName()) # 獲取線程名 time.sleep(2) print('%s is down' % currentThread().getName()) if __name__ == '__main__': t = Thread(target=task, name='這裏設置子線程初始化名') t.start() t.setName('設置線程名') # !!!! t.join() # 等待子線程運行結束 # currentThread() 等同於 線程對象t 因此獲取線程名也能夠t.getName() print('主線程', currentThread().getName()) # 但在主線程內(並無線程對象)要獲取線程名必須用 currentThread().getName() t.isAlive() # 線程是否存活! 查看線程對象是否存活
#方法1 from threading import Thread # 建立線程的模塊 def task(name): print(name) if __name__ == '__main__': # 開啓線程 參數1:方法名(不要帶括號) 參數2:參數(元祖) 返回對象 p = Thread(target=task, args=('線程1',)) p.start() # 只是給操做系統發送了一個就緒信號,並非執行。操做系統接收信號後安排cpu運行 print('主') #方法2 - 類的方法 from threading import Thread # 建立線程的模塊 class MyThread(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): # 固定名字run !!!必須用固定名 print(self.name) if __name__ == '__main__': # 必需要這樣啓動 p = MyThread('子線程1') p.start() print('主)
二、線程鎖threading.RLock和threading.Lock
因爲線程之間是進行隨機調度,而且每一個線程可能只執行n條執行以後,CPU接着執行其餘線程。爲了保證數據的準確性,引入了鎖的概念。因此可能出現以下問題:
例:假設列表A的全部元素就爲0,當一個線程從前向後打印列表的全部元素,另一個線程則從後向前修改列表的元素爲1,那麼輸出的時候,列表的元素就會一部分爲0,一部分爲1,這就致使了數據的不一致。鎖的出現解決了這個問題。
import threading import time globals_num = 0 lock = threading.RLock() def Func(): lock.acquire() # 得到鎖 global globals_num globals_num += 1 time.sleep(1) print(globals_num) lock.release() # 釋放鎖 for i in range(10): t =threading.Thread(target=Func) t.start()
RLock容許在同一線程中被屢次acquire。而Lock卻不容許這種狀況。 若是使用RLock,那麼acquire和release必須成對出現,即調用了n次acquire,必須調用n次的release才能真正釋放所佔用的瑣。
三、threading.Event
python線程的事件用於主線程控制其餘線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個「Flag」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
import threading def do(event): print('start') event.wait() print('execute') event_obj = threading.Event() for i in range(10): t = threading.Thread(target=do, args=(event_obj,)) t.start() event_obj.clear() inp = input('input:') if inp == 'true': event_obj.set()
當線程執行的時候,若是flag爲False,則線程會阻塞,當flag爲True的時候,線程不會阻塞。它提供了本地和遠程的併發性。
五、threading.Condition
Condition類實現了一個conditon變量。 這個conditiaon變量容許一個或多個線程等待,直到他們被另外一個線程通知。 若是lock參數,被給定一個非空的值,,那麼他必須是一個lock或者Rlock對象,它用來作底層鎖。不然,會建立一個新的Rlock對象,用來作底層鎖。
wait(timeout=None) : 等待通知,或者等到設定的超時時間。當調用這wait()方法時,若是調用它的線程沒有獲得鎖,那麼會拋出一個RuntimeError 異常。 wati()釋放鎖之後,在被調用相同條件的另外一個進程用notify() or notify_all() 叫醒以前 會一直阻塞。wait() 還能夠指定一個超時時間。
若是有等待的線程,notify()方法會喚醒一個在等待conditon變量的線程。notify_all() 則會喚醒全部在等待conditon變量的線程。 notify()和notify_all()不會釋放鎖,也就是說,線程被喚醒後不會馬上返回他們的wait() 調用。除非線程調用notify()和notify_all()以後放棄了鎖的全部權。
例子: 生產者-消費者模型,
import threading import time def consumer(cond): with cond: print("consumer before wait") cond.wait() print("consumer after wait") def producer(cond): with cond: print("producer before notifyAll") cond.notifyAll() print("producer after notifyAll") condition = threading.Condition() c1 = threading.Thread(name="c1", target=consumer, args=(condition,)) c2 = threading.Thread(name="c2", target=consumer, args=(condition,)) p = threading.Thread(name="p", target=producer, args=(condition,)) c1.start() time.sleep(2) c2.start() time.sleep(2) p.start()
五、queue模塊
Queue 就是對隊列,它是線程安全的,,舉例來講,咱們去麥當勞吃飯。飯店裏面有廚師職位,前臺負責把廚房作好的飯賣給顧客,顧客則去前臺領取作好的飯。這裏的前臺就至關於咱們的隊列。造成管道樣,廚師作好飯經過前臺傳送給顧客,所謂單向隊列
這個模型也叫生產者-消費者模型。
import queue q = queue.Queue(maxsize=0) # 構造一個先進顯出隊列,maxsize指定隊列長度,爲0 時,表示隊列長度無限制。 q.join() # 等到隊列爲kong的時候,在執行別的操做 q.qsize() # 返回隊列的大小 (不可靠) q.empty() # 當隊列爲空的時候,返回True 不然返回False (不可靠) q.full() # 當隊列滿的時候,返回True,不然返回False (不可靠) q.put(item, block=True, timeout=None) # 將item放入Queue尾部,item必須存在,能夠參數block默認爲True,表示當隊列滿時,會等待隊列給出可用位置, 爲False時爲非阻塞,此時若是隊列已滿,會引起queue.Full 異常。 可選參數timeout,表示 會阻塞設置的時間,事後, 若是隊列沒法給出放入item的位置,則引起 queue.Full 異常 q.get(block=True, timeout=None) # 移除並返回隊列頭部的一個值,可選參數block默認爲True,表示獲取值的時候,若是隊列爲空,則阻塞,爲False時,不阻塞, 若此時隊列爲空,則引起 queue.Empty異常。 可選參數timeout,表示會阻塞設置的時候,事後,若是隊列爲空,則引起Empty異常。 q.put_nowait(item) # 等效於 put(item,block=False) q.get_nowait() # 等效於 get(item,block=False)
代碼以下:
#!/usr/bin/env python import Queue import threading message = Queue.Queue(10) def producer(i): while True: message.put(i) def consumer(i): while True: msg = message.get() for i in range(12): t =threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): t =threading.Thread(target=consumer, args=(i,)) t.start()
協程存在的意義:對於多線程應用,CPU經過切片的方式來切換線程間的執行,線程切換時須要耗時(保存狀態,下次繼續)。協程,則只使用一個線程,在一個線程中規定某個代碼塊執行順序。協程的適用場景:當程序中存在大量不須要CPU的操做時(IO),適用於協程;
import time def A(): while 1: print('------A-----') time.sleep(0.1) yield() def B(): while 1: print('-------B-----') time.sleep(0.1) next(a) a = A() B()
執行結果:
-------B----- ------A----- -------B----- ------A----- -------B----- ------A----- -------B----- ------A----- -------B----- ------A----- ···
yield能實現協程,不過實現過程不易於理解,greenlet是在這方面作了改進。
from greenlet import greenlet import time def A(): while 1: print('-------A-------') time.sleep(0.5) g2.switch() def B(): while 1: print('-------B-------') time.sleep(0.5) g1.switch() g1 = greenlet(A) #建立協程g1 g2 = greenlet(B) g1.switch() #跳轉至協程g1
執行結果:
-------A------- -------B------- -------A------- -------B------- -------A------- ···
greenlet能夠實現協程,不過每一次都要人爲的去指向下一個該執行的協程,顯得太過麻煩。gevent每次遇到io操做,須要耗時等待時,會自動跳到下一個協程繼續執行。
gevent 是一個第三方庫,能夠輕鬆經過gevent實現協程程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。gevent會主動識別程序內部的IO操做,當子程序遇到IO後,切換到別的子程序。若是全部的子程序都進入IO,則阻塞。
import gevent def A(): while 1: print('-------A-------') gevent.sleep(1) #用來模擬一個耗時操做,注意不是time模塊中的sleep def B(): while 1: print('-------B-------') gevent.sleep(0.5) #每當碰到耗時操做,會自動跳轉至其餘協程 g1 = gevent.spawn(A) # 建立一個協程 g2 = gevent.spawn(B) g1.join() #等待協程執行結束 g2.join()
執行結果:
-------A------- -------B------- -------B------- -------A------- -------B------- -------B------- -------A------- -------B------- -------B------- ···
import gevent from gevent import monkey,socket monkey.patch_all() #有IO才作時須要這一句 s = socket.socket(2,1) #用的都是gevent模塊中的socket,但用法同樣 s.setsockopt(1,2,1) s.bind(('',8080)) s.listen(1024) def func_accept(): while 1: cs,userinfo = s.accept() print('來了一個客戶'+str(userinfo)) g = gevent.spawn(func_recv,cs) #每當有用戶鏈接,增長一條協程 def func_recv(cs): while 1: recv_data = cs.recv(1024) print(recv_data) #程誰堵塞了,便會跳轉至其餘協程 if len(recv_data) > 0: cs.send(recv_data) else: cs.close() break g1 = gevent.spawn(func_accept) g1.join()