python3之線程與進程

一、CPU運行原理

咱們都知道CPU的根本任務就是執行指令,對計算機來講最終都是一串由「0」和「1」組成的序列。CPU從邏輯上能夠劃分紅3個模塊,分別是控制單元、運算單元和存儲單元,這三部分由CPU內部總線鏈接起來:html

控制單元:控制單元是整個CPU的指揮控制中心,由指令寄存器IR(Instruction Register)、指令譯碼器ID(Instruction Decoder)和操做控制器OC(Operation Controller)等,對協調整個電腦有序工做極爲重要。它根據用戶預先編好的程序,依次從存儲器中取出各條指令,放在指令寄存器IR中,經過指令譯碼(分析)肯定應該進行什麼操做,而後經過操做控制器OC,按肯定的時序,向相應的部件發出微操做控制信號。操做控制器OC中主要包括節拍脈衝發生器、控制矩陣、時鐘脈衝發生器、復位電路和啓停電路等控制邏輯。
運算單元:是運算器的核心。能夠執行算術運算(包括加減乘數等基本運算及其附加運算)和邏輯運算(包括移位、邏輯測試或兩個值比較)。相對控制單元而言,運算器接受控制單元的命令而進行動做,即運算單元所進行的所有操做都是由控制單元發出的控制信號來指揮的,因此它是執行部件。
存儲單元:包括CPU片內緩存和寄存器組,是CPU中暫時存放數據的地方,裏面保存着那些等待處理的數據,或已經處理過的數據,CPU訪問寄存器所用的時間要比訪問內存的時間短。採用寄存器,能夠減小CPU訪問內存的次數,從而提升了CPU的工做速度。但由於受到芯片面積和集成度所限,寄存器組的容量不可能很大。寄存器組可分爲專用寄存器和通用寄存器。專用寄存器的做用是固定的,分別寄存相應的數據。而通用寄存器用途普遍並可由程序員規定其用途,通用寄存器的數目因微處理器而異。python

cpu的工做原理:linux

總的來講,CPU從內存中一條一條地取出指令和相應的數據,按指令操做碼的規定,對數據進行運算處理,直到程序執行完畢爲止。程序員

控制單元在時序脈衝的做用下,將指令計數器裏所指向的指令地址(這個地址是在內存裏的)送到地址總線上去,而後CPU將這個地址裏的指令讀到指令寄存器進行譯碼。對於執行指令過程當中所須要用到的數據,會將數據地址也送到地址總線,而後CPU把數據讀到CPU的內部存儲單元(就是內部寄存器)暫存起來,最後命令運算單元對數據進行處理加工。算法

cpu的工做效率:編程

基本上,CPU就是這樣去執行讀出數據、處理數據和往內存寫數據3項基本工做。但在一般狀況下,一條指令能夠包含按明確順序執行的許多操做,CPU的工做就是執行這些指令,完成一條指令後,CPU的控制單元又將告訴指令讀取器從內存中讀取下一條指令來執行。這個過程不斷快速地重複,快速地執行一條又一條指令,產生你在顯示器上所看到的結果。咱們很容易想到,在處理這麼多指令和數據的同時,因爲數據轉移時差和CPU處理時差,確定會出現混亂處理的狀況。爲了保證每一個操做準時發生,CPU須要一個時鐘,時鐘控制着CPU所執行的每個動做。時鐘就像一個節拍器,它不停地發出脈衝,決定CPU的步調和處理時間,這就是咱們所熟悉的CPU的標稱速度,也稱爲主頻。主頻數值越高,代表CPU的工做速度越快。windows

而在執行效率方面,一些廠商經過流水線方式或以幾乎並行工做的方式執行指令的方法來提升指令的執行速度。剛纔咱們提到,指令的執行須要許多獨立的操做,諸如取指令和譯碼等。最初CPU在執行下一條指令以前必須所有執行完上一條指令,而如今則由分佈式的電路各自執行操做。也就是說,當這部分的電路完成了一件工做後,第二件工做當即佔據了該電路,這樣就大大增長了執行方面的效率。 數組

另外,爲了讓指令與指令之間的鏈接更加準確,如今的CPU一般會採用多種預測方式來控制指令更高效率地執行。緩存

二、線程與進程的區別

   (1)進程是資源的分配和調度的一個獨立單元,而線程是CPU調度的基本單元
          (2)同一個進程中能夠包括多個線程,而且線程共享整個進程的資源(寄存器、堆棧、上下文),一個進程至少包括一個線程。
          (3)進程的建立調用fork或者vfork,而線程的建立調用pthread_create,進程結束後它擁有的全部線程都將銷燬,而線程的結束不會影響同個進程中的其餘線程的結束
          (4)線程是輕兩級的進程,它的建立和銷燬所須要的時間比進程小不少,全部操做系統中的執行功能都是建立線程去完成的
          (5)線程中執行時通常都要進行同步和互斥,由於他們共享同一進程的全部資源
          (6)線程有本身的私有屬性TCB,線程id,寄存器、硬件上下文,而進程也有本身的私有屬性進程控制塊PCB,這些私有屬性是不被共享的,用來標示一個進程或一個線程的標誌
進程的幾種狀態:
   (1)run(運行狀態):正在運行的進程或在等待隊列中對待的進程,等待的進程只要以獲得cpu就能夠運行
          (2)Sleep(可中斷休眠狀態):至關於阻塞或在等待的狀態
          (3)D(不可中斷休眠狀態):在磁盤上的進程
          (4)T(中止狀態):這中狀態沒法直觀的看見,由於是進程中止後就釋放了資源,因此不會留在linux中
          (5)Z(殭屍狀態):子進程先與父進程結束,但父進程沒有調用wait或waitpid來回收子進程的資源,因此子進程就成了殭屍進程,若是父進程結束後任然沒有回收子進程的資源,那麼1號進程將回收

三、python3調用線程

Python3 經過兩個標準庫 _thread 和 threading 提供對線程的支持。 
_thread 提供了低級別的、原始的線程以及一個簡單的鎖,它相比於 threading 模塊的功能仍是比較有限的。 
threading 模塊除了包含 _thread 模塊中的全部方法外,還提供的其餘方法:安全

  • threading.currentThread(): 返回當前的線程變量。
  • threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
  • threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

除了使用方法外,線程模塊一樣提供了Thread類來處理線程,Thread類提供瞭如下方法:

  • run(): 用以表示線程活動的方法。
  • start():啓動線程活動。
  • join([time]): 等待至線程停止。這阻塞調用線程直至線程的join() 方法被調用停止-正常退出或者拋出未處理的異常-或者是可選的超時發生。
  • isAlive(): 返回線程是否活動的。
  • getName(): 返回線程名。
  • setName(): 設置線程名。
  • setDaemon():設置爲後臺線程或前臺線程(默認)若是是後臺線程,主線程執行過程當中,後臺線程也在執行,主線程執行完畢後,後臺線程不論成功與否,均中止;若是是前臺線程,主線程執行過程當中,前臺線程也在執行,主線程執行完畢後,等待前臺線程也執行完成後,程序中止。

直接調用啓動線程:

#!/usr/bin/env python
#coding:utf8

import threading  #線程模塊
import time

def sayhi(num):  #定義每一個線程要運行的函數
    print('running on number',num)
    time.sleep(3)

if __name__ == "__main__":
    t1 = threading.Thread(target=sayhi,args=(33,)) #生成一個線程實例
    t2 = threading.Thread(target=sayhi,args=(22,)) #生成另外一個線程實例

    t1.start()  #啓動線程
    t2.start()
    print(t1.getName()) #獲取線程名
    print(t2.getName())
    t1.join()  #阻塞主線程,等待t1子線程執行完後再執行後面的代碼
    t2.join()  #阻塞主線程,等待t2子線程執行完後再執行後面的代碼
    print('-----end')

繼承式調用啓動線程:

#!/usr/bin/env python
#coding:utf8

import threading,time

class mythreading(threading.Thread): #寫一個類方法繼承threading模塊
    def __init__(self):
        #threading.Thread.__init__(self)   #金典類重寫父類方法
        super(mythreading,self).__init__() #重寫父類屬性
        self.name = n+self.name.split('d')[1]


    def run(self):  #運行線程的函數,函數名必須是run名稱
        super(mythreading,self).run()
        print('starting on threading',self.name)
        time.sleep(5)
if __name__ == '__main__':
    #t1 = mythreading(1)   #經過類建立線程
    #t2 = mythreading(2)
    #t1.start()   #啓動進程
    #t2.start()
    ttr = []
    for i in range(10):     #啓動十個線程
        t = mythreading()
        ttr.append(t)
        t.start()     
        t.setName('hehe-{}'.format(i))   #修改線程名
        print(t.getName())   #獲取線程名

    for item in ttr:
        item.join()    #阻斷線程等待執行完後再執行後續代碼

    print('-----end')

守護線程:

#!/usr/bin/env python
#coding:utf8

import time
import threading

def run(num):  #子線程運行函數
    print('---starting',num)
    time.sleep(2)
    print('---done')
def main():  #主線程運行函數
    print('開啓主線程')
    for i in range(4): #在主線程中運行4個子線程
        t1 = threading.Thread(target=run,args=(i,))
        t1.start()
        print('啓動線程',t1.getName())
    t1.join()
    print('結束主線程')
m = threading.Thread(target=main,args=())
m.setDaemon(True) #設置主線程爲守護線程
m.start()
m.join(timeout=3) #等待3秒後主線程退出,無論子線程是否運行完
print('------end')


#output:
開啓主線程
---starting 0
啓動線程 Thread-2
---starting 1
啓動線程 Thread-3
---starting 2
啓動線程 Thread-4
---starting 3
啓動線程 Thread-5
---done
---done
---done
---done
結束主線程
------end

進程已結束,退出代碼0

(1)線程同步

若是多個線程共同對某個數據修改,則可能出現不可預料的結果,爲了保證數據的正確性,須要對多個線程進行同步。 
使用 Thread 對象的 Lock 和 Rlock 能夠實現簡單的線程同步,這兩個對象都有 acquire 方法和 release 方法,對於那些須要每次只容許一個線程操做的數據,能夠將其操做放到 acquire 和 release 方法之間。以下: 
  多線程的優點在於能夠同時運行多個任務(至少感受起來是這樣)。可是當線程須要共享數據時,可能存在數據不一樣步的問題。 
  考慮這樣一種狀況:一個列表裏全部元素都是0,線程」set」從後向前把全部元素改爲1,而線程」print」負責從前日後讀取列表並打印。 
  那麼,可能線程」set」開始改的時候,線程」print」便來打印列表了,輸出就成了一半0一半1,這就是數據的不一樣步。爲了不這種狀況,引入了鎖的概念。 
  鎖有兩種狀態——鎖定和未鎖定。每當一個線程好比」set」要訪問共享數據時,必須先得到鎖定;若是已經有別的線程好比」print」得到鎖定了,那麼就讓線程」set」暫停,也就是同步阻塞;等到線程」print」訪問完畢,釋放鎖之後,再讓線程」set」繼續。 
  通過這樣的處理,打印列表時要麼所有輸出0,要麼所有輸出1,不會再出現一半0一半1的尷尬場面。 

鎖提供以下方法: 
1.Lock.acquire([blocking]) 
2.Lock.release() 
3.threading.Lock() 加載線程的鎖對象,是一個基本的鎖對象,一次只能一個鎖定,其他鎖請求,需等待鎖釋放後才能獲取

4.threading.RLock() 多重鎖,在同一線程中可用被屢次acquire。若是使用RLock,那麼acquire和release必須成對出現, 調用了n次acquire鎖請求,則必須調用n次的release才能在線程中釋放鎖對象。

import threading,time

class mythread(threading.Thread):
    def __init__(self,threadID,threadName):
        super(mythread,self).__init__()
        self.threadID = threadID
        self.threadName = threadName
    def run(self):
        print('開啓線程:',self.threadName)
        threadLock.acquire() #獲取線程鎖
        print_time(time.time(),self.threadName)
        threadLock.release()  #釋放線程鎖
def print_time(suntime,threadName):
    for i in range(3):
        time.sleep(2)
        print('%s,,,%s'%(suntime,threadName))

threadLock = threading.Lock()  #建立線程鎖
t1 = mythread(1,'thread1')  #建立線程1
t2 = mythread(2,'thread2')
t1.start()  #啓動線程
t2.start()
t1.join()  #阻塞主線程,等待線程1完成
t2.join()
print('退出程序')

#鎖住運行線程函數後,會等待線程1執行完成後在執行線程2
#output
開啓線程: thread1
開啓線程: thread2
1516081515.3328698,,,thread1
1516081515.3328698,,,thread1
1516081515.3328698,,,thread1
1516081521.3341322,,,thread2
1516081521.3341322,,,thread2
1516081521.3341322,,,thread2
退出程序
from threading import Thread,Lock,RLock
import time

class mythread(Thread):
    def __init__(self,number1,number2):
        super(mythread,self).__init__()
        self.number1 = number1
        self.number2 = number2

    def run(self):
        print('開啓線程',self.name)
        lock.acquire()
        print('run is:',time.time(),self.number1+self.number2)
        arithmetic(self.number1,self.number2)
        time.sleep(2)
        lock.release()

def arithmetic(avg1,avg2):
    lock.acquire()
    print('arithmetic:',time.time(),avg1+avg2)
    time.sleep(2)
    lock.release()

#建立鎖對象時若是使用Lock則會在運行線程2時一直處於等待狀態
#若是使用RLock則可正常運行,RLock支持多重鎖
lock = RLock()  #建立多重鎖 if __name__ == "__main__":
    t1 = mythread(3,4)
    t2 = mythread(5,6)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('程序結束!')

#OUTPUT
開啓線程 Thread-1
run is: 1516088282.0478237 7
arithmetic: 1516088282.0478237 7
開啓線程 Thread-2
run is: 1516088286.0496652 11
arithmetic: 1516088286.0496652 11
程序結束!

(2)queue同步隊列

queue模塊實現多生產者,多用戶隊列。當在多線程之間必須安全地交換信息時,它在線程編程中特別有用。Queue模塊中類實現了全部必需的鎖定語義

該模塊實現三種類型的隊列,它們的區別僅在於檢索條目的順序:

在FIFO隊列中,第一個添加的任務是第一個被檢索的。

在LIFO隊列中,最後添加的條目是第一個被檢索(像堆棧同樣操做)。

使用優先級隊列,條目保持排序(使用heapq模塊),而且首先檢索最低值的條目。

在內部,模塊使用鎖來暫時阻止競爭的線程; 可是,它不是爲了處理線程內的重入而設計的;該queue模塊定義瞭如下類:

class queue.Queuemaxsize = 0 

FIFO隊列的構造器。 maxsize是一個整數,用於設置可放入隊列的項目數的上限。一旦達到此大小,插入將會阻塞,直到隊列項被消耗。若是 maxsize小於或等於零,隊列大小是無限的。

class queue.LifoQueuemaxsize = 0 

LIFO隊列的構造器。 maxsize是一個整數,用於設置可放入隊列的項目數的上限。一旦達到此大小,插入將會阻塞,直到隊列項被消耗。若是 maxsize小於或等於零,隊列大小是無限的。

class queue.PriorityQueuemaxsize = 0 

優先隊列的構造函數。 maxsize是一個整數,用於設置可放入隊列的項目數的上限。一旦達到此大小,插入將會阻塞,直到隊列項被消耗。若是 maxsize小於或等於零,隊列大小是無限的。

首先檢索最低值的條目(最低值條目是返回的條目sorted(list(entries))[0])。對於條目的典型圖案的形式是一個元組:(priority_number, data)

exception queue.Empty

空對象上調用非阻塞get()(或 get_nowait()時引起異常Queue

exception queue.Full

若是已滿對象上調用非阻塞put()(或 put_nowait()),則會引起異常Queue

隊列對象(Queue,LifoQueue或PriorityQueue)提供如下的公共方法:

Queue.qsize

返回隊列的大小。請注意,qsize()> 0並不保證後續的get()不會被阻塞,qsize()也不會保證put()不會被阻塞。

Queue.empty

若是隊列爲空返回True不然返回False若是empty()返回True,則不保證對put()的後續調用不會被阻塞。一樣,若是empty()返回False,則不保證後續調用get()不會被阻塞。

Queue.full

若是隊列已滿返回True不然返回Fales若是full()返回True,則不保證後續的get()調用不會被阻塞。一樣,若是full()返回False它並不能保證後續調用put()不會被阻塞。

Queue.putitemblock = Truetimeout = None 

將項目放入隊列中。若是可選參數block爲True,而且timeout爲 None(默認),則根據須要進行阻止,直到空閒插槽可用。若是 timeout是一個正數,則最多會阻塞timeout秒數,Full若是在此時間內沒有空閒插槽,則會引起異常。不然(block爲false),若是一個空閒插槽當即可用,則在隊列中放置一個項目,不然引起Full異常(在這種狀況下timeout被忽略)。

Queue.put_nowaititem 

至關於put(item, False)

Queue.getblock = Truetimeout = None 

從隊列中移除並返回一個項目。若是可選參數爲真,而且 timeoutNone(默認),則在必要時阻塞,直到項目可用。若是timeout是一個正數,則最多會阻塞timeout秒數,Empty若是在該時間內沒有可用項目,則會引起異常。不然(block爲false),返回一個項目,若是一個是當即可用的,不然引起Empty異常(在這種狀況下timeout被忽略)。

Queue.get_nowait

至關於get(False)

提供了兩種方法來支持跟蹤入隊任務是否已徹底由守護進程消費者線程處理。

Queue.task_done

代表之前排隊的任務已經完成。由隊列消費者線程使用。對於每一個get()用於獲取任務的對象,隨後的調用都會task_done()告訴隊列,任務的處理已完成。

若是a join()當前被阻塞,則在處理全部項目時(意味着task_done()已經接收到已經put()進入隊列的每一個項目呼叫),則將恢復

提出了一個ValueError好象叫更多的時間比中放入隊列中的項目。

Queue.join

阻塞,直到隊列中的全部項目都被獲取並處理。

每當將項目添加到隊列中時,未完成任務的數量就會增長。只要消費者線程調用task_done()來指示該項目已被檢索,而且全部工做都已完成,計數就會減小當未完成任務的數量降低到零時,join()取消阻止。

#Queue先進先出隊列
import queue
def show(q,i):
    if q.empty() or q.qsize() >= 1:
        q.put(i)   #存隊列
    elif q.full():
        print('queue not size')
        
que = queue.Queue(5)   #容許5個隊列的隊列對象
for i in range(5):
    show(que,i)
print('queue is number:',que.qsize())  #隊列元素個數
for j in range(5):
    print(que.get())  #取隊列
print('......end')

#output:
queue is number: 5
0
1
2
3
4
......end
#LifoQueue先進後出隊列

import queue

lifoque = queue.LifoQueue()
lifoque.put('hello1')
lifoque.put('hello2')
lifoque.put('hello3')
print(lifoque.qsize())
print(lifoque.get())
print(lifoque.get())
print(lifoque.get())


#output:
3
hello3
hello2
hello1
#PriorityQueue按數據大小取最小值優先

import queue

pque = queue.PriorityQueue()  #優先級的隊列
pque.put(7)   #先存入隊列
pque.put(5)
pque.put(3)
print(pque.qsize())
print(pque.get())  #取出最小值的數據
print(pque.get())
print(pque.get())

#output:
3
22
52
71

(3)信號量(Semaphore)

Lock鎖是同時只容許一個線程更改數據,而Semaphore是同時容許必定數量的線程更改數據,如一個場所內只有3把椅子給人坐,那麼只容許3我的,其它人則排隊,只有等裏面的人出來後才能進去。

import threading,time
class mythreading(threading.Thread): #寫一個類方法繼承hreading模塊
    def run(self):  #運行線程的函數,函數名必須是run名稱
        semaphore.acquire()  #獲取信號量鎖
        print('running the thread:',self.getName())
        time.sleep(2)
        semaphore.release()  #釋放信號量鎖

if __name__ == '__main__':
    semaphore = threading.BoundedSemaphore(3) #建立信號量對象,只運行3個進程同時運行
    for i in range(20):
        t1 = mythreading()
        t1.start()
    t1.join()
    print('---end')

threading模塊的Timer方法,在通過必定時間後才能運行的操做,通常只針對函數運行。

import threading

def run():
    print('is running...')
        
t=threading.Timer(5.0,run) #等待5秒後執行函數
t.start()

(4)事件(event)

python線程的事件用於主線程控制其它線程的執行,事件主要提供了三個方法:

event.wait  線程阻塞

event.set   將全局對象「Flag」設置爲False

event.clear 將全局對象"Flag"設置爲True

事件處理的機制:全局定義了一個「Flag」,默認爲「False」,若是「Flag」值爲 False,那麼當程序執行 event.wait 方法時就會阻塞,若是「Flag」值爲True,那麼event.wait 方法時便再也不阻塞。

#!/usr/bin/env python
#coding:utf8

import threading
def show(event):
    print('start')
    event.wait()  #阻塞線程執行程序
    print('done')
event_obj = threading.Event()  #建立event事件對象
for i in range(10):
    t1 = threading.Thread(target=show,args=(event_obj,))
    t1.start()
    inside = input('>>>:')
    if inside == '1':
        event_obj.set() #當用戶輸入1時,set全局Flag爲True,線程再也不阻塞,打印"done"
    event_obj.clear() #將Flag設置爲False

(5)條件(Condition)

使得線程等待,只有知足某條件時,才釋放N個線程

import threading
def condition_func():
    ret = False
    inp = input('>>>:')
    if inp == '1':
        ret = True
    return ret

def run(n):
    con.acquire() #條件鎖
    con.wait_for(condition_func) #判斷條件
    print('running...',n)
    con.release() #釋放鎖
    print('------------')

if __name__ == '__main__':
    con = threading.Condition() #創建線程條件對象
    for i in range(10):
        t = threading.Thread(target=run,args=(i,))
        t.start()
        t.join()
    print('退出程序')

四、python3進程調用

multiprocessing是一個使用相似於threading模塊的API來支持多進程。該multiprocessing軟件包提供本地和遠程併發,經過使用子進程來充分利用服務器上的多個處理器,它能夠在Unix和Windows上運行。

class multiprocessing.Processgroup = Nonetarget = Nonename = Noneargs =()kwargs = {}*daemon = None 

過程對象表示在單獨的過程當中運行的活動。這個 Process類的全部方法的等價與 threading.Thread

應始終使用關鍵字參數調用構造函數。羣體 應該永遠是None它僅僅是爲了兼容而存在 threading.Thread。 targetrun()方法調用的可調用對象它默認爲None,意味着什麼都不叫。name是進程名稱(name更多細節)。 args是目標調用的參數元組。 kwargs是目標調用的關鍵字參數字典。若是提供,則僅關鍵字守護進程參數將進程daemon標誌設置爲TrueFalse若是None(默認),這個標誌將從建立過程繼承。

默認狀況下,沒有參數傳遞給目標

若是一個子類重寫了構造函數,那麼必須確保它Process.__init__()在執行任何其餘操做以前調用基類的構造函數()。

在版本3.3中進行了更改:添加了守護進程參數。

run

表示進程活動的方法。

您能夠在子類中重寫此方法。標準run() 方法調用傳遞給對象構造函數的可調用對象做爲目標參數,若是有的話,分別argskwargs參數中獲取順序和關鍵字參數

start

開始進程的活動。

每一個進程對象最多隻能調用一次。它安排對象的run()方法在一個單獨的進程中被調用。

join timeout 

若是可選參數timeoutNone(缺省值),則該方法將阻塞,直到其join()方法被調用的進程終止。若是超時是一個正數,它最多會阻塞超時秒數。請注意,None若是方法終止或方法超時,則方法返回檢查流程exitcode,肯定是否終止。

一個過程能夠鏈接屢次。

進程沒法自行加入,由於這會致使死鎖。嘗試在啓動以前加入進程是錯誤的。

name

該進程的名稱。名稱是一個字符串,僅用於識別目的。它沒有語義。多個進程能夠被賦予相同的名稱。

初始名稱由構造函數設置。若是沒有明確的名字被提供給構造函數,就會構造一個名爲「Process-N 1:N 2:...:N k 」 的表單,其中每一個N k是其父代的第N個孩子。

is_alive

返回進程是否存在;start() 方法返回的那一刻起,一個進程對象是活着的,直到子進程終止。

daemon

進程的守護進程標誌,一個布爾值。這必須在start()調用以前設置 

初始值是從建立過程繼承的。當進程退出時,它將嘗試終止其全部守護進程的子進程。

請注意,守護進程不容許建立子進程。不然,若是父進程退出時被終止,則守護進程將使其子進程成爲孤兒。此外,這些不是 Unix守護進程或服務,它們是正常的進程,若是非守護進程已經退出,它將被終止(而不是加入)。

除了 threading.ThreadAPI以外,Process對象還支持如下屬性和方法:

pid

返回進程ID。在這個過程產生以前,這將是 None

exitcode

孩子的退出代碼。這將是None若是過程尚未結束。負值-N表示孩子被信號N終止

authkey

進程的身份驗證密鑰(一個字節字符串)。

multiprocessing初始化主進程正在使用分配一個隨機串os.urandom()

當一個Process對象被建立時,它會繼承父進程的認證密鑰,儘管這能夠經過設置authkey成另外一個字節字符串來改變

請參閱驗證密鑰

sentinel

系統對象的數字句柄,當進程結束時它將變爲「就緒」。

若是您想一次使用多個事件,則可使用此值multiprocessing.connection.wait()不然,調用join()更簡單。

在Windows中,這是與使用的OS手柄WaitForSingleObject 和WaitForMultipleObjects家庭的API調用。在Unix上,這是一個可用於select模塊原語的文件描述符

3.3版本中的新功能

terminate

終止這個進程。在Unix上,這是使用SIGTERM信號完成的在Windows TerminateProcess()上使用。請注意,退出處理程序和最後的子句等將不會被執行。

須要注意的是start()join()is_alive(), terminate()exitcode方法只能由建立進程對象的過程調用。

異常:

exception multiprocessing. ProcessError

全部multiprocessing異常的基類

exception  multiprocessing. BufferTooShort

Connection.recv_bytes_into()當提供的緩衝區對象過小而不能讀取消息時引起異常

若是e是的一個實例BufferTooShort,而後e.args[0]會給出消息做爲字節串。

exception multiprocessing. AuthenticationError

發生認證錯誤時引起。

exception multiprocessing. TimeoutError

當超時到期時由超時方法提出。

建立10個併發進程:

from multiprocessing import Process

def proce(pn):
    print('hello',pn)

if __name__ == '__main__':
    for i in range(10):  #啓動10個進程
        p1 = Process(target=proce,args=(i,)) #建立進程
        p1.start()  #啓動進程
        #print('nn:',p1.name)
    p1.join()  #等待進程結束
    print('exit project')

打印進程ID:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/17 14:15
# @Author  : Py.qi
# @File    : 進程ID.py
# @Software: PyCharm

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name',__name__) #是否主程序
    print('parent process',os.getppid())  #上層進程號
    print('process id:',os.getpid())  #自身進程號

def f(name):
    info('function f')
    print('hello',name)
if __name__ == '__main__':
    info('main line')  #經過pycharm進程運行函數
    p = Process(target=f,args=('bob',))  #調用子進程運行函數
    p.start()
    p.join()

#output:
main line   
module name __main__
parent process 5756  #pycharm進程號
process id: 9864  #自身進程

function f
module name __mp_main__
parent process 9864  
process id: 9024  #文件建立的進程號
hello bob

(1)上下文和啓動方法 

multiprocessing支持三種啓動流程的方式:

spawn:

  父進程啓動一個新鮮的Python解釋器進程。子進程只會繼承運行進程對象run()方法所必需的資源特別是父進程中沒必要要的文件描述符和句柄將不會被繼承。與使用forkforkserver相比,使用此方法啓動進程至關慢

  在Unix和Windows上可用。Windows上是默認的。

fork:

  父進程使用os.fork()fork來解釋Python。子進程在開始時與父進程有效地相同。父進程的全部資源都由子進程繼承。請注意,安全分叉多線程的過程是有問題的。

  僅在Unix上提供。Unix上是默認的。

forkserver:

  當程序啓動並選擇forkserver啓動方法時,啓動一個服務器進程。從那時起,不管什麼時候須要一個新的進程,父進程都鏈接到服務器並請求它分叉一個新的進程。fork服務器進程是單線程的,因此它是安全的使用os.fork()。沒有沒必要要的資源被繼承。

在支持經過Unix管道傳遞文件描述符的Unix平臺上可用。

 

在3.4版中進行了更改:在全部unix平臺上添加了spawn,並爲某些unix平臺添加了forkserver子進程再也不繼承Windows上的全部父代繼承句柄。

在Unix上,使用spawnforkserver啓動方法也將啓動一個信號量跟蹤器進程,該進程跟蹤程序進程建立的未連接的已命名信號量。當全部進程退出信號量跟蹤器時,將取消全部剩餘信號量的連接。一般應該沒有,但若是一個進程被一個信號殺死,那麼可能會有一些「泄漏」的信號量。(取消連接命名的信號量是一個嚴重的問題,由於系統只容許有限的數量,而且在下一次從新啓動以前它們不會自動斷開鏈接。)

 在主模塊中使用set_start_method()的啓動方法:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/17 15:44
# @Author  : Py.qi
# @File    : set_start1.py
# @Software: PyCharm

import multiprocessing as mp

def foo(q):  #傳遞隊列對象給函數
    q.put('hello')
    q.put('123')
    q.put('python')
if __name__ == '__main__':
    mp.set_start_method('spawn') #設置啓動進程方式
    q = mp.Queue()  #建立隊列對象
    p = mp.Process(target=foo, args=(q,)) #啓動一個進程,傳遞運行函數和參數
    p.start()  #啓動進程
    print(q.get())  #獲取隊列數據
    print(q.qsize())
    print(q.get())
    p.join()

set_start_method()不能在程序中使用屢次,可使用get_context()獲取上下文對象,上下文對象與多處理模塊具備相同的API,並容許在同一個程序中使用多個啓動方法。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/18 13:14
# @Author  : Py.qi
# @File    : get_context_1.py
# @Software: PyCharm

import multiprocessing as mp
import time

def foo(q):
    for i in range(10):
        q.put('python%s'%i)
        time.sleep(1)
    #q.put('context')
if __name__ == '__main__':
    ext = mp.get_context('spawn')
    q = ext.Queue()
    p = ext.Process(target=foo,args=(q,))
    p2 = ext.Process(target=foo,args=(q,))
    p.start()
    p2.start()
    for i in range(10):
        print(q.get())
    #print(q.get())
    #print(q.get())
    #print(q.get())
    #print(q.get())
    p.join()
    p2.join()

#output:
python0
python0
python1
python1
python2
python2
python3
python3
python4
python4

請注意,與一個上下文相關的對象可能與另外一個上下文的進程不兼容。特別是,使用fork上下文建立的鎖不能被傳遞給使用spawnforkserver啓動方法啓動的進程 

想要使用特定啓動方法的庫應該能夠get_context()用來避免干擾庫用戶的選擇。

(1)進程之間交換對象

multiprocessing 支持兩種進程之間的通信通道:

class multiprocessing.Queuemaxsize 

返回使用管道和一些鎖/信號量實現的進程共享隊列。當一個進程首先把一個項目放到隊列中時,一個進給線程被啓動,將一個緩衝區中的對象傳送到管道中。

Queue實現queue.Queue除了task_done()以外的 全部方法join()

 

qsize

返回隊列的近似大小。因爲多線程/多處理語義,這個數字是不可靠的。

請注意,NotImplementedError在Mac OS X等Unix平臺上可能會出現這種sem_getvalue()狀況。

empty

True若是隊列爲空False返回不然返回因爲多線程/多處理語義,這是不可靠的。

full

True若是隊列已滿False返回不然返回因爲多線程/多處理語義,這是不可靠的。

put obj [block [timeout 

將obj放入隊列中。若是可選參數True (缺省值)而且超時None(缺省值),則在須要時禁止,直到空閒插槽可用。若是超時是一個正數,則最多會阻塞超時秒數,queue.Full若是在此時間內沒有空閒插槽,則會引起異常。不然(是 False),若是一個空閒插槽當即可用,則在隊列中放置一個項目,不然引起queue.Full異常(在這種狀況下超時被忽略)。

put_nowait obj 

至關於put(obj, False)

get block [timeout 

從隊列中移除並返回一個項目。若是可選ARGS 是 True(默認值),超時None(默認),塊若有必要,直到一個項目是可用的。若是超時是一個正數,則最多會阻塞超時秒數,queue.Empty 若是在該時間內沒有可用項目,則會引起異常。不然(塊是 False),返回一個項目,若是一個是當即可用的,不然引起 queue.Empty異常(在這種狀況下超時被忽略)。

get_nowait

至關於get(False)

multiprocessing.Queue有一些其餘的方法沒有找到 queue.Queue大多數代碼一般不須要這些方法:

close

代表當前進程不會有更多的數據放在這個隊列中。一旦將全部緩衝的數據刷新到管道,後臺線程將退出。當隊列被垃圾收集時,這被自動調用。

join_thread

加入後臺線程。這隻能在close()被調用後才能使用它阻塞,直到後臺線程退出,確保緩衝區中的全部數據已經​​刷新到管道。

默認狀況下,若是一個進程不是隊列的建立者,那麼在退出時它將嘗試加入隊列的後臺線程。該過程能夠調用cancel_join_thread()作出join_thread()什麼也不作。

cancel_join_thread

防止join_thread()阻塞。尤爲是,這能夠防止後臺線程在進程退出時自動加入

class multiprocessing.SimpleQueue

這是一個簡化的Queue類型,很是接近一個鎖定Pipe

empty

True若是隊列爲空False返回不然返回

get

從隊列中移除並返回一個項目。

put item 

將元素放入隊列中。

class multiprocessing.JoinableQueuemaxsize 

JoinableQueue,一個Queue子類,是另外有一個隊列task_done()join()方法。

task_done

代表之前排隊的任務已經完成。由隊列使用者使用。對於每一個get()用於獲取任務的對象,隨後的調用都會task_done()告訴隊列,任務的處理已完成。

若是a join()當前被阻塞,則在處理全部項目時(意味着task_done()已經接收到已經put()進入隊列的每一個項目呼叫),則將恢復

提出了一個ValueError好象叫更多的時間比中放入隊列中的項目。

join

阻塞,直到隊列中的全部項目都被獲取並處理。

每當將項目添加到隊列中時,未完成任務的數量就會增長。每當消費者打電話task_done()代表該物品已被檢索而且全部工做都已完成時,計數就會降低 當未完成任務的數量降低到零時, join()取消阻止。

multiprocessing.active_children

返回當前進程的全部活動的列表。調用這個函數有「加入」已經完成的任何進程的反作用。

multiprocessing.cpu_count

返回系統中的CPU數量。可能會提升 NotImplementedError

multiprocessing.current_process

返回Process當前進程對應對象。一個相似的threading.current_thread()

multiprocessing.freeze_support

添加對什麼時候使用的程序multiprocessing進行凍結以產生Windows可執行文件的支持。(已經用py2exe, PyInstallercx_Freeze進行了測試。)須要在主模塊後面直接調用這個函數:if __name__ == '__main__'

freeze_support()在Windows之外的操做系統上調用時,調用不起做用。另外,若是模塊正在經過Windows上的Python解釋器正常運行(程序尚未被凍結),那麼這個模塊freeze_support()沒有任何做用。

multiprocessing.get_all_start_methods

返回支持的啓動方法的列表,其中第一個是默認的。可能的啓動方法是'fork', 'spawn''forkserver'在Windows上只'spawn'可用。在Unix上'fork'而且'spawn'始終受支持,而且'fork'是默認的。3.4版本中的新功能

multiprocessing.get_contextmethod = None 

返回與multiprocessing模塊具備相同屬性的上下文對象 

若是方法None那麼返回默認的上下文。不然,方法應該是'fork''spawn', 'forkserver'。 ValueError若是指定的啓動方法不可用,則引起。3.4版本中的新功能

multiprocessing.get_start_methodallow_none = False 

返回用於啓動進程的啓動方法的名稱。

若是start方法沒有被修復,而且allow_none爲false,那麼start方法被固定爲默認值,並返回名稱。若是start方法沒有被修復,而且allow_none 爲true,則None返回。

返回值能夠是'fork''spawn''forkserver' 或None。 'fork'在Unix上是默認的,而'spawn'在Windows上是默認的。

3.4版本中的新功能

multiprocessing.set_executable

設置啓動子進程時使用的Python解釋器的路徑。(默認sys.executable使用)。嵌入可能須要作一些事情

set_executable OS 路徑加入SYS exec_prefix 'pythonw.exe' )) 

才能夠建立子進程。在版本3.4中更改:如今在Unix上支持'spawn'啓動方法時使用。

multiprocessing. set_start_method method

設置應該用來啓動子進程的方法。 方法能夠'fork''spawn'或者'forkserver'

請注意,這應該至多被調用一次,而且應該在主模塊子句內受到保護if __name__ == '__main__'

3.4版本中的新功能

 Queue(隊列):這個Queue類爲queue.Queue的克隆

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/18 13:40
# @Author  : Py.qi
# @File    : queue_pbet.py
# @Software: PyCharm

from multiprocessing import Process,Queue

def f(q):
    q.put([42, None, 'hello'])
    q.put('python')
    q.put('python3')
if __name__ == '__main__':
    q = Queue()  #建立隊列對象
    p = Process(target=f, args=(q,)) #新進程
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    print(q.get())
    print(q.get())
    p.join()

Pipes(管道):該Pipe()函數返回一對由默認爲雙工(雙向)的管道鏈接的鏈接對象。

鏈接對象容許發送和接收可選對象或字符串。他們能夠被認爲是面向消息的鏈接套接字。

鏈接對象一般使用建立Pipe()

class multiprocessing.Connection

send obj 

將對象發送到應該讀取的鏈接的另外一端recv()

該對象必須是可挑選的。很是大的包(大約32 MB +,雖然取決於操做系統)可能會引起ValueError異常。

recv

返回使用鏈接的另外一端發送的對象 send()阻止,直到有東西要接收。提出 EOFError,若是沒有什麼留下來接收,而另外一端被關閉。

fileno

返回鏈接使用的文件描述符或句柄。

close

關閉鏈接。

當鏈接被垃圾收集時,這被自動調用。

poll timeout 

  返回是否有可讀的數據。

  若是沒有指定超時,則會當即返回。若是超時是一個數字,那麼這指定了以秒爲單位的最大時間來阻止。若是timeout,None則使用無限超時。

  請注意,可使用一次輪詢多個鏈接對象multiprocessing.connection.wait()

send_bytes(buffer [,offset [,size ] ] )

從相似字節的對象發送字節數據做爲完整的消息。

若是給出偏移量,則從緩衝區中的該位置讀取數據。若是 給定大小,那麼將從緩衝區中讀取多個字節。很是大的緩衝區(大約32 MB +,但取決於操做系統)可能會引起 ValueError異常

recv_bytes maxlength 

將鏈接另外一端發送的字節數據的完整消息做爲字符串返回。阻止,直到有東西要接收。EOFError若是沒有什麼能夠接收,而另外一端已經關閉,就會引起。

若是最大長度被指定而且所述消息是長於最大長度 而後OSError升至並鏈接將再也不是可讀的。

在版本3.3中改變:這個函數用來提升IOError,如今是一個別名OSError

recv_bytes_into buffer [,offset 

從鏈接的另外一端讀取緩衝區中的字節數據的完整消息,並返回消息中的字節數。阻止,直到有東西要接收。提出EOFError,若是沒有什麼留下來接收,而另外一端被關閉。

緩衝區必須是一個可寫的字節類對象。若是 給出了偏移量,則消息將從該位置寫入緩衝區。偏移量必須是小於緩衝區長度的非負整數(以字節爲單位)。

若是緩衝區過短,則BufferTooShort引起異常,而且完整的消息可用, 異常實例e.args[0]在哪裏e

在版本3.3中更改:如今可使用Connection.send()和在進程之間傳輸鏈接對象自己Connection.recv()

3.3版本中的新功能:鏈接對象如今支持上下文管理協議 - 請參閱 上下文管理器類型。 __enter__()返回鏈接對象,並__exit__()調用close()

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/18 13:50
# @Author  : Py.qi
# @File    : queue_pipes.py
# @Software: PyCharm

from multiprocessing import Process,Pipe

def f(parent,child):
    parent.send('parent1') #父連接寫入數據
    parent.send('parent2')
    child.send('python1')  #子連接寫入數據
    child.send('python2')
    parent.close()  #關閉管道
    child.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe() #建立父和子管道鏈接
    p = Process(target=f, args=(parent_conn,child_conn))
    p.start()
    print(parent_conn.recv())   #從父連接收的數據是從子連接寫入的數據
    print(parent_conn.recv())
    print(child_conn.recv())    #從子連接收的數據是從父連接寫入的數據
    print(child_conn.recv())
    p.join()

Pipe()返回兩個鏈接對象,表示管道的兩端(parent_conn)爲頭,(child_conn)爲尾。這兩個鏈接對象都有send()和recv()方法等。請注意:若是兩個或多個進程(線程)試圖同時讀取或寫入管道的同一端,則管道中的數據可能會損壞,能夠同時使用管道的頭和尾流程式的讀取和寫入數據就沒有問題了。

(2)進程之間的同步

一般,同步原語在多進程程序中並不像在多線程程序中那樣必要;也可使用管理器對象建立同步基元。

class  multiprocessing. Barrier parties [action [timeout 

屏蔽對象:一個克隆的threading.Barrier3.3版本中的新功能

class  multiprocessing. BoundedSemaphore value 

一個有界的信號量對象 threading.BoundedSemaphore與其相近的模擬存在單一差別:其acquire方法的第一個參數是命名,與之一致Lock.acquire()

注意:在Mac OS X上,這是沒法區分的,Semaphore由於 sem_getvalue()沒有在該平臺上實現。

class  multiprocessing. Condition lock 

一個條件變量:一個別名threading.Condition若是指定了,那麼它應該是一個Lock或一個RLock 對象multiprocessing在版本3.3中更改:該wait_for()方法已添加。

class multiprocessing. Event

克隆的threading.Event

class multiprocessing. Lock

一個非遞歸鎖對象:一個很是相似的threading.Lock一旦一個進程或線程得到一個鎖,隨後的嘗試從任何進程或線程獲取它將被阻塞,直到它被釋放; 任何進程或線程可能會釋放它。threading.Lock應用於線程的概念和行爲在 這裏被複制,multiprocessing.Lock由於它適用於進程或線程,除非指出。

請注意,這Lock其實是一個工廠函數,它返回一個multiprocessing.synchronize.Lock使用默認上下文初始化的實例

Lock支持上下文管理器協議,所以能夠用在with語句中。

acquire block = Truetimeout = None 

獲取一個鎖,阻塞或不阻塞。

block參數設置爲True(默認)的狀況下,方法調用將被阻塞直到鎖處於解鎖狀態,而後將其設置爲鎖定並返回True請注意,這第一個參數的名稱不一樣於threading.Lock.acquire()

參數設置爲False,方法調用不會阻止。若是鎖目前處於鎖定狀態,則返回False不然將鎖設置爲鎖定狀態並返回True

當用正值,浮點值超時調用,只要沒法獲取鎖定,最多隻能阻塞超時指定的秒數具備負值的超時調用 至關於超時零。超時None(默認值)的調用 將超時期限設置爲無限。請注意,處理負值或超時None值 與實施的行爲不一樣 超時參數有,若是沒有實際意義參數被設置爲,並所以忽略。返回threading.Lock.acquire()FalseTrue若是鎖已被獲取或False超時時間已過。

release

釋放一個鎖。這能夠從任何進程或線程調用,不只是最初獲取鎖的進程或線程。

threading.Lock.release()除了在解鎖的鎖上調用a時,行爲與其相同ValueError

class multiprocessing. RLock

遞歸鎖對象:一個緊密的類比threading.RLock遞歸鎖必須由獲取它的進程或線程釋放。一旦一個進程或線程得到遞歸鎖定,相同的進程或線程能夠再次得到它,而不會阻塞; 該進程或線程必須每次釋放一次它被獲取。

請注意,這RLock其實是一個工廠函數,它返回一個multiprocessing.synchronize.RLock使用默認上下文初始化的實例

RLock支持上下文管理器協議,所以能夠用在with語句中。

acquire block = Truetimeout = None 

獲取一個鎖,阻塞或不阻塞。

當調用參數設置爲True,阻塞直到鎖處於解鎖狀態(不屬於任何進程或線程),除非該鎖已被當前進程或線程所擁有。當前進程或線程接着獲取鎖的全部權(若是它尚未全部權),而且鎖內的遞歸級別增長1,致使返回值爲True請注意,第一個參數的行爲與實現相比有幾個不一樣之處threading.RLock.acquire(),從參數自己的名稱開始。

當調用參數設置爲False,不要阻塞。若是鎖已經被另外一個進程或線程獲取(並所以被擁有),則當前進程或線程不獲取全部權,而且鎖內的遞歸級別不改變,致使返回值爲False若是鎖處於未鎖定狀態,則當前進程或線程獲取全部權,遞歸級別遞增,結果返回值爲True

timeout參數的使用和行爲與in中的相同 Lock.acquire()請注意,這些超時行爲中的一些 與實施的行爲有所不一樣threading.RLock.acquire()

release

釋放一個鎖,遞減遞歸級別。若是在遞減以後遞歸級別爲零,則將鎖重置爲解鎖(不禁任何進程或線程擁有),而且若是有其餘進程或線程被阻塞,等待鎖解鎖,則准許其中的一個繼續進行。若是在遞減以後遞歸級別仍然不爲零,那麼鎖保持鎖定並由調用進程或線程擁有。

只有在調用進程或線程擁有鎖定時才調用此方法。一個AssertionError若是該方法是經過一個過程調用或線程之外的僱主或升高若是鎖處於解鎖(無主)的狀態。請注意,在這種狀況下引起的異常的類型不一樣於執行的行爲threading.RLock.release()

class  multiprocessing. Semaphore value 

一個信號量對象:一個接近的類比threading.Semaphore

與其相近的模擬存在單一差別:其acquire方法的第一個參數是命名,與之一致Lock.acquire()

multiprocessing包含來自全部同步方法等價於threading例如,可使用鎖來確保一次只有一個進程打印到標準輸出:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/18 14:16
# @Author  : Py.qi
# @File    : mult_sync.py
# @Software: PyCharm

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start() #建立進程並啓動

(3)進程之間共享狀態

在進行併發編程時,一般最好儘量避免使用共享狀態。使用多個進程時尤爲如此。

可是,若是你確實須要使用一些共享數據,那麼 multiprocessing提供了一些方法。

可使用能夠由子進程繼承的共享內存來建立共享對象。

multiprocessing.Valuetypecode_or_type* argslock = True 

返回ctypes從共享內存分配對象。默認狀況下,返回值其實是對象的同步包裝器。對象自己能夠經過a的value屬性來訪問Value

typecode_or_type決定了返回對象的類型:它是一個ctypes類型或array 模塊使用的一種字符類型。 *參數傳遞給類型的構造函數。

若是鎖True(默認),則會建立一個新的遞歸鎖對象來同步對該值的訪問。若是鎖是一個Lock一個RLock對象,那麼將用於同步訪問值。若是是鎖False那麼訪問返回的對象將不會被鎖自動保護,所以它不必定是「過程安全的」。

 Shared memory(共享內存):可使用value或將數據存儲在共享內存映射中Array

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/18 14:24
# @Author  : Py.qi
# @File    : mult_sharememory.py
# @Software: PyCharm

from multiprocessing import Process, Value, Array
def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = i*i

if __name__ == '__main__':
    num = Value('d', 0.0)  #d表示一個雙精度浮點數
    arr = Array('i', range(10)) #i表示符號整數
    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()
    print(num.value)
    print(arr[:])

#output:
3.1415927
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

'd''i'建立時使用的參數numarr被使用的那種的TypeCodes array模塊:'d'表示一個雙精度浮點數和'i'指示符號整數。這些共享對象將是進程和線程安全的。

爲了更靈活地使用共享內存,可使用multiprocessing.sharedctypes支持建立從共享內存分配的任意ctypes對象的 模塊。

 

Server process(服務器進程):經過Manager()控制一個服務器進程來返回管理器對象,該進程持有Python對象,並容許其餘進程使用代理來操做它們。

管理者提供了一種方法來建立能夠在不一樣進程之間共享的數據,包括在不一樣機器上運行的進程之間經過網絡進行共享。管理員對象控制管理共享對象的服務器進程 其餘進程能夠經過使用代理來訪問共享對象。

multiprocessing. Manager

返回SyncManager可用於在進程之間共享對象的已啓動對象。返回的管理對象對應於一個衍生的子進程,並具備建立共享對象並返回相應代理的方法。

Manager進程在垃圾收集或其父進程退出時將當即關閉。管理員類在multiprocessing.managers模塊中定義 

class multiprocessing.managers.BaseManager[ address [,authkey 

建立一個BaseManager對象。

一旦建立,應該調用start()get_server().serve_forever()確保管理器對象引用已啓動的管理器進程。

地址是管理進程偵聽新鏈接的地址。若是地址None一個任意的選擇。

authkey是將用於檢查到服務器進程的傳入鏈接的有效性的身份驗證密鑰。若是使用 authkeyNonecurrent_process().authkey不然使用authkey,它必須是一個字節字符串。

start initializer [initargs 

啓動一個子流程來啓動管理器。若是初始化程序不是,None 那麼子進程將initializer(*initargs)在啓動時調用

get_server

Server在Manager的控制下返回一個表明實際服務器對象。Server對象支持該 serve_forever()方法:

   connect

      將本地管理器對象鏈接到遠程管理器進程

   shutdown

  中止經理使用的過程。這僅start()在用於啓動服務器進程時纔可用 

  這能夠被屢次調用。

   register typeid[callable[proxyType [exposed[method_to_typeid [create_method 

  可用於註冊類型或可與經理類一塊兒調用的類方法。

  typeid是一個「類型標識符」,用於標識特定類型的共享對象。這必須是一個字符串。

  callable是可調用的,用於爲這個類型標識符建立對象。若是一個管理器實例將被鏈接到使用該connect()方法的服務器,或者若是 create_method參數是,False那麼這能夠保留爲 None

  proxytype是其中的一個子類,BaseProxy用於使用此typeid爲共享對象建立代理若是None那麼一個代理類是自動建立的。

  exposed用於指定其中用於此typeid的代理應該被容許使用訪問方法的名稱序列 BaseProxy._callmethod()(若是exposedNone隨後 proxytype._exposed_被用於代替若是它存在)。在其中沒有指定暴露列表中的狀況下,共享對象的全部「公共方法」將是可訪問的。(這裏的「公共方法」是指任何具備__call__()方法且名稱不以其開頭的屬性'_')。

method_to_typeid是一個映射,用於指定那些應該返回代理的公開方法的返回類型。它將方法名稱映射到typeid字符串。(若是method_to_typeid存在,None則 proxytype._method_to_typeid_使用它。)若是一個方法的名稱不是這個映射的關鍵字,或者若是這個映射是None 這個方法返回的對象將被值複製。

create_method肯定是否應該使用名稱typeid建立一個方法, 該方法可用於通知服務器進程建立一個新的共享對象併爲其返回一個代理。默認狀況下是TrueBaseManager 實例也有一個只讀屬性:address manager使用的地址。

版本3.3中更改:管理器對象支持上下文管理協議 - 請參閱 上下文管理器類型。 __enter__()啓動服務器進程(若是還沒有啓動),而後返回管理器對象。 __exit__()來電shutdown()

在之前的版本__enter__()中,若是還沒有啓動管理器的服務器進程,則不啓動。

經過返回的管理manager()將支持的類型: listdictNamespaceLock, RLockSemaphoreBoundedSemaphore, ConditionEventBarrier, QueueValueArray

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/18 14:40
# @Author  : Py.qi
# @File    : mult_servprocess.py
# @Software: PyCharm

from multiprocessing import Process, Manager

def f(d,l,i):
    d[i] = i+1  #每一個進程添加字典元素
    l.append(i)  #每一個進程啓動後將列表添加數據i

if __name__ == '__main__':
    manager = Manager()
    d = manager.dict()
    l = manager.list()
    pro_list=[]
    for i in range(5):  #啓動5個進程
        p = Process(target=f, args=(d,l,i))
        p.start()
        pro_list.append(p)  #將進程都添加到列表
    for j in pro_list:
        j.join()  #等待每一個進程執行完成
    print(d)  #打印字典數據爲5個進程操做的後的數據,實現進程間的數據共享
    print(l)

#output:
{0: 1, 1: 2, 2: 3, 3: 4, 4: 5}
[0, 1, 2, 3, 4]

(4)進程池(Pool)

class multiprocessing.pool.Poolprocesses [,initializer [,initargs [,maxtasksperchild [,context 

一個進程池對象,用於控制能夠提交做業的工做進程池。它支持超時和回調的異步結果,並具備並行映射實現。

進程是要使用的工做進程的數量。若是過程是 None而後經過返回的數字os.cpu_count()被使用。

若是初始化程序不是,None那麼每一個工做進程將initializer(*initargs)在啓動時調用 

maxtasksperchild是一個工做進程在退出以前能夠完成的任務數量,並被一個新的工做進程取代,以便釋放未使用的資源。默認的maxtasksperchildNone,這意味着工做進程的生活時間與池同樣長。

上下文可用於指定用於啓動工做進程的上下文。一般使用函數multiprocessing.Pool()Pool()上下文對象方法來建立池在這兩種狀況都適當設置。

請注意,池對象的方法只應由建立池的進程調用。

apply func [args [kwds 

呼叫FUNC帶參數ARGS和關鍵字參數kwds它阻塞,直到結果準備就緒。鑑於這些塊,apply_async()更適合於並行執行工做。此外,func 只能在進程池的一名工做人員中執行。

apply_async func [args [kwds [callback [error_callback 

apply()返回結果對象方法的變體

若是指定了callback,那麼它應該是一個可接受的參數。當結果變爲就緒callback被應用到它,這是除非調用失敗,在這種狀況下,error_callback 被應用。

若是指定了error_callback,那麼它應該是一個可接受的參數。若是目標函數失敗,則使用異常實例調用error_callback

回調應該當即完成,不然處理結果的線程將被阻塞。

map funciterable [chunksize 

map()內置函數的並行等價物儘管它只支持一個可迭代的參數)。它阻塞,直到結果準備就緒。

這種方法把迭代器切成許多塊,它們做爲單獨的任務提交給進程池。能夠經過將chunksize設置爲正整數來指定這些塊的(近似)大小

map_async funciterable [chunksize [callback [error_callback 

map()返回結果對象方法的變體

若是指定了callback,那麼它應該是一個可接受的參數。當結果變爲就緒callback被應用到它,這是除非調用失敗,在這種狀況下,error_callback 被應用。

若是指定了error_callback,那麼它應該是一個可接受的參數。若是目標函數失敗,則使用異常實例調用error_callback

回調應該當即完成,不然處理結果的線程將被阻塞。

imap funciterable [chunksize 

一個懶惰的版本map()

所述CHUNKSIZE參數是與由所使用的一個map() 方法。對於使用了一個較大的值很長iterables CHUNKSIZE可使做業完成的太多比使用的默認值加快 1

此外,若是CHUNKSIZE1next()經過返回的迭代器的方法imap()方法有一個可選的timeout參數: next(timeout)將提升multiprocessing.TimeoutError若是結果不能內退回timeout秒。

imap_unordered funciterable [chunksize 

一樣imap(),除了從返回的迭代結果的排序應該考慮爲所欲爲。(只有當只有一個工做進程時,纔是保證「正確」的順序。)

starmap funciterable [chunksize 

就像map()除了可迭代的元素被指望是被解包爲參數的迭代。

所以,一個可迭代結果[(1,2), (3, 4)][func(1,2), func(3,4)]

3.3版本中的新功能

starmap_async funciterable [chunksize [callback [error_back 

的組合starmap()map_async(),超過迭代 迭代 iterables,並呼籲FUNC與解壓縮iterables。返回一個結果對象。

3.3版本中的新功能

close

防止將更多任務提交到池中。一旦全部任務完成,工做進程將退出。

terminate

當即中止工做進程而不完成傑出的工做。當池對象被垃圾收集時terminate()會當即調用。

join

等待工做進程退出。必須打電話close()或 terminate()在使用以前join()

3.3版新增功能:池對象如今支持上下文管理協議 - 請參閱 上下文管理器類型。 __enter__()返回池對象,並__exit__()調用terminate()

class multiprocessing.pool.AsyncResult

Pool.apply_async()和 返回的結果的類Pool.map_async()

get timeout 

到達時返回結果。若是timeout不是None,而且結果沒有在超時內到達,那麼 multiprocessing.TimeoutError會引起。若是遠程調用引起異常,那麼該異常將被從新調整get()

wait timeout 

等到結果可用或timeout秒數經過。

ready

返回通話是否完成。

successful

返回調用是否完成而不引起異常。AssertionError若是結果沒有準備好,會提升

Pool類表示一個工做進程池。它具備容許以幾種不一樣方式將任務卸載到工做進程的方法。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/19 14:19
# @Author  : Py.qi
# @File    : mult_Pool_1.py
# @Software: PyCharm

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         #4個開始工做進程
        result = pool.apply_async(f, (10,)) #異步進程處理
        print(result)
        print(result.get(timeout=1))        #1秒超時,返回結果

        print(pool.map(f, range(10)))       #並行阻塞等待結果

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(2))          # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        #引起異常multiprocessing.TimeoutError


#output:
<multiprocessing.pool.ApplyResult object at 0x0000008BC4E76278>
100
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4
Traceback (most recent call last):
  File "Z:/python_project/day20/mult_Pool_1.py", line 28, in <module>
    print(result.get(timeout=1))        #引起異常multiprocessing.TimeoutError
  File "Z:\Program Files\Python35\lib\multiprocessing\pool.py", line 640, in get
    raise TimeoutError
multiprocessing.context.TimeoutError

 

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/18 15:28
# @Author  : Py.qi
# @File    : mult_Pool.py
# @Software: PyCharm

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    #啓動4個工做進程做爲進程池
    with Pool(processes=4) as pool:
        #返回函數參數運行結果列表
        print(pool.map(f, range(10)))
        #在進程池中以任意順序打印相同的數字
        for i in pool.imap_unordered(f, range(10)):
            print(i,end=' ')
        #異步評估
        res = pool.apply_async(f,(20,))      #在進程池中只有一個進程運行
        print('\n',res.get(timeout=1))       #打印結果,超時爲1秒
        #打印該進程的PID
        res = pool.apply_async(os.getpid,()) #在進程池中只有一個進程運行
        print(res.get(timeout=1))            #打印進程PID

        #打印4個進程的PID
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        #進程等待10秒,獲取數據超時爲1秒,將輸出異常
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")


#output:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0 1 4 9 16 25 36 49 64 81 
 400
2164
[8152, 2032, 2164, 8152]
We lacked patience and got a multiprocessing.TimeoutError

請注意:池的方法只能右建立它的進程使用,包中的功能要求__main__,在交互解釋器中將不起做用。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/1/18 16:14
# @Author  : Py.qi
# @File    : sync_apply.py
# @Software: PyCharm

from multiprocessing import Pool,freeze_support
import time
def foo(n):
    time.sleep(1)
    print(n*n)

def back(arg):
    print('exec done:',arg)

if __name__ == '__main__':
    freeze_support()  #windows下新啓動進程須要先運行此函數
    pool = Pool(2)   #建立進程池,同時2個進程運行
    for i in range(10):
        #pool.apply(func=foo, args=(i,))  #建立同步進程
        #建立異步進程,傳遞函數和參數,在函數執行完後執行callback,並將函數foo的結構返回給callback
        pool.apply_async(func=foo,args=(i,),callback=back)
    pool.close() #此處必須是先關閉進程再join
    pool.join()
    print('end')
相關文章
相關標籤/搜索