聊一聊協程(上):從Node.js談起

本文已收錄 【修煉內功】躍遷之路

聊一聊協程(上).jpg

林中小舍.png

在上一篇文章 JVM 細說線程中已經介紹了應用程序常見的一些線程模型,本篇就上篇說起的協程作簡單的介紹

談到併發/異步,首先想到的可能即是線程/進程,Java在近20年的發展中從JDK1.2以後便採用1:1線程模型,Java在覈心類庫中提供了衆多異步API,可使多線程應用發揮強大的併發能力並得到不錯的性能javascript

現在,在不少高併發的場景下(如I/O密集型)操做系統的線程調度成爲了性能的瓶頸,每每cpu使用率及內存使用率還穩如泰山,但系統load已經堵到不行html

那,協程可以爲I/O密集型的場景帶來什麼幫助?本篇就從Node.js的異步API聊起前端

總有人會說,協程其實就是線程,只不過是換了一種寫法的語法糖,就如同Java8中的Lambda表達式,也總有人會說Lambda表達式只不過是匿名類的語法糖而已(見Java8 Lambda到底是不是匿名類的語法糖),然,非也vue

如上篇文章所述,N:M線程模型能夠解決N:1模型中阻塞問題,同時也能充分利用CPU的多核優點,這也是大部分協程實現的基礎java

N能夠理解爲用戶線程數,其數量根據業務邏輯須要而定,M能夠理解爲內核線程數,其數量固定(或相對固定),每個用戶線程都須要放到內核線程中才能執行,用戶線程的調度由應用程序管理(甚至能夠交由編程人員經過編寫程序管理)。而協程則能夠理解爲上述的用戶線程,一種更爲輕量級的線程node

Node.js架構特色

爲何恰恰選Node.js聊協程?這要從Node.js的架構特色提及~ git

Node.js是單線程麼?是,也不是~es6

nodejs_system.png

Node.js使用事件驅動及非阻塞I/O實現異步模型github

Node.js is a platform built on Chrome's JavaScript runtime for easily building fast, scalable network applications. Node.js uses an event-driven, non-blocking I/O model that makes it lightweight and efficient, perfect for data-intensive real-time applications that run across distributed devices.

APPLICATION爲咱們所編寫的應用層,其JS的解釋執行由V8引擎負責,這裏的執行線程只有一個,這也就是一般所說Node.js爲單線程的緣由,在編寫Node.js程序時沒有辦法建立子線程,同時若是有部分邏輯阻塞或者長時間運行,則會影響整個運行時npm

Node.js的高併發則依賴事件驅動(EVENT LOOP)及非阻塞I/O(LIBUV),在進行I/O操做時Node.js將任務交由UV線程池中的線程執行,並在事件隊列中註冊回調,在I/O操做完成時觸發回調繼續後續的動做,在整個I/O操做的過程當中並不會阻塞JS的解釋執行

  • Node.js的JavaScript解釋執行只有一個線程,阻塞或長時運行的邏輯會影響整個應用層的運行
  • I/O操做交由UV線程池處理(經過addones也能夠將CPU密集型的計算邏輯放到LIBUV線程池中執行),經過事件機制回調將結果返回給應用層處理

Node.js的工做線程數固定(可經過環境變量UV_THREADPOOL_SIZE指定),每一個工做線程對應一個內核線程,工做線程數能夠理解爲N:M線程模型中的M

Node.js應用層的異步任務由開發人員編寫,每一個異步任務能夠理解爲用戶線程,任務數對應於N:M線程模型中的N

因爲Node.js上述的特色(單執行線程,多工做線程),沒有過多的干擾,很是適合用來說述協程的概念及應用

異步編程

爲了按部就班地理解協程的概念,咱們從異步的常規實現方式一一提及,來說述協程的演變過程(對經常使用異步編程無感的可直接跳到協程一節)

回調

將函數作爲另外一個函數的入參傳入,並由調用方在合適的時機(處理完成或失敗)進行調用

// 獲取個人信息
let request = $.get(
  "user/info/me", 
  // 回調函數
  function(data) { 
    /* do something */
    }
);

// 2秒後取消請求(若是請求仍未返回)
setTimeout(
  // 回調函數
  function() {
    if (!!request) {
        request.abort()
    }
    }, 
  2000
);

回調能夠在必定程度上將主流程與異步邏輯分離,異步邏輯的處理不會阻塞主流程的執行,但回調也帶了一些問題

  • 異步執行的結果沒法有效地返回到主邏輯流程中(Java中可使用Future.get以阻塞方式等待異步結果)
  • 不良好的編程習慣,容易造成回調地獄Callback Hell

如,將文件夾下的全部圖片拼接成一張圖片

fs.readdir(source, function(err, files) {
  if (err) {
    console.error('Error finding files: ', err);
  } else {
    files.forEach(function(filename, fileIndex) {
      console.info(filename);
      gm(source + filename).size(function(err, values) {
        if (err) {
          console.error('Error identifying file size: ', err);
        } else {
          console.info(filename + ' : ' + values);
          aspect = values.width / values.height;
          widths.forEach(function(width, widthIndex) {
            height = Math.round(width / aspect);
            console.info('resizing ' + filename + 'to ' + height + 'x' + height);
            let destFile = dest + "w" + width + '_' + filename;
            this.resize(width, height).write(destFile, function(err) {
              if (err) {
                console.error('Error writing file: ', err);
              } else {
                console.info('Writing file to: ', destFile);
              }
            });
          });
        }
      });
    });
  }
});

大量的回調函數嵌套在一塊兒,可閱讀性和可維護性都並不高

再如,多個http請求存在先後依賴關係,前一個請求的返回值做爲後一個請求的參數

$.get("step/1", (data1) => {
    $.get(`step/2/${data1}`, (data2) => {
        $.get(`step/3/${data2}`, (data3) => {
            /* do the final thing */
        })
    })
})

化解Callback Hell的方法有不少,其中最簡單的方式即是將代碼模塊化、扁平化

function step(url, then) {
    $.get(url, (data) => {
        then(data);
    });
}

function doFinalThing(data) {
  /* do the final thing */
}

// Thunk 化
function proxyStep(url, then) {
    return (data) => {
        step(`${url}/${data}`, then);
    }
}

let step3Proxy = proxyStep('step/3', doFinalThing);
let step2Proxy = proxyStep('step/2', step3Proxy);

step('step/1', step2Proxy);

Thunk的概念在下文會有介紹,在將邏輯進行抽象化、模塊化以後,代碼則會變得清晰起來

事件

事件是回調的另外一種形式,在代碼邏輯的分離上作的更爲完全

回調的事件化

// 定義事件
class EventGet {
    constructor(url = '') {
        this.url = url;
          // 事件完成時的回調
        this.onComplete = (data) => {}
    }
    
      // 觸發事件
    emitGet() {
        $.get(this.url, this.onComplete)
    }
}

let step1 = new EventGet('step/1')
let step2 = new EventGet('step/2')
let step3 = new EventGet('step/3')

step1.onComplete = (data) => {
  // step1完成時觸發step2
  step2.url += `/${data}`;
  step2.emitGet();
}

step2.onComplete = (data) => {
  // step2完成時觸發step3
  step3.url += `/${data}`;
  step3.emitGet();
}

step3.onComplete = (data) => {
  /* do the final thing */
}

// 觸發step1
step1.emitGet();

事件相比於單純的回調更爲語義化,也更容易表達程序所要執行的邏輯

這裏是否讓你想起了Java中的CompletableFuture?但請再次注意,Node.js中應用層的解釋執行只有一個線程,每次GET請求並不是建立了一個內核線程去執行,而是交給了UV線程池,由事件機制來回調onComplete函數處理請求的結果

其實,若是作過前端開發,隨處均可以看到回調或事件的使用

<nz-button-group>
  <button nz-button nzType="default" (click)="cancle()">取消</button>
  <button nz-button nzType="danger" (click)="delete()">刪除</button>
</nz-button-group>
function clickHandler() {
  /* do something when clicking */
}
var btn = document.getElementById("btn");
btn.onclick = clickHandler;
btn.addEventListener("click", clickHandler, false);

Promise

Promise 是異步編程的一種解決方案,比傳統的解決方案(回調函數和事件)更合理、更強大

Promise有三個狀態,pending(進行中)、fulfilled(已成功)和rejected(已失敗),pending能夠轉換爲fulfilledrejected其中之一,且狀態一旦轉換就不會再變

promise.png

promise的使用詳見ex6-promise,這裏不贅述,若是將上述回調或事件的示例轉爲promise的方式,能夠編寫以下

function promiseGet(url) {
    return new Promise((resolve, reject) => {
        $.get(url, resolve, reject);
    });
}

// 回調嵌套改成鏈式調用
promiseGet(
  "step/1"
).then((data1) => {
    return promiseGet(`step/2/${data1}`)  
}).then((data2) => {
  return promiseGet(`step/3/${data2}`)
}).then((data3) => {
  /* do the final thing*/
}).catch((e) => {
  /* handle exception */
}).finally(() => {
    /* do finnaly */
});

promise可將先後有依賴關係的異步處理轉換爲鏈式調用的形式,一樣是回調,卻能夠大大避免Callback Hell,並使調用邏輯更加清晰

同時,promise還能夠輕鬆編寫並行代碼

function promiseGet(url) {
    return new Promise((resolve, reject) => {
        $.get(url, resolve, reject);
    });
}

Promise.all([
    promiseGet("user/info/張三"),
    promiseGet("user/info/李四"),
    promiseGet("user/info/趙五")
]).then((users) => {
    /* do something */
}).catch((e) => {
    /* handle exception */
}).finally(() => {
    /* do finnaly */
})

響應式

響應式編程做爲近幾年很火的一種編程範式,以一種流(Stream)的方式處理數據,響應式的概念十分龐大,這裏不作詳述,如下以一個rxjs示例展現響應式編程如何解耦異步邏輯

function getOnObserver(url, observer) {
  $.get(
    'user/info/張三', 
    (data) => {
      observer.next(data);
      observer.complete();
    },
    (error) => {
      observer.error(error)
    }
  );
}

// 建立流
let observable = Observable.create((observer) => {
    getOnObserver('user/info/張三',  observer);
    getOnObserver('user/info/李四',  observer);
    getOnObserver('user/info/趙五',  observer);
});

// 訂閱/消費流
observable.subscribe((data) => {
    /* do something for each result */
});

響應式編程的威力遠不止此,流式處理有豐富的api(可簡單參考Java8中Stream API)、背壓保護策略等等,經過其事件回調機制能夠在I/O密集型應用中一展身手

協程

在粗略瞭解了幾種常規的異步編程方式以後,從本節內容開始真正進入協程的範疇

generator

子程序(或者稱爲函數),在全部語言中都是層級調用,嚴格遵循線程棧的入棧出棧,子程序調用老是一個入口一個返回,調用順序是明確的

而協程的調用和子程序不一樣,協程看上去也是子程序,但執行過程當中協程內部可中斷,而後轉而執行別的子程序/協程,在適當的時候再返回來接着執行

generator 函數是 ES6 提供的一種異步編程解決方案,語法行爲與傳統函數徹底不一樣

function* helloGenerator() {
    yield console.log('hello');
    yield console.log('I\'m');
}

function* worldGenerator() {
    yield console.log('world');
    yield console.log('ManerFan');
}

let hello = helloGenerator();
let world = worldGenerator();

// 交替執行helloGenerator及worldGenerator
hello.next();
world.next();
hello.next();
world.next();

運行結果

hello
world
I'm
ManerFan

按照常理,在同一個線程中順序調用helloGeneratorworldGenerator,兩個函數均會按照調用順序完整的執行,按預期應該輸出

hello
I'm
world
ManerFan

在使用generator時,其next方法會在方法體內遇到yield關鍵字時暫停執行,交回該函數的執行權,相似於線程的掛起,所以generator也被稱之爲暫停函數

generator.png

generator函數能夠在內部使用yield關鍵字標識暫停點,generator函數的暫停、恢復執行可由應用程序靈活控制(內核線程的調度由系統控制),這與傳統函數的執行規則徹底不一樣,generator函數的調度權徹底交給了應用層

yield關鍵字除了標識暫停點以外,還能夠在恢復執行的時候傳值進來(generator更高階的用法詳見es6-generator

function* foo(x) {
    var y = 2 * (yield (x + 1));
    var z = yield (y / 3);
    return (x + y + z);
}

let f = foo(5);
let step1 = f.next();
console.log(step1); // { value:6, done:false }
let step2 = f.next(12);
console.log(step2); // { value:8, done:false }
let step3 = f.next(13); 
console.log(step3); // { value:42, done:true }

不論事件仍是Promise亦或響應式都離不開回調,事件將主流程與異步回調分離,Promise將異步回調轉爲鏈式回調,響應式將異步回調轉爲流式回調,當generator遇到異步回調會發生什麼?

如下,模擬定義$.get函數以下

let $ = {
  get(url, callback) {
    setTimeout(() => callback(url.substring(5)), 500);
  }
}

以上文回調嵌套爲例

// 回調方式
$.get("step/1", (data1) => {
  $.get(`step/2/${data1}`, (data2) => {
      $.get(`step/3/${data2}`, (data3) => {
          /* do the final thing */
      })
  })
})

利用generator可暫停、可恢復的能力,可在異步回調邏輯中觸發恢復下一步的動做,並將當前的異步處理結果帶回,以此將回調嵌套拉平,將異步回調邏輯寫出同步的順滑感,咱們稱之爲異步邏輯的「同步化」(同步的寫法,異步的執行

// 封裝異步調用
function get(url) {
  $.get(url, (data) => {
    // 觸發後續流程,並將數據代入後續流程
    req.next(data)
  })
}

// generator 異步邏輯同步化
function* asyncAsSync() {
  // 同步的寫法,異步的執行
  let result1 = yield get('step/1');
  let result2 = yield get(`step/2/${result1}`);
  let result3 = yield get(`step/3/${result2}`);
  console.log(result3);
  /* do the final thing */
}

// 生成generator
var req = asyncAsSync();
// 觸發一次
req.next();

// do something at the same time
console.log('do something at the same time when excute gets');

輸出

do something at the same time when excute gets
3/2/1

asyncAsSync函數中看似是同步的邏輯,實則每個yield get()都是一次異步調用,異步的結果經過req.next()帶回,而且asyncAsSync函數的調用並不會阻塞最後一行console.log的執行

generator_callback.png

generator的自動執行

在使用generator的過程當中其實並非很方便,generator函數的暫停與恢復須要使用程序控制,這對於編寫程序來講門檻會提升,那有沒有一種方法能夠自動的執行generator的next函數呢?

首先介紹generator自動執行使用的一種函數變形方式,柯里化(Thunk)

thunk函數的定義

// fn(arg1, arg2, arg3, ..., callback)
const thunk = function(fn) { 
  return function (...args) {
    return function (callback) {
      return fn.call(this, ...args, callback);
    }
  };
};

首先,被柯里化函數的入參知足如下規則

  • 由兩部分組成,參數及回調函數
  • 參數能夠有多個,回調函數只能有一個
  • 回調函數在入參的最後一個位置

柯里化將函數拆解成了兩部分,一部分爲只要參數入參的函數,一部分爲只有回調函數入參的函數,其使用方法以下

// $.get爲一函數,其入參爲url及回調
// 對$.get進行柯里化

// getThunk爲一函數,其入參爲url
const getThunk = thunk($.get);
// getInfoMe爲一函數,其入參爲回調函數
const getInfoMe = getThunk('user/info/me');
getInfoMe(function(data) => { 
    /* do something */ 
})

對thunk實現比較完美的庫參見npm-thunkify

使用thunk實現generator的自動執行器

// 自動執行器
function co(fn) {
  var gen = fn();

  // nextCallBack(data)函數的執行,來自
  // 1. 下方的首次顯示觸發
  // 2. yield值的回調觸發
  function nextCallBack(data) {
    // result的值爲一個接收回調函數的函數
    var result = gen.next(data); 
    if (result.done) return;
    // 執行result值中的函數,並將回調參數設置爲nextCallBack(data),異步遞歸回調
    // 當回調的時候繼續執行nextCallBack(data),並將執行結果代入
    result.value(nextCallBack); 
  }

  nextCallBack();
}
let getThunk = thunk($.get);

// generator 異步邏輯同步化
function* asyncAsSync() {
  let result1 = yield getThunk('step/1');
  let result2 = yield getThunk(`step/2/${result1}`);
  let result3 = yield getThunk(`step/3/${result2}`);
  console.log(result3);
  /* do the final thing */
}

co(asyncAsSync);

// do something at the same time
console.log('do something at the same time when excute gets');

輸出

do something at the same time when excute gets
3/2/1

getThunk(url)的執行結果爲一個函數,咱們記爲getStep(callback),該函數的入參爲一個回調函數callback,而callbak即$.get的回調函數(這裏很重要,參見thunk定義)

關鍵在於co中的nextCallBack(data),咱們記爲co#nextCallBack(data)

co#nextCallBack(data)會觸發一次yieldyeild的結果即getThunk(url)的執行結果getStep(callback),而getStep(callback)的入參又被設爲co#nextCallBack(data),執行異步遞歸回調

上述代碼的執行邏輯爲

  1. gen.next(data)

    執行 yield getThunk('step/1')獲得result,其值爲函數 getStep(callback)
  2. result.value(nextCallBack)

    執行 getStep(callback),並將callback入參設置爲 co#nextCallBack
  3. getStep(co#nextCallBack)

    執行 $.get('step/1'),當數據返回時執行回調函數 co#nextCallBack(data),其中data爲 $.get('step/1')的結果 1
  4. gen.next(1)

    執行 yield getThunk('step/2/1')獲得函數 getStep(callback)
  5. result.value(nextCallBack)

    執行 getStep(callback),並將callback入參設置爲 co#nextCallBack(data)
  6. getStep(co#nextCallBack)

    執行 $.get('step/2/1'),當數據返回時執行回調函數 co#nextCallBack(data),其中data爲 $.get('step/2/1')的結果 2/1
  7. 以此類推

generator的自動執行關鍵在於如下幾點

  • co#nextCallBack中觸發generator的next
  • yield的結果再也不是具體的值,而是柯里化後的函數(接收回調函數入參的函數)
  • 將上述柯里化後函數的入參設置爲co#nextCallBack,執行並遞歸回調
  • 遞歸回調會執行co#nextCallBack並將執行結果代入
  • 繼續觸發generator的next,恢復generator的執行,並將上一步的直接結果代入

generator的實際應用(koa)

將generator的精髓用到極致的還要當屬koa(koa2已經使用async改寫,再也不使用generator),它將http server端異步middleware的書寫體驗整個提高了一個層級

middleware相似於java servlet中的filter,其執行過程相似於剝洋蔥

filter.png

而當全部的middleware(包括核心core)都是異步的話,整個處理邏輯在各middleware之間的跳轉就變得複雜起來

koa使用generator的特性,巧妙實現了請求處理邏輯在各異步middleware間的靈活跳轉執行

如下,簡單模擬koa-middleware的實現邏輯

// 定義app
let app = {
  middlewares: [],

  core: function* (next) {
    console.log("excute core!");
    // yield 異步操做
    yield* next;
  },

  // 將多個middleware組合成鏈式結構
  compose(middlewares) {
    function* noop() {}
    return function* (next){
      var i = middlewares.length;
      var prev = next || noop();
      var curr;
  
      while (i--) {
        curr = middlewares[i];
        prev = curr.call(this, prev);
      }
  
      yield* prev;
    }
  },

  // 添加middleware
  use(middleware) {
    this.middlewares.push(middleware);
  },

  run() {
    let chain = this.compose([...this.middlewares, this.core]);
    co(chain);
  }
}
app.use(function* (next) {
  console.log("before middleware1");
  // yield 異步操做
  yield* next;
  console.log("after middleware1");
  // yield 異步操做
});

app.use(function* (next) {
  console.log("before middleware2");
  // yield 異步操做
  yield* next;
  console.log("after middleware2");
  // yield 異步操做
});

app.run();

輸出

before middleware1
before middleware2
excute core!
after middleware2
after middleware1

generator趕上Promise

以上,generator的自動執行依賴於thunk化,而thunk又很是生澀難懂,若是將generator與Promise結合,或許會更容易理解一些

function promiseGet(url) {
  return new Promise((resolve, _) => {
      $.get(url, resolve);
  });
}

function * asyncAsSync() {
  let result1 = yield promiseGet('step/1');
  let result2 = yield promiseGet(`step/2/${result1}`);
  let result3 = yield promiseGet(`step/3/${result2}`);
  console.log(result3);
  /* do the final thing */
}

function co(fn) {
  var g = fn();

  // nextCallBack(data)函數的執行,來自
  // 1. 下方的首次顯示觸發
  // 2. yield值的回調觸發
  function nextCallBack(data){
    // result的值爲Promise對象
    var result = g.next(data);
    if (result.done) return result.value;
    result.value.then(function(data){
      // Promise的then回調中,遞歸執行nextCallBack(data),並將執行結果代入
      nextCallBack(data);
    });
  }

  nextCallBack();
}

co(asyncAsSync);

// do something at the same time
console.log('do something at the same time when excute gets');

對generator自動執行封裝較好的看tj大神的tj-co

async/await

generator(function*)? yield? thunk? co? 想使用generator實在不要再複雜,不過好在es2017開始提供了async/await

function promiseGet(url) {
  return new Promise((resolve, _) => {
      $.get(url, resolve);
  });
}

// 異步代碼同步化
async function asyncAsSync() {
  let result1 = await promiseGet('step/1');
  let result2 = await promiseGet(`step/2/${result1}`);
  let result3 = await promiseGet(`step/3/${result2}`);
  console.log(result3);
  /* do the final thing */
}

asyncAsSync();

// do something at the same time
console.log('do something at the same time when excute gets');

輸出

do something at the same time when excute gets
3/2/1

沒有了生澀的thunk,沒有了燒腦的自動執行器,代碼得以變得更加清爽

簡單來說,async其實就是generator的語法糖

  • 使用async替代generator的標星函數function*
  • 使用await替代yield
  • await後可跟普通值、普通函數及Promise對象
  • async自帶自動執行器

async/await相比generator + thunk/Promise + co的方案,更加語義化,也更容易理解

藉助Promise的能力,還能夠異步並行處理數據

function promiseGet(url) {
    return new Promise((resolve, reject) => {
        $.get(url, resolve, reject);
    });
}

async function asyncRun() {
    let names = await Promise.all([
        promiseGet("user/info/張三"),
        promiseGet("user/info/李四"),
        promiseGet("user/info/趙五")
    ])
    names.forEach((name) => console.log(name));
}

asyncRun().catch((e) => { /* handle exception */})

// do something at the same time
console.log('do something at the same time when excute gets');

輸出

do something at the same time when excute gets
張三
李四
趙五

更多async的用法詳見async/await

使用async/awaitkoa middleware處理邏輯能夠簡單模擬以下

// 定義app
let app = {
  middlewares: [],

  core: async (next) => {
    console.log("excute core!");
    // await 異步操做
    await next;
  },

  // 將多個middleware組合成鏈式結構
  compose(middlewares) {
      var i = middlewares.length;
      var prev = Promise.resolve();
      var curr;
  
      while (i--) {
        curr = middlewares[i];
        prev = curr.bind(this, prev);
      }

      return prev;
  },

  // 添加middleware
  use(middleware) {
    this.middlewares.push(middleware);
  },

  run() {
    let chain = this.compose([...this.middlewares, this.core]);
    chain();
  }
}
app.use(async (next) => {
  console.log("before middleware1");
  // await 異步操做
  await next();
  console.log("after middleware1");
  // await 異步操做
});

app.use(async (next) => {
  console.log("before middleware2");
  // await 異步操做
  await next();
  console.log("after middleware2");
  // await 異步操做
});

app.run();

輸出

before middleware1
before middleware2
excute core!
after middleware2
after middleware1

總結

  • 傳統異步編程須要藉助同步阻塞等待、回調函數、事件等方式獲取異步執行結果
  • generator(協程)可將子程序的暫停、恢復等調度權交給應用層,而且能夠在同一個協程上下文中將子程序放到不一樣的內核線程中執行(端應用場景較多,將UI渲染與後臺計算的執行線程隔離,避免後臺計算阻塞UI渲染,形成假死)
  • 使用自動執行器(generator + thunk + co / async + await + promise),可將回調編碼方式拉平,以同步的寫法編寫異步執行邏輯
  • 使用N:M線程模型,固定(或相對固定)內核線程數,避免內核線程的建立、銷燬、調度、上下文切換等帶來的系統消耗,同時也打破了單進程可建立有限線程數的限制,以此提高系統吞吐率

訂閱號

相關文章
相關標籤/搜索