async/await套路編程

對於併發任務,一般是用生成消費模型,對隊列的處理能夠使用相似master-worker的方式,master主要用戶獲取隊列的msg,worker用戶處理消息。redis

爲了簡單起見,而且協程更適合單線程的方式,咱們的主線程用來監聽隊列,子線程用於處理隊列。這裏使用redis的隊列。主線程中有一個是無限循環,用戶消費隊列。併發

也即:異步

在主線程裏,一個無限循環,一個不斷加入的新任務協程:async

一個loop.run_forever(),一個async def do_sleep2(x, queue, msg=""):函數

子線程做消費者。(代碼裏沒有演示,只是用子線程有循環事件和異步加入協程,主線程循環結果)oop

import time
import asyncio
from queue import Queue
from threading import Thread
from asyncio.futures import Future
from collections.abc import Coroutine, Generator

"""
只要在一個生成器函數頭部用上 @asyncio.coroutine 裝飾器
就能將這個函數對象,【標記】爲協程對象。注意這裏是【標記】,劃重點。
實際上,它的本質仍是一個生成器。
標記後,它實際上已經能夠當成協程使用。
"""


@asyncio.coroutine
def hello2():
    yield from asyncio.sleep(1)


coroutine2 = hello2()
print(isinstance(coroutine2, Generator))
print(isinstance(coroutine2, Coroutine))
# True
# False
"""
只要在一個函數前面加上 async 關鍵字,這個函數對象是一個協程,
經過isinstance函數,它確實是Coroutine類型。
"""


async def hello(name):
    print("Hello, ", name)
    time.sleep(2)
    return 'stop 2 seconds.'


# 定義協程對象
coroutine = hello("world")
print(isinstance(coroutine, Coroutine))
# True

# 定義事件循環對象容器
loop = asyncio.get_event_loop()

# 將協程轉爲task任務
# task = asyncio.ensure_future(coroutine)
task = loop.create_task(coroutine)
print(isinstance(task, Future))
# True
# 將task任務扔進事件循環對象中並觸發
loop.run_until_complete(task)

# task.result() 能夠取得返回結果
print('return value: {}'.format(task.result()))
# Hello,  world
# return value: stop 2 seconds.


# 協程函數
async def do_some_work(x):
    print('waiting: ', x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

# 協程對象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine4 = do_some_work(4)

# 將協程轉成task,並組成list
tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine4),
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
# loop.run_until_complete(asyncio.gather(*tasks))

for task in tasks:
    print('Task ret: ', task.result())
# waiting:  1
# waiting:  2
# waiting:  4
# Task ret:  Done after 1s
# Task ret:  Done after 2s
# Task ret:  Done after 4s


# 外部的協程函數
async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine4 = do_some_work(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine4),
    ]

    # 【重點】:await 一個task列表(協程)
    # dones:表示已經完成的任務
    # pendings:表示未完成的任務
    dones, pendings = await asyncio.wait(tasks)

    for task in dones:
        print('Task ret: ', task.result())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# waiting:  1
# waiting:  2
# waiting:  4
# Task ret:  Done after 1s
# Task ret:  Done after 2s
# Task ret:  Done after 4s

"""
協程中的狀態

Pending:懸而未決的狀態
Running:事件循環正在調用執行任務
Done:任務執行完畢
Cancelled:Task被取消後的狀態

asyncio.wait:接收形式少點,控制性強,手工循環結果。

asyncio.gather:接收形式普遍,直接返回結果。
"""


def start_loop(loop):
    # 一個在後臺永遠運行的事件循環
    asyncio.set_event_loop(loop)
    loop.run_forever()


def start_loop2(loop):
    # 一個在後臺永遠運行的事件循環
    asyncio.set_event_loop(loop)
    loop.run_forever()


def do_sleep(x, queue, msg=""):
    time.sleep(x)
    queue.put(msg)


async def do_sleep2(x, queue, msg=""):
    await asyncio.sleep(x)
    queue.put(msg)


queue = Queue()
queue2 = Queue()
new_loop = asyncio.new_event_loop()
new_loop2 = asyncio.new_event_loop()

t = Thread(target=start_loop, args=(new_loop, ))
t.start()

t2 = Thread(target=start_loop2, args=(new_loop2, ))
t2.start()

print(time.ctime())

# 動態添加兩個協程
# 這種方法,在主線程是同步的
new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第一個")
new_loop.call_soon_threadsafe(do_sleep, 6, queue, "第二個")

# 動態添加兩個協程
# 這種方法,在主線程是異步的
asyncio.run_coroutine_threadsafe(do_sleep2(6, queue, "第1個"), new_loop2)
asyncio.run_coroutine_threadsafe(do_sleep2(6, queue, "第2個"), new_loop2)

while True:
    msg = queue.get()
    print("{} 協程運行完成。。。".format(msg))
    print(time.ctime())

# Thu Dec 27 19:51:00 2018
# 第一個 協程運行完成。。。
# Thu Dec 27 19:51:06 2018
# 第二個 協程運行完成。。。
# Thu Dec 27 19:51:12 2018

while True:
    msg = queue2.get()
    print("{} 協程運行完成。。。".format(msg))
    print(time.ctime())

# 第1個 協程運行完成。。。
# Thu Dec 27 20:02:10 2018
# 第2個 協程運行完成。。。
# Thu Dec 27 20:02:10 2018
相關文章
相關標籤/搜索