async 異步協程進階

協程經過 async/await 語法進行聲明,是編寫異步應用的推薦方式html

例如新定義一個協程(coroutine object)python

async def foo():
return 42

首先先來介紹下:django

認識aysn和asyncio都有哪些函數方法:json

建立一個future 對象:session

task = asyncio.create_task(foo())
或者使用
task=asyncio.ensure_future(foo())
那麼如何判斷建立的task究竟是不是future 對象呢?
async def exetask():
# task = asyncio.create_task(foo())
task = asyncio.ensure_future(foo())
if isinstance(task,asyncio.Future):
print("yes")
else:
print("no")
s=await task
asyncio.run(exetask())
結果以下:

  yes併發

答案是確定的。

2.如何運行一個協程:

要真正運行一個協程,asyncio 提供了三種主要機制:less

第一種:異步

  • asyncio.run() 函數用來運行最高層級的入口點 "main()" 函數 (參見上面的示例。)async

第二種:函數

經過loop對象實現:

loop=asyncio.get_event_loop()
async  def  jobs():
i=10
while i>0:
# time.sleep(0.5)
i=i-1
return 0
tasks=[asyncio.ensure_future(jobs(k)) for k in range(1,4)]
res=loop.run_until_complete(asyncio.gather(*tasks))
print(res)
loop.close()

3.併發

第1種併發運行:
當一個協程經過  等函數被打包爲一個 任務(coro)將 coro 協程 打包爲一個  排入日程準備執行,
返回 Task 對象。此函數 在 Python 3.7 中被加入。在 Python 3.7 以前,of course你也能夠改用低層級的  函數, This works in all Python versions but is less readable
asyncio.create_task()asyncio.create_taskTaskasyncio.ensure_future()
#### ordinal job async
async def jobs():
i=10
while i>0:
# time.sleep(0.5)
i=i-1
return 0
async def get_status():
r=await jobs()
return r
loop=event=asyncio.get_event_loop()
tasks = [asyncio.create_task(get_status()) for k in range(1,4)]
for task in tasks:
    r=loop.run_until_complete(asyncio.wait_for(task,timeout=10))
    print(r)

  這裏我採用了比較低級的loop事件對象來調用run_until_complete() 方法來實現:

事實上開發者通常更喜歡採用高級用法asyncio.wait()或者asyncio.wait_for()來實現這寫異步任務調用:

在進行下面介紹以前我想你應該先了解:asyncio.wait_for(awtimeout*loop=None) 

  等待 aw 可等待對象 完成,指定 timeout 秒數後超時。

  若是 aw 是一個協程,它將自動做爲任務加入日程。

  timeout 能夠爲 None,也能夠爲 float 或 int 型數值表示的等待秒數。若是 timeout 爲 None,則等待直到完成。

  若是發生超時,任務將取消並引起 asyncio.TimeoutError.

  要避免任務 取消,能夠加上 shield()

  函數將等待直到目標對象確實被取消,因此總等待時間可能超過 timeout 指定的秒數。

  若是等待被取消,則 aw 指定的對象也會被取消。

  loop 參數已棄用,計劃在 Python 3.10 中移除。

asyncio.wait(aws*loop=Nonetimeout=Nonereturn_when=ALL_COMPLETED)

併發運行 aws 指定的 可等待對象 並阻塞線程直到知足 return_when 指定的條件。

若是 aws 中的某個可等待對象爲協程,它將自動做爲任務加入日程。直接向 wait() 傳入協程對象已棄用,由於這會致使 使人迷惑的行爲

返回兩個 Task/Future 集合: (done, pending)

用法:

done, pending = await asyncio.wait(aws) 

loop 參數已棄用,計劃在 Python 3.10 中移除。

如指定 timeout (float 或 int 類型) 則它將被用於控制返回以前等待的最長秒數。

請注意此函數不會引起 asyncio.TimeoutError。當超時發生時,未完成的 Future 或 Task 將在指定秒數後被返回。

return_when 指定此函數應在什麼時候返回。它必須爲如下常數之一:

常數

描述

FIRST_COMPLETED

函數將在任意可等待對象結束或取消時返回。

FIRST_EXCEPTION

函數將在任意可等待對象因引起異常而結束時返回。當沒有引起任何異常時它就至關於 ALL_COMPLETED

ALL_COMPLETED

函數將在全部可等待對象結束或取消時返回。

與 wait_for() 不一樣,wait() 在超時發生時不會取消可等待對象。


如何驗證wait 不取消,wait_for 取消aw對象呢:

咱們來看個例子:

先看wait_for:

async def foo(k=0):
await asyncio.sleep(30)
return k

async def exetask():
task=asyncio.create_task(foo(k=1))
try:
await asyncio.wait_for(task,timeout=1)
print(task.cancelled())
#3.7 改成當 aw 因超時被取消,wait_for 會等待 aw 被取消,3.7以前直接報異常,
except asyncio.TimeoutError:
print("timeout ")
# task.cancel()
print(task.cancelled())
asyncio.run(exetask())
結果:

C:\Python37\python.exe D:/workspace/AutoFate/src/commonutils/asyncutils.py
timeout
True

再看wait,這裏注意因爲waitexpect a list of futures, not Task,我換種寫法:

async def foo(k=0):
await asyncio.sleep(30)
return k


async def exetask():
task = asyncio.create_task(foo(k=1))
try:
#請注意wait函數不會引起 asyncio.TimeoutError
await asyncio.wait([task], timeout=1)
## 3.7 改成當 aw 因超時被取消,wait_for 會等待 aw 被取消,3.7以前直接報異常,
# await asyncio.wait_for(task,timeout=1)

except asyncio.TimeoutError:
print("timeout ")
print(task.cancelled())


asyncio.run(exetask())

結果:

C:\Python37\python.exe D:/workspace/AutoFate/src/commonutils/asyncutils.py
False

Process finished with exit code 0

 這裏完美的展示了阻塞的魅力!!!!!!!

 

 
第2種併發運行:
awaitable (*awsloop=Nonereturn_exceptions=False)
asyncio.gather

併發 運行 aws 序列中的 可等待對象

下面來看個簡單的例子:

async def count(k):
    print(f"start job{k} {time.asctime()} ")
    await asyncio.sleep(0.6)
    print(f"end job{k} {time.asctime()}")
    return k
async def mayns():
    r=await asyncio.gather(count(1),count(2))
    return r
def test():
    import time
    st=time.perf_counter()
    results=asyncio.run(mayns())
    elapsed=time.perf_counter()-st
    print(f"result return :{results}  execute in {elapsed:0.2} seconds.")

運行結果:

start job1 Fri Dec 13 23:00:38 2019
start job2 Fri Dec 13 23:00:38 2019
end job1 Fri Dec 13 23:00:39 2019
end job2 Fri Dec 13 23:00:39 2019
result return :[1, 2] execute in 0.6 seconds.

gather直接返回的是調用的task的全部result列表

固然你也能夠這樣蒐集任務:

 

最後介紹一下如何實現異步http請求:
# request 庫同步阻塞,aiohttp纔是異步的請求,pip install aiohttp
from aiohttp import ClientSession as session
async def  test2(k):
    r=await other_test(k)
    return r
async def  other_test(k):
    print("start await job %s,%s"%(k,time.asctime()))
    urls = ["http://127.0.0.1:8000/index", "http://127.0.0.1:8000/stuTable/"]
    async with session() as request:
        async with request.get(urls[0]) as rq:
            r=await  rq.read()
            res=r.decode("utf-8")
    print("end await job %s,%s"%(k,time.asctime()))
    return res

def test_aiohttp():
    klist=[100,50,88]
    loop=asyncio.get_event_loop()
    tasks=[asyncio.ensure_future(test2(k)) for k in klist]
    # loop.run_until_complete(asyncio.wait(tasks))
    #可經過asyncio.gather(*tasks)將響應所有收集起來
    res=loop.run_until_complete(asyncio.gather(*tasks))
    print(res)
    loop.close()

  結果:

C:\Python37\python.exe D:/workspace/AutoFate/src/commonutils/asyncutils.py
start await job 100,Fri Dec 13 23:54:31 2019
start await job 50,Fri Dec 13 23:54:31 2019
start await job 88,Fri Dec 13 23:54:31 2019
end await job 88,Fri Dec 13 23:54:31 2019
end await job 50,Fri Dec 13 23:54:31 2019
end await job 100,Fri Dec 13 23:54:31 2019
['{"user": "test001", "msg": "this is test index view "}', '{"user": "test001", "msg": "this is test index view "}', '{"user": "test001", "msg": "this is test index view "}']

Process finished with exit code 0


服務是本身用djangO起的:
from django.shortcuts import render# Create your views here.from django.http import HttpResponseimport jsonfrom . models import Student,Gradefrom django.db import modelsdef index(request):    data={"user":"test001","msg":"this is test index view "}    js=json.dumps(data)    return HttpResponse(js)def stuTable(request):    import time    time.sleep(3)    # stu_object=Student.objects.all()    # return  render(request,template_name="index.html",context={"student_object":stu_object})    return HttpResponse(json.dumps({"A":888,"NN":899}))
相關文章
相關標籤/搜索