python中的協程(三)

asyncio

asyncio 是幹什麼的?html

  • 異步網絡操做
  • 併發
  • 協程

python3.0時代,標準庫裏的異步網絡模塊:select(很是底層) python3.0時代,第三方異步網絡庫:Tornado python3.4時代,asyncio:支持TCP,子進程python

如今的asyncio,有了不少的模塊已經在支持:aiohttp,aiodns,aioredis等等 https://github.com/aio-libs 這裏列出了已經支持的內容,並在持續更新git

固然到目前爲止實現協程的不只僅只有asyncio,tornado和gevent都實現了相似功能github

關於asyncio的一些關鍵字的說明:web

  • event_loop 事件循環:程序開啓一個無限循環,把一些函數註冊到事件循環上,當知足事件發生的時候,調用相應的協程函數redis

  • coroutine 協程:協程對象,指一個使用async關鍵字定義的函數,它的調用不會當即執行函數,而是會返回一個協程對象。協程對象須要註冊到事件循環,由事件循環調用。網絡

  • task 任務:一個協程對象就是一個原生能夠掛起的函數,任務則是對協程進一步封裝,其中包含了任務的各類狀態多線程

  • future: 表明未來執行或沒有執行的任務的結果。它和task上沒有本質上的區別併發

  • async/await 關鍵字:python3.5用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口。app

事件循環(Event Loop)

  • 事件循環 — 把它想成 asyncio 的中心執行器。

如今咱們看一下全部這些如何融爲一體。正如我以前提到的,異步代碼在一個線程中運行。

從上圖可知:

1.消息循環是在線程中執行

2.從隊列中取得任務

3.每一個任務在協程中執行下一步動做

4.若是在一個協程中調用另外一個協程(await <coroutine_name>),會觸發上下文切換,掛起當前協程,並保存現場環境(變量,狀態),而後載入被調用協程

5.若是協程的執行到阻塞部分(阻塞I/O,Sleep),當前協程會掛起,並將控制權返回到線程的消息循環中,而後消息循環繼續從隊列中執行下一個任務...以此類推

 6  . 隊列中的全部任務執行完畢後,消息循環返回第一個任務

定義一個協程

import time
import asyncio

now = lambda: time.time()
start = now()


# 0、經過async關鍵字定義一個協程對象,調用時不會執行,會返回一個協程對象
async def do_some_work(x):
    print("waiting:", x)

# 一、這裏是一個協程對象,這個時候do_some_work函數並無執行
coro = do_some_work(2)  # 返回了一個協程對象,並賦值
print(coro)  # <coroutine object do_some_work at 0x000001FFF0E10D00>
# 二、建立一個事件loop
loop = asyncio.get_event_loop()
# 三、將協程加入到事件循環loop
loop.run_until_complete(coro)

print("Time:", now()-start)

  在上面代碼中咱們經過async關鍵字定義一個協程(coroutine),固然協程不能直接運行,須要將協程加入到事件循環loop中,asyncio.get_event_loop:建立一個事件循環,而後使用run_until_complete將協程註冊到事件循環,並啓動事件循環。

建立一個task

  協程對象不能直接運行,在註冊事件循環的時候,實際上是run_until_complete方法將協程包裝成爲了一個任務(task)對象. task對象是Future類的子類,保存了協程運行後的狀態,用於將來獲取協程的結果。

import asyncio
import time

now = lambda: time.time()
start = now()

async def do_some_work(x):  # 0
    print("waiting:", x)

coroutine = do_some_work(2)  # 1
loop = asyncio.get_event_loop()  # 2
task = loop.create_task(coroutine)  # 三、建立task對象
print(task)  # <Task pending coro=<do_some_work() running at H:/python/project_workspace/web/協程/task_test.py:7>>
loop.run_until_complete(task)  # 4
print(task)  # <Task finished coro=<do_some_work() done, defined at H:/python/project_workspace/web/協程/task_test.py:7> result=None> 
print("Time:", now()-start)

  建立task後,在task加入事件循環以前爲pending狀態,當完成後,狀態爲finished

關於上面經過loop.create_task(coroutine)建立task,一樣的能夠經過 asyncio.ensure_future(coroutine)建立task

關於這兩個命令的官網解釋: https://docs.python.org/3/library/asyncio-task.html#asyncio.ensure_future

asyncio.ensure_future(coro_or_future, *, loop=None)¶
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

If the argument is a Future, it is returned directly.

  https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.create_task

AbstractEventLoop.create_task(coro)
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

Third-party event loops can use their own subclass of Task for interoperability. In this case, the result type is a subclass of Task.

This method was added in Python 3.4.2. Use the async() function to support also older Python versions.

綁定回調

綁定回調,在task執行完成的時候能夠獲取執行的結果,回調的最後一個參數是future對象,經過該對象能夠獲取協程返回值。

import time
import asyncio

now = lambda: time.time()
start = now()

async def do_some_work(x):
    print("waiting:", x)
    return "Done after {}s".format(x)

def callback(future):
    print("callback:", future.result())

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
print(task)
task.add_done_callback(callback)  # 綁定回調函數,把參數傳給了callback
print(task)
loop.run_until_complete(task)
print(task)

print("Time:", now()-start)

#########################
<Task pending coro=<do_some_work() running at H:/python/project_workspace/web/協程/綁定回調.py:7>>
<Task pending coro=<do_some_work() running at H:/python/project_workspace/web/協程/綁定回調.py:7> cb=[callback() at H:/python/project_workspace/web/協程/綁定回調.py:11]>
waiting: 2
callback: Done after 2s
<Task finished coro=<do_some_work() done, defined at H:/python/project_workspace/web/協程/綁定回調.py:7> result='Done after 2s'>
Time: 0.0010023117065429688

  經過add_done_callback方法給task任務添加回調函數,當task(也能夠說是coroutine)執行完成的時候,就會調用回調函數。並經過參數future獲取協程執行的結果。這裏咱們建立 的task和回調裏的future對象其實是同一個對象

阻塞和await

使用async能夠定義協程對象,使用await能夠針對耗時的操做進行掛起,就像生成器裏的yield同樣,函數讓出控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其餘的協程也掛起或者執行完畢,再進行下一個協程的執行

耗時的操做通常是一些IO操做,例如網絡請求,文件讀取等。咱們使用asyncio.sleep函數來模擬IO操做。協程的目的也是讓這些IO操做異步化。

import asyncio
import time

now = lambda :time.time()

async def do_some_work(x):
    print("waiting:",x)
    # await 後面就是調用耗時的操做
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

print("Task ret:", task.result())
print("Time:", now() - start)

  在await asyncio.sleep(x),由於這裏sleep了,模擬了阻塞或者耗時操做,這個時候就會讓出控制權。 即當遇到阻塞調用的函數的時候,使用await方法將協程的控制權讓出,以便loop調用其餘的協程。

await實現併發

# Tools:Pycharm 2017.3.2
# author ="wlx"
__date__ = '2018/9/18 19:43'
import asyncio
import time

now = lambda :time.time()

async def do_some_work(x):
    print("Waiting:",x)
    await asyncio.sleep(x)  # 這裏阻塞時自動跳轉
    return "Done after {}s".format(x)

start = now()

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))  # 多個任務時,不能直接傳tasks,須要asyncio.wait()封裝一下

print('123567890')
for task in tasks:
    print("Task ret:",task.result())

print("Time:",now()-start)

  運行結果:

Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Time: 4.004154920578003

  總時間爲4s左右。4s的阻塞時間,足夠前面兩個協程執行完畢。若是是同步順序的任務,那麼至少須要7s。此時咱們使用了aysncio實現了併發。asyncio.wait(tasks) 也可使用 asyncio.gather(*tasks) ,前者接受一個task列表,後者接收一堆task。

關於asyncio.gather和asyncio.wait官網的說明:

https://docs.python.org/3/library/asyncio-task.html#asyncio.gather

Return a future aggregating results from the given coroutine objects or futures.

All futures must share the same event loop. If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

https://docs.python.org/3/library/asyncio-task.html#asyncio.wait

Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending).

The sequence futures must not be empty.

timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

return_when indicates when this function should return.

協程嵌套

使用async能夠定義協程,協程用於耗時的io操做,咱們也能夠封裝更多的io操做過程,這樣就實現了嵌套的協程,即一個協程中await了另一個協程,如此鏈接起來。

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
    print("waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]

    dones, pendings = await asyncio.wait(tasks)
    for task in dones:
        print("Task ret:", task.result())

    # results = await asyncio.gather(*tasks)
    # for result in results:
    #     print("Task ret:",result)


start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)

若是咱們把上面代碼中的:

    dones, pendings = await asyncio.wait(tasks)
    for task in dones:
        print("Task ret:", task.result())

替換爲:

    results = await asyncio.gather(*tasks)
    for result in results:
        print("Task ret:",result)

這樣獲得的就是一個結果的列表

不在main協程函數裏處理結果,直接返回await的內容,那麼最外層的run_until_complete將會返回main協程的結果。 將上述的代碼更改成:

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
    print("waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    return await asyncio.gather(*tasks)

start = now()

loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())
for result in results:
    print("Task ret:",result)

print("Time:", now()-start)

或者返回使用asyncio.wait方式掛起協程。

將代碼return await asyncio.gather(*tasks)

更改成:return await asyncio.wait(tasks)

也可使用asyncio的as_completed方法

import asyncio
import time


now = lambda: time.time()

async def do_some_work(x):
    print("waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)
    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print("Task ret: {}".format(result))

start = now()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print("Time:", now()-start)

  從上面也能夠看出,協程的調用和組合很是靈活,主要體如今對於結果的處理:如何返回,如何掛起

協程的中止

future對象有幾個狀態:

  • Pending
  • Running
  • Done
  • Cacelled

建立future的時候,task爲pending,事件循環調用執行的時候固然就是running,調用完畢天然就是done,若是須要中止事件循環,就須要先把task取消。可使用asyncio.Task獲取事件循環的task

import asyncio
import time


now = lambda :time.time()


async def do_some_work(x):
    print("Waiting:",x)
    await asyncio.sleep(x)
    return "Done after {}s".format(x)

coroutine1 =do_some_work(1)
coroutine2 =do_some_work(2)
coroutine3 =do_some_work(2)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3),
]

start = now()

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
    print(asyncio.Task.all_tasks())
    for task in asyncio.Task.all_tasks():
        print(task.cancel())
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

print("Time:",now()-start)

  啓動事件循環以後,立刻ctrl+c,會觸發run_until_complete的執行異常 KeyBorardInterrupt。而後經過循環asyncio.Task取消future。能夠看到輸出以下:

Waiting: 1
Waiting: 2
Waiting: 2
^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result='Done after 1s'>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<wait() running at /usr/local/lib/python3.5/asyncio/tasks.py:361> wait_for=<Future pending cb=[Task._wakeup()]>>}
False
True
True
True
Time: 1.0707225799560547

True表示cannel成功,loop stop以後還須要再次開啓事件循環,最後在close,否則還會拋出異常

循環task,逐個cancel是一種方案,但是正如上面咱們把task的列表封裝在main函數中,main函數外進行事件循環的調用。這個時候,main至關於最外出的一個task,那麼處理包裝的main函數便可。

多線程的事件循環

不少時候,咱們的事件循環用於註冊協程,而有的協程須要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程建立一個事件循環,而後在新建一個線程,在新線程中啓動事件循環。當前線程不會被block。

import asyncio
from threading import Thread
import time

now = lambda :time.time()

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6)  # run_coroutine_threadsafe新註冊協程對象
new_loop.call_soon_threadsafe(more_work, 3)

  啓動上述代碼以後,當前線程不會被block,新線程中會按照順序執行call_soon_threadsafe方法註冊的more_work方法, 後者由於time.sleep操做是同步阻塞的,所以運行完畢more_work須要大體6 + 3

多線程異步的事件循環

import asyncio
import time
from threading import Thread

now = lambda :time.time()


def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def do_some_work(x):
    print('Waiting {}'.format(x))
    await asyncio.sleep(x)  # 使本來同步的子線程變成了異步
    print('Done after {}s'.format(x))

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

  上述的例子,主線程中建立一個new_loop,而後在另外的子線程中開啓一個無限事件循環。 主線程經過run_coroutine_threadsafe新註冊協程對象。這樣就能在子線程中進行事件循環的併發操做,同時主線程又不會被block。一共執行的時間大概在6s左右。

相關文章
相關標籤/搜索