對於併發任務,一般是用生成消費模型,對隊列的處理能夠使用相似master-worker的方式,master主要用戶獲取隊列的msg,worker用戶處理消息。redis
爲了簡單起見,而且協程更適合單線程的方式,咱們的主線程用來監聽隊列,子線程用於處理隊列。這裏使用redis的隊列。主線程中有一個是無限循環,用戶消費隊列。併發
也即:異步
在主線程裏,一個無限循環,一個不斷加入的新任務協程:async
一個loop.run_forever(),一個async def do_sleep2(x, queue, msg=""):函數
子線程做消費者。(代碼裏沒有演示,只是用子線程有循環事件和異步加入協程,主線程循環結果)oop
import time import asyncio from queue import Queue from threading import Thread from asyncio.futures import Future from collections.abc import Coroutine, Generator """ 只要在一個生成器函數頭部用上 @asyncio.coroutine 裝飾器 就能將這個函數對象,【標記】爲協程對象。注意這裏是【標記】,劃重點。 實際上,它的本質仍是一個生成器。 標記後,它實際上已經能夠當成協程使用。 """ @asyncio.coroutine def hello2(): yield from asyncio.sleep(1) coroutine2 = hello2() print(isinstance(coroutine2, Generator)) print(isinstance(coroutine2, Coroutine)) # True # False """ 只要在一個函數前面加上 async 關鍵字,這個函數對象是一個協程, 經過isinstance函數,它確實是Coroutine類型。 """ async def hello(name): print("Hello, ", name) time.sleep(2) return 'stop 2 seconds.' # 定義協程對象 coroutine = hello("world") print(isinstance(coroutine, Coroutine)) # True # 定義事件循環對象容器 loop = asyncio.get_event_loop() # 將協程轉爲task任務 # task = asyncio.ensure_future(coroutine) task = loop.create_task(coroutine) print(isinstance(task, Future)) # True # 將task任務扔進事件循環對象中並觸發 loop.run_until_complete(task) # task.result() 能夠取得返回結果 print('return value: {}'.format(task.result())) # Hello, world # return value: stop 2 seconds. # 協程函數 async def do_some_work(x): print('waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) # 協程對象 coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine4 = do_some_work(4) # 將協程轉成task,並組成list tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine4), ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) # loop.run_until_complete(asyncio.gather(*tasks)) for task in tasks: print('Task ret: ', task.result()) # waiting: 1 # waiting: 2 # waiting: 4 # Task ret: Done after 1s # Task ret: Done after 2s # Task ret: Done after 4s # 外部的協程函數 async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine4 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine4), ] # 【重點】:await 一個task列表(協程) # dones:表示已經完成的任務 # pendings:表示未完成的任務 dones, pendings = await asyncio.wait(tasks) for task in dones: print('Task ret: ', task.result()) loop = asyncio.get_event_loop() loop.run_until_complete(main()) # waiting: 1 # waiting: 2 # waiting: 4 # Task ret: Done after 1s # Task ret: Done after 2s # Task ret: Done after 4s """ 協程中的狀態 Pending:懸而未決的狀態 Running:事件循環正在調用執行任務 Done:任務執行完畢 Cancelled:Task被取消後的狀態 asyncio.wait:接收形式少點,控制性強,手工循環結果。 asyncio.gather:接收形式普遍,直接返回結果。 """ def start_loop(loop): # 一個在後臺永遠運行的事件循環 asyncio.set_event_loop(loop) loop.run_forever() def start_loop2(loop): # 一個在後臺永遠運行的事件循環 asyncio.set_event_loop(loop) loop.run_forever() def do_sleep(x, queue, msg=""): time.sleep(x) queue.put(msg) async def do_sleep2(x, queue, msg=""): await asyncio.sleep(x) queue.put(msg) queue = Queue() queue2 = Queue() new_loop = asyncio.new_event_loop() new_loop2 = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop, )) t.start() t2 = Thread(target=start_loop2, args=(new_loop2, )) t2.start() print(time.ctime()) # 動態添加兩個協程 # 這種方法,在主線程是同步的 new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第一個") new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第二個") # 動態添加兩個協程 # 這種方法,在主線程是異步的 asyncio.run_coroutine_threadsafe(do_sleep2(6, queue, "第1個"), new_loop2) asyncio.run_coroutine_threadsafe(do_sleep2(6, queue, "第2個"), new_loop2) while True: msg = queue.get() print("{} 協程運行完成。。。".format(msg)) print(time.ctime()) # Thu Dec 27 19:51:00 2018 # 第一個 協程運行完成。。。 # Thu Dec 27 19:51:06 2018 # 第二個 協程運行完成。。。 # Thu Dec 27 19:51:12 2018 while True: msg = queue2.get() print("{} 協程運行完成。。。".format(msg)) print(time.ctime()) # 第1個 協程運行完成。。。 # Thu Dec 27 20:02:10 2018 # 第2個 協程運行完成。。。 # Thu Dec 27 20:02:10 2018