Python 多進程+協程的例子

爲何要多進程+協程?由於這能夠得到極高的性能,建議先通讀 Python黑魔法 --- 異步IO( asyncio) 協程 一文。python

廢話很少說,上代碼。微信

<!--more-->app

import asyncio
import multiprocessing
import os
import time
from multiprocessing import Manager


# 業務類
class BaiJiaHao():

    async def get_author(self, rec):
        """
        協程代碼
        """
        print('enter get author,wait for: %d' % rec['num'])
        # 模擬IO操做,耗時根據傳進來的num決定
        await asyncio.sleep(rec['num'])
        # 返回協程任務完成後的結果
        return rec


    def run(self):
        # 假定咱們有11個任務要跑,每一個任務耗時爲num秒,串行的話須要43秒。
        # 但咱們這個demo跑完只須要這些任務中的最大值:8秒
        list = [{'title': 'title1', 'num': 2},
                {'title': 'title2', 'num': 1},
                {'title': 'title3', 'num': 3},
                {'title': 'title4', 'num': 8},
                {'title': 'title5', 'num': 2},
                {'title': 'title6', 'num': 5},
                {'title': 'title7', 'num': 7},
                {'title': 'title8', 'num': 3},
                {'title': 'title9', 'num': 4},
                {'title': 'title10', 'num': 3},
                {'title': 'title11', 'num': 5},
                ]
        result = run_get_author_in_multi_process(list)
        print('result', result)


def get_chunks(iterable, chunks=1):
    """
    此函數用於分割若干任務到不一樣的進程裏去
    """
    lst = list(iterable)
    return [lst[i::chunks] for i in range(chunks)]


def run_get_author(lists, queue):
    """
    這個就是子進程運行的函數,接收任務列表和用於進程間通信的Queue
    """
    print('exec run_get_author.child process id : %s, parent process id : %s' % (os.getpid(), os.getppid()))
    # 每一個子進程分配一個新的loop
    loop = asyncio.new_event_loop()
    # 初始化業務類,轉成task或future
    spider = BaiJiaHao()
    tasks = [loop.create_task(spider.get_author(rec)) for rec in lists]
    # 協程走起
    loop.run_until_complete(asyncio.wait(tasks))
    # 往queue寫入每一個任務的結果
    for task in tasks:
        queue.put(task.result())


def run_get_author_in_multi_process(task_lists):
    """
    父進程函數,主要是分割任務並初始化進程池,啓動進程並返回結果
    """
    # process_count = len(tasks) % 2
    # 進程數這裏我用機器上的核心數,注意:未考慮核心數比任務多的狀況
    process_count = multiprocessing.cpu_count()
    print('process_count: %d' % process_count)
    split_lists = get_chunks(task_lists, process_count)
    pool = multiprocessing.Pool(process_count)
    queue = Manager().Queue()
    for lists in split_lists:
        pool.apply_async(run_get_author, args=(lists, queue,))
    pool.close()
    pool.join()
    result = []
    # 從子進程讀取結果並返回
    while not queue.empty():
        result.append(queue.get())
    return result

now = lambda : time.time()

if __name__ == '__main__':
    start = now()
    spider = BaiJiaHao()
    spider.run()
    print('done','TIME: ', now() - start)

運行結果:異步

微信截圖_20201210111954.png

相關文章
相關標籤/搜索