代碼說明一切:html
#encoding=utf-8 #author: walker #date: 2014-05-21 #summary: 自定義進程池遍歷目錄下文件 from multiprocessing import Process, Queue, Lock import time, os #消費者 class Consumer(Process): def __init__(self, queue, ioLock): super(Consumer, self).__init__() self.queue = queue self.ioLock = ioLock def run(self): while True: task = self.queue.get() #隊列中無任務時,會阻塞進程 if isinstance(task, str) and task == 'quit': break; time.sleep(1) #假定任務處理須要1秒鐘 self.ioLock.acquire() print( str(os.getpid()) + ' ' + task) self.ioLock.release() self.ioLock.acquire() print 'Bye-bye' self.ioLock.release() #生產者 def Producer(): queue = Queue() #這個隊列是進程/線程安全的 ioLock = Lock() subNum = 4 #子進程數量 workers = build_worker_pool(queue, ioLock, subNum) start_time = time.time() for parent, dirnames, filenames in os.walk(r'D:\test'): for filename in filenames: queue.put(filename) ioLock.acquire() print('qsize:' + str(queue.qsize())) ioLock.release() while queue.qsize() > subNum * 10: #控制隊列中任務數量 time.sleep(1) for worker in workers: queue.put('quit') for worker in workers: worker.join() ioLock.acquire() print('Done! Time taken: {}'.format(time.time() - start_time)) ioLock.release() #建立進程池 def build_worker_pool(queue, ioLock, size): workers = [] for _ in range(size): worker = Consumer(queue, ioLock) worker.start() workers.append(worker) return workers if __name__ == '__main__': Producer()
ps:python
self.ioLock.acquire() ... self.ioLock.release()
可用git
with self.ioLock: ...
替代。github
#encoding=utf-8 #author: walker #date: 2017-03-14 #summary: 一個子進程生產,一個子進程消費 import os, sys, time from multiprocessing import Process, Pool, Queue, Manager #生產 def Produce(q): print('Produce %d ...' % os.getpid()) for i in range(1, 20): while q.full(): print('sleep %d/%d ...' % (i, q.qsize())) time.sleep(1) q.put(i) q.put(0) #用0通知結束 #消費 def Consume(q): print('Consume %d ...' % os.getpid()) while True: num = q.get() if 0 == num: #收到結束信號 print('receive 0') break print('Consumer ' + str(num)) time.sleep(2) print('Consumer end ' + str(num)) if __name__ == '__main__': q = Queue(10) #可用 q = Manager().Queue(10) #可用 print(os.getpid()) producerProcess = Process(target=Produce, args=(q,)) #生產進程 consumerProcess = Process(target=Consume, args=(q,)) #消費進程 producerProcess.start() consumerProcess.start() producerProcess.join() consumerProcess.join()
#encoding=utf-8 #author: walker #date: 2017-03-14 #summary: 一個子進程生產,進程池消費 import os, sys, time from multiprocessing import Process, Pool, Queue, Manager #生產 def Produce(q, poolSize): print('Produce ...') for i in range(1, 100): while q.full(): print('sleep %d/%d ...' % (i, q.qsize())) time.sleep(1) q.put(i) for _ in range(0, poolSize): q.put(0) #用0通知結束 #消費 def Consume(q): print('Consume ...') while True: num = q.get() if 0 == num: #收到結束信號 print('receive 0') break print('Consumer ' + str(num)) time.sleep(2) print('Consumer end ' + str(num)) if __name__ == '__main__': #q = Queue(10) #不可用 q = Manager().Queue(10) #可用 poolSize = 4 producerProcess = Process(target=Produce, args=(q, poolSize)) #生產進程 consumerPool = Pool(processes=poolSize) #消費進程池,默認子進程個數爲os.cpu_count() for _ in range(0, poolSize): consumerPool.apply_async(func=Consume, args=(q,)) producerProcess.start() consumerPool.close() producerProcess.join() consumerPool.join()
#encoding=utf-8 #author: walker #date: 2017-03-14 #summary: 主進程生產,進程池消費 import os, sys, time from multiprocessing import Process, Pool, Queue, Manager #消費 def Consume(q): print('Consume ...') num = q.get() print('Consume %d ...' % num) time.sleep(2) print('Consumer %d over' % num) if __name__ == '__main__': #q = Queue(10) #不可用 q = Manager().Queue(10) #可用 pool = Pool(processes = 4) for i in range(1, 100): #生產 while q.full(): print('sleep %d ...' % q.qsize()) time.sleep(1) q.put(i) print(i) pool.apply_async(Consume, (q,)) pool.close() pool.join()
*** Updated 2016-01-06 ***web
一個好玩的例子:segmentfault
#encoding=utf-8 #author: walker #date: 2016-01-06 #summary: 一個多進程的好玩例子 import os, sys, time from multiprocessing import Pool cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__)) g_List = ['a'] #修改全局變量g_List def ModifyDict_1(): global g_List g_List.append('b') #修改全局變量g_List def ModifyDict_2(): global g_List g_List.append('c') #處理一個 def ProcOne(num): print('ProcOne ' + str(num) + ', g_List:' + repr(g_List)) #處理全部 def ProcAll(): pool = Pool(processes = 4) for i in range(1, 20): #ProcOne(i) #pool.apply(ProcOne, (i,)) pool.apply_async(ProcOne, (i,)) pool.close() pool.join() ModifyDict_1() #修改全局變量g_List if __name__ == '__main__': ModifyDict_2() #修改全局變量g_List print('In main g_List :' + repr(g_List)) ProcAll()
Windows7 下運行的結果:安全
λ python3 demo.py In main g_List :['a', 'b', 'c'] ProcOne 1, g_List:['a', 'b'] ProcOne 2, g_List:['a', 'b'] ProcOne 3, g_List:['a', 'b'] ProcOne 4, g_List:['a', 'b'] ProcOne 5, g_List:['a', 'b'] ProcOne 6, g_List:['a', 'b'] ProcOne 7, g_List:['a', 'b'] ProcOne 8, g_List:['a', 'b'] ProcOne 9, g_List:['a', 'b'] ProcOne 10, g_List:['a', 'b'] ProcOne 11, g_List:['a', 'b'] ProcOne 12, g_List:['a', 'b'] ProcOne 13, g_List:['a', 'b'] ProcOne 14, g_List:['a', 'b'] ProcOne 15, g_List:['a', 'b'] ProcOne 16, g_List:['a', 'b'] ProcOne 17, g_List:['a', 'b'] ProcOne 18, g_List:['a', 'b'] ProcOne 19, g_List:['a', 'b']
Ubuntu 14.04下運行的結果:
app
In main g_List :['a', 'b', 'c'] ProcOne 1, g_List:['a', 'b', 'c'] ProcOne 2, g_List:['a', 'b', 'c'] ProcOne 3, g_List:['a', 'b', 'c'] ProcOne 5, g_List:['a', 'b', 'c'] ProcOne 4, g_List:['a', 'b', 'c'] ProcOne 8, g_List:['a', 'b', 'c'] ProcOne 9, g_List:['a', 'b', 'c'] ProcOne 7, g_List:['a', 'b', 'c'] ProcOne 11, g_List:['a', 'b', 'c'] ProcOne 6, g_List:['a', 'b', 'c'] ProcOne 12, g_List:['a', 'b', 'c'] ProcOne 13, g_List:['a', 'b', 'c'] ProcOne 10, g_List:['a', 'b', 'c'] ProcOne 14, g_List:['a', 'b', 'c'] ProcOne 15, g_List:['a', 'b', 'c'] ProcOne 16, g_List:['a', 'b', 'c'] ProcOne 17, g_List:['a', 'b', 'c'] ProcOne 18, g_List:['a', 'b', 'c'] ProcOne 19, g_List:['a', 'b', 'c']
能夠看見Windows7下第二次修改沒有成功,而Ubuntu下修改爲功了。據uliweb做者limodou講,緣由是Windows下是重啓實現的子進程;Linux下是fork實現的。async
相關閱讀:ide
0、官方多進程文檔。
三、python的threading和multiprocessing模塊
四、python下使用ctypes獲取threading線程id
*** walker * 2014-05-21 ***