python多進程那點事兒【multiprocessing庫】

      前言:項目中有個需求須要對產品的日誌處理,按照產品中日誌的某些字段,對日誌進行再次劃分。好比產品的日誌中含有字段id,tag=1,如今須要把tag是基數的放到一個文件中,tag是偶數的放入一個文件中。這就涉及到多個文件的讀寫操做,一個文件一個文件讀取寫入那時間過久了,公司配備的單機,跑了半個多小時,光標仍是一直在閃閃閃【你懂得】。沒辦法了,仍是用多進程跑吧。這就得對python中的多進程重新回顧一遍了。python

Q1:爲何不用多線程呢?多線程

A1:這個就須要瞭解python多線程的實現原理了,經過在其解釋器層面施加一個全局鎖來保證同一時刻只有一個線程能夠擁有鎖,執行相應的python字節碼。因此雖然冠名是是多線程,可是實質上仍是隻有一個線程在運行。有時候多線程可能會讓程序得不到提升反而下降,由於線程之間須要競爭資源。因此不少人也說,若是想真正的同一時刻執行多個任務的話,就須要使用多進程。併發

1.使用multiprocessing.Processapp

      multiprocessing.Process最多見的使用就是:async

p = multiprocessing.Process(target = 多線程執行函數名, args = 函數參數元組形式)
p.start()
p.join()

      注意使用多進程時候必定要使用join對子進程的狀態進行收集,不然在程序運行過程當中會出現殭屍進程,對系統性能形成影響。ide

      固然,上面這隻有一個進程,你在寫的時候可能很順手就寫了函數

for x in range(10):
p = multiprocessing.Process(target = 多線程執行函數名, args = 函數參數元組形式)
p.start()
p.join()

      而後就發現,這個進程貌似是順序執行的。。。好像沒有併發,緣由就出如今join的位置上,仔細查看手冊,你會發如今join函數下方有一行說明:性能

Block the calling thread until the process whose join() method is called terminates or until the optional timeout occurs.

      意思就是主線程會在join的地方一直等子進程結束。。那麼咱們多個進程併發執行就要這樣寫了:spa

1 p_list = []
2 for x in range(10):
3     p = multiprocessing.Process(target = 多線程執行函數名, args = 函數參數元組形式)
4     p.start()
5     p_list.append(p)
6 
7 for p in p_list:
8     p.join()

      感受這樣寫的代碼一點都不優雅,並且之後拓展也很不方便,子進程的數目會隨着任務數目的增長而增長,進程得不到重複的利用。操作系統

2.使用multiprocessing.Pool

 

      進程池就是上述不方便的完美解決,其通常用法以下:

pool = multiprocessing.Pool(processes=進程數目)
for x in xrange(任務數目):
    pool.apply_async(函數名, 函數參數元組形式)
pool.close() # close函數代表不會再往進程池中加入新任務,必定要在join方法調用以前調用。
pool.join()

      上述代碼開啓了含有必定數目的進程池,只須要往進程池中加入新任務便可,當進程池中已滿,其餘的任務就等待,直到有任務結束。

      注意:除了pool.apply_async方法,還有一個pool.apply方法,只不過pool.apply方法是阻塞的。

      還可使用進程池方法來關注進程執行的結果,pool.apply_asyn函數即返回函數的執行結果,使用get()方法便可獲得。

3.多進程共享數據

      多個進程之間共享數據也有不少種方法:

      1)共享變量

         只能使用Value和Array方法:

multiprocessing.Value(typecode_or_type, *args[, lock]) 
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True) 

         關於lock的一段說明:

Return a process-safe wrapper object for a ctypes object which uses lock to synchronize access. If lock is None (the default) then a multiprocessing.RLock object is created automatically.

         共享變量只能是一個變量,或者是線性的一組變量,類型也是從typecode_or_type衍生而來的,具體的用法和說明手冊上已經講解的很清楚了。附上手冊上的一段代碼以下:

 1 from multiprocessing import Process, Lock
 2 from multiprocessing.sharedctypes import Value, Array
 3 from ctypes import Structure, c_double
 4 
 5 class Point(Structure):
 6     _fields_ = [('x', c_double), ('y', c_double)]
 7 
 8 def modify(n, x, s, A):
 9     n.value **= 2
10     x.value **= 2
11     s.value = s.value.upper()
12     for a in A:
13         a.x **= 2
14         a.y **= 2
15 
16 if __name__ == '__main__':
17     lock = Lock()
18 
19     n = Value('i', 7)
20     x = Value(c_double, 1.0/3.0, lock=False)
21     s = Array('c', 'hello world', lock=lock)
22     A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
23 
24     p = Process(target=modify, args=(n, x, s, A))
25     p.start()
26     p.join()
27 
28     print n.value
29     print x.value
30     print s.value
31     print [(a.x, a.y) for a in A]
View Code

         結果:

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

      2)Manager

         使用Manager方法時,共享變量的類型會多一些,例如list,dict,Event,Lock,Array,Value...使用Manager方法時須要注意,在操做共享對象時候,除了賦值操做,其餘的方法都做用在共享對象的拷貝上,並不會對共享對象生效。例如:

d = Manager().dict()
d[0] = []
d[0].append(0) # append方法做用在代理對象上,並不對原始對象生效
print d

         輸出:{0: []}

         而一樣意思的一段代碼:

d = Manager().dict()
l = []
l.append(0)
d[0] = l # 直接賦值操做,影響原始共享對象
print d

         輸出:{0: [0]}

      3)Queue

         隊列,顧名思義,就是一組數據,使用put來往隊列中存入數據,使用get方法獲取數據,當隊列滿了繼續put和隊列空了繼續get時候會拋出相對應的異常。能夠多個進程之間傳遞數據。

      4)Pipe

         Pipe方法返回(conn1, conn2)表明一個管道的兩個端,對應兩個進程。還能夠經過duplex來設定管道是全雙工(duplex=True)仍是半雙工(duplex=False)工做。send和recv方法分別是發送和接受消息的方法,若是沒有消息可接收,recv方法會一直阻塞。若是管道已經被關閉,那麼recv方法會拋出EOFError。

4.多進程同步互斥

      1)Lock和Semaphore

         這兩個就很少講了,學過操做系統的都知道。Lock限定同一時間僅一個進程訪問共享變量,Semaphore則能夠限定多個進程同時訪問共享變量。

      2)Event

         使用set和is_set判斷事件是否已經發生,決定下一步要執行的動做。

相關文章
相關標籤/搜索