python併發編程之asyncio協程(三)

協程實現了在單線程下的併發,每一個協程共享線程的幾乎全部的資源,除了協程本身私有的上下文棧;協程的切換屬於程序級別的切換,對於操做系統來講是無感知的,所以切換速度更快、開銷更小、效率更高,在有多IO操做的業務中能極大提升效率。html

系列文章

asyncio模塊建立協程

asyncio在python3.4後被內置在python中,使得python的協程建立變得更加方便。session

import asyncio
import os

# async 關鍵字定義一個協程
async def target_func1():
    print('the func start')
    print(os.getpid())
    print('the func end')

def run():
    # 建立一個協程對象
    coroutine = target_func1()
    # 建立一個事件循環
    loop = asyncio.get_event_loop()
    loop.run_until_complete(coroutine) # 將協程對象添加到事件循環,運行直到結束
    print(os.getpid())
    loop.close() # 關閉事件循環

def run1():
    # 建立一個事件循環
    loop = asyncio.get_event_loop()
    # 建立一個協程對象
    coroutine = target_func1(loop)
    loop.create_task(coroutine) # 建立一個任務並添加到事件循環中
    loop.run_forever()  # 開啓無限循環,須要在異步函數中調用stop()使中止
    loop.close()

if __name__ == '__main__':
    run()

# 結果
the func start
4876
the func end
4876

以上可知,全部的代碼段都是在一個進程的單線程中執行。多線程

asyncio模塊分析

  • coroutine

被async修飾的函數調用後會生成協程函數,能夠經過send喚醒執行。併發

async def target_func1():
    print('the func start')
    print(os.getpid())
    print('the func end')
coroutine = target_func1()

try:
    coroutine.send(None) # 喚醒協程
except StopIteration:
    print('xx')
    coroutine.close() # 關閉
  • async

async關鍵字能夠定義一個協程對象,被async修飾的函數變成了一個協程對象而不是一個普通的函數。

async def target_func1():
    pass

coroutine = target_func1()
print(coroutine)
  • await

await用於控制事件的執行順序,它只能在異步函數中使用,即被async關鍵字定義的協程函數,不然報錯。當執行到await時,當前協程掛起,轉而去執行await後面的協程,完畢後再回到當前協程繼續往下。

# async 關鍵字定義一個協程
async def target_func1():
    print('the func start')
    x = await target_func2() # 當前協程掛起
    print(x)
    print('the func end')
    return 1

async def target_func2():
    """
    目標函數2
    :return:
    """
    time.sleep(2)
    print('the func end2')
    return 0
  • 主要方法
asyncio.get_event_loop():建立一個事件循環,全部的異步函數都須要在事件循環中運行;
asyncio.ensure_future():建立一個任務
asyncio.gather(*fs):添加並行任務
asyncio.wait(fs):添加並行任務,能夠是列表
loop.run_until_complete(func):添加協程函數同時啓動阻塞直到結束
loop.run_forever():運行事件無限循環,直到stop被調用
loop.create_task():建立一個任務並添加到循環
loop.close():關閉循環
loop.time():循環開始後到當下的時間
loop.stop():中止循環
loop.is_closed() # 判斷循環是否關閉
loop.create_future():建立一個future對象,推薦使用這個函數而不要直接建立future實例
loop.call_soon() # 設置回調函數,不能接受返回的參數,須要用到future對象,當即回調
loop.call_soon_threadsafe() # 線程安全的對象
loop.call_later() # 異步返回後開始算起,延遲迴調
loop.call_at() # 循環開始多少s回調
loop.call_exception_handler() # 錯誤處理
  • 主要的類

Future:主要用來保存任務的狀態; Task:Future的子類,擴展了Future的功能;

# Future
from asyncio import Future
# future = Future()
# future.result() # 獲取任務的結果
# future.remove_done_callback(fn) # 刪除全部的回調函數並返回個數
# future.set_result('result') # 設置任務的結果,必須在result()以前執行,不然報錯
# future.exception() # 獲取任務的錯誤信息
# future.set_exception('bad') # 設置任務的錯誤信息
# future.add_done_callback('fn') # 添加回調函數

# Task
current_task():返回循環當前的任務,類方法
all_tasks():返回事件循環全部的任務
get_stack():獲取其餘協程的堆棧列表
print_stack:輸出其餘協程的堆棧列表
cancel:取消任務

實例

  • 添加多個任務到事件循環
async def target_func3(name):
    """
    :return:
    """
    await asyncio.sleep(1)
    print(name)
    return 0

def run1():
    # 建立一個事件循環
    loop = asyncio.get_event_loop()
    x = loop.run_until_complete(asyncio.gather(target_func3('A'),target_func3('B'),target_func3('C'),))
    print(x) # 等待返回結果,一個列表,按照事件添加的順序,可是計算的順序是不定的
    loop.close()

if __name__ == '__main__':
    run1()
  • 使用run_forever啓動循環獲取異步計算結果

run_forever()不能直接獲得異步函數的返回結果,須要使用Future類來做爲第三方保存結果,同時設置回調函數;

from asyncio import Future
from functools import partial

async def target_func0(name, future):
    """
    目標函數2
    :return:
    """
    time.sleep(1)
    print(name)
    future.set_result(name) # 設置返回結果

def got_result(loop, future):
    print(future.result()) # 處理結果
    loop.stop() # 循環中止

def run():
    loop = asyncio.get_event_loop()
    future = Future(loop=loop)
    res = asyncio.ensure_future(target_func0('A', future)) # 生成一個Task任務
    print(res)
    future.add_done_callback(partial(got_result, loop)) # 回調函數默認只能有一個參數future,必須使用偏函數
    # print(future.result()) # future上下文必須先調用future.set_result。
    loop.run_forever()
    loop.close()

if __name__ == '__main__':
    run()
  • 鏈協程

協程裏調用等待另外的協程完成後才能返回。

import asyncio
import time
# async 關鍵字定義一個協程
async def target_func1():
    print('the func start')
    x = await target_func2() # 等待協程完成,控制執行順序
    print(x)
    print('the func end')
    return 1

async def target_func2():
    """
    目標函數2
    :return:
    """
    time.sleep(2)
    print('the func end2')
    return 0

def run1():
    # 建立一個事件循環
    loop = asyncio.get_event_loop()
    x = loop.run_until_complete(target_func1())
    print(x)
    loop.close()
if __name__ == '__main__':
    run()
  • 普通回調實例
import asyncio
import time
from functools import partial

# async 關鍵字定義一個協程
async def target_func1():
    print('the func end')
    return 1

def get_res(loop):
    print('xxxx')
    loop.stop()

def run1():
    # 建立一個事件循環
    loop = asyncio.get_event_loop()
    loop.create_task(target_func1())
    # loop.call_soon(partial(get_res, loop)) # 設置回調函數,不能接受返回的參數,須要用到future對象
    # loop.call_soon_threadsafe() # 線程安全的對象
    # loop.call_later(delay=5, callback=partial(get_res, loop)) # 異步返回後開始算起,延遲5秒回調
    # loop.call_at(when=8000,callback=partial(get_res, loop)) # 循環開始第8秒回調
    # loop.call_exception_handler() # 錯誤處理
    loop.run_forever()
    loop.close()

if __name__ == '__main__':
    run1()

本地IO和網絡IO的異步使用

使用協程的目的是在系統發生io阻塞的時候,能夠交出CUP的控制權,讓其去執行其餘的任務。實際使用時通常的場景有本地IO和網絡IO。

  • 網絡IO
# 使用asyncio+aiohttp,若是想異步化,網絡請求須要拋棄requests包
import asyncio
import time
from aiohttp import ClientSession

async def target2():
    print('start2')
    async with ClientSession() as session:
        async with session.get(url='http://www.baidu.com') as rsp:
            data = await rsp.read()
    print('end2')
    return data

def run1():
    # 建立一個事件循環
    loop = asyncio.get_event_loop()
    tasks = [target2() for i in range(100)]
    ts = asyncio.gather(*tasks)
    t = time.time()
    loop.run_until_complete(ts)
    print(time.time()-t)
    loop.close()

if __name__ == '__main__':
    run1()
  • 本地io

核心思想:將文件讀寫的while循環換成事件循環。

可參考:https://github.com/lyyyuna/script_collection/blob/master/aysncfile/asyncfile.py

協程Queue

asyncio模塊也有本身的queue實現生產消費模式,只要有三種隊列:Queue(先進先出),PriorityQueue(優先級隊列),LifoQueue(棧),可是Queue不是線程安全的類,也就是說在多進程或多線程的狀況下不要使用這個隊列。

import asyncio
import time
from asyncio import Queue

# async 關鍵字定義一個協程
async def target_func1(q:Queue):
    for i in range(100):
        await q.put(i)

async def target_func2(q:Queue):
    for i in range(100):
        x = await q.get()
        print(x)

def run1():
    # 建立一個事件循環
    loop = asyncio.get_event_loop()
    q = Queue(100)
    task = asyncio.gather(target_func1(q), target_func2(q))
    loop.run_until_complete(task)
    loop.close()

if __name__ == '__main__':
    run1()

Queue的get(),join(),put()方法返回的都是協程,須要使用await關鍵字。

相關文章
相關標籤/搜索