深刻Asyncio(二)從線程到協程

線程的真相

多線程並非一無可取,在實際問題中,要權衡優劣勢來選擇多線程、多進程或是協程。協程爲多線程的某些問題提供了一種解決方案,因此學習協程首先要對線程有必定了解。python


多線程優勢

  1. 代碼可讀性
    多線程代碼即便是併發執行的,但依然能夠線性閱讀,可讀性高。
  2. 共享內存
    在多核CPU中仍然共享內存數據,這對解決某些問題很重要,避免了數據通訊。
  3. 很容易對現有代碼進行改造
    有不少多線程編程的實例,也有不少阻塞程序依賴多線程的代碼參考。

在Python中,因爲GIL的存在,並行執行依然是不可能的(CPython解釋器)。GIL帶來的一個影響就是,全部的線程只能用到一個CPU核心,這幾乎斷絕了多線程並行的優點。編程

使用多線程的最佳方式就是使用concurrent.futures庫的ThreadPoolExecutor(),將全部的數據丟到submit()方法中。bash

from concurrent.futures import ThreadPoolExecutor as Executor

def worker(data):
    pass

with Executor(max_workers=10) as exe:
    future = exe.submit(worker, data)

要關閉線程則執行Executor.shutdown(wait=True),它容許等待1-2秒來等線程執行完畢。對於上述例子,要儘量地在worker()函數中不使用全局變量。多線程


多線程的背景

爲了文章的完整性,即便讀者早就有所瞭解,在這裏仍是要補充:
1. 多線程編程中出現的bug是最難修復的bug,根據經驗,設計一款新軟件可能不容易出現這些問題,但對於一些現存軟件的維護,即便專家也沒辦法解決這些問題;
2. 線程是資源密集型的,須要額外的系統資源來建立,如爲每一個線程預分配內存棧同時會提早消耗進程的虛擬內存,能夠經過threading.stack_size([size])修改棧大小,但對於函數遞歸嵌套調用的棧深度有影響;
3. 在高併發環境中,因爲上下文切換成本,會對吞吐量形成影響;
4. 多線程不夠靈活,全部的線程共享CPU時間,而無論線程是否準備工做。併發

總之,多線程編程讓代碼bug難查,而且對於高併發場景並不高效。異步


例子:機器人與餐具

假設你開了一家餐館,你的僱員都是機器人,如今就僱員與餐具構建一個簡單的程序。async

# Robot
import threading
from queue import Queue

class ThreadBot(threading.Thread):  # 線程子類,繼承start/join等方法
    def __init__(self):
        super().__init__(target=self.manage_table)  # 目標函數,下面定義
        self.cutlery = Cutlery(knives=0, forks=0)   # 每一個機器人攜帶的餐具
        self.tasks = Queue()    # 機器人接收的任務被添加到任務隊列

    def manage_table(self): 
        while True:   # 機器人只接受三種工做
            task = self.tasks.get()
            if task == 'prepare table':
                kitchen.give(to=self.cutlery, knives=4, forks=4)
            elif task == 'clear table':
                self.cutlery.give(to=kitchen, knives=4, forks=4)
            elif task == 'shutdown':
                return
# Cutlery
from attr import attrs, attrib  # 開源庫,不影響線程或協程,使實例屬性的初始化更輕鬆

@attrs
class Cutlery:
    knives = attrib(default=0)
    forks = attrib(default=0)

    def give(self, to: 'Cutlery', knives=0, forks=0):   # 用於與其它實例交互
        self.change(-knives, -forks)
        to.change(knives, forks)

    def change(self, knives, forks):
        self.knives += knives
        self.forks += forks

kitchen = Cutlery(knives=100, forks=100)
bots = [ThreadBot() for i in range(10)]    # 建立了10個線程機器人

import sys

for bot in bots:
    for i in range(int(sys.argv[1])):   # 從命令行獲取桌子的數量,而後給每一個機器人安排全部桌子的任務
        bot.tasks.put('prepare table')
        bot.tasks.put('clear table')
    bot.tasks.put('shutdown')

print(f'Kitchen inventory before service: {kitchen}')
for bot in bots: bot.start()
for bot in bots: bot.join()
print(f'Kitchen inventory after service: {kitchen}')

指望是通過程序運行後,全部的刀叉都應該回到廚房而且數量與初始同樣。函數

λ  python test.py 100
Kitchen inventory before service: Cutlery(knives=100, forks=100)
Kitchen inventory after service: Cutlery(knives=100, forks=100)

λ  python test.py 10000
Kitchen inventory before service: Cutlery(knives=100, forks=100)
Kitchen inventory after service: Cutlery(knives=104, forks=80)

能夠看到,在提供10000張桌子後,結果出現了嚴重的錯誤,實際上即便嘗試屢次都不樂觀。咱們知道這些機器人構造良好,也不會出現錯誤,那麼是什麼地方錯了呢?高併發

回憶下場景:代碼簡單易讀,邏輯沒錯,甚至用100張桌子進行了測試,但10000張桌子測試就失敗了,而且錯誤每次都不同。oop

其實這是典型的競態條件bug,錯誤出如今這一段代碼中:

def change(self, knives, forks):
    self.knives += knives
    self.forks += forks

自加在C解釋器中運行並非原子的,它被分爲幾步:
1. 讀取原變量(self.knives)值到臨時變量;
2. 將knives值加到臨時變量;
3. 將終值賦值給原變量。

搶佔式多任務(多線程)會打亂上述步驟,可經過加鎖的方式修復bug:

def change(self, knives, forks):
    with self.lock:
        self.knives += knives
        self.forks += forks

但加鎖須要瞭解多線程代碼中,什麼地方會發生數據共享,若是是我的寫的程序比較好控制,但若是有第三方的代碼夾雜進來,就會很頭痛了。

光看源碼很難找出有競態條件,這主要是由於源碼裏沒有指出什麼時候何處切換線程,即便指出也沒用,由於切換是由操做系統決定的,它可能發生在任什麼時候間任何位置。

解決問題的一個辦法就是讓機器人不處理餐具,而是交由某一個單獨的線程去處理。

不過在協程中,咱們能夠顯式地知道上下文在什麼時候切換,由於await關鍵字很顯眼。

import asyncio

class CoroBot:  # 單線程管理多個機器人實例
    def __init__(self):
        self.cutlery = Cutlery(knives=0, forks=0)
        self.tasks = asyncio.Queue()    # 使用異步隊列

    async def manage_table(self):
        while True:
            task = await self.tasks.get()   # 關鍵點,協程惟一能夠切換的位置
            if task == 'prepare table':
                kitchen.give(to=self.cutlery, knives=4, forks=4)
            elif task == 'clear table':
                self.cutlery.give(to=kitchen, knives=4, forks=4)
            elif task == 'shutdown':
                return

from attr import attrs, attrib

@attrs
class Cutlery:
    knives = attrib(default=0)
    forks = attrib(default=0)

    def give(self, to: 'Cutlery', knives=0, forks=0):
        self.change(-knives, -forks)
        to.change(knives, forks)

    def change(self, knives, forks):
        self.knives += knives
        self.forks += forks

kitchen = Cutlery(knives=100, forks=100)
bots = [CoroBot() for i in range(10)]    # 建立了10個協程機器人,但它們是由一個線程管理的

import sys

for bot in bots:
    for i in range(int(sys.argv[1])):
        bot.tasks.put_nowait('prepare table')   # 異步寫入隊列
        bot.tasks.put_nowait('clear table')
    bot.tasks.put_nowait('shutdown')

print(f'Kitchen inventory before service: {kitchen}')
loop = asyncio.get_event_loop()
tasks = [loop.create_task(bot.manage_table()) for bot in bots]
task_group = asyncio.gather(*tasks)
loop.run_until_complete(task_group)
print(f'Kitchen inventory after service: {kitchen}')

因爲只有一個位置提供切換協程,所以在運行過程當中不存在競態條件,結果也是明顯的,不管多少測試都能經過。

相關文章
相關標籤/搜索