引子javascript
進程html
線程(優先閱讀)java
協程python
進程
概念:就是一個程序在一個數據集上的一次動態執行過程(本質上來說,就是運行中的程序(代指運行過程),程序不運行就不是進程) 抽象概念windows
組成:安全
一、程序:咱們編寫的程序用來描述進程要完成哪些功能以及如何完成網絡
二、數據集:數據集則是程序在執行過程當中所須要使用的資源數據結構
三、進程控制塊:進程控制塊用來記錄進程的外部特徵,描述進程的執行變化過程,系統能夠利用它來控制和管理進程,它是系統感知進程存在的惟一標誌。多線程
闡釋:進程與進程之間都佔用的是獨立的內存塊,它們彼此之間的數據也是獨立的併發
優勢:同時利用多個CPU,可以同時進行多個操做
缺點:耗費資源(須要從新開闢內存空間)
構造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 線程組,目前尚未實現,庫引用中提示必須是None;
target: 要執行的方法;
name: 進程名;
args/kwargs: 要傳入方法的參數。
實例方法:
is_alive():返回進程是否在運行。
join([timeout]):阻塞當前上下文環境的進程程,直到調用此方法的進程終止或到達指定的timeout(可選參數)。
start():進程準備就緒,等待CPU調度
run():strat()調用run方法,若是實例進程時未制定傳入target,這star執行t默認run()方法。
terminate():無論任務是否完成,當即中止工做進程
屬性:
daemon:和線程的setDeamon功能同樣
name:進程名字。
pid:進程號。
建立進程的方式有倆種
一,經過調用模塊的方式來建立線程
# 進程模塊 import multiprocessing import time def f1(): start = time.time() sum = 0 for n in range(100000000): sum += n print(sum) print("data:{}".format(time.time() - start)) if __name__ == '__main__': # windows在調用進程的時候,必須加這句話,不然會報錯 li = [] p1 = multiprocessing.Process(target=f1) li.append(p1) p2 = multiprocessing.Process(target=f1) li.append(p2) for p in li: p.start() for i in li: i.join() print("ending...")
二,經過繼承類的方式(推薦)
import multiprocessing class Process(multiprocessing.Process): def run(self): sum = 0 for n in range(100000000): sum += n print(sum) li = [] for i in range(2): p = Process() li.append(p) if __name__ == '__main__': for p in li: p.start() for i in li: i.join() print("ending")
進程之間的通訊
建立進程模塊的下隊列(Queue)

# 進程之間的通訊 Queue from multiprocessing import Queue, Process, Pipe import os,time,random def write(q): print("process to write{}".format(os.getpid())) for value in ["A","B","C"]: print("Put {} to queue...".format(value)) q.put(value) time.sleep(random.random()) def read(q): print("process to read{}".format(os.getpid())) while True: value = q.get(True) print("Get {} from queue".format(value)) if __name__ == '__main__': q = Queue() pw = Process(target=write,args=(q,)) # 這裏傳輸的q是copy的 pr = Process(target=read,args=(q,)) pw.start() pr.start() pw.join() pr.terminate() # 強行終止進程(由於這個子進程定義了一個死循環)
管道(Pipe)

# 進程之間的通訊 Pipe(相似於socket) from multiprocessing import Queue, Process, Pipe import os,time,random # 說明Pipe的send是沒有返回值的 pipe = Pipe() # print(pipe) def worker(pipe): time.sleep(random.random()) for i in range(10): print("worker send {}".format(pipe.send(i))) def Boss(pipe): while True: print("Boss recv {}".format(pipe.recv())) p1 = Process(target=worker,args=(pipe[0],)) p2 = Process(target=Boss,args=(pipe[1],)) if __name__ == '__main__': p1.start() p2.start()
上述實現了進程間的數據通訊,那麼進程能夠達到數據共享麼?Sure。
前一節中, Pipe、Queue 都有必定數據共享的功能,可是他們會堵塞進程, 這裏介紹的兩種數據共享方式都不會堵塞進程, 並且都是多進程安全的。
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
.
由上述英文咱們瞭解到,經過Manager()能夠實現進程上的數據共享,而且支持的類型也由不少,接下來看代碼

from multiprocessing import Process, Manager def f(d,l,n): d["name"] = "alex" d[n] = "1" l.append(n) if __name__ == '__main__': with Manager() as manager: # 相似於文件操做的with open(...) d = manager.dict() l = manager.list(range(5)) print(d,l) p_list = [] for n in range(10): p = Process(target=f,args=(d, l, n)) p.start() p_list.append(p) for p in p_list: p.join() # 這兒的join必須加 print(d) print(l) # 關於數據共享的進程等待的問題,鄙人做出一些本身的理解 # 多核CPU的狀況下,進程間是能夠實現並行的,固然每一個核處理的速度又有極其細微的差別性,速度處理稍慢些的進程在還在對數據進行處理的候,同時又想要獲得數據了,天然會出現錯誤,因此要等待進程處理完這份數據的時候再進行操做

from multiprocessing import Process, Manager def func(n,a): n.value = 50 for i in range(len(a)): a[i] += 10 if __name__ == '__main__': with Manager() as manager: num = manager.Value("d", 0.0) ints = manager.Array("i", range(10)) p = Process(target=func,args=(num,ints)) p.start() p.join() print(num) print(ints) 輸出 Value('d', 50) array('i', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]) # 共享內存有兩個結構,一個是 Value, 一個是 Array,這兩個結構內部都實現了鎖機制,所以是多進程安全的。 # Value 和 Array 都須要設置其中存放值的類型,d 是 double 類型,i 是 int 類型,具體的對應關係在Python 標準庫的 sharedctypes 模塊中查看。 # 上面的共享內存支持兩種結構 Value 和 Array, 這些值在主進程中管理,很分散。 Python 中還有一統天下,無所不能的Manager,專門用來作數據共享。 其支持的類型很是多。
進程同步
Lock
鎖是爲了確保數據一致性,好比讀寫鎖,每一個進程給一個變量增長 1 ,可是若是在一個進程讀取但尚未寫入的時候,另外的進程也同時讀取了,並寫入該值,則最後寫入的值是錯誤的,這時候就須要鎖。

# 爲何引伸進程同步 # 數據的一致性 import time from multiprocessing import Lock, Process def run(i, lock): with lock: # 自動得到鎖和釋放鎖 time.sleep(1) print(i) if __name__ == '__main__': lock = Lock() for i in range(10): p = Process(target=run,args=(i,lock,)) p.start()
Lock 同時也實現了 ContextManager API, 能夠結合 with 語句使用, 關於 ContextManager, 請移步 Python 學習實踐筆記 裝飾器 與 context 查看。
Semaphore
Semaphore 和 Lock 稍有不一樣,Semaphore 至關於 N 把鎖,獲取其中一把就能夠執行了。 信號量的總數 N 在構造時傳入,s = Semaphore(N)
。 和 Lock 同樣,若是信號量爲0,則進程堵塞,直到信號大於0。
進程池
若是有50個任務要去執行,CPU只有4核,那建立50個進程完成,其實大可沒必要,徒增管理開銷。若是隻想建立4個進程,讓它們輪流替完成任務,不用本身去管理具體的進程的建立銷燬,那 Pool 是很是有用的。
Pool 是進程池,進程池可以管理必定的進程,當有空閒進程時,則利用空閒進程完成任務,直到全部任務完成爲止
1
2
3
4
5
6
7
8
|
def
func(x):
return
x
*
x
if
__name__
=
=
'__main__'
:
p_pool
=
pool.Pool(
4
)
result
=
p_pool.
map
(func,
range
(
8
))
print
(result)
# Pool 進程池建立4個進程,無論有沒有任務,都一直在進程池中等候,等到有數據的時候就開始執行。
|
從上面的例子來看貌似也看不出什麼效果,那麼接下來自定義一個進程池
關於進程池的API用法(並非只有倆個哦)
apply (每一個任務是排隊進行,相似於串行失去意義)
apply_async (任務都是併發進行,而且能夠設置回調函數) 進程的併發其實能夠稱之爲並行了,能夠利用到多核CPU

import os,time from multiprocessing import pool,Process def run(n): # print(os.getpid()) time.sleep(1) print(n) return n # 該函數的返回值,是回調函數的所要傳入的值 def bar(args): pass # print("bar {}".format(args)) # print(os.getpid()) if __name__ == '__main__': p_pool = pool.Pool(5) # 設置進程池中的最大放置 for n in range(100): # 回調函數,就是某個函數執行成功或結束執行的函數 p_pool.apply_async(func=run,args=(n,),callback=bar) p_pool.close() # 進程的關閉和等待是有順序的 p_pool.join() print("ending") # 看看 Pool 的執行流程,有三個階段。第1、一個進程池接收不少任務,而後分開執行任務;第2、再也不接收任務了;第3、等全部任務完成了,回家,不幹了。 # 這就是上面的方法,close 中止接收新的任務,若是還有任務來,就會拋出異常。 join 是等待全部任務完成。 join 必需要在 close 以後調用,不然會拋出異常。terminate 非正常終止,內存不夠用時,垃圾回收器調用的就是這個方法。
線程
概念:線程是應用程序中工做的最小單元,或者又稱之爲微進程。
組成:它被包含在進程之中,是進程中的實際運做單位。一條線程指的是進程中一個單一順序的控制流,一個進程中能夠併發多個線程,每條線程並行執行不一樣的任務。
闡釋:線程不可以獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。線程能夠共享(調用)進程的數據資源
優勢:共享內存,IO操做時候,創造併發操做
缺點:"......"(中國文化的博大精深的帶引號)
關於多線程
多線程相似於同時執行多個不一樣程序,多線程運行有以下優勢:
- 使用線程能夠把佔據長時間的程序中的任務放到後臺去處理。
- 用戶界面能夠更加吸引人,這樣好比用戶點擊了一個按鈕去觸發某些事件的處理,能夠彈出一個進度條來顯示處理的進度
- 程序的運行速度可能加快
- 在一些等待的任務實現上如用戶輸入、文件讀寫和網絡收發數據等,線程就比較有用了。在這種狀況下咱們能夠釋放一些珍貴的資源如內存佔用等等。
線程在執行過程當中與進程仍是有區別的。每一個獨立的線程有一個程序運行的入口、順序執行序列和程序的出口。可是線程不可以獨立執行,必須依存在應用程序中,由應用程序提供多個線程執行控制。
每一個線程都有他本身的一組CPU寄存器,稱爲線程的上下文,該上下文反映了線程上次運行該線程的CPU寄存器的狀態。
指令指針和堆棧指針寄存器是線程上下文中兩個最重要的寄存器,線程老是在進程獲得上下文中運行的,這些地址都用於標誌擁有線程的進程地址空間中的內存。
- 線程能夠被搶佔(中斷)。
- 在其餘線程正在運行時,線程能夠暫時擱置(也稱爲睡眠) -- 這就是線程的退讓。
線程能夠分爲:
- 內核線程:由操做系統內核建立和撤銷。
- 用戶線程:不須要內核支持而在用戶程序中實現的線程。
Python3 線程中經常使用的兩個模塊爲:
- _thread
- threading(推薦使用)
thread 模塊已被廢棄。用戶可使用 threading 模塊代替。因此,在 Python3 中不能再使用"thread" 模塊。爲了兼容性,Python3 將 thread 重命名爲 "_thread"。
Python中使用線程有兩種方式:函數或者用類來包裝線程對象。
Python3 經過兩個標準庫 _thread 和 threading 提供對線程的支持。
_thread 提供了低級別的、原始的線程以及一個簡單的鎖,它相比於 threading 模塊的功能仍是比較有限的。
threading 模塊除了包含 _thread 模塊中的全部方法外,還提供的其餘方法:
- threading.currentThread(): 返回當前的線程變量。
- threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
- threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
除了使用方法外,線程模塊一樣提供了Thread類來處理線程,Thread類提供瞭如下方法:
- run(): 用以表示線程活動的方法。
- start():啓動線程活動。
- join([time]): 等待至線程停止。這阻塞調用線程直至線程的join() 方法被調用停止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
- setDaemon(True):守護主線程,跟隨主線程退(必需要放在start()上方)
- isAlive(): 返回線程是否活動的。
- getName(): 返回線程名。
- setName(): 設置線程名。
看了那麼多廢話,那麼建立線程的方式有倆種,接下來看代碼
一,經過調用模塊的方式來建立線程(推薦使用)
import threading # 線程模塊 import time # 建立線程 def onepiece1(n): print("路飛正在使用橡膠火箭炮%s,攻擊力%s" %(time.ctime(),n)) time.sleep(3) print("路飛結束該技能%s" %time.ctime()) def onepiece2(n): print("艾尼路正在出雷神萬擊%s你,攻擊力%s" %(time.ctime(),n)) time.sleep(5) print("艾尼路結束該技能%s" %time.ctime()) if __name__ == '__main__': thread_1 = threading.Thread(target=onepiece1,args=(10,)) # 建立子線程 thread_2 = threading.Thread(target=onepiece2,args=(9,)) thread_1.start() # pyhton1.join() thread_2.start() thread_2.join() # 等待線程終止 print("ending Fighting")
二,建立類經過繼承的方式來建立線程
使用Threading模塊建立線程,直接從threading.Thread繼承,而後重寫__init__方法和run方法:
import threading import time class MyThread(threading.Thread): def __init__(self,num): threading.Thread.__init__(self) self.num = num def run(self): # 定義每一個線程要運行的函數 print("running on number:%s" %self.num) time.sleep(3) print("ending......") if __name__ == '__main__': t1 = MyThread(1) # 繼承這個類,把1這個參數,傳給num ,t1就是個線程對象 t2 = MyThread(2) t1.start() t2.start()
GIL
在知道線程的建立方式以及一些方法的使用後,引伸一個cpython解釋器的一個歷史遺留問題,全局GIL鎖
由於Python的線程雖然是真正的線程,但解釋器執行代碼時,有一個GIL鎖:Global Interpreter Lock,任何Python線程執行前,必須先得到GIL鎖,而後,每執行100條字節碼,解釋器就自動釋放GIL鎖,讓別的線程有機會執行。這個GIL全局鎖實際上把全部線程的執行代碼都給上了鎖,因此,多線程在Python中只能交替執行,即便100個線程跑在100核CPU上,也只能用到1個核。
固然了,也有經過別的途徑提升執行效率,技術的道路上終無止境。
同步鎖
多個線程共同對某個數據修改,則可能出現不可預料的結果,爲了保證數據的正確性,須要對多個線程進行同步。
使用 Thread 對象的 Lock 和 Rlock 能夠實現簡單的線程同步。
這兩個對象都有 acquire 方法和 release 方法。
對於那些須要每次只容許一個線程操做的數據,能夠將其操做放到 acquire 和 release 方法之間。

def sub(): global num thread_lock_A.acquire() # 得到鎖,用於線程同步 tmep = num time.sleep(0.001) num = tmep - 1 thread_lock_A.release() # 釋放鎖,開啓下一個線程 # 問題,加鎖以後100個線程就變爲了串行執行,鎖內的代碼 li = [] for i in range(100): t = threading.Thread(target=sub) t.start() li.append(t) for t in li: t.join() print("ending") print(num)
線程的死鎖和遞歸鎖
在線程間共享多個資源的時候,若是兩個線程分別佔有一部分資源而且同時等待對方的資源,就會形成死鎖,由於系統判斷這部分資源都
正在使用,全部這兩個線程在無外力做用下將一直等待下去。
解決死鎖就能夠用遞歸鎖

import threading,time # lock_A = threading.Lock() # lock_B = threading.Lock() r_lock = threading.RLock() class Mythread(threading.Thread): def actionA(self): r_lock.acquire() print(self.name,time.ctime()) time.sleep(2) r_lock.acquire() print(self.name,time.ctime()) time.sleep(1) r_lock.release() r_lock.release() def actionB(self): r_lock.acquire() print(self.name,time.ctime()) time.sleep(2) r_lock.acquire() print(self.name,time.ctime()) time.sleep(1) r_lock.release() r_lock.release() def run(self): self.actionA() self.actionB() li = [] for i in range(5): t = Mythread() t.start() li.append(t) for t in li: t.join() print("ending")
爲了支持在同一線程中屢次請求同一資源,python提供了「可重入鎖」:threading.RLock。RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次acquire。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。
信號量(Semaphore):從意義上來說,也能夠稱之爲一種鎖
信號量:指同時開幾個線程併發
信號量用來控制線程併發數的,BoundedSemaphore或Semaphore管理一個內置的計數 器,每當調用acquire()時-1,調用release()時+1。
計數器不能小於0,當計數器爲 0時,acquire()將阻塞線程至同步鎖定狀態,直到其餘線程調用release()。(相似於停車位的概念)
BoundedSemaphore與Semaphore的惟一區別在於前者將在調用release()時檢查計數 器的值是否超過了計數器的初始值,若是超過了將拋出一個異常。

import threading,time class myThread(threading.Thread): def run(self): #啓動後,執行run方法 if semaphore.acquire(): #加把鎖,能夠放進去多個(至關於5把鎖,5個鑰匙,同時有5個線程) print(self.name) time.sleep(5) semaphore.release() if __name__=="__main__": semaphore=threading.Semaphore(5) #同時能有幾個線程進去(設置爲5就是一次5個線程進去),相似於停車廠一次能停幾輛車 thrs=[] #空列表 for i in range(100): #100個線程 thrs.append(myThread()) #加線程對象 for t in thrs: t.start() #分別啓動
同步條件(Event)
簡單瞭解
Event對象實現了簡單的線程通訊機制,它提供了設置信號,清楚信號,等待等用於實現線程間的通訊。
1 設置信號
使用Event的set()方法能夠設置Event對象內部的信號標誌爲真。Event對象提供了isSet()方法來判斷其內部信號標誌的狀態。當使用event對象的set()方法後,isSet()方法返回真
2 清除信號
使用Event對象的clear()方法能夠清除Event對象內部的信號標誌,即將其設爲假,當使用Event的clear方法後,isSet()方法返回假
3 等待
Event對象wait的方法只有在內部信號爲真的時候纔會很快的執行並完成返回。當Event對象的內部信號標誌位假時,則wait方法一直等待到其爲真時才返回。

import threading, time class Boss(threading.Thread): def run(self): print("BOSS:今晚你們都要加班到22:00。") print(event.isSet()) event.set() time.sleep(5) print("BOSS:<22:00>能夠下班了。") print(event.isSet()) event.set() class Worker(threading.Thread): def run(self): event.wait() print("Worker:哎……命苦啊!") time.sleep(1) event.clear() event.wait() print("Worker:OhYeah!") if __name__ == "__main__": event = threading.Event() threads = [] for i in range(5): threads.append(Worker()) threads.append(Boss()) for t in threads: t.start() for t in threads: t.join()
Event內部包含了一個標誌位,初始的時候爲false。
可使用使用set()來將其設置爲true;
或者使用clear()將其重新設置爲false;
可使用is_set()來檢查標誌位的狀態;
另外一個最重要的函數就是wait(timeout=None),用來阻塞當前線程,直到event的內部標誌位被設置爲true或者timeout超時。若是內部標誌位爲true則wait()函數理解返回。
多線程利器——隊列(queue)
由於列表是不安全的數據結構,因此引伸了新的模塊——隊列

# 列表是不安全的數據結構 舉個簡單的例子 li = [1, 2, 3, 4, 5] def remove(): while True: xx = li[-1] print(xx) time.sleep(1) li.remove(xx) A = threading.Thread(target=remove) B = threading.Thread(target=remove) A.start() B.start()
Python 的 queue 模塊中提供了同步的、線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先級隊列 PriorityQueue。
這些隊列都實現了鎖原語,可以在多線程中直接使用,可使用隊列來實現線程間的同步。
queue 模塊中的經常使用方法:
- queue.qsize() 返回隊列的大小
- queue.empty() 若是隊列爲空,返回True,反之False
- queue.full() 若是隊列滿了,返回True,反之False
- queue.full 與 maxsize 大小對應
- queue.get([block[, timeout]])獲取隊列,timeout等待時間
- queue.get_nowait() 至關queue.get(False)
- queue.put(item) 寫入隊列,timeout等待時間
- queue.put_nowait(item) 至關Queue.put(item, False)
- queue.task_done() 在完成一項工做以後,queue.task_done()函數向任務已經完成的隊列發送一個信號
- queue.join() 實際上意味着等到隊列爲空,再執行別的操做

import queue # 隊列有三種模式 # 先進先出 qu = queue.Queue() qu.put("alex") qu.put(123) qu.put({"age":18}) while True: print(qu.get()) print("————————")

# 先進後出 qu = queue.LifoQueue() qu.put("alex") qu.put(123) qu.put({"age":18}) while True: print(qu.get()) print("————————")

# 優先級 q = queue.PriorityQueue(3) # 設定大小 q.put([1, "alex"]) q.put([3, 123]) q.put([2, {"age":18}]) # q.put([4,456]) # 若是裝的大於設定大小,也會阻塞(等待) # while True: # print(q.get()[1]) # get當取不到值以後會等待 # print("————————") print(q.qsize()) # 查看當前隊列有多少個 print(q.empty()) # 判斷是否爲空 print(q.full()) # 判斷是否爲滿
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# 實例
import
queue
import
threading
import
time
go
=
False
# 設定標識位
class
MyThread(threading.Thread):
def
__init__(
self
, threadID, name, q):
threading.Thread.__init__(
self
)
self
.threadID
=
threadID
self
.name
=
name
self
.q
=
q
def
run(
self
):
print
(
"開啓線程:{}"
.
format
(
self
.name))
process_data(
self
.name,
self
.q)
print
(
"退出線程:{}"
.
format
(
self
.name))
def
process_data(thread_name,q):
while
not
go:
queue_lock.acquire()
# 得到鎖
if
not
work_queue.empty():
# 若是隊列爲空返回True,反之False
data
=
q.get()
# 向隊列取值,先進先出
queue_lock.release()
# 釋放鎖
print
(
"{} processing {}"
.
format
(thread_name,data))
else
:
queue_lock.release()
time.sleep(
1
)
thread_list
=
[
"Thread-1"
,
"Thread-2"
,
"Thread-3"
]
name_list
=
[
"one"
,
"two"
,
"three"
,
"four"
,
"five"
]
queue_lock
=
threading.Lock()
# 同步鎖
work_queue
=
queue.Queue(
10
)
threads
=
[]
threads_ID
=
1
# 建立新線程
for
t
in
thread_list:
thread
=
MyThread(threads_ID,t,work_queue)
# 建立線程
thread.start()
# 啓動線程
threads.append(thread)
# 追加線程對象到列表
threads_ID
+
=
1
# ID自加1
# 填充隊列
queue_lock.acquire()
for
name
in
name_list:
work_queue.put(name)
# 向隊列填充
queue_lock.release()
# 等待隊列清空. 清空返回True,則此循環會跳過
while
not
work_queue.empty():
pass
# 改變狀態,通知線程退出
go
=
True
# 等待全部線程完成
for
t
in
threads:
t.join()
print
(
"退出主線程。"
)
|
生產者與消費者模型
在這個現實社會中,生活中到處充滿了生產和消費.
什麼是生產者消費者模型
在 工做中,可能會碰到這樣一種狀況:某個模塊負責產生數據,這些數據由另外一個模塊來負責處理(此處的模塊是廣義的,能夠是類、函數、線程、進程等)。產生數據的模塊,就形象地稱爲生產者;而處理數據的模塊,就稱爲消費者。在生產者與消費者之間在加個緩衝區,形象的稱之爲倉庫,生產者負責往倉庫了進商 品,而消費者負責從倉庫裏拿商品,這就構成了生產者消費者模型。結構圖以下
生產者消費者模型的優勢
一、解耦
假設生產者和消費者分別是兩個類。若是讓生產者直接調用消費者的某個方法,那麼生產者對於消費者就會產生依賴(也就是耦合)。未來若是消費者的代碼發生變化, 可能會影響到生產者。而若是二者都依賴於某個緩衝區,二者之間不直接依賴,耦合也就相應下降了。
舉個例子,咱們去郵局投遞信件,若是不使用郵筒(也就是緩衝區),你必須得把信直接交給郵遞員。有同窗會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須 得認識誰是郵遞員,才能把信給他(光憑身上穿的制服,萬一有人假冒,就慘了)。這就產生和你和郵遞員之間的依賴(至關於生產者和消費者的強耦合)。萬一哪天郵遞員換人了,你還要從新認識一下(至關於消費者變化致使修改生產者代碼)。而郵筒相對來講比較固定,你依賴它的成本就比較低(至關於和緩衝區之間的弱耦合)。
二、支持併發
因爲生產者與消費者是兩個獨立的併發體,他們之間是用緩衝區做爲橋樑鏈接,生產者只須要往緩衝區裏丟數據,就能夠繼續生產下一個數據,而消費者只須要從緩衝區了拿數據便可,這樣就不會由於彼此的處理速度而發生阻塞。
接上面的例子,若是咱們不使用郵筒,咱們就得在郵局等郵遞員,直到他回來,咱們把信件交給他,這期間咱們啥事兒都不能幹(也就是生產者阻塞),或者郵遞員得挨家挨戶問,誰要寄信(至關於消費者輪詢)。
三、支持忙閒不均
緩衝區還有另外一個好處。若是製造數據的速度時快時慢,緩衝區的好處就體現出來了。當數據製造快的時候,消費者來不及處理,未處理的數據能夠暫時存在緩衝區中。 等生產者的製造速度慢下來,消費者再慢慢處理掉。
爲了充分複用,再拿寄信的例子來講事。假設郵遞員一次只能帶走1000封信。萬一某次碰上情人節(也多是聖誕節)送賀卡,須要寄出去的信超過1000封,這時 候郵筒這個緩衝區就派上用場了。郵遞員把來不及帶走的信暫存在郵筒中,等下次過來 時再拿走。
對生產者與消費者模型的闡釋就進行到這裏,用代碼實現生產者與消費者模型

import time,random import queue,threading q = queue.Queue() def Producer(name): count = 0 while count <10: print("making.....正在製做包子...") time.sleep(5) q.put(count) print('Producer %s has produced %s baozi..' %(name, count)) count +=1 q.join() print("ok......") def Consumer(name): count = 0 while count <10: time.sleep(random.randrange(4)) # 產生一個隨機數(1秒-3秒之間) data = q.get() print("eating.......") time.sleep(4) # 4秒鐘這後 q.task_done() # 給他發一個信號,纔打印ok print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data)) count +=1 p1 = threading.Thread(target=Producer, args=('A君',)) c1 = threading.Thread(target=Consumer, args=('B君',)) c2 = threading.Thread(target=Consumer, args=('C君',)) c3 = threading.Thread(target=Consumer, args=('D君',)) p1.start() c1.start() c2.start() c3.start()
import threading, time, queue q = queue.Queue() def consumer(q): while True: msg = q.get() if isinstance(msg, str) and msg == "quit": break else: print(msg) print("Bye byes") def producer(): start_time = time.time() while time.time() - start_time < 5: q.put('something at %s' % time.time()) time.sleep(1) q.put('quit') factory =threading.Thread(target=producer) worker = threading.Thread(target=consumer, args=(q,)) factory.start() # 開啓生產者線程 worker.start() # 開啓消費者線程
協程
在學習異步IO模型前,先來了解協程。
一大波闡釋即將到臨,非高能請注意閃躲(仔細閱讀)
概念:協程,又稱微線程,纖程。英文名Coroutine。 是非搶佔式的程序 主要也是解決I/O操做的
協程的概念很早就提出來了,但直到最近幾年纔在某些語言(如Lua)中獲得普遍應用。
子程序,或者稱爲函數,在全部語言中都是層級調用,好比A調用B,B在執行過程當中又調用了C,C執行完畢返回,B執行完畢返回,最後是A執行完畢。
因此子程序調用是經過棧實現的,一個線程就是執行一個子程序。
子程序調用老是一個入口,一次返回,調用順序是明確的。而協程的調用和子程序不一樣。
協程看上去也是子程序,但執行過程當中,在子程序內部可中斷,而後轉而執行別的子程序,在適當的時候再返回來接着執行。
優勢:
優勢1: 協程極高的執行效率。由於子程序切換不是線程切換,而是由程序自身控制,所以,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優點就越明顯。
優勢2: 不須要多線程的鎖機制,由於只有一個線程,也不存在同時寫變量衝突,在協程中控制共享資源不加鎖,只須要判斷狀態就行了,因此執行效率比多線程高不少。
由於協程是一個線程執行,那怎麼利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可得到極高的性能。
在此引伸了下生成器的內容

# 生成器 def f(): print("ok") s = yield 6 print(s) print("ok2") yield gen=f() # print(gen) # next(gen) # 方法一 # next(gen) RET=gen.__next__() # 方法二 print(RET) gen.send(5) # 方法三

import time import queue def consumer(name): print("--->ready to eat baozi........") while True: new_baozi = yield # yield實現上下文切換,傳包子進來 print("[%s] is eating baozi %s" % (name,new_baozi)) #time.sleep(1) def producer(): r = con.__next__() r = con2.__next__() n = 0 while 1: time.sleep(1) print("\033[32;1m[producer]\033[0m is making baozi %s and %s" %(n,n+1) ) con.send(n) # 發送告訴他有包子了 con2.send(n+1) n +=2 if __name__ == '__main__': con = consumer("c1") con2 = consumer("c2") producer()
greenlet是一個用C實現的協程模塊,相比與python自帶的yield,它可使你在任意函數之間隨意切換,而不需把這個函數先聲明爲generator

from greenlet import greenlet def test1(): print(12) gr2.switch() print(34) gr2.switch() def test2(): print(56) gr1.switch() print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() gr2.switch()
Gevent

import gevent import requests,time start_time = time.time() def get_url(url): print("get: {}".format(url)) resp = requests.get(url) data = resp.text print(len(data),url) # get_url('https://www.python.org/') # get_url('https://www.yahoo.com/') # get_url('https://www.baidu.com/') # get_url('https://www.sina.com.cn/') # get_url('http://www.xiaohuar.com/') gevent.joinall( [ gevent.spawn(get_url, 'https://www.python.org/'), gevent.spawn(get_url, 'https://www.yahoo.com/'), gevent.spawn(get_url, 'https://www.baidu.com/'), gevent.spawn(get_url, 'https://www.sina.com.cn/'), gevent.spawn(get_url,'http://www.xiaohuar.com/') ] ) print(time.time()-start_time)
協程的優點:
一、沒有切換的消耗
二、沒有鎖的概念
有一個問題:能用多核嗎?
答:能夠採用多進程+協程,是一個很好的解決併發的方案