如何在Pool中使用Queue,Stack Overflow的回答,戳這裏html
其實吧官方文檔看一遍應該就大部分都懂了。python
須要注意的是:在使用多進程的時候,咱們的進程函數的傳入參數必須是pickle-able的,也就是參數必須能夠被pickle保存下來,multiprocessing.Queue對象不能傳遞給pool.apply_*()等函數,須要使用multiprocessing.Manager().Queue()產生的對象app
貼一下代碼async
# -*- coding: UTF-8 -*- from multiprocessing import Process, Pool, Queue, Manager, JoinableQueue import time import os res = [] def put_task(): msg = [] for i in xrange(50): time.sleep(0.1) msg.append(str(os.getpid())) return ','.join(msg) def collect_results(result): res.append(result) def take_task(queue): while 1: print(queue.get(True)) def task_put(name, que): for i in range(10): time.sleep(1) que.put("%d is done" % name) def task_take_queue(que, n): i = 0 while i < n: print(que.get(True)) i += 1 def consumer(input_q): while True: item = input_q.get(True) # 處理項目 print item # 此處替換爲有用的工做 # 發出信號通知任務完成 input_q.task_done() def producer(output_q): sequence = [1, 2, 3, 4] # range(5)[1:5] for item in sequence: # 將項目放入隊列 time.sleep(1) output_q.put(item) # 創建進程 def method_1(): pool = Pool() res = pool.map_async(put_task, range(5)) pool.close() pool.join() from pprint import pprint pprint(res.get()) def method_2(): pool = Pool() pool.apply_async(put_task, callback=collect_results) pool.apply_async(put_task, callback=collect_results) pool.apply_async(put_task, callback=collect_results) pool.close() pool.join() from pprint import pprint pprint(res) def method_3(): pool = Pool(processes=10) m = Manager() q = m.Queue() for i in range(5): pool.apply_async(task_put, (i, q)) pool.apply_async(task_take_queue, (q, 50)) pool.close() pool.join() def method_4(): q = JoinableQueue() # 運行使用者進程 cons_p = Process(target=consumer, args=(q,)) cons_p.daemon = True # 定義該進程爲後臺運行 True - When a process exits, it attempts to terminate all of its daemonic child processes. cons_p.start() # 生產項目,sequence表明要發送給使用者的項目序列 # 在時間中,這多是生成器的輸出或經過一些其餘方式生產出來 producer(q) # 等待全部項目被處理 q.join() if __name__ == '__main__': method_4()
1 import multiprocessing 2 import os 3 import time 4 5 6 def pool_init(q): 7 global queue # make queue global in workers 8 queue = q 9 10 11 def task(): 12 # can use `queue` here if you like 13 for i in range(5): 14 time.sleep(1) 15 queue.put(os.getpid()) 16 17 18 def take_task(): 19 while 1: 20 print(queue.get(True)) 21 22 23 def run(pool): 24 tasks = [] 25 tasks.append(pool.apply_async(take_task)) 26 for i in range(os.cpu_count()): 27 tasks.append(pool.apply_async(task)) 28 for t in tasks: 29 print(t.get(), ) 30 31 32 if __name__ == '__main__': 33 queue = multiprocessing.Queue() 34 pool = multiprocessing.Pool(initializer=pool_init, initargs=(queue,)) 35 run(pool) 36 pool.close() 37 pool.join()