asyncio協程與併發

併發編程

Python的併發實現有三種方法。python

  1. 多線程
  2. 多進程
  3. 協程(生成器)

基本概念

串行:同時只能執行單個任務
並行:同時執行多個任務數據庫

在Python中,雖然嚴格說來多線程與協程都是串行的,但其效率高,在遇到阻塞時會將阻塞任務交給系統執行,經過合理調度任務,使得程序高效。編程

最高效的固然是多進程了,但因爲多進程依賴硬件配置,而且當任務量超過CPU核心數時,多進程會有進程上下文切換開銷,而這個開銷很大,因此不是最佳解決方案。安全

常見耗時場景

  1. CPU計算密集型
  2. 磁盤IO密集型
  3. 網絡IO密集型

CPU計算密集型

多線程對比單線程,因爲GIL的存在,切換線程須要不斷加鎖、釋放鎖,效率反而更低;多進程至關於多個CPU同時工做,所以效率很高。網絡

IO密集型

IO密集型能夠是磁盤IO、網絡IO、數據庫IO等,都屬於計算量小,IO等待浪費高。越是IO等待時間長,則多線程的優點相比單線程越明顯,多進程效率高但依賴配置資源。多線程

結論

單線程老是最慢的,多線程適合在IO密集型場景使用,多進程適合CPU計算要求高的場景下使用,多進程雖然老是最快的,但須要CPU資源支持。併發

多線程

Python建立多線程有兩種方法。app

  1. 函數

用函數建立多線程

from threading import Thread


def func():
    for i in range(2):
        print('Hello world!')
        sleep(1)


th1 = Thread(target=func)
th1.start()
th2 = Thread(target=func)
th2.start()

用類建立多線程

這個類必須繼承Thread,必須重載run()方法框架

from threading import Thread


class MyThread(Thread):
    def __init__(self):
        super().__init__()
        self.name = 'Bob'
    
    def run(self):
        for i in range(2):
            print('Hello world!')
            sleep(1)

th1 = MyThread()
th2 = MyThread()

th1.start()
th2.start()

經常使用方法

  • threading.Thread(target=func, args=())
    • start() # 啓動子線程
    • join() # 阻塞子線程
    • is_alive()/isAlive() # 判斷線程執行狀態,正在執行返回True,不然False
    • daemon # 設置線程是否隨主線程退出而退出,默認False
    • name # 設置線程名

線程鎖

import threading


lock = threading.Lock()    # 生成鎖,全局惟一

lock.acquire()    # 加鎖

lock.release()    # 釋放鎖

加鎖與解鎖必須成對出現,或者使用上下文管理器with來管理鎖。異步

可重入鎖

在Redis分佈式鎖中提到過,用於讓非阻塞線程重複得到鎖來發送或讀取數據,這裏的可重入鎖僅指讓同一線程能夠屢次獲取鎖。

import threading


rlock = threading.RLock()    # 生成可重入鎖

死鎖

死鎖一般有兩種。

  1. 同一線程內嵌套獲取同一把鎖,形成死鎖(解決方案是用可重入鎖)
  2. 多個線程不按順序同時得到多個鎖,形成死鎖(解決方案一是靠編程人員人工識別,二是對鎖排序)

GIL全局鎖

多進程是真正的並行,而多線程是僞並行,實際是多個線程交替執行。

遇到GIL影響性能的狀況,要麼考慮用多進程替代多線程,要麼更換Python解釋器。

線程通訊

經常使用線程通訊方法。

  1. threading.Event
  2. threading.Condition
  3. queue.Queue

Event事件

import threading


event = threading.Event()

event.clear()    # 重置event,使全部該event事件都處於待命狀態

event.wait()    # 等待接收event指令,決定是否阻塞程序執行

evnet.set()    # 發送event指令,全部該event事件的線程開始執行
import time

import threading


class MyThread(threading.Thread):
    def __init__(self, name, event):
        super().__init__()
        self.name = name
        self.event = event
    
    def run(self):
        self.event.wait()    # 等待event.set()才能執行下去
        time.sleep(1)
        print('{} Done'.format(self.name))

threads = []
event = threading.Event()

for i in range(5):
    threads.append(MyThread(event))

event.clear()    # 重置event,使event.wait()生效

for t in threads:
    t.start()

print('Waiting 3s')
time.sleep(3)

print('Awake all threads')
event.set()    # 發送event指令,全部綁定了event的線程開始執行

全部線程在調用start()方法後並不會執行完,而是在event.wait()處停住了,須要發送event.set()指令才能繼續執行。

Condition狀態

import threading


cond = threading.Condition()

cond.acquire()

cond.release()

cond.wait()    # 等待指令觸發,同時臨時釋放鎖,直到調用notify才從新佔有鎖

cond.notify()    # 發送指令

Condition與Event很相似,不過因爲wait()notify()能夠反覆調用,所以通常做爲編程人員可控調用鎖來使用,放在run()方法下。

Queue隊列

隊列是線程安全的,經過put()get()方法來操做隊列。

from queue import Queue

q = Queue(maxsize=0)    # 設置0表示無限長隊列

q.get(timeout=0.5)    # 阻塞程序,等待隊列消息,能夠設置超時時間

q.put()    # 發送消息

q.join()    # 等待全部消息被消費完

# 不經常使用但要了解的方法
q.qsize()    # 返回消息個數
q.empty()    # 返回bool值,隊列是否空
q.full()    # 返回bool值,隊列是否滿

Queue是FIFO隊列,還有queue.LifoQueuequeue.PriorityQueue

線程隔離

兩個線程的變量不能被相互訪問。

一般使用threading.local類來實現,該類的實例是一個字典型對象,直接經過key-value形式存入變量,如threading.local().name = 'bob'

若是想要實現一個線程內的全局變量或實現線程間的信息隔離,就使用local類。

線程池

多線程並非越多越好,由於在切換線程時會切換上下文環境(固然相比多進程的開銷要小的多),在量大時依然會形成CPU的開銷。

所以出現了線程池的概念,即預先建立好合適數量的線程,使任務能馬上使用。

經過concurrent.futures庫的ThreadPoolExecutor類來實現。

import time
import threading
from concurrent.futures import ThreadPoolExecutor


def target():
    for i in range(5):
        print('{}-{}\n'.format(threading.get_ident(), i)
        time.sleep(1)

pool = ThreadPoolExecutor(5)    # 線程池數量限制爲5

for i in range(100):
    pool.submit(target)    # 往線程中提交併運行

協程

學習協程,要先理解生成器,由於Python的協程是從生成器中誕生並演變到如今這個樣子的。

可迭代、迭代器、生成器

可迭代對象,其類或元類都實現了__iter__()方法,而該方法返回一個對象支持迭代,既能夠是string/list/tuple/dict等內置類型的對象,也能夠是本身寫的對象(這個對象的類實現了遍歷元素的__iter__方法)。

迭代器對象,可迭代對象是迭代器的基礎,迭代器只是比可迭代對象多了一個__next__()方法,這個方法讓咱們能夠再也不用for循環來獲取元素。

生成器對象,在迭代器的基礎上,實現了yield,至關於函數中的return,在每次for循環遍歷或調用next()時,都會返回一個值並阻塞等待下一次調用。

可迭代對象、迭代器都是將全部元素放在內存裏,而生成器則是須要時臨時生成元素,因此生成器節省時間、空間。

如何運行/激活生成器

兩個方法。

  1. next()
  2. send(None)

這兩個方法是等價的,但因爲send方法能夠傳值進去,因此在協程中大有用處。

生成器的執行狀態

經過inspect庫的getgeneratorstate方法獲取狀態信息。

  1. GEN_CREATED 等待開始執行
  2. GEN_RUNNING 解釋器正在執行(只有多線程中才能看到)
  3. GEN_SUSPENDED 在yield表達式處暫停
  4. GEN_CLOSED 執行結束

生成器的異常

StopIteration

從生成器過渡到協程:yield

生成器引入了函數暫停執行(yield)功能,後來又引入了向暫停的生成器發送信息的功能(send),並以此催生了協程。

協程是爲非搶佔式多任務產生子程序的計算機程序組件,協程容許不一樣入口點在不一樣位置暫停或開始執行程序。

協程和線程有類似點,多個協程之間與線程同樣,只會交叉串行執行;也有不一樣點,線程之間要頻繁切換,加鎖、解鎖,協程不須要。

協程經過yield暫停生成器,將程序的執行流程交給其它子程序,從而實現不一樣子程序之間的交替執行。

經過例子演示如何向生成器發送信息。

def func(n):
    index = 0
    while index < n:
        num = yield index    # 這裏分紅兩部分,yield index將index return給外部程序, num = yield接受外部send的信息並賦值給num
        if num is None:
            num = 1
        index += num


f = func(5)
print(next(f))    # 0
print(f.send(2))    # 2
print(next(f))    # 3
print(f.send(-1))    # 2

yield from語法

從Python3.3纔出現的語法。

yield from後面須要添加可迭代對象(迭代器、生成器固然知足要求)。

# 拼接一個可迭代對象
# 使用yield
astr = 'ABC'

alist = [1, 2, 3]

adict = dict(name='kct', age=18)

agen = (i for i in range(5))

def gen(*args):
    for item in args:
        for i in item:
            yield i

new_list = gen(astr, alist, adict, agen)

print("use yield:", list(new_list))

# 使用yield from

def gen(*args):
    for item in args:
        yield from item

new_flist = fgen(astr, alist, adict, agen)

print("use yield from:", list(new_flist))

能夠看出,使用yield from能夠直接從可迭代對象中yield全部元素,減小了一個for循環,代碼更簡潔,固然yield from不止作了這件事。

yield from後能夠接生成器,以此造成生成器嵌套,yield from就幫咱們處理了各類異常,讓咱們只需專心於業務代碼便可。

具體講解yield from前先了解幾個概念:

  1. 調用函數:調用委託生成器的代碼
  2. 委託生成器:包含yield from表達式的生成器函數
  3. 子生成器:yield from後接的生成器函數

舉個例子,實時計算平均值

# 子生成器
def average_gen():
    total = 0
    count = 0
    average = 0
    while True:
        num = yield average
        count += 1
        total += num
        average = total/count

# 委託生成器
def proxy_gen():
    while True:
        yield from average_gen()

# 調用函數
def main():
    get_average = proxy_gen()
    next(get_average)    # 第一次調用不傳值,讓子生成器開始運行
    print(get_average.send(10))    # 10
    print(get_average.send(20))    # 15
    print(get_average.send(30))    # 20

委託生成器的做用是在調用函數與子生成器之間創建一個雙向通訊通道,調用函數能夠send消息給子生成器,子生成器yield值也是直接返回給調用函數。

有時會在yield from前做賦值操做,這是用於作結束操做,改造上面的例子。

# 子生成器
def average_gen():
    total = 0
    count = 0
    average = 0
    while True:
        num = yield average
        if num is None:
            break
        count += 1
        total += num
        average = total/count
    return total, count, average    # 當協程結束時,調用return

# 委託生成器
def proxy_gen():
    while True:
        total, count, average = yield from average_gen()    # 只有子生成器的協程結束了纔會進行賦值,後面的語句纔會執行
        print('Count for {} times, Total is {}, Average is {}'.format(count, total, average))

# 調用函數
def main():
    get_average = proxy_gen()
    next(get_average)    # 第一次調用不傳值,讓子生成器開始運行
    print(get_average.send(10))    # 10
    print(get_average.send(20))    # 15
    print(get_average.send(30))    # 20
    get_average.send(None)    # 結束協程,若是後面再調用send,將會另起一協程

爲何不直接調用子生成器?

yield from作了全面的異常處理。直接調用子生成器,首先就要處理StopIteration異常,其次若子生成器不是協程生成器而是迭代器,則會有其它異常拋出,所以須要知道,委託生成器在這之中扮演着重要角色,不可忽略。

asyncio

asyncio是Python3.4引入的標準庫,直接內置對異步IO的支持。

雖然學了yieldyield from,但仍是不知如何入手去作併發,asyncio則是爲了提供這麼個框架來精簡複雜的代碼操做。

如何定義建立協程

經過前面學習,咱們知道調用函數/委託生成器/子生成器這三劍客中,子生成器就是協程,那麼asyncio如何來定義建立協程呢?

asyncio經過在函數定義前增長async關鍵字來定義協程對象,經過isinstance(obj, Coroutine)便可判斷是不是協程,這個協程類從collections.abc包導入。

咱們也知道,生成器是協程的基礎,那麼有什麼辦法將生成器變成協程來使用?

經過@asyncio.coroutine裝飾器能夠標記生成器函數爲協程對象,可是經過isinstance(obj, Generator)isinstance(obj, Coroutine)仍然能夠看到,這個生成器函數只是被標記爲協程了,但其本質依然是生成器。

重要概念

  1. event_loop事件循環,將協程註冊到時間循環中,當知足事件發生時調用相應的協程函數;
  2. coroutine協程,一個使用async關鍵字定義的函數,調用它不會當即執行函數,而是返回一個協程對象,這個協程對象須要註冊到事件循環中,由事件循環調用;
  3. future對象,表明未來執行或沒有執行的任務的結果,和task沒有本質區別;
  4. task任務,一個協程對象就是一個原生能夠掛起的函數,任務則是對協程的進一步包裝,其中包含任務的各類狀態,task對象是future的子類,它將coroutine和future聯繫在一塊兒,將coroutine封裝成一個future對象;
  5. async/await關鍵字,async定義一個協程,await用於掛起阻塞的異步調用接口(做用相似yield但不徹底是)。

協程工做流程

  1. 定義/建立協程對象
  2. 將協程轉換爲task
  3. 定義事件循環容器
  4. 把task任務扔進事件循環中並觸發
import asyncio

async def hello(name):
    print('Hello, ', name)

coroutine = hello('World')

# 建立事件循環
loop = asyncio.get_event_loop()

# 將協程轉換爲任務
task = loop.create_task(coroutine)

# 將任務放入事件循環對象中觸發
loop.run_until_complete(task)

awaityield

這二者都能實現暫停的效果,但功能是不兼容的,在生成器中不能用await,在async定義的協程中不能用yield

而且,yield from後可接可迭代對象、迭代器、生成器、future對象、協程對象,await後只能接future對象、協程對象。

建立future對象

前面咱們知道經過async能夠定義一個協程對象,那麼如何建立一個future對象呢?

答案是經過task,只須要建立一個task對象便可。

# 在前一個例子中,咱們先建立了事件循環,而後經過事件循環建立了task,咱們來測試下
import asyncio
from asyncio.futures import Future


async def hello(name):
    print('Hello, ', name)

coroutine = hello('World')

# 建立事件循環
loop = asyncio.get_event_loop()

# 將協程轉換爲任務
task = loop.create_task(coroutine)

print(isinstance(task, Future))    # 結果是True

# 不創建事件循環的方法
task = asyncio.ensure_future(coroutine)

print(isinstance(task, Future))    # 結果也是True

知道了建立future對象(也便是建立task對象)的方法,那麼咱們驗證下awaityield後接coroutine和future對象。

import sys
import asyncio


async def f1():
    await asyncio.sleep(2)
    return 'Hello, {}'.format(sys._getframe().f_code.co_name)


@asyncio.coroutine
def f2():
    yield from asyncio.sleep(2)
    return 'Hello, {}'.format(sys._getframe().f_code.co_name)


async def f3():
    await asyncio.ensure_future(asyncio.sleep(2))
    return 'Hello, {}'.format(sys._getframe().f_code.co_name)


@asyncio.coroutine
def f4():
    yield from asyncio.ensure_future(asyncio.sleep(2))
    return 'Hello, {}'.format(sys._getframe().f_code.co_name)


tasks = [
    asyncio.ensure_future(f1()),
    asyncio.ensure_future(f2()),
    asyncio.ensure_future(f3()),
    asyncio.ensure_future(f4())
]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print(task.result())

loop.close()

綁定回調函數

異步IO都是在IO高的地方掛起,等IO操做結束後再繼續執行,大多數時候,咱們後續的代碼執行都是須要依賴IO的返回值的,此時就要用到回調了。

回調的實現有兩種方式。

第一種,利用同步編程實現的回調

這種方法要求咱們可以取得協程的await的返回值。經過task對象的result()方法能夠得到返回結果。

import time
import asyncio

async def _sleep(x):
    time.sleep(x)
    return 'Stopped {} seconds!'.format(x)

coroutine = _sleep(2)

loop = asyncio.get_event_loop()

task = asyncio.ensure_future(coroutine)

loop.run_until_complete(task)

# 直接經過task獲取任務結果
print('Result: {}'.format(task.result()))

第二種,經過asyncio自帶的添加回調函數功能實現

import time
import asyncio

async def _sleep(x):
    time.sleep(x)
    return 'Stopped {} seconds!'.format(x)

def callback(future):
    print('Result: {}'.format(future.result()))

coroutine = _sleep(2)

loop = asyncio.get_event_loop()

task = asyncio.ensure_future(coroutine)

# 添加回調函數
task.add_done_callback(callback)

loop.run_until_complete(task)

協程中的併發

asyncio實現併發,就須要多個協程來完成任務,前面作awaityield的驗證時就用了併發。

每當有任務阻塞的時候就await,而後其餘協程繼續工做。

第一步,建立多個協程的列表

# 協程函數
async def worker(n):
    print('Waiting: {}'.format(n))
    await asyncio.sleep(n)
    return 'Done {}'.format(n)

# 協程對象
c1 = worker(1)
c2 = worker(2)
c3 = worker(4)

# 協程轉換爲task
tasks = [
    asyncio.ensure_future(c1),
    asyncio.ensure_future(c2),
    asyncio.ensure_future(c3)
    ]

loop = asyncio.get_event_loop()

第二步,將列表註冊到事件循環中

有兩種方法,這兩種方法的區別後面說。

return的結果能夠經過task.result()查看。

# asyncio.wait()
loop.run_until_complete(asyncio.wait(tasks))

# asyncio.gather()
loop.run_until_complete(asyncio.gather(*tasks))    # *不能省略

# 查看結果
for task in tasks:
    print('Result: {}'.format(task.result()))

協程中的嵌套

使用async能夠定義協程,協程用於耗時的IO操做,咱們也能夠封裝更多的IO操做過程,實現一個協程中await另外一個協程,實現協程的嵌套。

# 內部協程函數
async def worker(n):
    print('Waiting: {}'.format(n))
    await asyncio.sleep(n)
    return 'Done {}'.format(n)

# 外部協程函數
async def main():
    c1 = worker(1)
    c2 = worker(2)
    c3 = worker(4)
    
    tasks = [
        asyncio.ensure_future(c1),
        asyncio.ensure_future(c2),
        asyncio.ensure_future(c3)
    ]
    
    dones, pendings = await asyncio.wait(tasks)
    
    for task in tasks:
        print('Result: {}'.format(task.result()))


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

若是外部協程使用的asyncio.gather(),那麼做以下替換。

results = await asyncio.gather(*tasks)

for result in results:
    print('Result: {}'.format(result))

協程中的狀態

講生成器時提到了四種狀態,對協程咱們也瞭解一下其狀態(準確地說是future/task對象的狀態)。

  1. Pending:建立Future,還未執行
  2. Running:事件循環正在調用執行任務
  3. Done:任務執行完畢
  4. Cancelled:Task被取消後的狀態

gather和wait

接收的參數不一樣

wait接收的tasks,必須是一個list對象,該list對象中存放多個task,既能夠經過asyncio.ensure_future轉爲task對象也能夠不轉。

gather也能夠接收list對象,但*不能省,也能夠直接將多個task做爲可變長參數傳入,參數能夠是協程對象或future對象。

返回結果不一樣

wait返回donespendings,前者表示已完成的任務,後者表示未完成的任務,須要經過task.result()手工獲取結果。

gather直接將值返回。

協程控制功能

# FIRST_COMPLETED:完成第一個任務就返回
# FIRST_EXCEPTION:產生第一個異常就返回
# ALL_COMPLETED:全部任務完成再返回(默認選項)
dones, pendings = loop.run_until_complete(
    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))

# 控制運行時間:1秒後返回
dones, pendings = loop.run_until_complete(
    asyncio.wait(tasks, timeout=1))

動態添加協程

asyncio中如何動態添加協程到事件循環中?

兩種方法,一種是同步的,一種是異步的。

import time
import asyncio
from queue import Queue
from threading import Thread

# 在後臺永遠運行的事件循環
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


def do_sleep(x, queue, msg=""):
    time.sleep(x)
    queue.put(msg)

queue = Queue()

new_loop = asyncio.new_event_loop()

t = Thread(target=start_loop, args=(new_loop,))
t.start()

print(time.ctime())

# 動態添加兩個協程
# 這種方法在主線程是同步的
new_loop.call_soon_threadsafe(do_sleep, 6, queue, 'First')
new_loop.call_soon_threadsafe(do_sleep, 3, queue, 'Second')

while True:
    msg = queue.get()
    print('{} is done'.format(msg))
    print(time.ctime())
import time
import asyncio
from queue import Queue
from threading import Thread

# 在後臺永遠運行的事件循環
def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def do_sleep(x, queue, msg=""):
    await asyncio.sleep(x)
    queue.put(msg)

queue = Queue()

new_loop = asyncio.new_event_loop()

t = Thread(target=start_loop, args=(new_loop,))
t.start()

print(time.ctime())

# 動態添加兩個協程
# 這種方法在主線程是異步的
asyncio.run_coroutine_threadsafe(do_sleep(6, queue, 'First'), new_loop)
asyncio.run_coroutine_threadsafe(do_sleep(3, queue, 'Second'), new_loop)

while True:
    msg = queue.get()
    print('{} is done'.format(msg))
    print(time.ctime())
相關文章
相關標籤/搜索