如何優雅地實現Python通用多線程/進程並行模塊

當單線程性能不足時,咱們一般會使用多線程/多進程去加速運行。而這些代碼每每多得使人絕望,須要考慮:git

  • 如何建立線程執行的函數?
  • 如何收集結果?若但願結果從子線程返回主線程,則還要使用隊列
  • 如何取消執行? 直接kill掉全部線程?信號如何傳遞?
  • 是否須要線程池? 不然反覆建立線程的成本太高了

不只如此,若改成多進程或協程,代碼還要繼續修改。若多處使用並行,則這些代碼還會重複不少遍,很是痛苦。github

因而,咱們考慮將並行的全部邏輯封裝到一個模塊以內,向外部提供像串行執行同樣的編程體驗,還能完全解決上面所述的疑難問題。全部代碼不足180行。編程

GitHub地址:多線程

https://github.com/ferventdesert/multi_yielderapp

使用時很是簡潔:異步

def xprint(x): 
    time.sleep(1)  # mock a long time task 
    yield x*x   
i=0
for item in multi_yield(xrange(100)),xprint, process_mode,3:
    i+=1
    print(item)
    if i>10:
        break

上面的代碼會使用三個進程,並行地打印1-10的平方。當打印完10以後,進程自動回收釋放。就像串行程序同樣簡單。async

1. 先實現串行任務

咱們一般會將任務分割爲不少個子塊,從而方便並行。所以能夠將任務抽象爲生成器。相似下面的操做,每一個seed都是任務的種子。編程語言

def get_generator():
    for seed in 100:
        yield seed

任務自己的定義,則能夠經過一個接受種子的函數來實現:分佈式

def worker(seed):
    # some long time task
    return seed*seed # just example

那麼實現串行任務就像這樣:函數式編程

for seed in get_generator(n):
    print worker(seed)

進一步地,能夠將其抽象爲下面的函數:

def serial_yield(genenator,worker):
    for seed in generator():
        yield worker(seed)

該函數經過傳入生成器函數(generator)和任務的定義(worker函數),便可再返回一個生成器。消費時:

for result in serial_yield(your_genenator, your_worker):
    print(result)

咱們看到,經過定義高階函數,serial_yield就像map函數,對seed進行加工後輸出。

2. 定義並行任務

考慮以下場景: boss負責分發任務到任務隊列,多個worker從任務隊列撈數據,處理完以後,再寫入結果隊列。主線程從結果隊列中取結果便可。

咱們定義以下幾種執行模式:

  • async: 異步/多協程
  • thread: 多線程
  • process: 多進程

使用Python建立worker的代碼以下,func是任務的定義(是個函數)

def factory(func, args=None, name='task'):
        if args is None:
            args = ()
        if mode == process_mode:
            return multiprocessing.Process(name=name, target=func, args=args)
        if mode == thread_mode:
            import threading
            t = threading.Thread(name=name, target=func, args=args)
            t.daemon = True
            return t
        if mode == async_mode:
            import gevent
            return gevent.spawn(func, *args)

建立隊列的代碼以下,注意seeds多是無窮流,所以須要限定隊列的長度,當入隊列發現隊列已滿時,則任務須要阻塞。

def queue_factory(size):
        if mode == process_mode:
            return multiprocessing.Queue(size)
        elif mode == thread_mode:
            return Queue(size)
        elif mode == async_mode:
            from gevent import queue
            return queue.Queue(size)

何時任務能夠終止? 咱們羅列以下幾種狀況:

  • 全部的seed都已經被消費完了
  • 外部傳入告終束請求

對第一種狀況,咱們讓boss在seed消費完以後,在隊列裏放入多個Empty標誌,worker收到Empty以後,就會自動退出,下面是boss的實現邏輯:

def _boss(task_generator, task_queue, worker_count):
        for task in task_generator:
            task_queue.put(task)
        for i in range(worker_count):
            task_queue.put(Empty)
        print('worker boss finished')

再定義worker的邏輯:

def _worker(task_queue, result_queue, gene_func):
        import time
        try:
            while not stop_wrapper.is_stop():
                if task_queue.empty():
                    time.sleep(0.01)
                    continue
                task = task.get()
                if task == Empty:
                    result_queue.put(Empty)
                    break
                if task == Stop:
                    break
                for item in gene_func(task):
                    result_queue.put(item)
            print ('worker worker is stop')
        except Exception as e:
            logging.exception(e)
            print ('worker exception, quit')

簡單吧?可是這樣會有問題,這個後面再說,咱們把剩餘的代碼寫完。

再定義multi_yield的主要代碼。 代碼很是好理解,建立任務和結果隊列,再建立boss和worker線程(或進程/協程)並啓動,以後不停地從結果隊列裏取數據就能夠了。

def multi_yield(customer_func, mode=thread_mode, worker_count=1, generator=None, queue_size=10):
        workers = []
        result_queue = queue_factory(queue_size)
        task_queue = queue_factory(queue_size)

        main = factory(_boss, args=(generator, task_queue, worker_count), name='_boss')
        for process_id in range(0, worker_count):
            name = 'worker_%s' % (process_id)
            p = factory(_worker, args=(task_queue, result_queue, customer_func), name=name)
            workers.append(p)
        main.start()

        for r in workers:
            r.start()
        count = 0
        while not should_stop():
            data = result_queue.get()
            if data is Empty:
                count += 1
                if count == worker_count:
                    break
                continue
            if data is Stop:
                break
            else:
                yield data

這樣從外部消費時,便可:

def xprint(x):
    time.sleep(1)
    yield x

i=0
for item in multi_yield(xprint, process_mode,3,xrange(100)):
    i+=1
    print(item)
    if i>10:
        break

這樣咱們就實現了一個與serial_yield功能相似的multi_yield。能夠定義多個worker,從隊列中領任務,而不需重複地建立和銷燬,更不須要線程池。固然,代碼不徹底,運行時可能出問題。但以上代碼已經說明了核心的功能。完整的代碼能夠在文末找到。

可是你也會發現很嚴重的問題:

  • 當從外部break時,內部的線程並不會自動中止
  • 咱們沒法判斷隊列的長度,若隊列滿,那麼put操做會永遠卡死在那裏,任務都不會結束。

3. 改進任務中止邏輯

最開始想到的,是經過在multi_yield函數參數中添加一個返回bool的函數,這樣當外部break時,同時將該函數的返回值置爲True,內部檢測到該標誌位後強制退出。僞代碼以下:

_stop=False
def can_stop():
    return _stop

for item in multi_yield(xprint, process_mode,3,xrange(100),can_stop):
    i+=1
    print(item)
    if i>10:
        _stop=True
        break

但這樣並不優雅,引入了更多的函數做爲參數,還必須手工控制變量值,很是繁瑣。在多進程模式下,stop標誌位還如何解決?

咱們但願外部在循環時執行了break後,會自動通知內部的生成器。實現方法彷佛就是with語句,即contextmanager.

咱們實現如下的包裝類:

class Yielder(object):
    def __init__(self, dispose):
        self.dispose = dispose

    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.dispose()

它實現了with的原語,參數是dispose函數,做用是退出with代碼塊後的回收邏輯。

因爲值類型的標誌位沒法在多進程環境中傳遞,咱們再建立StopWrapper類,用於管理中止標誌和回收資源:

class Stop_Wrapper():
        def __init__(self):
            self.stop_flag = False
            self.workers=[]

        def is_stop(self):
            return self.stop_flag

        def stop(self):
            self.stop_flag = True
            for process in self.workers:
                if isinstance(process,multiprocessing.Process):
                    process.terminate()

最後的問題是,如何解決隊列滿或空時,put/get的無限等待問題呢?考慮包裝一下put/get:包裝在while True之中,每隔兩秒get/put,這樣即便阻塞時,也能保證能夠檢查退出標誌位。全部線程在主線程結束後,最遲也能在2s內自動退出。

def safe_queue_get(queue, is_stop_func=None, timeout=2):
    while True:
        if is_stop_func is not None and is_stop_func():
            return Stop
        try:
            data = queue.get(timeout=timeout)
            return data
        except:
            continue


def safe_queue_put(queue, item, is_stop_func=None, timeout=2):
    while True:
        if is_stop_func is not None and is_stop_func():
            return Stop
        try:
            queue.put(item, timeout=timeout)
            return item
        except:
            continue

如何使用呢?咱們只需在multi_yield的yield語句以外加上一行就能夠了:

with Yielder(stop_wrapper.stop):
        # create queue,boss,worker, then start all
        # ignore repeat code
        while not should_stop():
            data = safe_queue_get(result_queue, should_stop)
            if data is Empty:
                count += 1
                if count == worker_count:
                    break
                continue
            if data is Stop:
                break
            else:
                yield data

仔細閱讀上面的代碼, 外部循環時退出循環,則會自動觸發stop_wrapper的stop操做,回收所有資源,而不需經過外部的標誌位傳遞!這樣調用方在心智徹底不需有額外的負擔。

實現生成器和上下文管理器的編程語言,均可以經過上述方式實現自動協程資源回收。筆者也實現了一個C#版本的,有興趣歡迎交流。

這樣,咱們就能像文章開頭那樣,實現並行的迭代器操做了。

4. 結語

完整代碼在:

https://github.com/ferventdesert/multi_yielder/blob/master/src/multi_yielder.py

一些實現的細節頗有趣,咱們藉助在函數中定義函數,能夠不用複雜的類去承擔職責,而僅僅只需函數。而相似的思想,在函數式編程中很是常見。

該工具已經被筆者的流式語言etlpy所集成。可是依然有較多改進的空間,如沒有集成分佈式執行模式。

歡迎留言交流。

相關文章
相關標籤/搜索