因爲GIL的存在,python一個進程同時只能執行一個線程。所以在python開發時,計算密集型的程序經常使用多進程,IO密集型的使用多線程python
#建立方法1:將要執行的方法做爲參數傳給Process from multiprocessing import Process def f(name): print 'hello',name if __name__ == '__main__': #須要注意的是,多進程只能在main中執行 p = Process(target=f,args=('pingy',)) #target=f指執行函數f,args=('pingy',)是指以元組方式傳入函數的參數 p.start() #執行進程 p.join() #父進程中止,等待子進程執行完
#建立方法2:從Process繼承,並重寫run() from multiprocessing import Process class MyProcess(Process): def run(self): print("MyProcess extended from Process") if __name__ == '__main__': #須要注意的是,多進程只能在main中執行 p2=MyProcess() p2.start()
實例方法:數據結構
run(): #默認的run()函數調用target的函數,你也能夠在子類中覆蓋該函數 start() : #啓動該進程 daemon(): #中止子進程,只執行父進程 join([timeout]) : #父進程被中止,直到子進程被執行完畢。當timeout爲None時沒有超時,不然有超時 is_alive(): #返回進程是否在運行。正在運行指啓動後、終止前 terminate(): #結束進程
例:多線程
from multiprocessing import Process from threading import Thread import time import os def foo(n): time.sleep(2) print 'Number:',n print '子進程ID:',os.getpid(),'父進程ID:',os.getpid() def main1(): for i in range(2): foo(i) def main2(): for i in range(2): p = Process(target=foo,args=(i,)) print p.name,'準備執行...' #p.name爲進程名 p.start() print p.pid,'開始執行...' #在進程start前,進程號p.pid爲None p.join(1) #join([timeout]) 父進程被中止,直到子進程被執行完畢。 if __name__ == '__main__': print '主進程ID:',os.getpid() print '++++++++++++++++++++++++++++++++++++++++++' main1() print '------------------------------------------' main2()
輸出結果:app
主進程ID: 84792 ++++++++++++++++++++++++++++++++++++++++++ Number: 0 子進程ID: 84792 父進程ID: 84792 Number: 1 子進程ID: 84792 父進程ID: 84792 ------------------------------------------ Process-1 準備執行... 123316 開始執行... Process-2 準備執行... 85716 開始執行... Number: 0 子進程ID: 123316 父進程ID: 123316 Number: 1 子進程ID: 85716 父進程ID: 85716
設置daemon屬性:async
#不加daemon: import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.start() print "end!"
執行結果:函數
end! work start:Thu Oct 20 16:46:12 2016 work end:Thu Oct 20 16:46:15 2016
#加上daemon後: import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.daemon = True p.start() print "end!"
輸出結果:ui
end!
設置daemon執行完結束的方法:spa
import multiprocessing import time def worker(interval): print("work start:{0}".format(time.ctime())); time.sleep(interval) print("work end:{0}".format(time.ctime())); if __name__ == "__main__": p = multiprocessing.Process(target = worker, args = (3,)) p.daemon = True p.start() p.join() # print "end!"
輸出結果:線程
work start:Thu Oct 20 16:49:34 2016
work end:Thu Oct 20 16:49:37 2016 end!
將進程定義爲類:設計
import multiprocessing import time class ClockProcess(multiprocessing.Process): def __init__(self, interval): multiprocessing.Process.__init__(self) self.interval = interval def run(self): n = 5 while n > 0: print("the time is {0}".format(time.ctime())) time.sleep(self.interval) n -= 1 if __name__ == '__main__': p = ClockProcess(3) p.start()
輸出結果:
the time is Thu Oct 20 16:42:21 2016 the time is Thu Oct 20 16:42:24 2016 the time is Thu Oct 20 16:42:27 2016 the time is Thu Oct 20 16:42:30 2016 the time is Thu Oct 20 16:42:33 2016
多進程與多線程的區別:
from multiprocessing import Process import threading import time
li = [] def run(li1,n): li1.append(n) print li1 if __name__ == '__main__':for i in range(10): #建立多進程,每一個進程佔用單獨內存 p = Process(target=run,args=[li,i]) p.start() time.sleep(1) print '我是分割線'.center(50,'*') for i in range(10): #建立多線程,全部線程共享內存 t = threading.Thread(target=run,args=[li,i]) t.start()
執行結果:
[0] [1] [2] [3] [4] [5] [6] [7] [8] [9] *****************我是分割線****************** [0] [0, 1] [0, 1, 2] [0, 1, 2, 3] [0, 1, 2[, 03, , 14, , 25, ]3 , 4, 5] [0, 1, 2[, 03, , 14, , 25, , 36, , 47, ][5 0, , 61, , 72, , 83], 4, 5, 6, [70, , 81, , 92], 3, 4, 5, 6, 7, 8, 9]
(1)鎖:做用是當多個進程須要訪問共享資源的時候,Lock能夠用來避免訪問的衝突。它又分爲Lock和rLock,rLock中除了狀態locked和unlocked外還記錄了當前lock的owner和遞歸層數,使得RLock能夠被同一個線程屢次acquire()。若是使用RLock,那麼acquire和release必須成對出現,即調用了n次acquire,必須調用n次的release才能真正釋放所佔用的瑣。
#鎖的使用方法: import multiprocessing lock = multiprocessing.Lock() #Lock對象 lock.acquire(([timeout])) #鎖定。timeout爲可選項,若是設定了timeout,則在超時後經過返回值能夠判斷是否獲得了鎖 lock.release() #解鎖 rLock = multiprocessing.RLock() #RLock對象 rLock.acquire(([timeout])) #鎖定。timeout爲可選項,若是設定了timeout,則在超時後經過返回值能夠判斷是否獲得了鎖 rLock.release() #解鎖
例:
import multiprocessing import sys def worker_with(lock, f): with lock: fs = open(f, 'a+') n = 10 while n > 1: fs.write("Lockd acquired via with\n") n -= 1 fs.close() def worker_no_with(lock, f): lock.acquire() try: fs = open(f, 'a+') n = 10 while n > 1: fs.write("Lock acquired directly\n") n -= 1 fs.close() finally: lock.release() if __name__ == "__main__": lock = multiprocessing.Lock() f = "file.txt" w = multiprocessing.Process(target = worker_with, args=(lock, f)) nw = multiprocessing.Process(target = worker_no_with, args=(lock, f)) w.start() nw.start() print "end"
輸出結果:
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
(2)multiprocess.Queue:實現進/線程間的同步
注意:Queue.Queue是進程內非阻塞隊列,multiprocess.Queue是跨進程通訊隊列。多進程前者是各自私有,後者是各子進程共有。
實例方法:
Queue.qsize() 返回隊列的大小
Queue.empty() 若是隊列爲空,返回True,反之False
Queue.full() 若是隊列滿了,返回True,反之False
Queue.full 與 maxsize 大小對應
Queue.get([block[, timeout]])獲取隊列,timeout等待時間
Queue.get_nowait() 至關Queue.get(False)
Queue.put(item) 寫入隊列,timeout等待時間
Queue.put_nowait(item) 至關Queue.put(item, False)
Queue.task_done() 在完成一項工做以後,Queue.task_done()函數向任務已經完成的隊列發送一個信號
Queue.join() 實際上意味着等到隊列爲空,再執行別的操做
FIFO(先進先出)隊列:
import Queue q = Queue.Queue() for i in range(5): q.put(i) while not q.empty(): print q.get()
輸出結果: 0 1 2 3 4
LIFO後進先出隊列:
import Queue q = Queue.LifoQueue() for i in range(5): q.put(i) while not q.empty(): print q.get()
輸出結果: 4 3 2 1 0
優先級隊列:
import Queue import threading import time exitFlag = 0 class myThread (threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print "Starting " + self.name process_data(self.name, self.q) print "Exiting " + self.name def process_data(threadName, q): while not exitFlag: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print "%s processing %s" % (threadName, data) else: queueLock.release() time.sleep(1) threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = Queue.Queue(10) threads = [] threadID = 1 # 建立新線程 for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1 # 填充隊列 queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release() # 等待隊列清空 while not workQueue.empty(): pass # 通知線程是時候退出 exitFlag = 1 # 等待全部線程完成 for t in threads: t.join() print "Exiting Main Thread"
輸出結果:
Starting Thread-1 Starting Thread-2 Starting Thread-3 Thread-1 processing One Thread-2 processing Two Thread-3 processing Three Thread-1 processing Four Thread-2 processing Five Exiting Thread-3 Exiting Thread-1 Exiting Thread-2 Exiting Main Thread
multiprocess.Queue:實現進程間的同步:
例1:
from multiprocessing import Process,Queue def foo(q,n): q.put(n) if __name__ == '__main__': que=Queue() for i in range(5): p=Process(target=foo,args=(que,i)) p.start() p.join() print(que.qsize())
輸出結果:
5
例2:
import multiprocessing def writer_proc(q): try: q.put(1, block = False) except: pass def reader_proc(q): try: print q.get(block = False) except: pass if __name__ == "__main__": q = multiprocessing.Queue() writer = multiprocessing.Process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.Process(target=reader_proc, args=(q,)) reader.start()
輸出結果:
1
(3)multiprocessing.Value與multiprocessing.Array:進行數據共享
from multiprocessing import Process,Value,Array def foo1(n,a): n.value = 3 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d',0.0) #d的意思是小數.建立0.0 arr = Array('i',range(10)) #i的意思是整數.建立一個0-9的整數 p = Process(target=foo1,args=(num,arr)) p.start() p.join() print num.value print arr[:]
輸出結果:
3.0
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
(4)multiprocessing.Manager:數據共享
from multiprocessing import Manager,Process def f(d,l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': manage = Manager() d = manage.dict() #建立一個進程間可共享的dict l = manage.list(range(10)) #建立一個進程間可共享的list p = Process(target=f,args=(d,l)) p.start() p.join() print d print l
輸出結果:
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Pool能夠提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,若是池尚未滿,那麼就會建立一個新的進程用來執行該請求;但若是池中的進程數已經達到規定最大值,那麼該請求就會等待,直到池中有進程結束,纔會建立新的進程來它。
(1)使用進程池(非阻塞)
#使用進程池(非阻塞) import time import multiprocessing def fun(msg): print 'MSG:',msg time.sleep(3) print 'end' if __name__ == '__main__': pool = multiprocessing.Pool(processes=3) for i in xrange(4): msg = 'hello,%d' %(i) pool.apply_async(fun,(msg,)) #維持執行的進程總數爲processes,當一個進程執行完畢後會添加新的進程進去 print '-----------------------' pool.close() pool.join() print 'Sub-processes done'
輸出結果:
----------------------- MSG: hello,0 MSG: hello,1 MSG: hello,2 end MSG: hello,3 end end end Sub-processes done
(2)使用進程池(阻塞)
import time,multiprocessing def fun(msg): print 'MSG:',msg time.sleep(3) print 'end' if __name__ == '__main__': pool = multiprocessing.Pool(processes=3) for i in xrange(4): msg = 'hello,%d' %i pool.apply(fun,(msg,)) print '-------------------------------' pool.close() pool.join() print 'Sub-processes done'
輸出結果:
MSG: hello,0 end MSG: hello,1 end MSG: hello,2 end MSG: hello,3 end ------------------------------- Sub-processes done
實現機制:
管道是由內核管理的一個緩衝區,至關於咱們放入內存中的一個紙條。管道的一端鏈接一個進程的輸出。這個進程會向管道中放入信息。管道的另外一端鏈接一個進程的輸入,這個進程取出被放入管道的信息。一個緩衝區不須要很大,它被設計成爲環形的數據結構,以便管道能夠被循環利用。當管道中沒有信息的話,從管道中讀取的進程會等待,直到另外一端的進程放入信息。當管道被放滿信息的時候,嘗試放入信息的進程會等待,直到另外一端的進程取出信息。當兩個進程都終結的時候,管道也自動消失。
import multiprocessing import time def proc1(pipe): while True: for i in range(10000): print 'send:%s' %i pipe.send(i) time.sleep(1) def proc2(pipe): while True: print 'proc2 recv:',pipe.recv() time.sleep(1) def proc3(pipe): while True: print 'proc3 recv:',pipe.recv() time.sleep(1) if __name__ == '__main__': pipe = multiprocessing.Pipe() p1 = multiprocessing.Process(target=proc1,args=(pipe[0],)) p2 = multiprocessing.Process(target=proc2, args=(pipe[1],)) #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],)) p1.start() p2.start() #p3.start() p1.join() p2.join() #p3.join()
輸出結果:
send:0 proc2 recv: 0 send:1 proc2 recv: 1 send:2 proc2 recv: 2 send:3 proc2 recv: 3 send:4 proc2 recv: 4 send:5 proc2 recv: 5 send:6 proc2 recv: 6