python---異步IO(asyncio)協程

簡單瞭解

在py3中內置了asyncio模塊。其編程模型就是一個消息循環。

模塊查看:

from .base_events import *
from .coroutines import *  #協程模塊,能夠將函數裝飾爲協程
from .events import *  #事件模塊,事件循環和任務調度都將使用到他
from .futures import *   #異步併發模塊,該模塊對task封裝了許多方法,表明未來執行或沒有執行的任務的結果。它和task上沒有本質上的區別
from .locks import *  #異步保證資源同步
from .protocols import *
from .queues import *
from .streams import *
from .subprocess import *
from .tasks import *  #建立任務,是對協程的封裝,能夠查看協程的狀態。能夠將任務集合
from .transports import *

調用步驟:

1.當咱們給一個函數添加了async關鍵字,或者使用asyncio.coroutine裝飾器裝飾,就會把它變成一個異步函數。 
2.每一個線程有一個事件循環,主線程調用asyncio.get_event_loop時會建立事件循環

3.將任務封裝爲集合asyncio.gather(*args)
,以後一塊兒傳入事件循環中

4.要把異步的任務丟給這個循環的run_until_complete方法,事件循環會安排協同程序的執行。和方法名字同樣,該方法會等待異步的任務徹底執行纔會結束。

簡單使用:

import asyncio,time

@asyncio.coroutine  #設爲異步函數
def func1(num):
print(num,'before---func1----')
yield from asyncio.sleep(5)
print(num,'after---func1----')

task = [func1(1),func1(2)]

if __name__ == "__main__":
begin = time.time()
loop = asyncio.get_event_loop()  #進入事件循環
loop.run_until_complete(asyncio.gather(*task))  #將協同程序註冊到事件循環中
loop.close()
end = time.time()
print(end-begin)
1 before---func1----
2 before---func1----
1 after---func1----
2 after---func1----
5.00528621673584
輸出結果

定義一個協程(不一樣於上面的實例)

import asyncio,time

async def func1(num):  #使用async關鍵字定義一個協程,協程也是一種對象,不能直接運行,須要加入事件循環中,才能被調用。
    print(num,'before---func1----')

if __name__ == "__main__":
    begin = time.time()

    coroutine = func1(2)

    loop = asyncio.get_event_loop()
    loop.run_until_complete(coroutine)
    loop.close()
    end = time.time()
    print(end-begin)
    func1(2)  #因爲使用async異步關鍵字,因此不能直接運行

    D:/MyPython/day25/mq/multhread.py:15: RuntimeWarning: coroutine 'func1' was never awaited
    func1(2)html

    print(type(func1),type(coroutine))  #<class 'function'> <class 'coroutine'>

同:python---await/async關鍵字python

咱們可使用send(None)調用協程(這裏不這麼使用),這裏是將協程放入事件循環中進行處理

    coroutine = func1(2)
    try:
        coroutine.send(None)
    except StopIteration:
        pass

建立一個任務(對協程進一步封裝,能夠查看狀態等)

協程對象不能直接運行在註冊事件循環的時候實際上是run_until_complete方法將協程包裝成爲了一個任務(task)對象.編程

task對象是Future類的子類,保存了協程運行後的狀態,用於將來獲取協程的結果session

run_until_complete方法查看:

class BaseEventLoop(events.AbstractEventLoop):

   def run_until_complete(self, future):
        """Run until the Future is done.

        If the argument is a coroutine, it is wrapped in a Task.

        WARNING: It would be disastrous to call run_until_complete()
        with the same coroutine twice -- it would wrap it in two
        different Tasks and that can't be good.

        Return the Future's result, or raise its exception.
        """
        self._check_closed()

        new_task = not futures.isfuture(future)
        future = tasks.ensure_future(future, loop=self) if new_task:
            # An exception is raised if the future didn't complete, so there
            # is no need to log the "destroy pending task" message
            future._log_destroy_pending = False

        future.add_done_callback(_run_until_complete_cb)
        try:
            self.run_forever()
        except:
            if new_task and future.done() and not future.cancelled():
                # The coroutine raised a BaseException. Consume the exception
                # to not log a warning, the caller doesn't have access to the
                # local task.
                future.exception()
            raise
        finally:
            future.remove_done_callback(_run_until_complete_cb)
        if not future.done():
            raise RuntimeError('Event loop stopped before Future completed.')

        return future.result()

由源碼能夠知道,在協程註冊後會被自動封裝爲task任務。因此咱們不是必須傳入task。可是去建立一個task對象,有利於咱們理解協程的狀態。

import asyncio,time

async def func1(num):
    print(num,'before---func1----')

if __name__ == "__main__":
    begin = time.time()

    coroutine = func1(2)

    loop = asyncio.get_event_loop()

    task = loop.create_task(coroutine)  #建立了任務
    print(task) #pending

    loop.run_until_complete(task)
    loop.close()
    print(task) #finished
    end = time.time()
    print(end-begin)

對於協程的4種狀態:python---協程理解併發

    print(task) #pending
    print(getcoroutinestate(coroutine))

    loop.run_until_complete(task)
    loop.close()
    print(task) #finished
    print(getcoroutinestate(coroutine))
CORO_CREATED
2 before---func1----
<Task finished coro=<func1() done, defined at D:/MyPython/day25/mq/multhread.py:4> result=None>
CORO_CLOSED

 深刻了解:關於Task,create_task(),ensure_future均可以用來建立任務,那麼應該使用哪一個?app

條件使用ensure_future,他是最外層函數,其中調用了create_task()方法,功能全面,而Task官方不推薦直接使用異步

asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)均可以建立一個task,run_until_complete的參數是一個futrue對象。當傳入一個協程,其內部會自動封裝成task,task是Future的子類。 isinstance(task, asyncio.Future)將會輸出True。

綁定回調add_done_callback

async def func1(num):
print(num,'before---func1----')
return "recv num %s"%num

def callback(future):
print(future.result())

if __name__ == "__main__":
begin = time.time()

coroutine1 = func1(1)
loop = asyncio.get_event_loop()
task1=asyncio.ensure_future(coroutine1)
task1.add_done_callback(callback)
loop.run_until_complete(task1)
loop.close()
end = time.time()
print(end-begin)

1 before---func1----
recv num 1
0.004000186920166016async

能夠看到,coroutine執行結束時候會調用回調函數。並經過參數future獲取協程執行的結果。咱們建立的task和回調裏的future對象,其實是同一個對象。ide

我也能夠不使用回調函數,單純獲取返回值

當task狀態爲finished時候,咱們能夠直接使用result方法(在future模塊)獲取返回值

async def func1(num):
    print(num,'before---func1----')
    return "recv num %s"%num


if __name__ == "__main__":
    begin = time.time()

    coroutine1 = func1(1)
    loop = asyncio.get_event_loop()
    task1=asyncio.ensure_future(coroutine1)
    loop.run_until_complete(task1)
    print(task1)
    print(task1.result())
    loop.close()
    end = time.time()
    print(end-begin)
1 before---func1----
<Task finished coro=<func1() done, defined at D:/MyPython/day25/mq/multhread.py:6> result='recv num 1'>
recv num 1
0.0030002593994140625

阻塞和await

使用async關鍵字定義的協程對象,使用await能夠針對耗時的操做進行掛起(是生成器中的yield的替代,可是本地協程函數不容許使用),讓出當前控制權。協程遇到await,事件循環將會掛起該協程,執行別的協程,直到其餘協程也掛起,或者執行完畢在進行下一個協程的執行函數

使用asyncio.sleep模擬阻塞操做。

import asyncio,time


async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num) return "recv num %s"%num


if __name__ == "__main__":
    begin = time.time()

    coroutine1 = func1(5)
    coroutine2 = func1(3)
    loop = asyncio.get_event_loop()
    task1=asyncio.ensure_future(coroutine1)
    task2=asyncio.ensure_future(coroutine2)
    tasks = asyncio.gather(*[task1,task2])    #gather能夠實現同時註冊多個任務,實現併發操做。wait方法使用一致
    loop.run_until_complete(tasks)
    loop.close()
    end = time.time()
    print(end-begin)

 

併發:使用gather或者wait能夠同時註冊多個任務,實現併發

gather:Return a future aggregating results from the given coroutines or futures.  返回結果

    task1=asyncio.ensure_future(coroutine1)
    task2=asyncio.ensure_future(coroutine2)
    tasks = asyncio.gather(*[task1,task2])
    loop.run_until_complete(tasks)

wait:Returns two sets of Future: (done, pending).   #返回dones是已經完成的任務,pending是未完成的任務,都是集合類型

    task1=asyncio.ensure_future(coroutine1)
    task2=asyncio.ensure_future(coroutine2)
    tasks = asyncio.wait([task1,task2])
    loop.run_until_complete(tasks)

 

Usage:

done, pending = yield from asyncio.wait(fs)

wait是接收一個列表,然後gather是接收一堆任務數據。

二者的返回值也是不一樣的

協程嵌套,將多個協程封裝到一個主協程中

import asyncio,aiohttp

async def fetch_async(url):
    print(url)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print(resp.status)
            print(await resp.text())

tasks = [fetch_async('http://www.baidu.com/'), fetch_async('http://www.cnblogs.com/ssyfj/')]

event_loop = asyncio.get_event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()
關於aiohttp模塊的協程嵌套,嵌套更加明顯
import asyncio,time

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

async def main():
    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]

    dones, pendings = await asyncio.wait(tasks) for task in dones:  #對已完成的任務集合進行操做
        print("Task ret: ",task.result())

if __name__ == "__main__":
    begin = time.time()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
    end = time.time()
    print(end-begin)
5 before---func1----
3 before---func1----
4 before---func1----
Task ret:  recv num 4
Task ret:  recv num 5
Task ret:  recv num 3
5.000285863876343

也能夠直接使用gather直接獲取值

    results = await asyncio.gather(*tasks)
    for result in results:
        print("Task ret: ",result)

咱們也能夠不在main中處理結果,而是返回到主調用方進行處理

async def main():
    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]

    return await asyncio.gather(*tasks) if __name__ == "__main__":
    begin = time.time()

    loop = asyncio.get_event_loop()
 results = loop.run_until_complete(main()) for result in results: print("Task ret: ",result)
    loop.close()
    end = time.time()
    print(end-begin)

或者使用wait掛起

    return await asyncio.wait(tasks)

----------------------------------------------------
    dones,pendings = loop.run_until_complete(main())
    for task in dones:
        print("Task ret: ",task.result())

或者使用asyncio中的as_completed方法

Return an iterator whose values are coroutines.  #返回一個可迭代的協程函數值

    When waiting for the yielded coroutines you'll get the results (or
    exceptions!) of the original Futures (or coroutines), in the order
    in which and as soon as they complete.
import asyncio,time

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

async def main():
    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]

    for task in asyncio.as_completed(tasks): result = await task print("Task ret: ",result) if __name__ == "__main__":
    begin = time.time()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    loop.close()
    end = time.time()
    print(end-begin)

協程中止

future對象有幾個狀態:

  • Pending
  • Running
  • Done
  • Cacelled

 建立future的時候,task爲pending,

事件循環調用執行的時候固然就是running,

調用完畢天然就是done,

若是須要中止事件循環,就須要先把task取消。

可使用asyncio.Task獲取事件循環的task

import asyncio,time

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

if __name__ == "__main__":
    begin = time.time()

    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]


    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        print(asyncio.Task.all_tasks())
        for task in asyncio.Task.all_tasks():  #獲取全部任務
            print(task.cancel())  #單個任務取消
        loop.stop()    #須要先stop循環
        loop.run_forever()  #須要在開啓事件循環 finally:
        loop.close()  #統一關閉
    end = time.time()
    print(end-begin)
5 before---func1----
3 before---func1----
4 before---func1----
{<Task pending coro=<func1() running at multhread.py:5> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<loc
als>._on_completion() at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks.py:428]>, <
Task pending coro=<wait() running at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks
.py:361> wait_for=<Future pending cb=[Task._wakeup()]>>, <Task pending coro=<func1() running at multhread.py:5> wait
_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at C:\Users\Administrator\AppData\Loca
l\Programs\Python\Python35\lib\asyncio\tasks.py:428]>, <Task pending coro=<func1() running at multhread.py:5> wait_f
or=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at C:\Users\Administrator\AppData\Local\
Programs\Python\Python35\lib\asyncio\tasks.py:428]>}  #未處理,剛剛掛起爲pending狀態
True  #返回True,表示cancel取消成功
True
True
True
3.014172315597534

True表示cannel成功,loop stop以後還須要再次開啓事件循環,最後在close,否則還會拋出異常:

Task was destroyed but it is pending!

由於cancel後task的狀態依舊是pending

        for task in asyncio.Task.all_tasks():
            print(task)
            print(task.cancel())
            print(task)
<Task pending coro=<func1() running at multhread.py:5> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<loca
ls>._on_completion() at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks.py:428]> True <Task pending coro=<func1() running at multhread.py:5> wait_for=<Future cancelled> cb=[_wait.<locals>._on_completion
() at C:\Users\Administrator\AppData\Local\Programs\Python\Python35\lib\asyncio\tasks.py:428]>

或者使用協程嵌套,main協程至關於最外層task,處理main函數便可

import asyncio,time

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

async def main():
    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]

    dones,pendings=await asyncio.wait(tasks)

    for task in dones:
        print("Task ret: ",task.result())


if __name__ == "__main__":
    begin = time.time()

    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(main())
    try:
        loop.run_until_complete(task)
    except KeyboardInterrupt as e:
        print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())  #咱們只是把上面的單個寫成了全部任務集合取消,和協程嵌套關係不大。上面也能夠這樣寫。不過協程嵌套能夠簡化代碼
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()
    end = time.time()
    print(end-begin)
5 before---func1----
3 before---func1----
4 before---func1----
<class 'asyncio.tasks._GatheringFuture'>
True
3.008172035217285

感受有點多餘...

import asyncio,time

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

if __name__ == "__main__":
    begin = time.time()

    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]


    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        print(asyncio.gather(*tasks).cancel())  
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()
    end = time.time()
    print(end-begin)
5 before---func1----
3 before---func1----
4 before---func1----
True
3.008171796798706

上面討論的都是在同一線程下的事件循環,下面來談談不一樣線程的事件循環

在當前線程中建立一個事件循環(不啓用,單純獲取標識),開啓一個新的線程,在新的線程中啓動事件循環。在當前線程依據事件循環標識,能夠向事件中添加協程對象。當前線程不會因爲事件循環而阻塞了。

上面在一個線程中執行的事件循環,只有咱們主動關閉事件close,事件循環纔會結束,會阻塞。

同一線程:

import asyncio,time

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

if __name__ == "__main__":
    begin = time.time()

    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3),
    ]


 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.run_forever()
    end = time.time()
    print(end-begin)

不一樣線程事件循環(不涉及協程):

import asyncio,time,threading

def func1(num):
    print(num,'before---func1----')
    time.sleep(num)
    return "recv num %s"%num

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

if __name__ == "__main__":
    begin = time.time()

    new_loop = asyncio.new_event_loop() #在當前線程下建立時間循環,(未啓用)
    t = threading.Thread(target=start_loop,args=(new_loop,))    #開啓新的線程去啓動事件循環
    t.start()

    new_loop.call_soon_threadsafe(func1,3)
    new_loop.call_soon_threadsafe(func1,2)
    new_loop.call_soon_threadsafe(func1,6)

    end = time.time()
    print(end-begin)    #當前線程未阻塞,耗時0.02800154685974121
3 before---func1----
0.02800154685974121
2 before---func1----
6 before---func1----

新線程協程:

import asyncio,time,threading

async def func1(num):
    print(num,'before---func1----')
    await asyncio.sleep(num)
    return "recv num %s"%num

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

if __name__ == "__main__":
    begin = time.time()

    coroutine1 = func1(5)
    coroutine2 = func1(3)
    coroutine3 = func1(4)

    new_loop = asyncio.new_event_loop() #在當前線程下建立時間循環,(未啓用)
    t = threading.Thread(target=start_loop,args=(new_loop,))    #開啓新的線程去啓動事件循環
    t.start()

    asyncio.run_coroutine_threadsafe(coroutine1,new_loop)  #傳參必須是協程對象
    asyncio.run_coroutine_threadsafe(coroutine2,new_loop)
    asyncio.run_coroutine_threadsafe(coroutine3,new_loop)

    end = time.time()
    print(end-begin)    #當前線程未阻塞,耗時0.010000467300415039
5 before---func1----
3 before---func1----
4 before---func1----
0.010000467300415039

主線程經過run_coroutine_threadsafe新註冊協程對象。這樣就能在子線程中進行事件循環的併發操做,同時主線程又不會被block。

推文:Python黑魔法 --- 異步IO( asyncio) 協程

相關文章
相關標籤/搜索