當單線程性能不足時,咱們一般會使用多線程/多進程去加速運行。而這些代碼每每多得使人絕望,須要考慮:git
不只如此,若改成多進程或協程,代碼還要繼續修改。若多處使用並行,則這些代碼還會重複不少遍,很是痛苦。github
因而,咱們考慮將並行的全部邏輯封裝到一個模塊以內,向外部提供像串行執行同樣的編程體驗,還能完全解決上面所述的疑難問題。全部代碼不足180行。編程
GitHub地址:多線程
使用時很是簡潔:異步
def xprint(x): time.sleep(1) # mock a long time task yield x*x i=0 for item in multi_yield(xrange(100)),xprint, process_mode,3: i+=1 print(item) if i>10: break
上面的代碼會使用三個進程,並行地打印1-10的平方。當打印完10以後,進程自動回收釋放。就像串行程序同樣簡單。async
咱們一般會將任務分割爲不少個子塊,從而方便並行。所以能夠將任務抽象爲生成器。相似下面的操做,每一個seed都是任務的種子。編程語言
def get_generator(): for seed in 100: yield seed
任務自己的定義,則能夠經過一個接受種子的函數來實現:分佈式
def worker(seed): # some long time task return seed*seed # just example
那麼實現串行任務就像這樣:函數式編程
for seed in get_generator(n): print worker(seed)
進一步地,能夠將其抽象爲下面的函數:
def serial_yield(genenator,worker): for seed in generator(): yield worker(seed)
該函數經過傳入生成器函數(generator)和任務的定義(worker函數),便可再返回一個生成器。消費時:
for result in serial_yield(your_genenator, your_worker): print(result)
咱們看到,經過定義高階函數,serial_yield就像map函數,對seed進行加工後輸出。
考慮以下場景: boss負責分發任務到任務隊列,多個worker從任務隊列撈數據,處理完以後,再寫入結果隊列。主線程從結果隊列中取結果便可。
咱們定義以下幾種執行模式:
使用Python建立worker的代碼以下,func是任務的定義(是個函數)
def factory(func, args=None, name='task'): if args is None: args = () if mode == process_mode: return multiprocessing.Process(name=name, target=func, args=args) if mode == thread_mode: import threading t = threading.Thread(name=name, target=func, args=args) t.daemon = True return t if mode == async_mode: import gevent return gevent.spawn(func, *args)
建立隊列的代碼以下,注意seeds多是無窮流,所以須要限定隊列的長度,當入隊列發現隊列已滿時,則任務須要阻塞。
def queue_factory(size): if mode == process_mode: return multiprocessing.Queue(size) elif mode == thread_mode: return Queue(size) elif mode == async_mode: from gevent import queue return queue.Queue(size)
何時任務能夠終止? 咱們羅列以下幾種狀況:
對第一種狀況,咱們讓boss在seed消費完以後,在隊列裏放入多個Empty標誌,worker收到Empty以後,就會自動退出,下面是boss的實現邏輯:
def _boss(task_generator, task_queue, worker_count): for task in task_generator: task_queue.put(task) for i in range(worker_count): task_queue.put(Empty) print('worker boss finished')
再定義worker的邏輯:
def _worker(task_queue, result_queue, gene_func): import time try: while not stop_wrapper.is_stop(): if task_queue.empty(): time.sleep(0.01) continue task = task.get() if task == Empty: result_queue.put(Empty) break if task == Stop: break for item in gene_func(task): result_queue.put(item) print ('worker worker is stop') except Exception as e: logging.exception(e) print ('worker exception, quit')
簡單吧?可是這樣會有問題,這個後面再說,咱們把剩餘的代碼寫完。
再定義multi_yield的主要代碼。 代碼很是好理解,建立任務和結果隊列,再建立boss和worker線程(或進程/協程)並啓動,以後不停地從結果隊列裏取數據就能夠了。
def multi_yield(customer_func, mode=thread_mode, worker_count=1, generator=None, queue_size=10): workers = [] result_queue = queue_factory(queue_size) task_queue = queue_factory(queue_size) main = factory(_boss, args=(generator, task_queue, worker_count), name='_boss') for process_id in range(0, worker_count): name = 'worker_%s' % (process_id) p = factory(_worker, args=(task_queue, result_queue, customer_func), name=name) workers.append(p) main.start() for r in workers: r.start() count = 0 while not should_stop(): data = result_queue.get() if data is Empty: count += 1 if count == worker_count: break continue if data is Stop: break else: yield data
這樣從外部消費時,便可:
def xprint(x): time.sleep(1) yield x i=0 for item in multi_yield(xprint, process_mode,3,xrange(100)): i+=1 print(item) if i>10: break
這樣咱們就實現了一個與serial_yield
功能相似的multi_yield
。能夠定義多個worker,從隊列中領任務,而不需重複地建立和銷燬,更不須要線程池。固然,代碼不徹底,運行時可能出問題。但以上代碼已經說明了核心的功能。完整的代碼能夠在文末找到。
可是你也會發現很嚴重的問題:
最開始想到的,是經過在multi_yield
函數參數中添加一個返回bool的函數,這樣當外部break時,同時將該函數的返回值置爲True,內部檢測到該標誌位後強制退出。僞代碼以下:
_stop=False def can_stop(): return _stop for item in multi_yield(xprint, process_mode,3,xrange(100),can_stop): i+=1 print(item) if i>10: _stop=True break
但這樣並不優雅,引入了更多的函數做爲參數,還必須手工控制變量值,很是繁瑣。在多進程模式下,stop標誌位還如何解決?
咱們但願外部在循環時執行了break後,會自動通知內部的生成器。實現方法彷佛就是with語句,即contextmanager.
咱們實現如下的包裝類:
class Yielder(object): def __init__(self, dispose): self.dispose = dispose def __enter__(self): pass def __exit__(self, exc_type, exc_val, exc_tb): self.dispose()
它實現了with的原語,參數是dispose函數,做用是退出with代碼塊後的回收邏輯。
因爲值類型的標誌位沒法在多進程環境中傳遞,咱們再建立StopWrapper類,用於管理中止標誌和回收資源:
class Stop_Wrapper(): def __init__(self): self.stop_flag = False self.workers=[] def is_stop(self): return self.stop_flag def stop(self): self.stop_flag = True for process in self.workers: if isinstance(process,multiprocessing.Process): process.terminate()
最後的問題是,如何解決隊列滿或空時,put/get的無限等待問題呢?考慮包裝一下put/get:包裝在while True
之中,每隔兩秒get/put,這樣即便阻塞時,也能保證能夠檢查退出標誌位。全部線程在主線程結束後,最遲也能在2s內自動退出。
def safe_queue_get(queue, is_stop_func=None, timeout=2): while True: if is_stop_func is not None and is_stop_func(): return Stop try: data = queue.get(timeout=timeout) return data except: continue def safe_queue_put(queue, item, is_stop_func=None, timeout=2): while True: if is_stop_func is not None and is_stop_func(): return Stop try: queue.put(item, timeout=timeout) return item except: continue
如何使用呢?咱們只需在multi_yield的yield語句以外加上一行就能夠了:
with Yielder(stop_wrapper.stop): # create queue,boss,worker, then start all # ignore repeat code while not should_stop(): data = safe_queue_get(result_queue, should_stop) if data is Empty: count += 1 if count == worker_count: break continue if data is Stop: break else: yield data
仔細閱讀上面的代碼, 外部循環時退出循環,則會自動觸發stop_wrapper的stop操做,回收所有資源,而不需經過外部的標誌位傳遞!這樣調用方在心智徹底不需有額外的負擔。
實現生成器和上下文管理器的編程語言,均可以經過上述方式實現自動協程資源回收。筆者也實現了一個C#版本的,有興趣歡迎交流。
這樣,咱們就能像文章開頭那樣,實現並行的迭代器操做了。
完整代碼在:
https://github.com/ferventdesert/multi_yielder/blob/master/src/multi_yielder.py
一些實現的細節頗有趣,咱們藉助在函數中定義函數,能夠不用複雜的類去承擔職責,而僅僅只需函數。而相似的思想,在函數式編程中很是常見。
該工具已經被筆者的流式語言etlpy
所集成。可是依然有較多改進的空間,如沒有集成分佈式執行模式。
歡迎留言交流。