Python多進程編程

序. multiprocessing
python中的多線程其實並非真正的多線程,若是想要充分地使用多核CPU的資源,在python中大部分狀況須要使用多進程。Python提供了很是好用的多進程包multiprocessing,只須要定義一個函數,Python會完成其餘全部事情。藉助這個包,能夠輕鬆完成從單進程到併發執行的轉換。multiprocessing支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。python

 

1. Process

建立進程的類:Process([group [, target [, name [, args [, kwargs]]]]]),target表示調用對象,args表示調用對象的位置參數元組。kwargs表示調用對象的字典。name爲別名。group實質上不使用。
方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啓動某個進程。編程

屬性:authkey、daemon(要經過start()設置)、exitcode(進程在運行時爲None、若是爲–N,表示被信號N結束)、name、pid。其中daemon是父進程終止後自動終止,且本身不能產生新進程,必須在start()以前設置。安全

 

例1.1:建立函數並將其做爲單個進程多線程

複製代碼
import multiprocessing
import time

def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "p.pid:", p.pid
    print "p.name:", p.name
    print "p.is_alive:", p.is_alive()
複製代碼

結果併發

1
2
3
4
5
6
7
8
p.pid:  8736
p.name: Process -1
p.is_alive: True
The time is Tue Apr  21  20: 55: 12  2015
The time is Tue Apr  21  20: 55: 15  2015
The time is Tue Apr  21  20: 55: 18  2015
The time is Tue Apr  21  20: 55: 21  2015
The time is Tue Apr  21  20: 55: 24  2015

 

例1.2:建立函數並將其做爲多個進程app

複製代碼
import multiprocessing
import time

def worker_1(interval):
    print "worker_1"
    time.sleep(interval)
    print "end worker_1"

def worker_2(interval):
    print "worker_2"
    time.sleep(interval)
    print "end worker_2"

def worker_3(interval):
    print "worker_3"
    time.sleep(interval)
    print "end worker_3"

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = (2,))
    p2 = multiprocessing.Process(target = worker_2, args = (3,))
    p3 = multiprocessing.Process(target = worker_3, args = (4,))

    p1.start()
    p2.start()
    p3.start()

    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print "END!!!!!!!!!!!!!!!!!"
複製代碼

結果dom

1
2
3
4
5
6
7
8
9
10
11
The number of CPU is: 4
child   p.name:Process -3     p.id 7992
child   p.name:Process -2     p.id 4204
child   p.name:Process -1     p.id 6380
END!!!!!!!!!!!!!!!!!
worker_ 1
worker_ 3
worker_ 2
end worker_ 1
end worker_ 2
end worker_ 3

 

例1.3:將進程定義爲類async

複製代碼
import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1

if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()      
複製代碼

:進程p調用start()時,自動調用run()

結果

1
2
3
4
5
the time is Tue Apr  21  20: 31: 30  2015
the time is Tue Apr  21  20: 31: 33  2015
the time is Tue Apr  21  20: 31: 36  2015
the time is Tue Apr  21  20: 31: 39  2015
the time is Tue Apr  21  20: 31: 42  2015

 

例1.4:daemon程序對比結果

#1.4-1 不加daemon屬性

複製代碼
import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "end!"
複製代碼

結果

1
2
3
end!
work start:Tue Apr  21  21: 29: 10  2015
work end:Tue Apr  21  21: 29: 13  2015

#1.4-2 加上daemon屬性

複製代碼
import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    print "end!"
複製代碼

結果

1
end!

:因子進程設置了daemon屬性,主進程結束,它們就隨着結束了。

#1.4-3 設置daemon執行完結束的方法

複製代碼
import multiprocessing
import time

def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));

if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    p.join()
    print "end!"
複製代碼

結果

1
2
3
work start:Tue Apr  21  22: 16: 32  2015
work end:Tue Apr  21  22: 16: 35  2015
end!

 

2. Lock

當多個進程須要訪問共享資源的時候,Lock能夠用來避免訪問的衝突。

複製代碼
import multiprocessing
import sys

def worker_with(lock, f):
    with lock:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()
        
def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()
    
if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print "end"
複製代碼

結果(輸出文件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly

 

3. Semaphore

Semaphore用來控制對共享資源的訪問數量,例如池的最大鏈接數。

複製代碼
import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release\n");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()
複製代碼

結果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Process -1 acquire
Process -1 release
 
Process -2 acquire
Process -3 acquire
Process -2 release
 
Process -5 acquire
Process -3 release
 
Process -4 acquire
Process -5 release
 
Process -4 release

 

4. Event

Event用來實現進程間同步通訊。

複製代碼
import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")
複製代碼

結果

1
2
3
4
5
wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True

 

5. Queue

Queue是多進程安全的隊列,能夠使用Queue實現多進程之間的數據傳遞。put方法用以插入數據到隊列中,put方法還有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。若是超時,會拋出Queue.Full異常。若是blocked爲False,但該Queue已滿,會當即拋出Queue.Full異常。
 
get方法能夠從隊列讀取而且刪除一個元素。一樣,get方法有兩個可選參數:blocked和timeout。若是blocked爲True(默認值),而且timeout爲正值,那麼在等待時間內沒有取到任何元素,會拋出Queue.Empty異常。若是blocked爲False,有兩種狀況存在,若是Queue有一個值可用,則當即返回該值,不然,若是隊列爲空,則當即拋出Queue.Empty異常。Queue的一段示例代碼:
複製代碼
import multiprocessing

def writer_proc(q):      
    try:         
        q.put(1, block = False) 
    except:         
        pass   

def reader_proc(q):      
    try:         
        print q.get(block = False) 
    except:         
        pass

if __name__ == "__main__":
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))  
    writer.start()   

    reader = multiprocessing.Process(target=reader_proc, args=(q,))  
    reader.start()  

    reader.join()  
    writer.join()
複製代碼

結果

1
1

 

6. Pipe

Pipe方法返回(conn1, conn2)表明一個管道的兩個端。Pipe方法有duplex參數,若是duplex參數爲True(默認值),那麼這個管道是全雙工模式,也就是說conn1和conn2都可收發。duplex爲False,conn1只負責接受消息,conn2只負責發送消息。
 
send和recv方法分別是發送和接受消息的方法。例如,在全雙工模式下,能夠調用conn1.send發送消息,conn1.recv接收消息。若是沒有消息可接收,recv方法會一直阻塞。若是管道已經被關閉,那麼recv方法會拋出EOFError。
複製代碼
import multiprocessing
import time

def proc1(pipe):
    while True:
        for i in xrange(10000):
            print "send: %s" %(i)
            pipe.send(i)
            time.sleep(1)

def proc2(pipe):
    while True:
        print "proc2 rev:", pipe.recv()
        time.sleep(1)

def proc3(pipe):
    while True:
        print "PROC3 rev:", pipe.recv()
        time.sleep(1)

if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))

    p1.start()
    p2.start()
    #p3.start()

    p1.join()
    p2.join()
    #p3.join()
複製代碼

結果

 

7. Pool

在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。
Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,纔會建立新的進程來它。

 

例7.1:使用進程池(非阻塞)

複製代碼
#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束
    print "Sub-process(es) done."
複製代碼

一次執行結果

1
2
3
4
5
6
7
8
9
10
mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello  0
 
msg: hello  1
msg: hello  2
end
msg: hello  3
end
end
end
Sub-process(es) done.

函數解釋:

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解區別,看例1例2結果區別)
  • close()    關閉pool,使其不在接受新的任務。
  • terminate()    結束工做進程,不在處理未完成的任務。
  • join()    主進程阻塞,等待子進程的退出, join方法要在close或terminate以後使用。

執行說明:建立一個進程池pool,並設定進程的數量爲3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數爲3,因此0、一、2會直接送到進程中執行,當其中一個執行完過後才空出一個進程處理對象3,因此會出現輸出「msg: hello 3」出如今"end"後。由於爲非阻塞,主函數會本身執行自個的,不搭理進程的執行,因此運行完for循環後直接輸出「mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~」,主程序在pool.join()處等待各個進程的結束。

 

例7.2:使用進程池(阻塞)

複製代碼
#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束
    print "Sub-process(es) done."
複製代碼

一次執行的結果

1
2
3
4
5
6
7
8
9
10
msg: hello  0
end
msg: hello  1
end
msg: hello  2
end
msg: hello  3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.

  

例7.3:使用進程池,並關注結果

複製代碼
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"
    return "done" + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print ":::", res.get()
    print "Sub-process(es) done."
複製代碼

一次執行結果

1
2
3
4
5
6
7
8
9
10
msg: hello  0
msg: hello  1
msg: hello  2
end
end
end
::: donehello  0
::: donehello  1
::: donehello  2
Sub-process(es) done.

 

例7.4:使用多個進程池

複製代碼
#coding: utf-8
import multiprocessing
import os, time, random

def Lee():
    print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()獲取當前的進程的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()隨機生成0-1之間的小數
    end = time.time()
    print 'Task Lee, runs %0.2f seconds.' %(end - start)

def Marlon():
    print "\nRun task Marlon-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print 'Task Marlon runs %0.2f seconds.' %(end - start)

def Allen():
    print "\nRun task Allen-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print 'Task Allen runs %0.2f seconds.' %(end - start)

def Frank():
    print "\nRun task Frank-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print 'Task Frank runs %0.2f seconds.' %(end - start)
        
if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank] 
    print "parent process %s" %(os.getpid())

    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool執行函數,apply執行函數,當有一個進程執行完畢後,會添加一個新的進程到pool中

    print 'Waiting for all subprocesses done...'
    pool.close()
    pool.join()    #調用join以前,必定要先調用close() 函數,不然會出錯, close()執行後不會有新的進程加入到pool,join函數等待素有子進程結束
    print 'All subprocesses done.'
複製代碼

一次執行結果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
parent process  7704
 
Waiting for  all  subprocesses done...
Run task Lee -6948
 
Run task Marlon -2896
 
Run task Allen -7304
 
Run task Frank -3052
Task Lee, runs  1.59  seconds.
Task Marlon runs  8.48  seconds.
Task Frank runs  15.68  seconds.
Task Allen runs  18.08  seconds.
All subprocesses done.
相關文章
相關標籤/搜索