原文發在這裏的 Python concurrent.future 使用教程及源碼初剖html
好久沒寫博客了,想了想不能再划水,因而給本身定了一個目標,寫點 concurrent.future 的內容,因而這篇文章就是來聊聊 Python 3.2 中新增的 concurrent.future 模塊。java
有一個 Python 開發工程師小明,在面試過程當中,忽然接到這樣一個需求:去請求幾個網站,拿到他們的數據,小明定睛一想,簡單啊,噼裏啪啦,他寫了以下的代碼python
import multiprocessing
import time
def request_url(query_url: str):
time.sleep(3) # 請求處理邏輯
if __name__ == '__main__':
url_list = ["abc.com", "xyz.com"]
task_list = [multiprocessing.Process(target=request_url, args=(url,)) for url in url_list]
[task.start() for task in task_list]
[task.join() for task in task_list]複製代碼
Easy, 好了,如今新的需求來了,咱們想獲取每個請求結果,怎麼辦?小明想了想,又寫出以下的代碼git
import multiprocessing
import time
def request_url(query_url: str, result_dict: dict):
time.sleep(3) # 請求處理邏輯
result_dict[query_url] = {} # 返回結果
if __name__ == '__main__':
process_manager = multiprocessing.Manager()
result_dict = process_manager.dict()
url_list = ["abc.com", "xyz.com"]
task_list = [multiprocessing.Process(target=request_url, args=(url, result_dict)) for url in url_list]
[task.start() for task in task_list]
[task.join() for task in task_list]
print(result_dict)複製代碼
好了,面試官說,恩看起來不錯,好了,我再改改題目,首先,咱們不能阻塞主進程,主進程須要根據任務當前的狀態(結束/未結束)來及時的獲取對應的結果,怎麼改?,小明想了想,要不,咱們直接用信號量,讓任務完成後,向父進程發送一個信號量?而後直接暴力出奇跡?還有更簡單的方法麼?貌似沒了?最後面試官心理說了一句 naive ,臉上笑而不語,讓小明回去慢慢等消息。github
從小明的窘境,咱們能夠看出一個這樣的問題,就是咱們最經常使用的 multiprocessing
或者是 threding
兩個模塊,對於咱們想實現異步任務的場景來講,其實略有一點不友好的,咱們每每須要作一些額外的工做,才能比較乾淨的實現一些異步的需求。爲了解決這樣的窘境,09 年 10 月,Brian Quinlan 先生提出了 PEP 3148 ,在這個提案中,他提出將咱們經常使用的 multiprocessing
和 threding
模塊進行進一步封裝,達成較好的支持異步操做的目的。最終這個提案在 Python 3.2 中被引入。也就是咱們今天要聊聊的 concurrent.future 。面試
在咱們正式開始聊新模塊以前,咱們須要去了解關於 Future
模式的相關姿式編程
首先 Future
模式,是什麼?併發
Future
實際上是生產-消費者模型的一種擴展,在生產-消費者模型中,生產者不關心消費者何時處理完數據,也不關心消費者處理的結果。好比咱們常常寫出以下的代碼app
import multiprocessing, Queue
import os
import time
from multiprocessing import Process
from time import sleep
from random import randint
class Producer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
self.queue.put('one product')
print(multiprocessing.current_process().name + str(os.getpid()) + ' produced one product, the no of queue now is: %d' %self.queue.qsize())
sleep(randint(1, 3))
class Consumer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
d = self.queue.get(1)
if d != None:
print(multiprocessing.current_process().name + str(os.getpid()) + ' consumed %s, the no of queue now is: %d' %(d,self.queue.qsize()))
sleep(randint(1, 4))
continue
else:
break
#create queue
queue = multiprocessing.Queue(40)
if __name__ == "__main__":
print('Excited!")
#create processes
processed = []
for i in range(3):
processed.append(Producer(queue))
processed.append(Consumer(queue))
#start processes
for i in range(len(processed)):
processed[i].start()
#join processes
for i in range(len(processed)):
processed[i].join()複製代碼
這就是生產-消費者模型的一個簡單的實現,咱們利用一個 multiprocessing
中的 Queue
來做爲通訊渠道,咱們的生產者負責往隊列中傳入數據,消費者負責從隊列中獲取數據並處理。不過就如同上面所說的同樣,在這種模式中,生產者並不關心消費者什麼時候處理完數據,也不關心處理的結果。而在 Future
中,咱們可讓生產者等待消息處理完成,若是須要的話,咱們還能夠獲取相關的計算結果。dom
好比,你們能夠看看下面這樣一段 Java 代碼
package concurrent;
import java.util.concurrent.Callable;
public class DataProcessThread implements Callable<String> {
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
Thread.sleep(10000);//模擬數據處理
return "數據返回";
}
}複製代碼
這是咱們負責處理數據的代碼。
package concurrent;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
public class MainThread {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// TODO Auto-generated method stub
DataProcessThread dataProcessThread = new DataProcessThread();
FutureTask<String> future = new FutureTask<String>(dataProcessThread);
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(future);
Thread.sleep(10000);//模擬繼續處理自身其餘業務
while (true) {
if (future.isDone()) {
System.out.println(future.get());
break;
}
}
executor.shutdown();
}
}複製代碼
這是咱們主線程,你們能夠看到,咱們能夠很方便的獲取數據處理任務的狀態。同時獲取相關的結果。
前面說了,在 Python 3.2 之後,concurrent.futures 是內置的模塊,咱們能夠直接使用
Note: 若是你須要在 Python 2.7 中使用 concurrent.futures , 那麼請用 pip 進行安裝,
pip install futures
好了,準備就緒後,咱們來看看怎麼使用這個東西呢
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
if __name__ == '__main__':
pool = ProcessPoolExecutor(max_workers=2) # 建立一個最大可容納2個task的進程池
future1 = pool.submit(return_future_result, ("hello")) # 往進程池裏面加入一個task
future2 = pool.submit(return_future_result, ("world")) # 往進程池裏面加入一個task
print(future1.done()) # 判斷task1是否結束
time.sleep(3)
print(future2.done()) # 判斷task2是否結束
print(future1.result()) # 查看task1返回的結果
print(future2.result()) # 查看task2返回的結果複製代碼
首先 from concurrent.futures import ProcessPoolExecutor
從 concurrent.futures
引入 ProcessPoolExecutor
做爲咱們的進程池,處理咱們後面的數據。(在 concurrent.futures
中,爲咱們提供了兩種 Executor
,一種是咱們如今用的 ProcessPoolExecutor
, 一種是 ThreadPoolExecutor
他們對外暴露的方法一致,你們能夠根據本身的實際需求選用。)
緊接着,初始化一個最大容量爲 2 的進程池。而後咱們調用進程池中的 submit
方法提交一個任務。好了有意思的點來了,咱們在調用 submit
方法後,獲得了一個特殊的變量,這個變量是 Future
類的實例,表明着一個在將來完成的操做。換句話說,當 submit
返回 Future
實例的時候,咱們的任務可能尚未完成,咱們能夠經過調用 Future
實例中的 done
方法來獲取當前任務的運行狀態,若是任務結束後,咱們能夠經過 result
方法來獲取返回的結果。若是在執行後續的邏輯時,咱們由於一些緣由想要取消任務時,咱們能夠經過調用 cancel
方法來取消當前的任務。
如今新的問題來了,咱們若是想要提交不少個任務應該怎麼辦呢?concurrent.future
爲咱們提供了 map
方法來方便咱們批量添加任務。
import concurrent.futures
import requests
task_url = [('http://www.baidu.com', 40), ('http://example.com/', 40), ('https://www.github.com/', 40)]
def load_url(params: tuple):
return requests.get(params[0], timeout=params[1]).text
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
for url, data in zip(task_url, executor.map(load_url, task_url)):
print('%r page is %d bytes' % (url, len(data)))複製代碼
恩,concurrent.future
中線程/進程池所提供的 map
方法和標準庫中的 map
函數使用方法同樣。
前面講了怎麼使用 concurrent.futures
後,咱們都比較好奇,concurrent.futures
是怎麼實現 Future
模式的。裏面是怎麼將任務和結果進行關聯的。咱們如今開始從 submit
方法着手來簡單看一下 ProcessPoolExecutor
的實現。
首先,在初始化 ProcessPoolExecutor
時,它的 __init__
方法中進行了一些關鍵變量的初始化操做。
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""Initializes a new ProcessPoolExecutor instance. Args: max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. """
_check_system_limits()
if max_workers is None:
self._max_workers = os.cpu_count() or 1
else:
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
self._max_workers = max_workers
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
self._result_queue = SimpleQueue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
# Map of pids to processes
self._processes = {}
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
self._broken = False
self._queue_count = 0
self._pending_work_items = {}複製代碼
好了,咱們來看看咱們今天的入口 submit
方法
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool('A child process terminated '
'abruptly, the process pool is not usable anymore')
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
# Wake up queue management thread
self._result_queue.put(None)
self._start_queue_management_thread()
return f複製代碼
首先,傳入的參數 fn
是咱們的處理函數,args
以及 kwargs
是咱們要傳遞 fn
函數的參數。在 submit
函數最開始,首先根據 _broken
和 _shutdown_thread
的值來判斷當前進程池中處理進程的狀態以及目前進程池的狀態。若是處理進程忽然被銷燬或者進程池已經被關閉,那麼將拋出異常代表目前再也不接受新的 submit
操做。
若是前面狀態沒有問題,首先,實例化 Future
類,而後將這個實例和處理函數和相關參數一塊兒,做爲參數來實例化 _WorkItem
類,而後將實例 w 做爲 value ,_queue_count
做爲 key 存入 _pending_work_items
中。而後調用 _start_queue_management_thread
方法開啓進程池中的管理線程。如今來看看這部分代碼
def _start_queue_management_thread(self):
# When the executor gets lost, the weakref callback will wake up
# the queue management thread.
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None:
# Start the processes so that their sentinels are known.
self._adjust_process_count()
self._queue_management_thread = threading.Thread(
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue複製代碼
這一部分很簡單,首先運行 _adjust_process_count
方法,而後開啓一個守護線程,運行 _queue_management_worker
方法。咱們首先來看看 _adjust_process_count
方法。
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue))
p.start()
self._processes[p.pid] = p複製代碼
根據在 __init__
方法中設定的 _max_workers
來開啓對應數量的進程,在進程中運行 _process_worker
函數。
恩,順藤摸瓜,咱們先來看看 _process_worker
函數吧?
def _process_worker(call_queue, result_queue):
"""Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. Args: call_queue: A multiprocessing.Queue of _CallItems that will be read and evaluated by the worker. result_queue: A multiprocessing.Queue of _ResultItems that will written to by the worker. shutdown: A multiprocessing.Event that will be set as a signal to the worker that it should exit when call_queue is empty. """
while True:
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(os.getpid())
return
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(call_item.work_id, exception=exc))
else:
result_queue.put(_ResultItem(call_item.work_id,
result=r))複製代碼
首先,這裏搞了一個死循環,緊接着,咱們從 call_queue
隊列中獲取一個 _WorkItem
實例,而後若是獲取的值爲 None
的話,那麼證實沒有新的任務進來了,咱們能夠把當前進程的 pid
放入結果隊列中。而後結束進程。
若是收到了任務,那麼執行這個任務。不論是在執行過程當中發生異常,亦或者是獲得最終的結果,都將其封裝爲 _ResultItem
實例,並將其放入結果隊列中。
好了,咱們回到剛剛看了一半的 _start_queue_management_thread
函數,
def _start_queue_management_thread(self):
# When the executor gets lost, the weakref callback will wake up
# the queue management thread.
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None:
# Start the processes so that their sentinels are known.
self._adjust_process_count()
self._queue_management_thread = threading.Thread(
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue複製代碼
在執行完 _adjust_process_count
函數後,咱們進程池中的 _processes
變量(它是一個 dict )便關聯了一些處理進程。而後咱們開啓一個後臺守護線程,來執行 _queue_management_worker
函數,咱們給它傳了幾個變量,首先 _processes
是咱們的進程映射,_pending_work_items
中存放着咱們待處理任務,還有 _call_queue
和 _result_queue
。好了還有一個參數你們可能不太理解,就是 weakref.ref(self, weakref_cb)
這貨。
首先,Python 是一門具備垃圾回收機制的語言,有着 GC (Garbage Collection) 機制意味着咱們在大多數時候,不太須要去關注內存的分配與回收。在 Python 中,何時對象會被回收是由其引用計數所決定的。當引用計數爲 0 的時候,這個對象會被回收。在有一些狀況下,咱們對象由於交叉引用或者其他的一些緣由,形成引用計數始終不爲0,這意味着這個對象沒法被回收。形成內存泄露
。所以區別於咱們普通的引用,Python 中新增了一個引用機制叫作弱引用,弱引用的意義在於,某個變量持有一個對象,卻不會增長這個對象的引用計數。所以 weakref.ref(self, weakref_cb)
在大多數而言,等價於 self
(至於這裏爲何要使用弱引用,咱們這裏先不講,會開一個單章來講)
好了,這一部分代碼看完,咱們來看看,_queue_management_worker
怎麼實現的
def _queue_management_worker(executor_reference, processes, pending_work_items, work_ids_queue, call_queue, result_queue):
"""Manages the communication between this process and the worker processes. This function is run in a local thread. executor_reference: A weakref.ref to the ProcessPoolExecutor that owns Args: process: A list of the multiprocessing.Process instances used as this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). call_queue: A multiprocessing.Queue that will be filled with _CallItems derived from _WorkItems for processing by the process workers. result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. """
executor = None
def shutting_down():
return _shutdown or executor is None or executor._shutdown_thread
def shutdown_worker():
# This is an upper bound
nb_children_alive = sum(p.is_alive() for p in processes.values())
for i in range(0, nb_children_alive):
call_queue.put_nowait(None)
# Release the queue's resources as soon as possible.
call_queue.close()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS X.
for p in processes.values():
p.join()
reader = result_queue._reader
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)
if reader in ready:
result_item = reader.recv()
else:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
executor = None
# All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"A process in the process pool was "
"terminated abruptly while the future was "
"running or pending."
))
# Delete references to object. See issue16284
del work_item
pending_work_items.clear()
# Terminate remaining workers forcibly: the queues or their
# locks may be in a dirty state and block forever.
for p in processes.values():
p.terminate()
shutdown_worker()
return
if isinstance(result_item, int):
# Clean shutdown of a worker using its PID
# (avoids marking the executor broken)
assert shutting_down()
p = processes.pop(result_item)
p.join()
if not processes:
shutdown_worker()
return
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
if work_item is not None:
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# Delete references to object. See issue16284
del work_item
# Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
if shutting_down():
try:
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
shutdown_worker()
return
except Full:
# This is not a problem: we will eventually be woken up (in
# result_queue.get()) and be able to send a sentinel again.
pass
executor = None複製代碼
熟悉的大循環,循環的第一步,利用 _add_call_item_to_queue
函數來將等待隊列中的任務加入到調用隊列中去,先來看看這一部分代碼
def _add_call_item_to_queue(pending_work_items, work_ids, call_queue):
"""Fills call_queue with _WorkItems from pending_work_items. This function never blocks. Args: pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids are consumed and the corresponding _WorkItems from pending_work_items are transformed into _CallItems and put in call_queue. call_queue: A multiprocessing.Queue that will be filled with _CallItems derived from _WorkItems. """
while True:
if call_queue.full():
return
try:
work_id = work_ids.get(block=False)
except queue.Empty:
return
else:
work_item = pending_work_items[work_id]
if work_item.future.set_running_or_notify_cancel():
call_queue.put(_CallItem(work_id,
work_item.fn,
work_item.args,
work_item.kwargs),
block=True)
else:
del pending_work_items[work_id]
continue複製代碼
首先,判斷調用隊列是否是已經滿了,若是滿了,則放棄此次循環。緊接着從 work_id
隊列中取出,而後從等待任務中取出對應的 _WorkItem
實例。緊接着,調用實例中綁定的 Future
實例的 set_running_or_notify_cancel
方法來設置任務的狀態,緊接着將其扔入調用隊列中。
def set_running_or_notify_cancel(self):
"""Mark the future as running or process any cancel notifications. Should only be used by Executor implementations and unit tests. If the future has been cancelled (cancel() was called and returned True) then any threads waiting on the future completing (though calls to as_completed() or wait()) are notified and False is returned. If the future was not cancelled then it is put in the running state (future calls to running() will return True) and True is returned. This method should be called by Executor implementations before executing the work associated with this future. If this method returns False then the work should not be executed. Returns: False if the Future was cancelled, True otherwise. Raises: RuntimeError: if this method was already called or if set_result() or set_exception() was called. """
with self._condition:
if self._state == CANCELLED:
self._state = CANCELLED_AND_NOTIFIED
for waiter in self._waiters:
waiter.add_cancelled(self)
# self._condition.notify_all() is not necessary because
# self.cancel() triggers a notification.
return False
elif self._state == PENDING:
self._state = RUNNING
return True
else:
LOGGER.critical('Future %s in unexpected state: %s',
id(self),
self._state)
raise RuntimeError('Future in unexpected state')複製代碼
這一部份內容很簡單,檢查當前實例若是處於等待狀態,就返回 True ,若是處於被取消的狀態,就返回 False , 在 _add_call_item_to_queue
函數中,會將已經處於 cancel
狀態的 _WorkItem
從等待任務中移除。
好了,咱們繼續回到 _queue_management_worker
函數中去,
def _queue_management_worker(executor_reference, processes, pending_work_items, work_ids_queue, call_queue, result_queue):
"""Manages the communication between this process and the worker processes. This function is run in a local thread. executor_reference: A weakref.ref to the ProcessPoolExecutor that owns Args: process: A list of the multiprocessing.Process instances used as this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). call_queue: A multiprocessing.Queue that will be filled with _CallItems derived from _WorkItems for processing by the process workers. result_queue: A multiprocessing.Queue of _ResultItems generated by the process workers. """
executor = None
def shutting_down():
return _shutdown or executor is None or executor._shutdown_thread
def shutdown_worker():
# This is an upper bound
nb_children_alive = sum(p.is_alive() for p in processes.values())
for i in range(0, nb_children_alive):
call_queue.put_nowait(None)
# Release the queue's resources as soon as possible.
call_queue.close()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS X.
for p in processes.values():
p.join()
reader = result_queue._reader
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)
if reader in ready:
result_item = reader.recv()
else:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
executor = None
# All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"A process in the process pool was "
"terminated abruptly while the future was "
"running or pending."
))
# Delete references to object. See issue16284
del work_item
pending_work_items.clear()
# Terminate remaining workers forcibly: the queues or their
# locks may be in a dirty state and block forever.
for p in processes.values():
p.terminate()
shutdown_worker()
return
if isinstance(result_item, int):
# Clean shutdown of a worker using its PID
# (avoids marking the executor broken)
assert shutting_down()
p = processes.pop(result_item)
p.join()
if not processes:
shutdown_worker()
return
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
if work_item is not None:
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# Delete references to object. See issue16284
del work_item
# Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
if shutting_down():
try:
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
shutdown_worker()
return
except Full:
# This is not a problem: we will eventually be woken up (in
# result_queue.get()) and be able to send a sentinel again.
pass
executor = None複製代碼
result_item
變量
咱們看看
首先,你們可能在這裏有點疑問了
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)複製代碼
這個 wait
是什麼鬼啊,reader
又是什麼鬼啊。一步步來。首先,咱們看到,前面,reader = result_queue._reader
也會引發你們的疑問,這裏咱們 result_queue
是 multiprocess
裏面的 SimpleQueue
啊,它沒有 _reader
方法啊QAQ
class SimpleQueue(object):
def __init__(self, *, ctx):
self._reader, self._writer = connection.Pipe(duplex=False)
self._rlock = ctx.Lock()
self._poll = self._reader.poll
if sys.platform == 'win32':
self._wlock = None
else:
self._wlock = ctx.Lock()複製代碼
上面這貼出來的,是 SimpleQueue
的部分代碼,咱們能夠很清楚的看到,SimpleQueue
本質是利用一個 Pipe
來進行進程間通訊的,而後 _reader
是讀取 Pipe
的一個變量。
Note : 你們能夠複習下其他幾種進程間通訊的方法了
好了,這一部分看懂後,咱們來看看 wait
方法吧。
def wait(object_list, timeout=None):
''' Wait till an object in object_list is ready/readable. Returns list of those objects in object_list which are ready/readable. '''
with _WaitSelector() as selector:
for obj in object_list:
selector.register(obj, selectors.EVENT_READ)
if timeout is not None:
deadline = time.time() + timeout
while True:
ready = selector.select(timeout)
if ready:
return [key.fileobj for (key, events) in ready]
else:
if timeout is not None:
timeout = deadline - time.time()
if timeout < 0:
return ready複製代碼
這一部分代碼很簡單,首先將咱們待讀取的對象,進行一次註冊,而後當 timeout
爲 None 的時候,就一直等待到有對象讀取數據成功爲止
好了,咱們繼續回到前面的 _queue_management_worker
函數中去,來看看這樣一段代碼
ready = wait([reader] + sentinels)
if reader in ready:
result_item = reader.recv()
else:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
executor = None
# All futures in flight must be marked failed
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"A process in the process pool was "
"terminated abruptly while the future was "
"running or pending."
))
# Delete references to object. See issue16284
del work_item
pending_work_items.clear()
# Terminate remaining workers forcibly: the queues or their
# locks may be in a dirty state and block forever.
for p in processes.values():
p.terminate()
shutdown_worker()
return複製代碼
咱們用 wait
函數來讀取一系列對象,由於咱們沒有設置 Timeout
,因此當咱們拿到可讀取對象的結果時,若是 result_queue._reader
沒有在列表中,那麼意味着,有處理進程忽然異常關閉了,這個時候,咱們開始執行後面的語句來執行目前進程池的關閉操做。若是在列表中,咱們讀取數據,獲得 result_item
變量
咱們再看看下面的代碼
if isinstance(result_item, int):
# Clean shutdown of a worker using its PID
# (avoids marking the executor broken)
assert shutting_down()
p = processes.pop(result_item)
p.join()
if not processes:
shutdown_worker()
return
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
# work_item can be None if another process terminated (see above)
if work_item is not None:
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# Delete references to object. See issue16284
del work_item複製代碼
首先,若是 result_item
變量是 int 類型的話,不知道你們還記不記得在 _process_worker
函數中有這樣一段邏輯
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(os.getpid())
return複製代碼
當調用隊列中沒有新的任務時,將進程 pid
放入 result_queue
中。那麼咱們 result_item
若是值爲 int
那麼意味着,咱們以前任務處理工做已經完畢,因而開始清理,關閉咱們的進程池。
若是 result_item
既不爲 int
也不爲 None
, 那麼必然是 _ResultItem
的實例,咱們根據 work_id
取出 _WorkItem
實例,並將產生的異常或者值和 _WorkItem
實例中的 Future
實例(也就是咱們 submit 後返回的那貨)進行綁定。
最後,刪除這個 work_item
,完事兒,手工
洋洋灑灑寫了一大篇辣雞文章,但願你們不要介意,其實咱們能看到 concurrent.future
的實現,其實並無用什麼高深的黑魔法,可是其中細節值得咱們一一品味,因此這篇文章咱們先寫到這裏。後面有機會的話,咱們再去看看 concurrent.future
其他部分代碼。也有蠻多值得品味的地方。