簡單談談協程

前提:html

  1. 默認你已經對協程有了基本瞭解。
  2. 寫此文時,我自覺得我對協程瞭解仍是很不深刻,更多的是作個記錄,依據我的經驗,我相信再過段時間我會不忍看這篇文章的。
  3. 水平有限(太菜了),若是你發現文中有表述不當的地方,歡迎指正啦~

協程定義

協程的提出最先能夠追溯到 Melvin E. Conway 於 1963 年發佈的一篇論文《Design of a Separable Transition-Diagram Compiler》python

在這篇論文中,他提出將編譯器組織爲一組協程,這使得在調試中使用單獨的通道,而後在生產中運行單傳遞編譯器成爲可能。文章中有這樣一段話:c++

coroutines are subroutines all at the same level, each acting as if it were the master program when in fact there is no master program.git

這也許是協程最先的 描述了,不過,遺憾的是就目前來講,協程這一律念彷佛仍是缺失清晰明確並被廣泛承認的定義。所以你能看到各類語言/系統在具體實現時基於不一樣的理解和預期要求,方案也是不盡相同。在這裏,咱們結合 Marlin 博士在論文中總結的協程特徵來定義它:程序員

  • 協程的本地數據在連續調用中能得以存留
  • 當發生控制轉移時,原協程的執行被掛起,只有當控制權在稍後的某個階段從新回到進入該協程時,纔會從原先離開的地方繼續執行。

固然了,這種基於表象的描述仍是沒有明確其內在構造,留下了一些疑問,由這些疑問產生了對協程在不一樣角度的分類。github

  1. 基於協程間控制轉移機制的不一樣,分爲對稱協程和非對稱協程。
  2. 基於協程是否被實現爲堆棧結構,分爲堆棧協程 (stackful coroutine) 和無堆棧協程(stackless coroutine)。
  3. 另外還有基於其在語言中是做爲 first-class 對象提供仍是做爲受約束的結構來分類。

在這篇文章中,咱們主要關注於 stackful coroutine 和 stackless coroutine 的區別。編程

Why Coroutine?

咱們以生產者消費者模型來講明協程的使用場景,要注意的是,系統層級的協程爲了與語言中的實現加以區分,一般會叫另一個名字:纖程(fiber),這裏咱們仍是統稱協程了。promise

傳統的生產者消費者模型是假設一個進程擁有兩個線程,一個線程是消費者,一個線程是生產者,它們公用一份地址空間。咱們能夠經過這種方式簡單的實現生產者消費者模型。可是這種方式很是繁瑣也消耗性能,由於你須要不斷地進行生產者和消費者的上下文切換。而且這種線程的切換是由 OS 調度的,計算機上可能運行着不少的程序,調度器並不知道生產者和消費者之間的關係,極可能生產者早就完成了數據的生成,消費者卻遲遲不能獲得調度。瀏覽器

對於上述問題,一種解決方案是直接把它們放到一個線程上下文裏,寫一堆相互依賴檢查的代碼,但這反卻是一種回退了:markdown

  1. 咱們用多線程模型去解決生產者消費者問題,就是爲了實現生產者和消費者之間解耦和異步,但願它們能儘量的獨立工做而不考慮對方。
  2. 這種作法也並不天然,生產者和消費者更應該是一種相互協做的狀態,而不是有明確的 caller 和 callee

協程和線程的概念很類似,實際上在 OS 中,協程也被叫作用戶態線程,不過不一樣於線程的搶佔式調度執行,協程的主動讓出式調度執行起來更爲高效,由於從用戶態轉換到內核態的切換成本,須要維持的上下文也少得多。

import asyncio
import random

async def produce(queue, n):
    for item in range(n):
        # 生產一個項目,使用sleep模擬I/O操做
        print('producing item {} ->'.format(item)) 
        await asyncio.sleep(random.random())
        # 將項目放入隊列
        await queue.put(item)
    # 指示生產完畢
    await queue.put(None)

async def consume(queue):
    while True:
        # 等待來自生產者的項目
        item = await queue.get()
        if item is None:
            break
        # 消費這個項目,使用sleep模擬I/O操做
        print('consuming item {} <-'.format(item))
        await asyncio.sleep(random.random()) 

async def main():
    queue = asyncio.Queue()
    task1 = asyncio.create_task(produce(queue, 10))
    task2 = asyncio.create_task(consume(queue))
    await task1
    await task2

asyncio.run(main())
複製代碼

有棧協程和無棧協程

以咱們上面對協程的定義來講,你會發現實現協程的關鍵點在於數據流和控制流如何在跳轉中保持,咱們先不談保持,如何實現控制流的轉移呢?

  1. 調用一個新的函數

  2. 從當前函數返回

stackful

方案一就是有棧協程的思路,也是很是天然的一種想法,咱們讓每一個協程都擁有本身的堆棧和控制塊,在協程掛起以前,當前活動的協程的非易失性寄存器被存儲在協程的控制塊中。在恢復新激活的協程以前,從其關聯的控制塊中恢復,協程上下文切換經過棧空間的交換來實現。實際上你在 glibc/ucontext.h 中能看到 getcontextsetcontextswapcontextmakecontext 這幾個函數聲明,一些開源協程庫就是基於此實現的。

這裏牽扯到一個問題,棧空間如何分配?

一種方案是獨立內存棧,即給每一個協程分配一塊固定大小的棧空間,可是分配多少呢,少了容易棧溢出,多了容易浪費內存,交給程序員分配又增長了心智負擔。爲了防止爆棧,咱們每每要給每一個任務都分配知足上限的棧空間,任務切換時直接調整棧指針,可是一個線程同一時刻只有一個協程在運行呀。固然你也能夠放到堆上,可是顯然程序調用的開銷也會顯著增長。

另外一種方案是 共享內存棧,其作法是,咱們預先分配一塊佔用內存較大的棧空間做爲共享棧內存,在suspend 或者說 transfer 時,咱們會基於當前協程實際所用空間對其進行備份,也就是動態分配。共享棧相比獨立棧的劣勢時任務切換時須要進行較爲複雜的拷貝。

貼下:雲風的協程庫 coroutine 註釋版: github.com/chenyahui/A…

stackless

無棧協程採用的就是第二種方案了,你能夠將其理解基於對象模型實現的,這時協程的上下文就是對象的成員變量了。固然,你也能夠將其理解爲基於狀態機模型去實現的,以下:

struct coroutine {
    int i;
    int value = 0;
    void next() {
        switch(value) {
        case 0:
            return frist();
        case 1:
            return second();
    }
    void frist() {
        i = 0;
        value = 1;
    }
    void second() {
        i++;
        value = 2;
    }
};
複製代碼

因爲不須要切換棧幀,無棧協程的性能相比有棧協程會更好一點,內存空間的佔用也好得多,並且它執行的是傳統的函數調用函數返回,不須要手動修改棧指針,不會破壞 return stack buffer 跳轉預測,可是其實現須要編譯器的支持,總體上來講兼容性不若有棧協程。

誤解、補充

棧協程和無棧協程的區別點容易被誤解。

這裏的有棧、無棧是指協程是否被實現爲堆棧結構,換句話說當某個協程被掛起時,其是否存留在一個棧結構中。你能在不少文章以及論文中看到他們認爲有棧協程和無棧協程的差別在於可否在嵌套中掛起,實際上這是不許確的。

如何理解呢?

設想一下,咱們調用了一個 coroutine c,當它調用了常規例程 f 時,f 的調用幀要去哪呢?若是對 f() 的調用致使另外一個協程掛起,那麼f()自己的調用幀必須和 c() 的幀一塊兒保存。

基於 stackful 的協程方案,咱們是能夠作到這一點的,好比採用 Cactus Stack 的方案(以下圖) ,可是仍是會帶來不小的複雜性。 所以,很多語言的有棧協程也對其作了限制:要求每一個協程都放在詞法嵌套的最外層聲明。這個時候協程棧是徹底互不相交的,好比採用 stackful coroutine 的 Modula-2 就是如此。

仙人掌棧

而 stackless 的方案,協程 suspend 的時候根本就沒有常規的調用棧結構,要實現嵌套調用怎麼整呢。。。。很難

generator 和 async/await

迴歸到 js , 不少人會說 generator 和 async/await 就是無棧協程,其實這種說法不是太準確。

這點 python 的 pep 規範中也到了體現:www.python.org/dev/peps/pe…

引入 async/await 目的是實現一個心智模型簡單的,易用的並儘量接近同步的編程模型,更好的實現併發編程,而且明確的和生成器區別開來,消除生成器與協程之間的歧義。

生成器就是生成器,或者你叫它半協程(由於你能夠基於生成器去實現協程),咱們使用生成器更多的是側重於它提供的暫停和恢復執行函數的能力,叫協程就不太恰當了。

至於 async/await 是否是 generator 語法糖的問題,徹底看編譯器實現了。

咱們能夠經過 d8 --print-bytecode [path].js 看看 v8 是如何處理的 js 中 generator 函數的。

好比下面這段代碼:

function* testGenerator (){
  yield 1;
  yield 'a string'
}

function main(){
   let gen = testGenerator()
   gen.next()
   gen.next()
}

main()
複製代碼

生成的字節碼太長,這裏只截出部分(函數 testGenerator 部分字節碼 ),字節碼命令相關解釋,能夠直接看 源碼註釋

image-20210512152432445

經過 invokeInstrinsic[_GreateJSGeneratorObject] 建立 JSGenerator 實例,類 JSGenerator 的定義 戳這裏

咱們要關注的是 SuspendGeneratorResumeGenerator 這兩個命令。

IGNITION_HANDLER(SuspendGenerator, InterpreterAssembler) {
  Node* generator = LoadRegisterAtOperandIndex(0);
  TNode<FixedArray> array = CAST(LoadObjectField(
      generator, JSGeneratorObject::kParametersAndRegistersOffset));
  Node* closure = LoadRegister(Register::function_closure());
  Node* context = GetContext();
  RegListNodePair registers = GetRegisterListAtOperandIndex(1);
  Node* suspend_id = BytecodeOperandUImmSmi(3);

  Node* shared =
      LoadObjectField(closure, JSFunction::kSharedFunctionInfoOffset);
  TNode<Int32T> formal_parameter_count = UncheckedCast<Int32T>(
      LoadObjectField(shared, SharedFunctionInfo::kFormalParameterCountOffset,
                      MachineType::Uint16()));

  ExportParametersAndRegisterFile(array, registers, formal_parameter_count);
  StoreObjectField(generator, JSGeneratorObject::kContextOffset, context);
  StoreObjectField(generator, JSGeneratorObject::kContinuationOffset,
                   suspend_id);

  Node* offset = SmiTag(BytecodeOffset());
  StoreObjectField(generator, JSGeneratorObject::kInputOrDebugPosOffset,
                   offset);

  UpdateInterruptBudgetOnReturn();
  Return(GetAccumulator());
}
複製代碼

簡單來講,SuspendGenerator 處理函數會調用 LoadRegisterGetContext LoadObjectFieldStoreObjectField 等來保存狀態,並記錄字節碼的偏移量,而後直接把累加器(V8 是經過模擬物理機器來執行字節碼的,而且基於寄存器設計)的值給 return 了,也就是當前 generator 函數棧幀已經退出了,那暫停是怎麼實現的呢?

很簡單,你會發現字節碼處理函數最後一行基本都會調用 Dispatch 函數,執行該函數會取出當前函數生成的下一條字節碼來執行,SuspendGenerator 裏沒有執行 Dispatch,至於暫停位置,則是經過偏移量來記錄的。

IGNITION_HANDLER(ResumeGenerator, InterpreterAssembler) {
  Node* generator = LoadRegisterAtOperandIndex(0);
  Node* closure = LoadRegister(Register::function_closure());
  RegListNodePair registers = GetRegisterListAtOperandIndex(1);

  Node* shared =
      LoadObjectField(closure, JSFunction::kSharedFunctionInfoOffset);
  TNode<Int32T> formal_parameter_count = UncheckedCast<Int32T>(
      LoadObjectField(shared, SharedFunctionInfo::kFormalParameterCountOffset,
                      MachineType::Uint16()));

  ImportRegisterFile(
      CAST(LoadObjectField(generator,
                           JSGeneratorObject::kParametersAndRegistersOffset)),
      registers, formal_parameter_count);

  SetAccumulator(
      LoadObjectField(generator, JSGeneratorObject::kInputOrDebugPosOffset));

  Dispatch();
}
} 
複製代碼

ResumeGenerator 就基本是一個相反的過程了,恢復以前保存的狀態,調用 Dispatch 取出下一條字節碼繼續執行 generator 函數。

v8 中 async/await 是在 Generator 基礎上另外實現的。await 也會生成 SuspendGeneratorResumeGenerator 這兩條字節碼,

JSAsyncGeneratorObject 類 繼承了 JSGeneratorObject 類。

除了 Generator 這個地方還和 microtask 有關,簡單來講(這個地方實在看不下去代碼了,搜的),對於以下代碼,編譯器會建立一個 fulfilled 狀態的 promise,加入一個 microtask 到隊列中,保存執行狀態,暫停,而後遍歷微任務隊列時,就會恢復狀態並執行,放到瀏覽器中,其調度又會受到瀏覽器事件循環的影響。

async function testAsync(){
    const res = await 0
    return res
}

testAsync()
複製代碼

參考

相關文章
相關標籤/搜索