asyncio -- 協程庫

asyncio -- 協程庫

event_loop: 事件循環,至關一個無線循環,將函數註冊到這個時間循環中,知足條件時,調用適當的處理方法
            每個線程都有一個事件循環,主線程調用asyncio.get_event_loop()時建立循環,把異步任務放入run_until_complete()方法中,事件循環會安排協同程序的執行
coroutine: 協程對象類型,將協程對象註冊到時間循環中,它會被事件循環調用。能夠使用async關鍵字來定義一個方法,這個方法調用時不會當即執行,而是返回一個協程對象
task: 任務,對協程對象的進一步封裝,包含了任務的各個狀態,running,finished等。
    
# 關鍵字
async: 定義一個協程
await: 用來掛起阻塞方法的執行

例:

import asyncio
import time

async def do_task(x):
    print('start....')
    start=time.time()
    await asyncio.sleep(x)  # 也是一個協程
    print('共用時%s'%(time.time() - start))
loop = asyncio.get_event_loop()
loop.run_until_complete(do_task(3))
'''
結果: 
start....
共用時3.0001718997955322
'''

回調

# 例如IO操做,須要獲得返回的結果,能夠往future中添加回調來實現
import asyncio
import time

async def do_task(x):
    print('start....')
    start=time.time()
    await asyncio.sleep(x)
    print('共用時%s'%(time.time() - start))

def done_callback(result):   # result至關於一個對象
    print('執行回調函數')

result = asyncio.ensure_future(do_task(3))
result.add_done_callback(done_callback)

loop = asyncio.get_event_loop()
loop.run_until_complete(result)

多協程

# 先把協程存入列表中
coros = [do_task(1),do_task(2)]
# 而後把多個協程交給loop
loop.run_until_complete(asyncio.gather(*coros))
# 例
async def do_task(x):
    print('start....')
    start=time.time()
    await asyncio.sleep(x)
    print('共用時%s'%(time.time() - start))

coros = [do_task(2),do_task(3)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*coros))
'''
兩個協程併發,結束時間以耗時較長的協程爲準
start....
start....
共用時2.0001144409179688
共用時3.003171682357788
'''

多協程結果獲取

# 沒有loop的狀況
async def do_task(x):
    print('start....')
    start=time.time()
    await asyncio.sleep(x)
    # print('共用時%s'%(time.time() - start))
    return '共用時%s'%(time.time() - start)

futus = [asyncio.ensure_future(do_task(2)),asyncio.ensure_future(do_task(3))]
loop = asyncio.get_event_loop()
print(loop.run_until_complete(asyncio.gather(*futus)))

'''
結果:['共用時2.0001144409179688', '共用時3.003171682357788']
'''
# 已經有loop 的狀況  ?????
futus = [asyncio.ensure_future(do_task(2)),asyncio.ensure_future(do_task(3))]
result = await asyncio.gather(*futus)
print(str(result))

run_until_complete 和 run_forever

1. run_until_complete: 執行完當即結束 (內部也是調用run_forever)
2. run_forever : 執行完程序不結束,除非在協程調用中加 loop.stop()

import asyncio
import time
async def task(i):
    start = time.time()
    await asyncio.sleep(3)
    print('耗時%s'%(time.time()-start))
    loop.stop()

loop = asyncio.get_event_loop()
coro = task(3)
# loop.run_until_complete(coro)
asyncio.ensure_future(coro)
loop.run_forever()   # 協程中加loop.stop()


# 多個協程使用 run_forever問題: 第二個協程沒完,loop 就中止了
# 解決: gather 將多個協程合成一個future,添加回調,裏面終止loop
import asyncio
import time
async def task(i):
    start = time.time()
    await asyncio.sleep(3)
    print('耗時%s'%(time.time()-start))

def done_callback(loop,futu):
    print('callback 函數')
    loop.stop()
    
loop = asyncio.get_event_loop()
futus = asyncio.gather(task(2),task(3))  # 用gather合併
futus.add_done_callback(functools.partial(done_callback,loop))  # 調價回調
loop.run_forever()

把容易阻塞的函數task 定義成可調出協程

# 兩次task之間相差4秒
async def task():
    time.sleep(4)
    return datetime.datetime.now()
tasks = [
    task(),
    task()
]
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
print(results)
'''
結果:
[datetime.datetime(2019, 10, 24, 15, 8, 26, 758582), datetime.datetime(2019, 10, 24, 15, 8, 22, 758353)]
'''
# 把容易阻塞的函數task 定義成可調出協程
def task():
    time.sleep(4)
    return datetime.datetime.now()

async def fetch_async(func,*args):
    loop = asyncio.get_event_loop()
    future = loop.run_in_executor(None,func,*args)
    result = await future
    return result

tasks = [
    fetch_async(task),
    fetch_async(task)
]
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
print(results)

'''
結果:兩個task 幾乎同時結束
[datetime.datetime(2019, 10, 24, 15, 11, 28, 260963), datetime.datetime(2019, 10, 24, 15, 11, 28, 260963)]
'''

call_later()

# call_soon 來調用display_date
# display_data中用 call_later 每隔一分鐘調用dispaly_data,直到條件不知足
import asyncio
import datetime

def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop) # (delay,func,*args)
    else:
        loop.stop()
loop = asyncio.get_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()

其餘

# loop.close
loop.run_until_complete(do_task(3))
loop.close()
loop.run_until_complete(do_task(2))  # 此處異常,loop已經關閉
相關文章
相關標籤/搜索