Python自定義進程池(生產者/消費者模型)

代碼說明一切:html

#encoding=utf-8
#author: walker
#date: 2014-05-21
#summary: 自定義進程池遍歷目錄下文件

from multiprocessing import Process, Queue, Lock
import time, os

#消費者
class Consumer(Process):
	def __init__(self, queue, ioLock):
		super(Consumer, self).__init__()
		self.queue = queue
		self.ioLock = ioLock
		
	def run(self):
		while True:
			task = self.queue.get()	#隊列中無任務時,會阻塞進程
			if isinstance(task, str) and task == 'quit':
				break;
			time.sleep(1)	#假定任務處理須要1秒鐘
			self.ioLock.acquire()
			print( str(os.getpid()) + '  ' + task)
			self.ioLock.release()
		self.ioLock.acquire()
		print 'Bye-bye'
		self.ioLock.release()

#生產者		
def Producer():
	queue = Queue()    #這個隊列是進程/線程安全的
	ioLock = Lock()
	subNum = 4	#子進程數量
	workers = build_worker_pool(queue, ioLock, subNum)
	start_time = time.time()
	
	for parent, dirnames, filenames in os.walk(r'D:\test'):
		for filename in filenames:
			queue.put(filename)
			ioLock.acquire()	
			print('qsize:' + str(queue.qsize()))
			ioLock.release()
			while queue.qsize() > subNum * 10:	#控制隊列中任務數量
				time.sleep(1)
			
	for worker in workers:
		queue.put('quit')
		
	for worker in workers:
		worker.join()
	
	ioLock.acquire()	
	print('Done! Time taken: {}'.format(time.time() - start_time))
	ioLock.release()

#建立進程池
def build_worker_pool(queue, ioLock, size):
	workers = []
	for _ in range(size):
		worker = Consumer(queue, ioLock)
		worker.start()
		workers.append(worker)
	return workers
	
if __name__ == '__main__':
	Producer()

ps:python

self.ioLock.acquire()	
...
self.ioLock.release()

可用git

with self.ioLock:
    ...

替代。github


#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 一個子進程生產,一個子進程消費
 
import os, sys, time
from multiprocessing import Process, Pool, Queue, Manager

#生產
def Produce(q):
	print('Produce %d ...' % os.getpid())
	for i in range(1, 20):
		while q.full():
			print('sleep %d/%d ...' % (i, q.qsize()))
			time.sleep(1)
		q.put(i)
		
	q.put(0)		#用0通知結束

#消費
def Consume(q):
	print('Consume %d ...' % os.getpid())
	while True:
		num = q.get()
		if 0 == num:	#收到結束信號
			print('receive 0')
			break
		print('Consumer ' + str(num))
		time.sleep(2)
		print('Consumer end ' + str(num))
 
if __name__ == '__main__': 	
	q = Queue(10)				#可用
	q = Manager().Queue(10)		#可用
	
	print(os.getpid())
	
	producerProcess = Process(target=Produce, args=(q,))		#生產進程
	consumerProcess = Process(target=Consume, args=(q,))		#消費進程
	
	producerProcess.start()
	consumerProcess.start()
	
	producerProcess.join()
	consumerProcess.join()

#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 一個子進程生產,進程池消費
 
import os, sys, time
from multiprocessing import Process, Pool, Queue, Manager

#生產
def Produce(q, poolSize):
	print('Produce ...')
	for i in range(1, 100):
		while q.full():
			print('sleep %d/%d ...' % (i, q.qsize()))
			time.sleep(1)
		q.put(i)
		
	for _ in range(0, poolSize):
		q.put(0)		#用0通知結束

#消費
def Consume(q):
	print('Consume ...')
	while True:
		num = q.get()
		if 0 == num:	#收到結束信號
			print('receive 0')
			break
		print('Consumer ' + str(num))
		time.sleep(2)
		print('Consumer end ' + str(num))
 
if __name__ == '__main__': 	
	#q = Queue(10)				#不可用
	q = Manager().Queue(10)		#可用
	
	poolSize = 4
	producerProcess = Process(target=Produce, args=(q, poolSize))		#生產進程
	consumerPool = Pool(processes=poolSize)	#消費進程池,默認子進程個數爲os.cpu_count()
	for _ in range(0, poolSize):
		consumerPool.apply_async(func=Consume, args=(q,))
	
	producerProcess.start()
	consumerPool.close()
	
	producerProcess.join()
	consumerPool.join()	

#encoding=utf-8
#author: walker
#date: 2017-03-14
#summary: 主進程生產,進程池消費
 
import os, sys, time
from multiprocessing import Process, Pool, Queue, Manager

#消費
def Consume(q):
	print('Consume ...')
	num = q.get()
	print('Consume %d ...' % num)
	time.sleep(2)
	print('Consumer %d over' % num)	
	 
if __name__ == '__main__':	 
	#q = Queue(10)				#不可用
	q = Manager().Queue(10)		#可用
	
	pool = Pool(processes = 4)
	for i in range(1, 100):		#生產
		while q.full():
			print('sleep %d ...' % q.qsize())
			time.sleep(1)
		q.put(i)
		print(i)
		pool.apply_async(Consume, (q,))
		 
	pool.close()
	pool.join()

*** Updated 2016-01-06 ***web

一個好玩的例子:segmentfault

#encoding=utf-8
#author: walker
#date: 2016-01-06
#summary: 一個多進程的好玩例子

import os, sys, time
from multiprocessing import Pool

cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__))

g_List = ['a']

#修改全局變量g_List
def ModifyDict_1():
    global g_List
    g_List.append('b')

#修改全局變量g_List    
def ModifyDict_2():
    global g_List
    g_List.append('c')
 
#處理一個
def ProcOne(num):
    print('ProcOne ' + str(num) + ', g_List:' + repr(g_List))

#處理全部
def ProcAll():    
    pool = Pool(processes = 4)
    for i in range(1, 20):
        #ProcOne(i)
        #pool.apply(ProcOne, (i,))
        pool.apply_async(ProcOne, (i,))
        
    pool.close()
    pool.join()  
    
ModifyDict_1()  #修改全局變量g_List
    
if __name__ == '__main__':
    ModifyDict_2()  #修改全局變量g_List
    
    print('In main g_List :' + repr(g_List))
    
    ProcAll()

Windows7 下運行的結果:安全

λ python3 demo.py
In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b']
ProcOne 2, g_List:['a', 'b']
ProcOne 3, g_List:['a', 'b']
ProcOne 4, g_List:['a', 'b']
ProcOne 5, g_List:['a', 'b']
ProcOne 6, g_List:['a', 'b']
ProcOne 7, g_List:['a', 'b']
ProcOne 8, g_List:['a', 'b']
ProcOne 9, g_List:['a', 'b']
ProcOne 10, g_List:['a', 'b']
ProcOne 11, g_List:['a', 'b']
ProcOne 12, g_List:['a', 'b']
ProcOne 13, g_List:['a', 'b']
ProcOne 14, g_List:['a', 'b']
ProcOne 15, g_List:['a', 'b']
ProcOne 16, g_List:['a', 'b']
ProcOne 17, g_List:['a', 'b']
ProcOne 18, g_List:['a', 'b']
ProcOne 19, g_List:['a', 'b']

Ubuntu 14.04下運行的結果:
app

In main g_List :['a', 'b', 'c']
ProcOne 1, g_List:['a', 'b', 'c']
ProcOne 2, g_List:['a', 'b', 'c']
ProcOne 3, g_List:['a', 'b', 'c']
ProcOne 5, g_List:['a', 'b', 'c']
ProcOne 4, g_List:['a', 'b', 'c']
ProcOne 8, g_List:['a', 'b', 'c']
ProcOne 9, g_List:['a', 'b', 'c']
ProcOne 7, g_List:['a', 'b', 'c']
ProcOne 11, g_List:['a', 'b', 'c']
ProcOne 6, g_List:['a', 'b', 'c']
ProcOne 12, g_List:['a', 'b', 'c']
ProcOne 13, g_List:['a', 'b', 'c']
ProcOne 10, g_List:['a', 'b', 'c']
ProcOne 14, g_List:['a', 'b', 'c']
ProcOne 15, g_List:['a', 'b', 'c']
ProcOne 16, g_List:['a', 'b', 'c']
ProcOne 17, g_List:['a', 'b', 'c']
ProcOne 18, g_List:['a', 'b', 'c']
ProcOne 19, g_List:['a', 'b', 'c']

  能夠看見Windows7下第二次修改沒有成功,而Ubuntu下修改爲功了。據uliweb做者limodou講,緣由是Windows下是重啓實現的子進程;Linux下是fork實現的。async


相關閱讀:ide

0、官方多進程文檔

一、Python 並行任務技巧

二、python中的多進程處理

三、python的threading和multiprocessing模塊

四、python下使用ctypes獲取threading線程id 


*** walker * 2014-05-21 ***

相關文章
相關標籤/搜索