咱們都知道CPU的根本任務就是執行指令,對計算機來講最終都是一串由「0」和「1」組成的序列。CPU從邏輯上能夠劃分紅3個模塊,分別是控制單元、運算單元和存儲單元,這三部分由CPU內部總線鏈接起來:html
控制單元:控制單元是整個CPU的指揮控制中心,由指令寄存器IR(Instruction Register)、指令譯碼器ID(Instruction Decoder)和操做控制器OC(Operation Controller)等,對協調整個電腦有序工做極爲重要。它根據用戶預先編好的程序,依次從存儲器中取出各條指令,放在指令寄存器IR中,經過指令譯碼(分析)肯定應該進行什麼操做,而後經過操做控制器OC,按肯定的時序,向相應的部件發出微操做控制信號。操做控制器OC中主要包括節拍脈衝發生器、控制矩陣、時鐘脈衝發生器、復位電路和啓停電路等控制邏輯。
運算單元:是運算器的核心。能夠執行算術運算(包括加減乘數等基本運算及其附加運算)和邏輯運算(包括移位、邏輯測試或兩個值比較)。相對控制單元而言,運算器接受控制單元的命令而進行動做,即運算單元所進行的所有操做都是由控制單元發出的控制信號來指揮的,因此它是執行部件。
存儲單元:包括CPU片內緩存和寄存器組,是CPU中暫時存放數據的地方,裏面保存着那些等待處理的數據,或已經處理過的數據,CPU訪問寄存器所用的時間要比訪問內存的時間短。採用寄存器,能夠減小CPU訪問內存的次數,從而提升了CPU的工做速度。但由於受到芯片面積和集成度所限,寄存器組的容量不可能很大。寄存器組可分爲專用寄存器和通用寄存器。專用寄存器的做用是固定的,分別寄存相應的數據。而通用寄存器用途普遍並可由程序員規定其用途,通用寄存器的數目因微處理器而異。python
cpu的工做原理:linux
總的來講,CPU從內存中一條一條地取出指令和相應的數據,按指令操做碼的規定,對數據進行運算處理,直到程序執行完畢爲止。程序員
控制單元在時序脈衝的做用下,將指令計數器裏所指向的指令地址(這個地址是在內存裏的)送到地址總線上去,而後CPU將這個地址裏的指令讀到指令寄存器進行譯碼。對於執行指令過程當中所須要用到的數據,會將數據地址也送到地址總線,而後CPU把數據讀到CPU的內部存儲單元(就是內部寄存器)暫存起來,最後命令運算單元對數據進行處理加工。算法
cpu的工做效率:編程
基本上,CPU就是這樣去執行讀出數據、處理數據和往內存寫數據3項基本工做。但在一般狀況下,一條指令能夠包含按明確順序執行的許多操做,CPU的工做就是執行這些指令,完成一條指令後,CPU的控制單元又將告訴指令讀取器從內存中讀取下一條指令來執行。這個過程不斷快速地重複,快速地執行一條又一條指令,產生你在顯示器上所看到的結果。咱們很容易想到,在處理這麼多指令和數據的同時,因爲數據轉移時差和CPU處理時差,確定會出現混亂處理的狀況。爲了保證每一個操做準時發生,CPU須要一個時鐘,時鐘控制着CPU所執行的每個動做。時鐘就像一個節拍器,它不停地發出脈衝,決定CPU的步調和處理時間,這就是咱們所熟悉的CPU的標稱速度,也稱爲主頻。主頻數值越高,代表CPU的工做速度越快。windows
而在執行效率方面,一些廠商經過流水線方式或以幾乎並行工做的方式執行指令的方法來提升指令的執行速度。剛纔咱們提到,指令的執行須要許多獨立的操做,諸如取指令和譯碼等。最初CPU在執行下一條指令以前必須所有執行完上一條指令,而如今則由分佈式的電路各自執行操做。也就是說,當這部分的電路完成了一件工做後,第二件工做當即佔據了該電路,這樣就大大增長了執行方面的效率。 數組
另外,爲了讓指令與指令之間的鏈接更加準確,如今的CPU一般會採用多種預測方式來控制指令更高效率地執行。緩存
Python3 經過兩個標準庫 _thread 和 threading 提供對線程的支持。
_thread 提供了低級別的、原始的線程以及一個簡單的鎖,它相比於 threading 模塊的功能仍是比較有限的。
threading 模塊除了包含 _thread 模塊中的全部方法外,還提供的其餘方法:安全
除了使用方法外,線程模塊一樣提供了Thread類來處理線程,Thread類提供瞭如下方法:
直接調用啓動線程:
#!/usr/bin/env python #coding:utf8 import threading #線程模塊 import time def sayhi(num): #定義每一個線程要運行的函數 print('running on number',num) time.sleep(3) if __name__ == "__main__": t1 = threading.Thread(target=sayhi,args=(33,)) #生成一個線程實例 t2 = threading.Thread(target=sayhi,args=(22,)) #生成另外一個線程實例 t1.start() #啓動線程 t2.start() print(t1.getName()) #獲取線程名 print(t2.getName()) t1.join() #阻塞主線程,等待t1子線程執行完後再執行後面的代碼 t2.join() #阻塞主線程,等待t2子線程執行完後再執行後面的代碼 print('-----end')
繼承式調用啓動線程:
#!/usr/bin/env python #coding:utf8 import threading,time class mythreading(threading.Thread): #寫一個類方法繼承threading模塊 def __init__(self): #threading.Thread.__init__(self) #金典類重寫父類方法 super(mythreading,self).__init__() #重寫父類屬性 self.name = n+self.name.split('d')[1] def run(self): #運行線程的函數,函數名必須是run名稱 super(mythreading,self).run() print('starting on threading',self.name) time.sleep(5) if __name__ == '__main__': #t1 = mythreading(1) #經過類建立線程 #t2 = mythreading(2) #t1.start() #啓動進程 #t2.start() ttr = [] for i in range(10): #啓動十個線程 t = mythreading() ttr.append(t) t.start() t.setName('hehe-{}'.format(i)) #修改線程名 print(t.getName()) #獲取線程名 for item in ttr: item.join() #阻斷線程等待執行完後再執行後續代碼 print('-----end')
守護線程:
#!/usr/bin/env python #coding:utf8 import time import threading def run(num): #子線程運行函數 print('---starting',num) time.sleep(2) print('---done') def main(): #主線程運行函數 print('開啓主線程') for i in range(4): #在主線程中運行4個子線程 t1 = threading.Thread(target=run,args=(i,)) t1.start() print('啓動線程',t1.getName()) t1.join() print('結束主線程') m = threading.Thread(target=main,args=()) m.setDaemon(True) #設置主線程爲守護線程 m.start() m.join(timeout=3) #等待3秒後主線程退出,無論子線程是否運行完 print('------end') #output: 開啓主線程 ---starting 0 啓動線程 Thread-2 ---starting 1 啓動線程 Thread-3 ---starting 2 啓動線程 Thread-4 ---starting 3 啓動線程 Thread-5 ---done ---done ---done ---done 結束主線程 ------end 進程已結束,退出代碼0
若是多個線程共同對某個數據修改,則可能出現不可預料的結果,爲了保證數據的正確性,須要對多個線程進行同步。
使用 Thread 對象的 Lock 和 Rlock 能夠實現簡單的線程同步,這兩個對象都有 acquire 方法和 release 方法,對於那些須要每次只容許一個線程操做的數據,能夠將其操做放到 acquire 和 release 方法之間。以下:
多線程的優點在於能夠同時運行多個任務(至少感受起來是這樣)。可是當線程須要共享數據時,可能存在數據不一樣步的問題。
考慮這樣一種狀況:一個列表裏全部元素都是0,線程」set」從後向前把全部元素改爲1,而線程」print」負責從前日後讀取列表並打印。
那麼,可能線程」set」開始改的時候,線程」print」便來打印列表了,輸出就成了一半0一半1,這就是數據的不一樣步。爲了不這種狀況,引入了鎖的概念。
鎖有兩種狀態——鎖定和未鎖定。每當一個線程好比」set」要訪問共享數據時,必須先得到鎖定;若是已經有別的線程好比」print」得到鎖定了,那麼就讓線程」set」暫停,也就是同步阻塞;等到線程」print」訪問完畢,釋放鎖之後,再讓線程」set」繼續。
通過這樣的處理,打印列表時要麼所有輸出0,要麼所有輸出1,不會再出現一半0一半1的尷尬場面。
鎖提供以下方法:
1.Lock.acquire([blocking])
2.Lock.release()
3.threading.Lock() 加載線程的鎖對象,是一個基本的鎖對象,一次只能一個鎖定,其他鎖請求,需等待鎖釋放後才能獲取
4.threading.RLock() 多重鎖,在同一線程中可用被屢次acquire。若是使用RLock,那麼acquire和release必須成對出現, 調用了n次acquire鎖請求,則必須調用n次的release才能在線程中釋放鎖對象。
import threading,time class mythread(threading.Thread): def __init__(self,threadID,threadName): super(mythread,self).__init__() self.threadID = threadID self.threadName = threadName def run(self): print('開啓線程:',self.threadName) threadLock.acquire() #獲取線程鎖 print_time(time.time(),self.threadName) threadLock.release() #釋放線程鎖 def print_time(suntime,threadName): for i in range(3): time.sleep(2) print('%s,,,%s'%(suntime,threadName)) threadLock = threading.Lock() #建立線程鎖 t1 = mythread(1,'thread1') #建立線程1 t2 = mythread(2,'thread2') t1.start() #啓動線程 t2.start() t1.join() #阻塞主線程,等待線程1完成 t2.join() print('退出程序') #鎖住運行線程函數後,會等待線程1執行完成後在執行線程2 #output 開啓線程: thread1 開啓線程: thread2 1516081515.3328698,,,thread1 1516081515.3328698,,,thread1 1516081515.3328698,,,thread1 1516081521.3341322,,,thread2 1516081521.3341322,,,thread2 1516081521.3341322,,,thread2 退出程序
from threading import Thread,Lock,RLock import time class mythread(Thread): def __init__(self,number1,number2): super(mythread,self).__init__() self.number1 = number1 self.number2 = number2 def run(self): print('開啓線程',self.name) lock.acquire() print('run is:',time.time(),self.number1+self.number2) arithmetic(self.number1,self.number2) time.sleep(2) lock.release() def arithmetic(avg1,avg2): lock.acquire() print('arithmetic:',time.time(),avg1+avg2) time.sleep(2) lock.release() #建立鎖對象時若是使用Lock則會在運行線程2時一直處於等待狀態 #若是使用RLock則可正常運行,RLock支持多重鎖 lock = RLock() #建立多重鎖 if __name__ == "__main__": t1 = mythread(3,4) t2 = mythread(5,6) t1.start() t2.start() t1.join() t2.join() print('程序結束!') #OUTPUT 開啓線程 Thread-1 run is: 1516088282.0478237 7 arithmetic: 1516088282.0478237 7 開啓線程 Thread-2 run is: 1516088286.0496652 11 arithmetic: 1516088286.0496652 11 程序結束!
該queue
模塊實現多生產者,多用戶隊列。當在多線程之間必須安全地交換信息時,它在線程編程中特別有用。該Queue
模塊中的類實現了全部必需的鎖定語義
該模塊實現三種類型的隊列,它們的區別僅在於檢索條目的順序:
在FIFO隊列中,第一個添加的任務是第一個被檢索的。
在LIFO隊列中,最後添加的條目是第一個被檢索(像堆棧同樣操做)。
使用優先級隊列,條目保持排序(使用heapq
模塊),而且首先檢索最低值的條目。
在內部,模塊使用鎖來暫時阻止競爭的線程; 可是,它不是爲了處理線程內的重入而設計的;該queue模塊定義瞭如下類:
queue.
Queue
(maxsize = 0 )
FIFO隊列的構造器。 maxsize是一個整數,用於設置可放入隊列的項目數的上限。一旦達到此大小,插入將會阻塞,直到隊列項被消耗。若是 maxsize小於或等於零,隊列大小是無限的。
queue.
LifoQueue
(maxsize = 0 )
LIFO隊列的構造器。 maxsize是一個整數,用於設置可放入隊列的項目數的上限。一旦達到此大小,插入將會阻塞,直到隊列項被消耗。若是 maxsize小於或等於零,隊列大小是無限的。
queue.
PriorityQueue
(maxsize = 0 )
優先隊列的構造函數。 maxsize是一個整數,用於設置可放入隊列的項目數的上限。一旦達到此大小,插入將會阻塞,直到隊列項被消耗。若是 maxsize小於或等於零,隊列大小是無限的。
首先檢索最低值的條目(最低值條目是返回的條目sorted(list(entries))[0]
)。對於條目的典型圖案的形式是一個元組:。(priority_number, data)
exception queue.
Empty
在空對象上調用非阻塞get()
(或 get_nowait()
)時引起異常Queue
。
exception queue.
Full
若是在已滿的對象上調用非阻塞put()
(或 put_nowait()
),則會引起異常Queue
。
隊列對象(Queue,LifoQueue或PriorityQueue)提供如下的公共方法:
Queue.
qsize
()
返回隊列的大小。請注意,qsize()> 0並不保證後續的get()不會被阻塞,qsize()也不會保證put()不會被阻塞。
Queue.
empty
()
若是隊列爲空則返回True,不然返回False。若是empty()返回
True
,則不保證對put()的後續調用不會被阻塞。一樣,若是empty()返回False
,則不保證後續調用get()不會被阻塞。
Queue.
full
()
若是隊列已滿則返回True,不然返回Fales。若是full()返回
True
,則不保證後續的get()調用不會被阻塞。一樣,若是full()返回False
它並不能保證後續調用put()不會被阻塞。
Queue.
put
(item,block = True,timeout = None )
將項目放入隊列中。若是可選參數block爲True,而且timeout爲 None
(默認),則根據須要進行阻止,直到空閒插槽可用。若是 timeout是一個正數,則最多會阻塞timeout秒數,Full
若是在此時間內沒有空閒插槽,則會引起異常。不然(block爲false),若是一個空閒插槽當即可用,則在隊列中放置一個項目,不然引起Full
異常(在這種狀況下timeout被忽略)。
Queue.
put_nowait
(item )
至關於。put(item, False)
Queue.
get
(block = True,timeout = None )
從隊列中移除並返回一個項目。若是可選參數塊爲真,而且 timeout爲None
(默認),則在必要時阻塞,直到項目可用。若是timeout是一個正數,則最多會阻塞timeout秒數,Empty
若是在該時間內沒有可用項目,則會引起異常。不然(block爲false),返回一個項目,若是一個是當即可用的,不然引起Empty
異常(在這種狀況下timeout被忽略)。
Queue.
get_nowait
()
至關於get(False)
。
提供了兩種方法來支持跟蹤入隊任務是否已徹底由守護進程消費者線程處理。
Queue.
task_done
()
代表之前排隊的任務已經完成。由隊列消費者線程使用。對於每一個get()
用於獲取任務的對象,隨後的調用都會task_done()
告訴隊列,任務的處理已完成。
若是a join()
當前被阻塞,則在處理全部項目時(意味着task_done()
已經接收到已經put()
進入隊列的每一個項目的呼叫),則將恢復。
提出了一個ValueError
好象叫更多的時間比中放入隊列中的項目。
Queue.
join
()
阻塞,直到隊列中的全部項目都被獲取並處理。
每當將項目添加到隊列中時,未完成任務的數量就會增長。只要消費者線程調用task_done()
來指示該項目已被檢索,而且全部工做都已完成,計數就會減小。當未完成任務的數量降低到零時,join()
取消阻止。
#Queue先進先出隊列 import queue def show(q,i): if q.empty() or q.qsize() >= 1: q.put(i) #存隊列 elif q.full(): print('queue not size') que = queue.Queue(5) #容許5個隊列的隊列對象 for i in range(5): show(que,i) print('queue is number:',que.qsize()) #隊列元素個數 for j in range(5): print(que.get()) #取隊列 print('......end') #output: queue is number: 5 0 1 2 3 4 ......end
#LifoQueue先進後出隊列 import queue lifoque = queue.LifoQueue() lifoque.put('hello1') lifoque.put('hello2') lifoque.put('hello3') print(lifoque.qsize()) print(lifoque.get()) print(lifoque.get()) print(lifoque.get()) #output: 3 hello3 hello2 hello1
#PriorityQueue按數據大小取最小值優先 import queue pque = queue.PriorityQueue() #優先級的隊列 pque.put(7) #先存入隊列 pque.put(5) pque.put(3) print(pque.qsize()) print(pque.get()) #取出最小值的數據 print(pque.get()) print(pque.get()) #output: 3 22 52 71
Lock鎖是同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據,如一個場所內只有3把椅子給人坐,那麼只容許3我的,其它人則排隊,只有等裏面的人出來後才能進去。
import threading,time class mythreading(threading.Thread): #寫一個類方法繼承hreading模塊 def run(self): #運行線程的函數,函數名必須是run名稱 semaphore.acquire() #獲取信號量鎖 print('running the thread:',self.getName()) time.sleep(2) semaphore.release() #釋放信號量鎖 if __name__ == '__main__': semaphore = threading.BoundedSemaphore(3) #建立信號量對象,只運行3個進程同時運行 for i in range(20): t1 = mythreading() t1.start() t1.join() print('---end')
threading模塊的Timer方法,在通過必定時間後才能運行的操做,通常只針對函數運行。
import threading def run(): print('is running...') t=threading.Timer(5.0,run) #等待5秒後執行函數 t.start()
python線程的事件用於主線程控制其它線程的執行,事件主要提供了三個方法:
event.wait 線程阻塞
event.set 將全局對象「Flag」設置爲False
event.clear 將全局對象"Flag"設置爲True
事件處理的機制:全局定義了一個「Flag」,默認爲「False」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。
#!/usr/bin/env python #coding:utf8 import threading def show(event): print('start') event.wait() #阻塞線程執行程序 print('done') event_obj = threading.Event() #建立event事件對象 for i in range(10): t1 = threading.Thread(target=show,args=(event_obj,)) t1.start() inside = input('>>>:') if inside == '1': event_obj.set() #當用戶輸入1時,set全局Flag爲True,線程再也不阻塞,打印"done" event_obj.clear() #將Flag設置爲False
使得線程等待,只有知足某條件時,才釋放N個線程
import threading def condition_func(): ret = False inp = input('>>>:') if inp == '1': ret = True return ret def run(n): con.acquire() #條件鎖 con.wait_for(condition_func) #判斷條件 print('running...',n) con.release() #釋放鎖 print('------------') if __name__ == '__main__': con = threading.Condition() #創建線程條件對象 for i in range(10): t = threading.Thread(target=run,args=(i,)) t.start() t.join() print('退出程序')
multiprocessing是一個使用相似於threading模塊的API來支持多進程。該multiprocessing軟件包提供本地和遠程併發,經過使用子進程來充分利用服務器上的多個處理器,它能夠在Unix和Windows上運行。
class multiprocessing.
Process
(group = None,target = None,name = None,args =(),kwargs = {},*,daemon = None )
過程對象表示在單獨的過程當中運行的活動。這個 Process
類的全部方法的等價與 threading.Thread
。
應始終使用關鍵字參數調用構造函數。羣體 應該永遠是None
; 它僅僅是爲了兼容而存在 threading.Thread
。 target是run()
方法調用的可調用對象。它默認爲None
,意味着什麼都不叫。name是進程名稱(name
更多細節見)。 args是目標調用的參數元組。 kwargs是目標調用的關鍵字參數字典。若是提供,則僅關鍵字守護進程參數將進程daemon
標誌設置爲True
或False
。若是None
(默認),這個標誌將從建立過程繼承。
默認狀況下,沒有參數傳遞給目標。
若是一個子類重寫了構造函數,那麼必須確保它Process.__init__()
在執行任何其餘操做以前調用基類的構造函數()。
在版本3.3中進行了更改:添加了守護進程參數。
run
()
表示進程活動的方法。
您能夠在子類中重寫此方法。標準run()
方法調用傳遞給對象構造函數的可調用對象做爲目標參數,若是有的話,分別從args和kwargs參數中獲取順序和關鍵字參數。
start
()
開始進程的活動。
每一個進程對象最多隻能調用一次。它安排對象的run()
方法在一個單獨的進程中被調用。
join
([ timeout ] )
若是可選參數timeout是None
(缺省值),則該方法將阻塞,直到其join()
方法被調用的進程終止。若是超時是一個正數,它最多會阻塞超時秒數。請注意,None
若是方法終止或方法超時,則方法返回。檢查流程exitcode
,肯定是否終止。
一個過程能夠鏈接屢次。
進程沒法自行加入,由於這會致使死鎖。嘗試在啓動以前加入進程是錯誤的。
name
該進程的名稱。名稱是一個字符串,僅用於識別目的。它沒有語義。多個進程能夠被賦予相同的名稱。
初始名稱由構造函數設置。若是沒有明確的名字被提供給構造函數,就會構造一個名爲「Process-N 1:N 2:...:N k 」 的表單,其中每一個N k是其父代的第N個孩子。
is_alive
()
返回進程是否存在;從start()
方法返回的那一刻起,一個進程對象是活着的,直到子進程終止。
daemon
進程的守護進程標誌,一個布爾值。這必須在start()
調用以前設置 。
初始值是從建立過程繼承的。當進程退出時,它將嘗試終止其全部守護進程的子進程。
請注意,守護進程不容許建立子進程。不然,若是父進程退出時被終止,則守護進程將使其子進程成爲孤兒。此外,這些不是 Unix守護進程或服務,它們是正常的進程,若是非守護進程已經退出,它將被終止(而不是加入)。
除了 threading.Thread
API以外,Process
對象還支持如下屬性和方法:
pid
返回進程ID。在這個過程產生以前,這將是 None
。
exitcode
孩子的退出代碼。這將是None
若是過程尚未結束。負值-N表示孩子被信號N終止。
authkey
進程的身份驗證密鑰(一個字節字符串)。
當multiprocessing
初始化主進程正在使用分配一個隨機串os.urandom()
。
當一個Process
對象被建立時,它會繼承父進程的認證密鑰,儘管這能夠經過設置authkey
成另外一個字節字符串來改變。
請參閱驗證密鑰。
sentinel
系統對象的數字句柄,當進程結束時它將變爲「就緒」。
若是您想一次使用多個事件,則可使用此值multiprocessing.connection.wait()
。不然,調用join()
更簡單。
在Windows中,這是與使用的OS手柄WaitForSingleObject
和WaitForMultipleObjects
家庭的API調用。在Unix上,這是一個可用於select
模塊原語的文件描述符。
3.3版本中的新功能
terminate
()
終止這個進程。在Unix上,這是使用SIGTERM
信號完成的。在Windows TerminateProcess()
上使用。請注意,退出處理程序和最後的子句等將不會被執行。
須要注意的是start()
,join()
,is_alive()
, terminate()
和exitcode
方法只能由建立進程對象的過程調用。
異常:
exception multiprocessing.
ProcessError
全部multiprocessing
異常的基類。
exception
multiprocessing.
BufferTooShort
Connection.recv_bytes_into()
當提供的緩衝區對象過小而不能讀取消息時引起異常。
若是e
是的一個實例BufferTooShort
,而後e.args[0]
會給出消息做爲字節串。
exception multiprocessing.
AuthenticationError
發生認證錯誤時引起。
exception multiprocessing.
TimeoutError
當超時到期時由超時方法提出。
建立10個併發進程:
from multiprocessing import Process def proce(pn): print('hello',pn) if __name__ == '__main__': for i in range(10): #啓動10個進程 p1 = Process(target=proce,args=(i,)) #建立進程 p1.start() #啓動進程 #print('nn:',p1.name) p1.join() #等待進程結束 print('exit project')
打印進程ID:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/17 14:15 # @Author : Py.qi # @File : 進程ID.py # @Software: PyCharm from multiprocessing import Process import os def info(title): print(title) print('module name',__name__) #是否主程序 print('parent process',os.getppid()) #上層進程號 print('process id:',os.getpid()) #自身進程號 def f(name): info('function f') print('hello',name) if __name__ == '__main__': info('main line') #經過pycharm進程運行函數 p = Process(target=f,args=('bob',)) #調用子進程運行函數 p.start() p.join() #output: main line module name __main__ parent process 5756 #pycharm進程號 process id: 9864 #自身進程 function f module name __mp_main__ parent process 9864 process id: 9024 #文件建立的進程號 hello bob
multiprocessing支持三種啓動流程的方式:
spawn:
父進程啓動一個新鮮的Python解釋器進程。子進程只會繼承運行進程對象run()
方法所必需的資源。特別是父進程中沒必要要的文件描述符和句柄將不會被繼承。與使用fork或forkserver相比,使用此方法啓動進程至關慢。
在Unix和Windows上可用。Windows上是默認的。
fork:
父進程使用os.fork()
fork來解釋Python。子進程在開始時與父進程有效地相同。父進程的全部資源都由子進程繼承。請注意,安全分叉多線程的過程是有問題的。
僅在Unix上提供。Unix上是默認的。
forkserver:
當程序啓動並選擇forkserver啓動方法時,啓動一個服務器進程。從那時起,不管什麼時候須要一個新的進程,父進程都鏈接到服務器並請求它分叉一個新的進程。fork服務器進程是單線程的,因此它是安全的使用os.fork()
。沒有沒必要要的資源被繼承。
在支持經過Unix管道傳遞文件描述符的Unix平臺上可用。
在3.4版中進行了更改:在全部unix平臺上添加了spawn,並爲某些unix平臺添加了forkserver。子進程再也不繼承Windows上的全部父代繼承句柄。
在Unix上,使用spawn或forkserver啓動方法也將啓動一個信號量跟蹤器進程,該進程跟蹤程序進程建立的未連接的已命名信號量。當全部進程退出信號量跟蹤器時,將取消全部剩餘信號量的連接。一般應該沒有,但若是一個進程被一個信號殺死,那麼可能會有一些「泄漏」的信號量。(取消連接命名的信號量是一個嚴重的問題,由於系統只容許有限的數量,而且在下一次從新啓動以前它們不會自動斷開鏈接。)
在主模塊中使用set_start_method()的啓動方法:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/17 15:44 # @Author : Py.qi # @File : set_start1.py # @Software: PyCharm import multiprocessing as mp def foo(q): #傳遞隊列對象給函數 q.put('hello') q.put('123') q.put('python') if __name__ == '__main__': mp.set_start_method('spawn') #設置啓動進程方式 q = mp.Queue() #建立隊列對象 p = mp.Process(target=foo, args=(q,)) #啓動一個進程,傳遞運行函數和參數 p.start() #啓動進程 print(q.get()) #獲取隊列數據 print(q.qsize()) print(q.get()) p.join()
set_start_method()不能在程序中使用屢次,可使用get_context()獲取上下文對象,上下文對象與多處理模塊具備相同的API,並容許在同一個程序中使用多個啓動方法。
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 13:14 # @Author : Py.qi # @File : get_context_1.py # @Software: PyCharm import multiprocessing as mp import time def foo(q): for i in range(10): q.put('python%s'%i) time.sleep(1) #q.put('context') if __name__ == '__main__': ext = mp.get_context('spawn') q = ext.Queue() p = ext.Process(target=foo,args=(q,)) p2 = ext.Process(target=foo,args=(q,)) p.start() p2.start() for i in range(10): print(q.get()) #print(q.get()) #print(q.get()) #print(q.get()) #print(q.get()) p.join() p2.join() #output: python0 python0 python1 python1 python2 python2 python3 python3 python4 python4
請注意,與一個上下文相關的對象可能與另外一個上下文的進程不兼容。特別是,使用fork上下文建立的鎖不能被傳遞給使用spawn或forkserver啓動方法啓動的進程 。
想要使用特定啓動方法的庫應該能夠get_context()
用來避免干擾庫用戶的選擇。
multiprocessing
支持兩種進程之間的通信通道:
class multiprocessing.
Queue
([ maxsize ] )
返回使用管道和一些鎖/信號量實現的進程共享隊列。當一個進程首先把一個項目放到隊列中時,一個進給線程被啓動,將一個緩衝區中的對象傳送到管道中。
Queue
實現queue.Queue
除了task_done()
和以外的 全部方法join()
。
qsize
()
返回隊列的近似大小。因爲多線程/多處理語義,這個數字是不可靠的。
請注意,NotImplementedError
在Mac OS X等Unix平臺上可能會出現這種sem_getvalue()
狀況。
empty
()
True
若是隊列爲空False
則返回,不然返回。因爲多線程/多處理語義,這是不可靠的。
full
()
True
若是隊列已滿False
則返回,不然返回。因爲多線程/多處理語義,這是不可靠的。
put
(obj [,block [,timeout ] ] )
將obj放入隊列中。若是可選參數塊是True
(缺省值)而且超時是None
(缺省值),則在須要時禁止,直到空閒插槽可用。若是超時是一個正數,則最多會阻塞超時秒數,queue.Full
若是在此時間內沒有空閒插槽,則會引起異常。不然(塊是 False
),若是一個空閒插槽當即可用,則在隊列中放置一個項目,不然引起queue.Full
異常(在這種狀況下超時被忽略)。
put_nowait
(obj )
至關於。put(obj, False)
get
([ block [,timeout ] ] )
從隊列中移除並返回一個項目。若是可選ARGS 塊是 True
(默認值),超時是None
(默認),塊若有必要,直到一個項目是可用的。若是超時是一個正數,則最多會阻塞超時秒數,queue.Empty
若是在該時間內沒有可用項目,則會引起異常。不然(塊是 False
),返回一個項目,若是一個是當即可用的,不然引起 queue.Empty
異常(在這種狀況下超時被忽略)。
get_nowait
()
至關於get(False)
。
multiprocessing.Queue
有一些其餘的方法沒有找到 queue.Queue
。大多數代碼一般不須要這些方法:
close
()
代表當前進程不會有更多的數據放在這個隊列中。一旦將全部緩衝的數據刷新到管道,後臺線程將退出。當隊列被垃圾收集時,這被自動調用。
join_thread
()
加入後臺線程。這隻能在close()
被調用後才能使用。它阻塞,直到後臺線程退出,確保緩衝區中的全部數據已經刷新到管道。
默認狀況下,若是一個進程不是隊列的建立者,那麼在退出時它將嘗試加入隊列的後臺線程。該過程能夠調用cancel_join_thread()
作出join_thread()
什麼也不作。
cancel_join_thread
()
防止join_thread()
阻塞。尤爲是,這能夠防止後臺線程在進程退出時自動加入
class multiprocessing.
SimpleQueue
empty
()
True
若是隊列爲空False
則返回,不然返回。
get
()
從隊列中移除並返回一個項目。
put
(item )
將元素放入隊列中。
class multiprocessing.
JoinableQueue
([ maxsize ] )
JoinableQueue
,一個Queue
子類,是另外有一個隊列task_done()
和join()
方法。
task_done
()
代表之前排隊的任務已經完成。由隊列使用者使用。對於每一個get()
用於獲取任務的對象,隨後的調用都會task_done()
告訴隊列,任務的處理已完成。
若是a join()
當前被阻塞,則在處理全部項目時(意味着task_done()
已經接收到已經put()
進入隊列的每一個項目的呼叫),則將恢復。
提出了一個ValueError
好象叫更多的時間比中放入隊列中的項目。
join
()
阻塞,直到隊列中的全部項目都被獲取並處理。
每當將項目添加到隊列中時,未完成任務的數量就會增長。每當消費者打電話task_done()
代表該物品已被檢索而且全部工做都已完成時,計數就會降低 。當未完成任務的數量降低到零時, join()
取消阻止。
multiprocessing.
active_children
()
返回當前進程的全部活動的列表。調用這個函數有「加入」已經完成的任何進程的反作用。
multiprocessing.
cpu_count
()
返回系統中的CPU數量。可能會提升 NotImplementedError
。
multiprocessing.
current_process
()
返回Process
當前進程對應的對象。一個相似的threading.current_thread()
。
multiprocessing.
freeze_support
()
添加對什麼時候使用的程序multiprocessing
進行凍結以產生Windows可執行文件的支持。(已經用py2exe, PyInstaller和cx_Freeze進行了測試。)須要在主模塊後面直接調用這個函數:if __name__ == '__main__'
freeze_support()
在Windows之外的操做系統上調用時,調用不起做用。另外,若是模塊正在經過Windows上的Python解釋器正常運行(程序尚未被凍結),那麼這個模塊freeze_support()
沒有任何做用。
multiprocessing.
get_all_start_methods
()
返回支持的啓動方法的列表,其中第一個是默認的。可能的啓動方法是'fork'
, 'spawn'
和'forkserver'
。在Windows上只'spawn'
可用。在Unix上'fork'
而且'spawn'
始終受支持,而且'fork'
是默認的。3.4版本中的新功能
multiprocessing.
get_context
(method = None )
返回與multiprocessing
模塊具備相同屬性的上下文對象 。
若是方法是None
那麼返回默認的上下文。不然,方法應該是'fork'
,'spawn'
, 'forkserver'
。 ValueError
若是指定的啓動方法不可用,則引起。3.4版本中的新功能
multiprocessing.
get_start_method
(allow_none = False )
返回用於啓動進程的啓動方法的名稱。
若是start方法沒有被修復,而且allow_none爲false,那麼start方法被固定爲默認值,並返回名稱。若是start方法沒有被修復,而且allow_none 爲true,則None
返回。
返回值能夠是'fork'
,'spawn'
,'forkserver'
或None
。 'fork'
在Unix上是默認的,而'spawn'
在Windows上是默認的。
3.4版本中的新功能
multiprocessing.
set_executable
()
設置啓動子進程時使用的Python解釋器的路徑。(默認sys.executable
使用)。嵌入可能須要作一些事情
set_executable (OS ,路徑,加入(SYS 。exec_prefix , 'pythonw.exe' ))
才能夠建立子進程。在版本3.4中更改:如今在Unix上支持'spawn'
啓動方法時使用。
multiprocessing.
set_start_method
(method)
設置應該用來啓動子進程的方法。 方法能夠'fork'
,'spawn'
或者'forkserver'
。
請注意,這應該至多被調用一次,而且應該在主模塊的子句內受到保護。if __name__ == '__main__'
3.4版本中的新功能
Queue(隊列):這個Queue類爲queue.Queue的克隆
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 13:40 # @Author : Py.qi # @File : queue_pbet.py # @Software: PyCharm from multiprocessing import Process,Queue def f(q): q.put([42, None, 'hello']) q.put('python') q.put('python3') if __name__ == '__main__': q = Queue() #建立隊列對象 p = Process(target=f, args=(q,)) #新進程 p.start() print(q.get()) # prints "[42, None, 'hello']" print(q.get()) print(q.get()) p.join()
Pipes(管道):該Pipe()函數返回一對由默認爲雙工(雙向)的管道鏈接的鏈接對象。
鏈接對象容許發送和接收可選對象或字符串。他們能夠被認爲是面向消息的鏈接套接字。
鏈接對象一般使用建立Pipe()
class multiprocessing.
Connection
send
(obj )
將對象發送到應該讀取的鏈接的另外一端recv()
。
該對象必須是可挑選的。很是大的包(大約32 MB +,雖然取決於操做系統)可能會引起ValueError
異常。
fileno
()
返回鏈接使用的文件描述符或句柄。
close
()
關閉鏈接。
當鏈接被垃圾收集時,這被自動調用。
poll
([ timeout ] )
返回是否有可讀的數據。
若是沒有指定超時,則會當即返回。若是超時是一個數字,那麼這指定了以秒爲單位的最大時間來阻止。若是timeout,None
則使用無限超時。
請注意,可使用一次輪詢多個鏈接對象multiprocessing.connection.wait()
。
send_bytes
(buffer [,offset [,size ] ] )
從相似字節的對象發送字節數據做爲完整的消息。
若是給出偏移量,則從緩衝區中的該位置讀取數據。若是 給定大小,那麼將從緩衝區中讀取多個字節。很是大的緩衝區(大約32 MB +,但取決於操做系統)可能會引起 ValueError
異常
recv_bytes
([ maxlength ] )
將鏈接另外一端發送的字節數據的完整消息做爲字符串返回。阻止,直到有東西要接收。EOFError
若是沒有什麼能夠接收,而另外一端已經關閉,就會引起。
若是最大長度被指定而且所述消息是長於最大長度 而後OSError
升至並鏈接將再也不是可讀的。
recv_bytes_into
(buffer [,offset ] )
從鏈接的另外一端讀取緩衝區中的字節數據的完整消息,並返回消息中的字節數。阻止,直到有東西要接收。提出EOFError
,若是沒有什麼留下來接收,而另外一端被關閉。
緩衝區必須是一個可寫的字節類對象。若是 給出了偏移量,則消息將從該位置寫入緩衝區。偏移量必須是小於緩衝區長度的非負整數(以字節爲單位)。
若是緩衝區過短,則BufferTooShort
引起異常,而且完整的消息可用, 異常實例e.args[0]
在哪裏e
。
在版本3.3中更改:如今可使用Connection.send()
和在進程之間傳輸鏈接對象自己Connection.recv()
。
3.3版本中的新功能:鏈接對象如今支持上下文管理協議 - 請參閱 上下文管理器類型。 __enter__()
返回鏈接對象,並__exit__()
調用close()
。
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 13:50 # @Author : Py.qi # @File : queue_pipes.py # @Software: PyCharm from multiprocessing import Process,Pipe def f(parent,child): parent.send('parent1') #父連接寫入數據 parent.send('parent2') child.send('python1') #子連接寫入數據 child.send('python2') parent.close() #關閉管道 child.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() #建立父和子管道鏈接 p = Process(target=f, args=(parent_conn,child_conn)) p.start() print(parent_conn.recv()) #從父連接收的數據是從子連接寫入的數據 print(parent_conn.recv()) print(child_conn.recv()) #從子連接收的數據是從父連接寫入的數據 print(child_conn.recv()) p.join()
Pipe()返回兩個鏈接對象,表示管道的兩端(parent_conn)爲頭,(child_conn)爲尾。這兩個鏈接對象都有send()和recv()方法等。請注意:若是兩個或多個進程(線程)試圖同時讀取或寫入管道的同一端,則管道中的數據可能會損壞,能夠同時使用管道的頭和尾流程式的讀取和寫入數據就沒有問題了。
一般,同步原語在多進程程序中並不像在多線程程序中那樣必要;也可使用管理器對象建立同步基元。
multiprocessing.
Barrier
(parties [,action [,timeout ] ] )
屏蔽對象:一個克隆的threading.Barrier
。3.3版本中的新功能
multiprocessing.
BoundedSemaphore
([ value ] )
一個有界的信號量對象 threading.BoundedSemaphore
。與其相近的模擬存在單一差別:其acquire
方法的第一個參數是命名塊,與之一致Lock.acquire()
。
注意:在Mac OS X上,這是沒法區分的,Semaphore
由於 sem_getvalue()
沒有在該平臺上實現。
multiprocessing.
Condition
([ lock ] )
一個條件變量:一個別名threading.Condition
。若是指定了鎖,那麼它應該是一個Lock
或一個RLock
對象multiprocessing
。在版本3.3中更改:該wait_for()
方法已添加。
class multiprocessing.
Event
克隆的threading.Event
。
class multiprocessing.
Lock
一個非遞歸鎖對象:一個很是相似的threading.Lock
。一旦一個進程或線程得到一個鎖,隨後的嘗試從任何進程或線程獲取它將被阻塞,直到它被釋放; 任何進程或線程可能會釋放它。threading.Lock
應用於線程的概念和行爲在 這裏被複制,multiprocessing.Lock
由於它適用於進程或線程,除非指出。
請注意,這Lock
其實是一個工廠函數,它返回一個multiprocessing.synchronize.Lock
使用默認上下文初始化的實例。
acquire
(block = True,timeout = None )
獲取一個鎖,阻塞或不阻塞。
在block參數設置爲True
(默認)的狀況下,方法調用將被阻塞直到鎖處於解鎖狀態,而後將其設置爲鎖定並返回True
。請注意,這第一個參數的名稱不一樣於threading.Lock.acquire()
。
將塊參數設置爲False
,方法調用不會阻止。若是鎖目前處於鎖定狀態,則返回False
; 不然將鎖設置爲鎖定狀態並返回True
。
當用正值,浮點值超時調用時,只要沒法獲取鎖定,最多隻能阻塞超時指定的秒數。具備負值的超時調用 至關於超時零。超時值None
(默認值)的調用 將超時期限設置爲無限。請注意,處理負值或超時None
值 與實施的行爲不一樣 。該超時參數有,若是沒有實際意義塊參數被設置爲,並所以忽略。返回threading.Lock.acquire()
False
True
若是鎖已被獲取或False
超時時間已過。
release
()
釋放一個鎖。這能夠從任何進程或線程調用,不只是最初獲取鎖的進程或線程。
threading.Lock.release()
除了在解鎖的鎖上調用a時,行爲與其相同ValueError
。
class multiprocessing.
RLock
遞歸鎖對象:一個緊密的類比threading.RLock
。遞歸鎖必須由獲取它的進程或線程釋放。一旦一個進程或線程得到遞歸鎖定,相同的進程或線程能夠再次得到它,而不會阻塞; 該進程或線程必須每次釋放一次它被獲取。
請注意,這RLock
其實是一個工廠函數,它返回一個multiprocessing.synchronize.RLock
使用默認上下文初始化的實例。
RLock
支持上下文管理器協議,所以能夠用在with
語句中。
acquire
(block = True,timeout = None )
獲取一個鎖,阻塞或不阻塞。
當調用塊參數設置爲True
,阻塞直到鎖處於解鎖狀態(不屬於任何進程或線程),除非該鎖已被當前進程或線程所擁有。當前進程或線程接着獲取鎖的全部權(若是它尚未全部權),而且鎖內的遞歸級別增長1,致使返回值爲True
。請注意,第一個參數的行爲與實現相比有幾個不一樣之處threading.RLock.acquire()
,從參數自己的名稱開始。
當調用塊參數設置爲False
,不要阻塞。若是鎖已經被另外一個進程或線程獲取(並所以被擁有),則當前進程或線程不獲取全部權,而且鎖內的遞歸級別不改變,致使返回值爲False
。若是鎖處於未鎖定狀態,則當前進程或線程獲取全部權,遞歸級別遞增,結果返回值爲True
。
timeout參數的使用和行爲與in中的相同 Lock.acquire()
。請注意,這些超時行爲中的一些 與實施的行爲有所不一樣threading.RLock.acquire()
。
release
()
釋放一個鎖,遞減遞歸級別。若是在遞減以後遞歸級別爲零,則將鎖重置爲解鎖(不禁任何進程或線程擁有),而且若是有其餘進程或線程被阻塞,等待鎖解鎖,則准許其中的一個繼續進行。若是在遞減以後遞歸級別仍然不爲零,那麼鎖保持鎖定並由調用進程或線程擁有。
只有在調用進程或線程擁有鎖定時才調用此方法。一個AssertionError
若是該方法是經過一個過程調用或線程之外的僱主或升高若是鎖處於解鎖(無主)的狀態。請注意,在這種狀況下引起的異常的類型不一樣於執行的行爲threading.RLock.release()
。
multiprocessing.
Semaphore
([ value ] )
一個信號量對象:一個接近的類比threading.Semaphore
。
與其相近的模擬存在單一差別:其acquire
方法的第一個參數是命名塊,與之一致Lock.acquire()
。
multiprocessing
包含來自全部同步方法等價於threading
。例如,可使用鎖來確保一次只有一個進程打印到標準輸出:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 14:16 # @Author : Py.qi # @File : mult_sync.py # @Software: PyCharm from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start() #建立進程並啓動
在進行併發編程時,一般最好儘量避免使用共享狀態。使用多個進程時尤爲如此。
可是,若是你確實須要使用一些共享數據,那麼 multiprocessing
提供了一些方法。
可使用能夠由子進程繼承的共享內存來建立共享對象。
multiprocessing.
Value
(typecode_or_type,* args,lock = True )
返回ctypes
從共享內存分配的對象。默認狀況下,返回值其實是對象的同步包裝器。對象自己能夠經過a的value屬性來訪問Value
。
typecode_or_type決定了返回對象的類型:它是一個ctypes類型或array
模塊使用的一種字符類型。 *參數傳遞給類型的構造函數。
若是鎖是True
(默認),則會建立一個新的遞歸鎖對象來同步對該值的訪問。若是鎖是一個Lock
或一個RLock
對象,那麼將用於同步訪問值。若是是鎖,False
那麼訪問返回的對象將不會被鎖自動保護,所以它不必定是「過程安全的」。
Shared memory(共享內存):可使用value或將數據存儲在共享內存映射中Array
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 14:24 # @Author : Py.qi # @File : mult_sharememory.py # @Software: PyCharm from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = i*i if __name__ == '__main__': num = Value('d', 0.0) #d表示一個雙精度浮點數 arr = Array('i', range(10)) #i表示符號整數 p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:]) #output: 3.1415927 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
在'd'
與'i'
建立時使用的參數num
和arr
被使用的那種的TypeCodes array
模塊:'d'
表示一個雙精度浮點數和'i'
指示符號整數。這些共享對象將是進程和線程安全的。
爲了更靈活地使用共享內存,可使用multiprocessing.sharedctypes
支持建立從共享內存分配的任意ctypes對象的 模塊。
Server process(服務器進程):經過Manager()
控制一個服務器進程來返回管理器對象,該進程持有Python對象,並容許其餘進程使用代理來操做它們。
管理者提供了一種方法來建立能夠在不一樣進程之間共享的數據,包括在不一樣機器上運行的進程之間經過網絡進行共享。管理員對象控制管理共享對象的服務器進程 。其餘進程能夠經過使用代理來訪問共享對象。
返回SyncManager
可用於在進程之間共享對象的已啓動對象。返回的管理對象對應於一個衍生的子進程,並具備建立共享對象並返回相應代理的方法。
Manager進程在垃圾收集或其父進程退出時將當即關閉。管理員類在multiprocessing.managers
模塊中定義 :
multiprocessing.managers.
BaseManager
([ address [,authkey ] ] )
建立一個BaseManager對象。
一旦建立,應該調用start()
或get_server().serve_forever()
確保管理器對象引用已啓動的管理器進程。
地址是管理進程偵聽新鏈接的地址。若是地址是None
一個任意的選擇。
authkey是將用於檢查到服務器進程的傳入鏈接的有效性的身份驗證密鑰。若是使用 authkey的None
話current_process().authkey
。不然使用authkey,它必須是一個字節字符串。
start
([ initializer [,initargs ] ] )
啓動一個子流程來啓動管理器。若是初始化程序不是,None
那麼子進程將initializer(*initargs)
在啓動時調用。
get_server
()
Server
在Manager的控制下返回一個表明實際服務器的對象。該Server
對象支持該 serve_forever()
方法:
connect
()
將本地管理器對象鏈接到遠程管理器進程
shutdown
()
中止經理使用的過程。這僅start()
在用於啓動服務器進程時纔可用 。
這能夠被屢次調用。
register
(typeid[,callable[,proxyType [,exposed[,method_to_typeid [,create_method ] ] ] ] ] )
可用於註冊類型或可與經理類一塊兒調用的類方法。
typeid是一個「類型標識符」,用於標識特定類型的共享對象。這必須是一個字符串。
callable是可調用的,用於爲這個類型標識符建立對象。若是一個管理器實例將被鏈接到使用該connect()
方法的服務器,或者若是 create_method參數是,False
那麼這能夠保留爲 None
。
proxytype是其中的一個子類,BaseProxy
用於使用此typeid爲共享對象建立代理。若是None
那麼一個代理類是自動建立的。
exposed用於指定其中用於此typeid的代理應該被容許使用訪問方法的名稱序列 BaseProxy._callmethod()
。(若是exposed在None
隨後 proxytype._exposed_
被用於代替若是它存在)。在其中沒有指定暴露列表中的狀況下,共享對象的全部「公共方法」將是可訪問的。(這裏的「公共方法」是指任何具備__call__()
方法且名稱不以其開頭的屬性'_'
)。
method_to_typeid是一個映射,用於指定那些應該返回代理的公開方法的返回類型。它將方法名稱映射到typeid字符串。(若是method_to_typeid存在,None
則 proxytype._method_to_typeid_
使用它。)若是一個方法的名稱不是這個映射的關鍵字,或者若是這個映射是None
這個方法返回的對象將被值複製。
create_method肯定是否應該使用名稱typeid建立一個方法, 該方法可用於通知服務器進程建立一個新的共享對象併爲其返回一個代理。默認狀況下是True
。BaseManager
實例也有一個只讀屬性:address manager
使用的地址。
版本3.3中更改:管理器對象支持上下文管理協議 - 請參閱 上下文管理器類型。 __enter__()
啓動服務器進程(若是還沒有啓動),而後返回管理器對象。 __exit__()
來電shutdown()
。
在之前的版本__enter__()
中,若是還沒有啓動管理器的服務器進程,則不啓動。
經過返回的管理manager()將支持的類型: list
,dict
,Namespace
,Lock
, RLock
,Semaphore
,BoundedSemaphore
, Condition
,Event
,Barrier
, Queue
,Value
和Array
。
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 14:40 # @Author : Py.qi # @File : mult_servprocess.py # @Software: PyCharm from multiprocessing import Process, Manager def f(d,l,i): d[i] = i+1 #每一個進程添加字典元素 l.append(i) #每一個進程啓動後將列表添加數據i if __name__ == '__main__': manager = Manager() d = manager.dict() l = manager.list() pro_list=[] for i in range(5): #啓動5個進程 p = Process(target=f, args=(d,l,i)) p.start() pro_list.append(p) #將進程都添加到列表 for j in pro_list: j.join() #等待每一個進程執行完成 print(d) #打印字典數據爲5個進程操做的後的數據,實現進程間的數據共享 print(l) #output: {0: 1, 1: 2, 2: 3, 3: 4, 4: 5} [0, 1, 2, 3, 4]
class multiprocessing.pool.
Pool
([ processes [,initializer [,initargs [,maxtasksperchild [,context ] ] ] ] ] )
一個進程池對象,用於控制能夠提交做業的工做進程池。它支持超時和回調的異步結果,並具備並行映射實現。
進程是要使用的工做進程的數量。若是過程是 None
而後經過返回的數字os.cpu_count()
被使用。
若是初始化程序不是,None
那麼每一個工做進程將initializer(*initargs)
在啓動時調用 。
maxtasksperchild是一個工做進程在退出以前能夠完成的任務數量,並被一個新的工做進程取代,以便釋放未使用的資源。默認的maxtasksperchild是None
,這意味着工做進程的生活時間與池同樣長。
上下文可用於指定用於啓動工做進程的上下文。一般使用函數multiprocessing.Pool()
或Pool()
上下文對象的方法來建立池。在這兩種狀況下都適當設置。
請注意,池對象的方法只應由建立池的進程調用。
apply
(func [,args [,kwds ] ] )
呼叫FUNC帶參數ARGS和關鍵字參數kwds。它阻塞,直到結果準備就緒。鑑於這些塊,apply_async()
更適合於並行執行工做。此外,func 只能在進程池的一名工做人員中執行。
apply_async
(func [,args [,kwds [,callback [,error_callback ] ] ] ] )
apply()
返回結果對象的方法的變體。
若是指定了callback,那麼它應該是一個可接受的參數。當結果變爲就緒callback被應用到它,這是除非調用失敗,在這種狀況下,error_callback 被應用。
若是指定了error_callback,那麼它應該是一個可接受的參數。若是目標函數失敗,則使用異常實例調用error_callback。
回調應該當即完成,不然處理結果的線程將被阻塞。
map
(func,iterable [,chunksize ] )
map()
內置函數的並行等價物(儘管它只支持一個可迭代的參數)。它阻塞,直到結果準備就緒。
這種方法把迭代器切成許多塊,它們做爲單獨的任務提交給進程池。能夠經過將chunksize設置爲正整數來指定這些塊的(近似)大小。
map_async
(func,iterable [,chunksize [,callback [,error_callback ] ] ] )
map()
返回結果對象的方法的變體。
若是指定了callback,那麼它應該是一個可接受的參數。當結果變爲就緒callback被應用到它,這是除非調用失敗,在這種狀況下,error_callback 被應用。
若是指定了error_callback,那麼它應該是一個可接受的參數。若是目標函數失敗,則使用異常實例調用error_callback。
回調應該當即完成,不然處理結果的線程將被阻塞。
imap
(func,iterable [,chunksize ] )
一個懶惰的版本map()
。
所述CHUNKSIZE參數是與由所使用的一個map()
方法。對於使用了一個較大的值很長iterables CHUNKSIZE可使做業完成的太多比使用的默認值加快 1
。
此外,若是CHUNKSIZE是1
則next()
經過返回的迭代器的方法imap()
方法有一個可選的timeout參數: next(timeout)
將提升multiprocessing.TimeoutError
若是結果不能內退回timeout秒。
imap_unordered
(func,iterable [,chunksize ] )
一樣imap()
,除了從返回的迭代結果的排序應該考慮爲所欲爲。(只有當只有一個工做進程時,纔是保證「正確」的順序。)
starmap
(func,iterable [,chunksize ] )
就像map()
除了可迭代的元素被指望是被解包爲參數的迭代。
所以,一個可迭代的結果。[(1,2), (3, 4)]
[func(1,2), func(3,4)]
3.3版本中的新功能
starmap_async
(func,iterable [,chunksize [,callback [,error_back ] ] ] )
的組合starmap()
和map_async()
,超過迭代 迭代 iterables,並呼籲FUNC與解壓縮iterables。返回一個結果對象。
3.3版本中的新功能
close
()
防止將更多任務提交到池中。一旦全部任務完成,工做進程將退出。
terminate
()
當即中止工做進程而不完成傑出的工做。當池對象被垃圾收集時terminate()
會當即調用。
join
()
等待工做進程退出。必須打電話close()
或 terminate()
在使用以前join()
。
3.3版新增功能:池對象如今支持上下文管理協議 - 請參閱 上下文管理器類型。 __enter__()
返回池對象,並__exit__()
調用terminate()
。
class multiprocessing.pool.
AsyncResult
Pool.apply_async()
和 返回的結果的類Pool.map_async()
。
get
([ timeout ] )
到達時返回結果。若是timeout不是None
,而且結果沒有在超時秒內到達,那麼 multiprocessing.TimeoutError
會引起。若是遠程調用引起異常,那麼該異常將被從新調整get()
。
wait
([ timeout ] )
等到結果可用或timeout秒數經過。
ready
()
返回通話是否完成。
successful
()
返回調用是否完成而不引起異常。AssertionError
若是結果沒有準備好,會提升。
Pool
類表示一個工做進程池。它具備容許以幾種不一樣方式將任務卸載到工做進程的方法。
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/19 14:19 # @Author : Py.qi # @File : mult_Pool_1.py # @Software: PyCharm from multiprocessing import Pool import time def f(x): return x*x if __name__ == '__main__': with Pool(processes=4) as pool: #4個開始工做進程 result = pool.apply_async(f, (10,)) #異步進程處理 print(result) print(result.get(timeout=1)) #1秒超時,返回結果 print(pool.map(f, range(10))) #並行阻塞等待結果 it = pool.imap(f, range(10)) print(next(it)) # prints "0" print(next(it)) # prints "1" print(it.next(2)) # prints "4" unless your computer is *very* slow result = pool.apply_async(time.sleep, (10,)) print(result.get(timeout=1)) #引起異常multiprocessing.TimeoutError #output: <multiprocessing.pool.ApplyResult object at 0x0000008BC4E76278> 100 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 0 1 4 Traceback (most recent call last): File "Z:/python_project/day20/mult_Pool_1.py", line 28, in <module> print(result.get(timeout=1)) #引起異常multiprocessing.TimeoutError File "Z:\Program Files\Python35\lib\multiprocessing\pool.py", line 640, in get raise TimeoutError multiprocessing.context.TimeoutError
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 15:28 # @Author : Py.qi # @File : mult_Pool.py # @Software: PyCharm from multiprocessing import Pool, TimeoutError import time import os def f(x): return x*x if __name__ == '__main__': #啓動4個工做進程做爲進程池 with Pool(processes=4) as pool: #返回函數參數運行結果列表 print(pool.map(f, range(10))) #在進程池中以任意順序打印相同的數字 for i in pool.imap_unordered(f, range(10)): print(i,end=' ') #異步評估 res = pool.apply_async(f,(20,)) #在進程池中只有一個進程運行 print('\n',res.get(timeout=1)) #打印結果,超時爲1秒 #打印該進程的PID res = pool.apply_async(os.getpid,()) #在進程池中只有一個進程運行 print(res.get(timeout=1)) #打印進程PID #打印4個進程的PID multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] print([res.get(timeout=1) for res in multiple_results]) #進程等待10秒,獲取數據超時爲1秒,將輸出異常 res = pool.apply_async(time.sleep, (10,)) try: print(res.get(timeout=1)) except TimeoutError: print("We lacked patience and got a multiprocessing.TimeoutError") #output: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 0 1 4 9 16 25 36 49 64 81 400 2164 [8152, 2032, 2164, 8152] We lacked patience and got a multiprocessing.TimeoutError
請注意:池的方法只能右建立它的進程使用,包中的功能要求__main__,在交互解釋器中將不起做用。
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2018/1/18 16:14 # @Author : Py.qi # @File : sync_apply.py # @Software: PyCharm from multiprocessing import Pool,freeze_support import time def foo(n): time.sleep(1) print(n*n) def back(arg): print('exec done:',arg) if __name__ == '__main__': freeze_support() #windows下新啓動進程須要先運行此函數 pool = Pool(2) #建立進程池,同時2個進程運行 for i in range(10): #pool.apply(func=foo, args=(i,)) #建立同步進程 #建立異步進程,傳遞函數和參數,在函數執行完後執行callback,並將函數foo的結構返回給callback pool.apply_async(func=foo,args=(i,),callback=back) pool.close() #此處必須是先關閉進程再join pool.join() print('end')