序. multiprocessing
python中的多線程其實並非真正的多線程,若是想要充分地使用多核CPU的資源,在python中大部分狀況須要使用多進程。Python提供了很是好用的多進程包multiprocessing,只須要定義一個函數,Python會完成其餘全部事情。藉助這個包,能夠輕鬆完成從單進程到併發執行的轉換。multiprocessing支持子進程、通訊和共享數據、執行不一樣形式的同步,提供了Process、Queue、Pipe、Lock等組件。html
建立進程的類:Process([group [, target [, name [, args [, kwargs]]]]]),target表示調用對象,args表示調用對象的位置參數元組。kwargs表示調用對象的字典。name爲別名。group實質上不使用。
方法:is_alive() 、join([timeout])、run()、start()、terminate()。其中,Process以start()啓動某個進程。python
is_alive():判斷該進程是否還活着安全
join([timeout]):主進程阻塞,等待子進程的退出, join方法要在close或terminate以後使用。多線程
run():進程p調用start()時,自動調用run()併發
屬性:authkey、daemon(要經過start()設置)、exitcode(進程在運行時爲None、若是爲–N,表示被信號N結束)、name、pid。其中daemon是父進程終止後自動終止,且本身不能產生新進程,必須在start()以前設置。app
例1.1:建立函數並將其做爲單個進程dom
import multiprocessing import time def worker(interval): n = 5 while n > 0: print("The time is {0}".format(time.ctime())) #輸出時間的格式 time.sleep(interval) n -= 1 if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.start() print "p.pid:", p.pid print "p.name:", p.name print "p.is_alive:", p.is_alive()
結果async
1
2
3
4
5
6
7
8
|
p.pid:
8736
p.name: Process
-1
p.is_alive: True
The time is Tue Apr
21
20:
55:
12
2015
The time is Tue Apr
21
20:
55:
15
2015
The time is Tue Apr
21
20:
55:
18
2015
The time is Tue Apr
21
20:
55:
21
2015
The time is Tue Apr
21
20:
55:
24
2015
|
例1.2:建立函數並將其做爲多個進程ide
import multiprocessing import time def worker_1(interval): print "worker_1" time.sleep(interval) print "end worker_1" def worker_2(interval): print "worker_2" time.sleep(interval) print "end worker_2" def worker_3(interval): print "worker_3" time.sleep(interval) print "end worker_3" if __name__ == "__main__":
p1 = Process(target=worker_1, args=(6,))
p2 = Process(target=worker_2, args=(4,))
p3 = Process(target=worker_3, args=(2,))
p1.start() p2.start() p3.start()
print("The number of CPU is:" + str(cpu_count()))
for p in active_children():
print("child p.name:=%s" % p.name + "\tp.id=%s" % str(p.pid))
print(p1.pid)
print("END-----")
結果
1
2
3
4
5
6
7
8
9
10
11
|
The number of CPU is:4 |
例1.3:將進程定義爲類
import multiprocessing import time class ClockProcess(multiprocessing.Process): def __init__(self, interval): multiprocessing.Process.__init__(self) self.interval = interval def run(self): n = 5 while n > 0: print("the time is {0}".format(time.ctime())) time.sleep(self.interval) n -= 1 if __name__ == '__main__': p = ClockProcess(3) p.start()
注:進程p調用start()時,自動調用run()
結果
1
2
3
4
5
|
the time is Tue Apr
21
20:
31:
30
2015
the time is Tue Apr
21
20:
31:
33
2015
the time is Tue Apr
21
20:
31:
36
2015
the time is Tue Apr
21
20:
31:
39
2015
the time is Tue Apr
21
20:
31:
42
2015
|
例1.4:daemon程序對比結果
#1.4-1 不加daemon屬性
import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.start() print "end!"
結果
1
2
3
|
end!
work start:Tue Apr
21
21:
29:
10
2015
work end:Tue Apr
21
21:
29:
13
2015
|
#1.4-2 加上daemon屬性
import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.daemon = True p.start() print "end!"
結果
1
|
end!
|
注:因子進程設置了daemon屬性,主進程結束,它們就隨着結束了。
在多線程模型中,默認狀況下(sub-Thread.daemon=False)主線程會等待子線程退出後再退出,而若是sub- Thread.setDaemon(True)時,主線程不會等待子線程,直接退出,而此時子線程會隨着主線程的對出而退出,避免這種狀況,主線程中須要 對子線程進行join,等待子線程執行完畢後再退出。對應的,在多進程模型中,Process類也有daemon屬性,而它表示的含義與 Thread.daemon相似,當設置sub-Process.daemon=True時,主進程中須要對子進程進行等待,不然子進程會隨着主進程的退 出而退出
更詳細:http://www.tuicool.com/articles/zii6bm
#1.4-3 設置daemon執行完結束的方法
import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.daemon = True p.start() p.join() print "end!"
結果
1
2
3
|
work start:Tue Apr
21
22:
16:
32
2015
work end:Tue Apr
21
22:
16:
35
2015
end!
|
當多個進程須要訪問共享資源的時候,Lock能夠用來避免訪問的衝突。
import multiprocessing import sys def worker_with(lock, f): with lock: fs = open(f, 'a+') n = 10 while n > 1: fs.write("Lockd acquired via with\n") n -= 1 fs.close() def worker_no_with(lock, f): lock.acquire() try: fs = open(f, 'a+') n = 10 while n > 1: fs.write("Lock acquired directly\n") n -= 1 fs.close() finally: lock.release() if __name__ == "__main__": lock = multiprocessing.Lock() f = "file.txt" w = multiprocessing.Process(target = worker_with, args=(lock, f)) nw = multiprocessing.Process(target = worker_no_with, args=(lock, f)) w.start() nw.start() print "end"
結果(輸出文件)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
|
Semaphore用來控制對共享資源的訪問數量,例如池的最大鏈接數。
import multiprocessing import time def worker(s, i): s.acquire() print(multiprocessing.current_process().name + "acquire"); time.sleep(i) print(multiprocessing.current_process().name + "release\n"); s.release() if __name__ == "__main__": s = multiprocessing.Semaphore(2) for i in range(5): p = multiprocessing.Process(target = worker, args=(s, i*2)) p.start()
結果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
Process
-1
acquire
Process
-1
release
Process
-2
acquire
Process
-3
acquire
Process
-2
release
Process
-5
acquire
Process
-3
release
Process
-4
acquire
Process
-5
release
Process
-4
release
|
例子2:
import multiprocessing import time def worker(s, ): s.acquire() print(multiprocessing.current_process().name + "acquire") time.sleep(1) # print(multiprocessing.current_process().name + "release\n") s.release() if __name__ == "__main__": s = multiprocessing.Semaphore(2) for i in range(5): p = multiprocessing.Process(target = worker, args=(s, )) # time.sleep(0.01) p.start()
#####結果######
Process-4acquire
Process-3acquire
Process-1acquire
Process-2acquire
Process-5acquire
Event用來實現進程間同步通訊。
import multiprocessing
import time
def wait_for_event(e):
print("wait_for_event: starting")
e.wait() #一直阻塞的去等待set值
print('*****')
print("wairt_for_event: e.is_set()->" + str(e.is_set()))
def wait_for_event_timeout(e, t):
print("wait_for_event_timeout:starting")
e.wait(2) #等2s去取set值
print('------')
print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))
if __name__ == "__main__":
e = multiprocessing.Event()
# e.set()
w1 = multiprocessing.Process(name="block", target=wait_for_event, args=(e,))
w2 = multiprocessing.Process(name="non-block", target=wait_for_event_timeout, args=(e, 2))
w1.start()
w2.start()
time.sleep(10)
e.set() # 設置set的值
print("main: event is set")
結果
1
2
3
4
5
|
wait_for_event: starting |
import multiprocessing def writer_proc(q): try: q.put(1, block = False) except: pass def reader_proc(q): try: print q.get(block = False) except: pass if __name__ == "__main__": q = multiprocessing.Queue() writer = multiprocessing.Process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.Process(target=reader_proc, args=(q,)) reader.start() #reader.join() 這樣會一直阻塞 #writer.join()
結果
1
|
1
|
import multiprocessing
import time
def proc1(pipe):
# while True:
for i in range(3):
print("send: %s" %(i))
pipe.send(i)
time.sleep(1)
def proc2(pipe):
while True:
print ("proc2 rev:", pipe.recv())
time.sleep(1)
def proc3(pipe):
while True:
print("PROC3 rev:", pipe.recv())
time.sleep(1)
if __name__ == "__main__":
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
#p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))
p1.start()
p2.start()
# p3.start()
# p1.join()
# p2.join()
# p3.join()
#######結果########
send: 0
proc2 rev: 0
send: 1
proc2 rev: 1
send: 2
proc2 rev: 2
結果
在利用Python進行系統管理的時候,特別是同時操做多個文件目錄,或者遠程控制多臺主機,並行操做能夠節約大量的時間。當被操做對象數目不大時,能夠直接利用multiprocessing中的Process動態成生多個進程,十幾個還好,但若是是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,此時能夠發揮進程池的功效。
Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,纔會建立新的進程來它。
例子:
import multiprocessing import time def func(msg, a): # if a == 1: # time.sleep(8) # print(1) print("msg:", msg) print("++++") time.sleep(3) # print("end") if __name__ == "__main__": pool = multiprocessing.Pool(processes=3) for i in range(7): msg = "hello %d" % (i) a = i # 維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去 pool.apply_async(func, (msg, a, )) pool.close() # 調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束 pool.join() print("Sub-process(es) done.")
例7.1:使用進程池(非阻塞)
#coding: utf-8 import multiprocessing import time def func(msg): print "msg:", msg time.sleep(3) print "end" if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in xrange(4): msg = "hello %d" %(i) pool.apply_async(func, (msg, )) #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去 print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" pool.close() pool.join() #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束 print "Sub-process(es) done."
一次執行結果
1
2
3
4
5
6
7
8
9
10
|
mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello
0
msg: hello
1
msg: hello
2
end
msg: hello
3
end
end
end
Sub-process(es) done.
|
函數解釋:
執行說明:建立一個進程池pool,並設定進程的數量爲3,xrange(4)會相繼產生四個對象[0, 1, 2, 4],四個對象被提交到pool中,因pool指定進程數爲3,因此0、一、2會直接送到進程中執行,當其中一個執行完過後才空出一個進程處理對象3,因此會出現輸出「msg: hello 3」出如今"end"後。由於爲非阻塞,主函數會本身執行自個的,不搭理進程的執行,因此運行完for循環後直接輸出「mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~」,主程序在pool.join()處等待各個進程的結束。
例7.2:使用進程池(阻塞)
#coding: utf-8 import multiprocessing import time def func(msg): print "msg:", msg time.sleep(3) print "end" if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) for i in xrange(4): msg = "hello %d" %(i) pool.apply(func, (msg, )) #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去 print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" pool.close() pool.join() #調用join以前,先調用close函數,不然會出錯。執行完close後不會有新的進程加入到pool,join函數等待全部子進程結束 print "Sub-process(es) done."
一次執行的結果
1
2
3
4
5
6
7
8
9
10
|
msg: hello
0
end
msg: hello
1
end
msg: hello
2
end
msg: hello
3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.
|
例7.3:使用進程池,並關注結果
import multiprocessing import time def func(msg): print "msg:", msg time.sleep(3) print "end" return "done" + msg if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) result = [] for i in xrange(3): msg = "hello %d" %(i) result.append(pool.apply_async(func, (msg, ))) pool.close() pool.join() for res in result: print ":::", res.get() print "Sub-process(es) done."
一次執行結果
1
2
3
4
5
6
7
8
9
10
|
msg: hello
0
msg: hello
1
msg: hello
2
end
end
end
::: donehello
0
::: donehello
1
::: donehello
2
Sub-process(es) done.
|
例7.4:使用多個進程池
import multiprocessing import os, time, random def Lee(i): print('1', i) time.sleep(3) print('-----') # print("\nRun task Lee-%s" %(os.getpid())) #os.getpid()獲取當前的進程的ID # start = time.time() # time.sleep(random.random() * 10) #random.random()隨機生成0-1之間的小數 # end = time.time() # print('Task Lee, runs %0.2f seconds.' % (end - start)) def Marlon(i): print('2', i) time.sleep(3) print('-----') # print("\nRun task Marlon-%s" % (os.getpid())) # start = time.time() # time.sleep(random.random() * 40) # end = time.time() # print('Task Marlon runs %0.2f seconds.' %(end - start)) def Allen(i): print('3', i) time.sleep(3) print('-----') # print("\nRun task Allen-%s" %(os.getpid())) # start = time.time() # time.sleep(random.random() * 30) # end = time.time() # print('Task Allen runs %0.2f seconds.' %(end - start)) def Frank(i): print('4', i) time.sleep(3) print('-----') # print("\nRun task Frank-%s" %(os.getpid())) # start = time.time() # time.sleep(random.random() * 20) # end = time.time() # print('Task Frank runs %0.2f seconds.' %(end - start)) if __name__ == '__main__': function_list = [Lee, Marlon, Allen, Frank] # print("parent process %s" % (os.getpid())) pool = multiprocessing.Pool(4) for func in function_list: # Pool執行函數,apply執行函數,當有一個進程執行完畢後,會添加一個新的進程到pool中 for i in ['a', 'b', 'c','d', 'e', 'f', 'g']: pool.apply_async(func, args=(i,)) print('Waiting for all subprocesses done...') pool.close() # 調用join以前,必定要先調用close() 函數,不然會出錯, close()執行後不會有新的進程加入到pool,join函數等待素有子進程結束 pool.join() print('All subprocesses done.')
#coding: utf-8 import multiprocessing import os, time, random def Lee(): print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()獲取當前的進程的ID start = time.time() time.sleep(random.random() * 10) #random.random()隨機生成0-1之間的小數 end = time.time() print 'Task Lee, runs %0.2f seconds.' %(end - start) def Marlon(): print "\nRun task Marlon-%s" %(os.getpid()) start = time.time() time.sleep(random.random() * 40) end=time.time() print 'Task Marlon runs %0.2f seconds.' %(end - start) def Allen(): print "\nRun task Allen-%s" %(os.getpid()) start = time.time() time.sleep(random.random() * 30) end = time.time() print 'Task Allen runs %0.2f seconds.' %(end - start) def Frank(): print "\nRun task Frank-%s" %(os.getpid()) start = time.time() time.sleep(random.random() * 20) end = time.time() print 'Task Frank runs %0.2f seconds.' %(end - start) if __name__=='__main__': function_list= [Lee, Marlon, Allen, Frank] print "parent process %s" %(os.getpid()) pool=multiprocessing.Pool(4) for func in function_list: pool.apply_async(func) #Pool執行函數,apply執行函數,當有一個進程執行完畢後,會添加一個新的進程到pool中 print 'Waiting for all subprocesses done...' pool.close() pool.join() #調用join以前,必定要先調用close() 函數,不然會出錯, close()執行後不會有新的進程加入到pool,join函數等待素有子進程結束 print 'All subprocesses done.'
一次執行結果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
parent process
7704
Waiting for
all
subprocesses done...
Run task Lee
-6948
Run task Marlon
-2896
Run task Allen
-7304
Run task Frank
-3052
Task Lee, runs
1.59
seconds.
Task Marlon runs
8.48
seconds.
Task Frank runs
15.68
seconds.
Task Allen runs
18.08
seconds.
All subprocesses done.
|