開始說併發編程以前,最好有必定的底層知識積累,這裏我把須要的知識總結了一下,若是看下面的有不理解的能夠看一下:http://www.javashuo.com/article/p-rquxjwfh-dk.htmlhtml
不難看出,互斥鎖是信號量的一種特殊狀況(n=1時)。也就是說,徹底能夠用後者替代前者。可是,由於互斥鎖較爲簡單,且效率高,因此在必須保證資源獨佔的狀況下,仍是採用這種設計。python
上面的內容轉載自:http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html編程
看了上面簡單的,再說一下複雜的api
進程(英語:process),是計算機中已運行程序的實體。進程爲曾經是分時系統的基本運做單位。在面向進程設計的系統(如早期的UNIX,Linux 2.4及更早的版本)中,進程是程序的基本執行實體;在面向線程設計的系統(如當代多數操做系統、Linux 2.6及更新的版本)中,進程自己不是基本運行單位,而是線程的容器。程序自己只是指令、數據及其組織形式的描述,進程纔是程序(那些指令和數據)的真正運行實例。若干進程有可能與同一個程序相關係,且每一個進程皆能夠同步(循序)或異步(平行)的方式獨立運行。現代計算機系統可在同一段時間內以進程的形式將多個程序加載到存儲器中,並藉由時間共享(或稱時分複用),以在一個處理器上表現出同時(平行性)運行的感受。一樣的,使用多線程技術(多線程即每個線程都表明一個進程內的一個獨立執行上下文)的操做系統或計算機體系結構,一樣程序的平行線程,可在多CPU主機或網絡上真正同時運行(在不一樣的CPU上)。安全
關於進程在操做系統中的狀態及調度流程以下圖性能優化
簡單圖:網絡
複雜圖:數據結構
線程(英語:thread)是操做系統可以進行運算調度的最小單位。它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。在Unix System V及SunOS中也被稱爲輕量進程(lightweight processes),但輕量進程更多指內核線程(kernel thread),而把用戶線程(user thread)稱爲線程。
線程是獨立調度和分派的基本單位。線程能夠爲操做系統內核調度的內核線程,如Win32線程;由用戶進程自行調度的用戶線程,如Linux平臺的POSIX Thread;或者由內核與用戶進程,如Windows 7的線程,進行混合調度。
同一進程中的多條線程將共享該進程中的所有系統資源,如虛擬地址空間,文件描述符和信號處理等等。但同一進程中的多個線程有各自的調用棧(call stack),本身的寄存器環境(register context),本身的線程本地存儲(thread-local storage)。
一個進程能夠有不少線程,每條線程並行執行不一樣的任務。
在多核或多CPU,或支持Hyper-threading的CPU上使用多線程程序設計的好處是顯而易見,即提升了程序的執行吞吐率。在單CPU單核的計算機上,使用多線程技術,也能夠把進程中負責I/O處理、人機交互而常被阻塞的部分與密集計算的部分分開來執行,編寫專門的workhorse線程執行密集計算,從而提升了程序的執行效率。多線程
總結:併發
線程在執行過程當中與進程仍是有區別的。每一個獨立的線程有一個程序運行的入口、順序執行序列和程序的出口。可是線程不可以獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。
進程是具備必定獨立功能的程序關於某個數據集合上的一次運行活動,進程是系統進行資源分配和調度的一個獨立單位. 線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程本身基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器,一組寄存器和棧)可是它可與同屬一個進程的其餘的線程共享進程所擁有的所有資源. 一個線程能夠建立和撤銷另外一個線程;同一個進程中的多個線程之間能夠併發執行。
關於python的GIL鎖簡單說就是:不管你啓多少個線程,你有多少個cpu, Python在執行的時候會淡定的在同一時刻只容許一個線程運行。
因爲GIL的存在,python中多線程其實並非真正的多線程,若是想要充分發揮CPU的資源,在python中大部分狀況須要使用多進程。然後面又通過優化,出現了多線程加協程。
在開發中,有兩種比較常見的處理狀況,一個是IO密集型,一個是計算密集型,因爲GIL鎖的存在,因此python仍是比較適用於IO操做,由於這樣能發揮多線程的能力,而要實現計算密集型就須要開多進程,纔可以真正的發揮計算機多核的能力。
這裏轉一下別人的文章,感受寫的很全面,很少說明:http://python.jobbole.com/87743/
在python中使用的是threading模塊,它創建在thread 模塊之上。thread模塊以低級、原始的方式來處理和控制線程,而threading 模塊經過對thread進行二次封裝,提供了更方便的api來處理線程。
直接調用
import threading import time def sayhi(num): #定義每一個線程要運行的函數 print("running on number:%s" %num) time.sleep(3) if __name__ == '__main__': t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程實例 t2 = threading.Thread(target=sayhi,args=(2,)) #生成另外一個線程實例 t1.start() #啓動線程,能夠認爲是讓進程進入上面進程圖中的就緒狀態 t2.start() #啓動另外一個線程 print(t1.getName()) #獲取線程名 print(t2.getName())
注意,函數傳入的時候要傳入函數對象,也就是不加括號的形式。
繼承式調用
import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self): #繼承式調用必需要重寫這個函數 print("running on number:%s" %self.num) # self.name是這個線程的名字 time.sleep(3) if __name__ == '__main__': t1 = MyThread(1) t2 = MyThread(2) t1.start() t2.start()
將線程聲明爲守護線程,必須在start() 方法調用以前設置, 若是不設置爲守護線程程序會被無限掛起。這個方法基本和join是相反的。
當咱們 在程序運行中,執行一個主線程,若是主線程又建立一個子線程,主線程和子線程 就分兵兩路,分別運行,那麼當主線程完成
想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成後再退出。可是有時候咱們須要的是 只要主線程
完成了,無論子線程是否完成,都要和主線程一塊兒退出,這時就能夠 用setDaemon方法啦
首先看看下面的代碼
import time import threading NUM = 100 # 設定一個共享變量 thread_list = [] def lessNum(): global NUM # 在每一個線程中都獲取這個全局變量 temp = NUM time.sleep(0.1) NUM = temp - 1 # 對此公共變量進行-1操做 if __name__ == '__main__': for i in range(100): t = threading.Thread(target=lessNum) t.start() thread_list.append(t) for t in thread_list: # 等待全部線程執行完畢 t.join() print('final num:', NUM)
觀察:time.sleep(0.1) /0.001/0.0000001 結果分別是多少?
按照咱們的預想,應該是100個線程每次減1,而後結果是0,可是實際上不是這樣,
當讓函數睡0.1秒後,個人電腦的NUM的結果是99,
當讓函數睡0.001秒後個人電腦的NUM的結果是90左右,
當讓函數睡0.001秒後個人電腦的NUM的結果是也是90左右,
固然了,不一樣電腦由於運算速度不同,可能會產生不一樣的結果。可是不管怎樣也不是咱們預想的結果0,是由於,當線程開啓後,這100個線程操做的是同一個資源,當讓函數睡0.1秒後,按照如今大部分電腦的運行速度,足夠全部的線程得到那個NUM,而那個NUM的值都是100,減一以後,至關於給這個NUM賦值了100個100-1的結果,天然就是99,後面的0.001,和0.0000001結果也大體是這樣的流程,只不過隨着時間的減小,線程被cpu調用時須要必定的時間,因此一些線程處理的是其餘線程處理以後的結果。
由此能夠看出多個線程都在同時操做同一個共享資源,可能形成了資源破壞,怎麼辦呢?(join會形成串行,失去所線程的意義)
咱們能夠經過同步鎖來解決這種問題,咱們將核心的處理共享數據的地方用鎖鎖住,就像引子中寫的那樣,同一時間內只讓一個線程訪問這個資源。
互斥鎖的格式爲:
lock = threading.Lock()
lock.acquire()
要鎖的內容
lock.release()
上面例子中加鎖以後的樣子
import time import threading NUM = 100 # 設定一個共享變量 thread_list = [] def lessNum(): global NUM # 在每一個線程中都獲取這個全局變量 lock.acquire() temp = NUM time.sleep(0.0000001) NUM = temp - 1 # 對此公共變量進行-1操做 lock.release() if __name__ == '__main__': lock = threading.Lock() for i in range(100): t = threading.Thread(target=lessNum) t.start() thread_list.append(t) for t in thread_list: # 等待全部線程執行完畢 t.join() print('final num:', NUM)
能夠看出來,這樣修改了以後就像是串行的了,而這個和python沒有關係,其餘的編程語言也是這樣解決的。那麼這樣,多線程還有意義嗎?答案是確定的,由於只有這一部分須要加鎖,其餘部分則不須要。
在線程間共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源,就會形成死鎖,由於系統判斷這部分資源都正在使用,全部這兩個線程在無外力做用下將一直等待下去。下面是一個死鎖的例子:
import threading, time class myThread(threading.Thread): def doA(self): lockA.acquire() print(self.name, "gotlockA", time.ctime()) time.sleep(3) lockB.acquire() print(self.name, "gotlockB", time.ctime()) lockB.release() lockA.release() def doB(self): lockB.acquire() print(self.name, "gotlockB", time.ctime()) time.sleep(2) lockA.acquire() print(self.name, "gotlockA", time.ctime()) lockA.release() lockB.release() def run(self): self.doA() self.doB() if __name__ == "__main__": lockA = threading.Lock() lockB = threading.Lock() threads = [] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join()
結果爲
能夠看到程序陷入了阻塞,實際上是發生了死鎖。而有一種經常使用的解決方式,就是使用遞歸鎖。
遞歸鎖的格式:
r_lock = threading.RLock() r_lock = r_lock.acquire()
要鎖的內容 r_lock = r_lock.release()
爲了支持在同一線程中屢次請求同一資源,python提供了「可重入鎖」:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次acquire。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。也就是說每調用一次acquire方法就加1,每調用一次release方法就減1,沒人用的時候是默認的0。
上面例子中加鎖以後的樣子
import threading, time class myThread(threading.Thread): def doA(self): r_lock.acquire() print(self.name, "gotlockA", time.ctime()) time.sleep(3) r_lock.acquire() print(self.name, "gotlockB", time.ctime()) r_lock.release() r_lock.release() def doB(self): r_lock.acquire() print(self.name, "gotlockB", time.ctime()) time.sleep(2) r_lock.acquire() print(self.name, "gotlockA", time.ctime()) r_lock.release() r_lock.release() def run(self): self.doA() self.doB() if __name__ == "__main__": r_lock = threading.RLock() threads = [] for i in range(5): threads.append(myThread()) for t in threads: t.start() for t in threads: t.join()
同步和異步在開始給的網址中已經說過,這裏爲了方便起見,仍是在進行說明
同步和異步一般用來形容一次方法調用。
這裏實現同步就是對線程進行阻塞,等待另外一個線程將數據處理結束而後解除這個阻塞狀態。
建立同步對象:
event = threading.Event()
同步對象的經常使用方法:
event.wait():等待flag被設定,一旦event被設定,等同於pass
event.set():設定flag
event.clear():清除flag
event.isSet(): 查看當前flag狀態
注:一個event能夠用在多個線程中。
例子
import threading, time class Boss(threading.Thread): def run(self): print("BOSS:今晚你們都要加班到22:00。") print(event.isSet()) # False event.set() time.sleep(5) print("BOSS:<22:00>能夠下班了。") print(event.isSet()) event.set() class Worker(threading.Thread): def run(self): event.wait() # 一旦event被設定,等同於pass print("Worker:哎……命苦啊!") time.sleep(1) event.clear() event.wait() print("Worker:OhYeah!") if __name__ == "__main__": event = threading.Event() threads = [] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join() print("ending.....")
信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。
計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。(相似於停車位的概念)
BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。
例子:
import threading, time class myThread(threading.Thread): def run(self): if semaphore.acquire(): print(self.name) time.sleep(2) semaphore.release() if __name__ == "__main__": semaphore = threading.Semaphore(5) thrs = [] for i in range(100): thrs.append(myThread()) for t in thrs: t.start()
這個信號量默認是1。
不難看出,互斥鎖是信號量的一種特殊狀況(n=1時)。也就是說,徹底能夠用後者替代前者。可是,由於互斥鎖較爲簡單,且效率高,因此在必須保證資源獨佔的狀況下,仍是採用這種設計。
Queue是python標準庫中的線程安全的隊列(FIFO)實現,提供了一個適用於多線程編程的先進先出的數據結構,即隊列,用來在生產者和消費者線程之間的信息傳遞
建立一個隊列對象
import Queue
q = Queue.Queue(maxsize = 10) Queue.Queue類便是一個隊列的同步實現。隊列長度可爲無限或者有限。可經過Queue的構造函數的可選參數maxsize來設定隊列長度。若是maxsize小於1就表示隊列長度無限。
python Queue模塊有三種隊列及構造函數
經常使用的方法
q.get_nowait()方法
至關Queue.get(False),這種方法在向一個空隊列取值的時候會拋一個Empty異常,因此更經常使用的方法是先判斷一個隊列是否爲空,若是不爲空則取值
除了按照先進先出,還有一個按照優先級的處理順序
q=queue.PriorityQueue() q.put([5,100]) # 優先級爲5,放入參數爲100 q.put([7,200]) # 優先級爲7,放入參數爲200 q.put([3,"hello"]) q.put([4,{"name":"alex"}])
關於隊列,經常跟這個生產者和消費者的模型關聯起來,由於二者的契合度很高。因此這裏也提一下。
爲何要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。
什麼是生產者消費者模式
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
這就像,在餐廳,廚師作好菜,不須要直接和客戶交流,而是交給前臺,而客戶去飯菜也不須要不找廚師,直接去前臺領取便可,這也是一個結耦的過程。
下面用隊列的內容寫一個生產者和消費者的例子
import time, random import queue, threading q = queue.Queue() def Producer(name): count = 0 while count < 10: print("making........") time.sleep(random.randrange(3)) q.put(count) print('Producer %s has produced %s baozi..' % (name, count)) count += 1 # q.task_done() # q.join() print("ok......") def Consumer(name): count = 0 while count < 10: time.sleep(random.randrange(4)) if not q.empty(): data = q.get() # q.task_done() # q.join() print(data) print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' % (name, data)) else: print("-----no baozi anymore----") count += 1 p1 = threading.Thread(target=Producer, args=('A',)) c1 = threading.Thread(target=Consumer, args=('B',)) # c2 = threading.Thread(target=Consumer, args=('C',)) # c3 = threading.Thread(target=Consumer, args=('D',)) p1.start() c1.start() # c2.start() # c3.start()
multiprocessing包是Python中的多進程管理包。與threading.Thread相似,它能夠利用multiprocessing.Process對象來建立一個進程。該進程能夠運行在Python程序內部編寫的函數。該Process對象與Thread對象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類 (這些對象能夠像多線程那樣,經過參數傳遞給各個進程),用以同步進程,其用法與threading包中的同名類一致。因此,multiprocessing的很大一部份與threading使用同一套API,只不過換到了多進程的情境。
直接調用
from multiprocessing import Process import time def f(name): time.sleep(1) print('hello', name,time.ctime()) if __name__ == '__main__': p_list=[] for i in range(3): p = Process(target=f, args=('老王',)) p_list.append(p) p.start() for i in p_list: p.join() print('end')
繼承式調用
from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() #self.name = name def run(self): time.sleep(1) print ('hello', self.name,time.ctime()) if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end')
構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 線程組,目前尚未實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 進程名;
args/kwargs: 要傳入方法的參數。
實例方法:
is_alive():返回進程是否在運行。
join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。
start():進程準備就緒,等待CPU調度
run():strat()調用run方法,若是實例進程時未制定傳入target,這star執行t默認run()方法。
terminate():無論任務是否完成,當即中止工做進程
屬性:
daemon:和線程的setDeamon功能同樣
name:進程名字。
pid:進程號。
from multiprocessing import Process import os import time def info(title): print("title:", title) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main process line') time.sleep(1) print("------------------") p = Process(target=info, args=('老王',)) p.start() p.join()
結果以下:
能夠從進程號看到父子進程之間的關係,而主進程的父進程不是解釋器,我用的工具是pycharm,因此主進程的父進程是pycharm的端口號。
from multiprocessing import Process, Queue def f(q, n): q.put([123, 456, 'hello']) q.put(n * n + 1) print("son process", id(q)) if __name__ == '__main__': q = Queue() print("main process", id(q)) for i in range(3): p = Process(target=f, args=(q, i)) p.start() print(q.get()) print(q.get()) print(q.get())
pipe()返回兩個鏈接對象表明pipe的兩端。每一個鏈接對象都有send()方法和recv()方法。
可是若是兩個進程或線程對象同時讀取或寫入管道兩端的數據時,管道中的數據有可能會損壞。
當進程使用的是管道兩端的不一樣的數據則不會有數據損壞的風險。
該 Pipe()
函數返回一個由管道鏈接的鏈接對象,默認狀況下爲雙工(雙向)。
from multiprocessing import Process, Pipe def f(conn): conn.send([12, {"name": "laowang"}, 'hello']) response = conn.recv() print("response", response) conn.close() print("q_ID2:", id(conn)) if __name__ == '__main__': parent_conn, child_conn = Pipe() # 雙向管道 print("q_ID1:", id(child_conn)) p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) parent_conn.send("hello") p.join()
上面的Queue和Pipe只實現了數據交互,沒有實現數據共享(即一個進程去改變另外一個進程的數據)。
Manager()返回的manager對象控制了一個server進程,此進程包含的python對象能夠被其餘的進程經過proxies來訪問。從而達到多進程間數據通訊且安全。
Manager支持的類型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
from multiprocessing import Process, Manager def f(d, l, n): d[n] = '1' # {0:"1"} d['2'] = 2 # {0:"1","2":2} l.append(n) # [0,1,2,3,4, 0,1,2,3,4,5,6,7,8,9] if __name__ == '__main__': with Manager() as manager: d = manager.dict() # {} l = manager.list(range(5)) # [0,1,2,3,4] p_list = [] for i in range(10): p = Process(target=f, args=(d, l, i)) p.start() p_list.append(p) for res in p_list: res.join() print(d) # {0: '1', 1: '1', 2: '1', 3: '1', 4: '1', '2': 2, 6: '1', 7: '1', 8: '1', 9: '1', 5: '1'} print(l) # [0, 1, 2, 3, 4, 0, 2, 3, 1, 4, 5, 6, 7, 9, 8]
from multiprocessing import Process, Lock import time def f(l, i): l.acquire() time.sleep(1) print('hello world %s' % i) l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
進程池內部維護一個進程序列,當使用時,則去進程池中獲取一個進程,若是進程池序列中沒有可供使用的進進程,那麼程序就會等待,直到進程池中有可用進程爲止。
進程池中有兩個方法:
pool.apply_async(func=Foo, args(i,), callback=demo)
pool.apply(func=Foo, args(i,), callback=demo)
這個是同步的接口,不管線程池設爲多少,都是一個進程一個進程的執行。
注:以前感受回調函數徹底沒有做用,讓子進程去調用那個函數不就能夠了嗎,爲何要加一個回調函數呢,回調函數的區別就是他是由主進程調用的。這樣有什麼好處呢。好比開十個進程,咱們須要把這個行爲記錄下來,也就是咱們常說的日誌就能夠用回調函數來弄。咱們能夠把邏輯之外,並且公用的操做放到這個回調函數中,不用每次都要進程去作與邏輯無關的事情。並且這個回調函數必需要加一個值,就是用來接收進程函數中的return返回的值。
from multiprocessing import Process, Pool import time, os def processPool(i): time.sleep(1) print(i) print("son", os.getpid()) return "HELLO %s" % i def Back(arg): print(arg) if __name__ == '__main__': pool = Pool(5) # 參數爲進程池中維護的進程數量,不寫默認爲當前計算機的核心數。 print("main pid", os.getpid()) for i in range(100): pool.apply_async(func=processPool, args=(i,), callback=Back) pool.close() pool.join() # join與close調用順序是固定的 print('end')
注:pool.close()必須放在pool.join()的前面。
協程,又稱微線程,纖程。英文名Coroutine。
協程是一種用戶級的輕量級線程。協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以:
協程能保留上一次調用時的狀態(即全部局部狀態的一個特定組合),每次過程重入時,就至關於進入上一次調用的狀態,換種說法:進入上一次離開時所處邏輯流的位置。
在併發編程中,協程與線程相似,每一個協程表示一個執行單元,有本身的本地數據,與其它協程共享全局數據和其它資源。
目前主流語言基本上都選擇了多線程做爲併發設施,與線程相關的概念是搶佔式多任務(Preemptive multitasking),而與協程相關的是協做式多任務。
無論是進程仍是線程,每次阻塞、切換都須要陷入系統調用(system call),先讓CPU跑操做系統的調度程序,而後再由調度程序決定該跑哪個進程(線程)。
並且因爲搶佔式調度執行順序沒法肯定的特色,使用線程時須要很是當心地處理同步問題,而協程徹底不存在這個問題(事件驅動和異步程序也有一樣的優勢)。
由於協程是用戶本身來編寫調度邏輯的,對CPU來講,協程實際上是單線程,因此CPU不用去考慮怎麼調度、切換上下文,這就省去了CPU的切換開銷,因此協程在必定程度上又好於多線程。
優勢1: 協程有極高的執行效率。由於子程序切換不是線程切換,而是由程序自身控制,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯。
優勢2: 不須要多線程的鎖機制,協程是非搶佔式的,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。
由於協程是一個線程執行,那怎麼利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可得到極高的性能。
協程的原理就是python中的生成器
首先用yield完成簡單的生產者和消費者模型
import time import queue def consumer(name): print("--->ready to eat baozi...") while True: new_baozi = yield print("[%s] is eating baozi %s" % (name, new_baozi)) # time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while 1: time.sleep(1) print("\033[32;1m[producer]\033[0m is making baozi %s and %s" % (n, n + 1)) con.send(n) con2.send(n + 1) n += 2 if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") p = producer()
greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator
from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() # 結果 # 12 # 56 # 34 # 78
咱們能夠看到能夠用greenlet進行了切換,可是還不夠,有個更進一步的封裝gevent
greenlet咱們還有手動切換,而gevent能夠自動的切換,
import gevent import time def f(num): for i in num: time.sleep(0.1) print(i) if __name__ == '__main__': gevent.joinall([ gevent.spawn(f, [1, 2, 3, 4, 5]), gevent.spawn(f, "helloworld") ])
注:進程池multiprocessing.Pool和gevent有衝突不能同時使用,有興趣的能夠研究gevent.pool協程池。
使用Gevent的性能確實要比用傳統的線程高,甚至高不少。但這裏不得不說它的一個坑那就是猴子補丁
使用方法:
from gevent import monkey monkey.patch_all()
(1)猴子補丁的由來
猴子補丁的這個叫法起源於Zope框架,你們在修正Zope的Bug的時候常常在程序後面追加更新部分,這些被稱做是「雜牌軍補丁(guerillapatch)」,後來guerilla就漸漸的寫成了gorllia(猩猩),再後來就寫了monkey(猴子),因此猴子補丁的叫法是這麼莫名其妙的得來的。
後來在動態語言中,不改變源代碼而對功能進行追加和變動,統稱爲「猴子補丁」。因此猴子補丁並非Python中專有的。猴子補丁這種東西充分利用了動態語言的靈活性,能夠對現有的語言Api進行追加,替換,修改Bug,甚至性能優化等等。
使用猴子補丁的方式,gevent可以修改標準庫裏面大部分的阻塞式系統調用,包括socket、ssl、threading和 select等模塊,而變爲協做式運行。也就是經過猴子補丁的monkey.patch_xxx()來將python標準庫中模塊或函數改爲gevent中的響應的具備協程的協做式對象(通常寫爲gevent.monkey.patch_all())。這樣在不改變原有代碼的狀況下,將應用的阻塞式方法,變成協程式的。
(2)猴子補丁使用時的注意事項
猴子補丁的功能很強大,可是也帶來了不少的風險,尤爲是像gevent這種直接進行API替換的補丁,整個Python進程所使用的模塊都會被替換,可能本身的代碼能hold住,可是其它第三方庫,有時候問題並很差排查,即便排查出來也是很棘手,因此,就像松本建議的那樣,若是要使用猴子補丁,那麼只是作功能追加,儘可能避免大規模的API覆蓋。或者得確保項目中用到其餘用到的網絡庫也必須使用純Python或者明確說明支持Gevent
在前面說了爲了減小GIL鎖對高併發的程序產生的影響,不少人想了不少辦法。
舉個例子:給你200W條url,須要你把每一個url對應的頁面抓取保存起來,這種時候,單單使用多進程,效果確定是不好的。爲何呢?
例如每次請求的等待時間是2秒,那麼以下(忽略cpu計算時間):
一、單進程+單線程:須要2秒*200W=400W秒==1111.11個小時==46.3天,這個速度明顯是不能接受的
二、單進程+多線程:例如咱們在這個進程中開了10個多線程,比1中可以提高10倍速度,也就是大約4.63天可以完成200W條抓取,請注意,這裏的實際執行是:線程1碰見了阻塞,CPU切換到線程2去執行,碰見阻塞又切換到線程3等等,10個線程都阻塞後,這個進程就阻塞了,而直到某個線程阻塞完成後,這個進程才能繼續執行,因此速度上提高大約能到10倍(這裏忽略了線程切換帶來的開銷,實際上的提高應該是不能達到10倍的),可是須要考慮的是線程的切換也是有開銷的,因此不能無限的啓動多線程(開200W個線程確定是不靠譜的)
三、多進程+多線程:這裏就厲害了,通常來講也有不少人用這個方法,多進程下,每一個進程都能佔一個cpu,而多線程從必定程度上繞過了阻塞的等待,因此比單進程下的多線程又更好使了,例如咱們開10個進程,每一個進程裏開20W個線程,執行的速度理論上是比單進程開200W個線程快10倍以上的(爲何是10倍以上而不是10倍,主要是cpu切換200W個線程的消耗確定比切換20W個進程大得多,考慮到這部分開銷,因此是10倍以上)。
而根據前面對協程的解釋,它是不須要沒有切換線程的開銷的。這個時候使用多進程+協程(能夠看做是每一個進程裏都是單線程,而這個單線程是協程化的)
多進程+協程下,避開了CPU切換的開銷,又能把多個CPU充分利用起來,這種方式對於數據量較大的爬蟲還有文件讀寫之類的效率提高是巨大的。
可是上面的內容也只是針對這種IO密集型,計算密集型仍是多進程+單線程跑吧,沒轍,這樣還要快一些。
#-*- coding=utf-8 -*- import requests from multiprocessing import Process import gevent from gevent import monkey; monkey.patch_all() import sys reload(sys) sys.setdefaultencoding('utf8') def fetch(url): try: s = requests.Session() r = s.get(url,timeout=1)#在這裏抓取頁面 except Exception,e: print e return '' def process_start(url_list): tasks = [] for url in url_list: tasks.append(gevent.spawn(fetch,url)) gevent.joinall(tasks)#使用協程來執行 def task_start(filepath,flag = 100000):#每10W條url啓動一個進程 with open(filepath,'r') as reader:#從給定的文件中讀取url url = reader.readline().strip() url_list = []#這個list用於存放協程任務 i = 0 #計數器,記錄添加了多少個url到協程隊列 while url!='': i += 1 url_list.append(url)#每次讀取出url,將url添加到隊列 if i == flag:#必定數量的url就啓動一個進程並執行 p = Process(target=process_start,args=(url_list,)) p.start() url_list = [] #重置url隊列 i = 0 #重置計數器 url = reader.readline().strip() if url_list not []:#若退出循環後任務隊列裏還有url剩餘 p = Process(target=process_start,args=(url_list,))#把剩餘的url全都放到最後這個進程來執行 p.start() if __name__ == '__main__': task_start('./testData.txt')#讀取指定文件