《Node.js設計模式》基於ES2015+的回調控制流

本系列文章爲《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版連接javascript

歡迎關注個人專欄,以後的博文將在專欄同步:html

Asynchronous Control Flow Patterns with ES2015 and Beyond

在上一章中,咱們學習瞭如何使用回調處理異步代碼,以及如何解決如回調地獄代碼等異步問題。回調是JavaScriptNode.js中的異步編程的基礎,可是如今,其餘替代方案已經出現。這些替代方案更復雜,以便可以以更方便的方式處理異步代碼。前端

在本章中,咱們將探討一些表明性的替代方案,PromiseGenerator。以及async await,這是一種創新的語法,可在高版本的JavaScript中提供,其也做爲ECMAScript 2017發行版的一部分。java

咱們將看到這些替代方案如何簡化處理異步控制流的方式。最後,咱們將比較全部這些方法,以瞭解全部這些方法的全部優勢和缺點,並可以明智地選擇最適合咱們下一個Node.js項目要求的方法。node

Promise

咱們在前面的章節中提到,CPS風格不是編寫異步代碼的惟一方法。事實上,JavaScript生態系統爲傳統的回調模式提供了有趣的替代方案。最着名的選擇之一是Promise,特別是如今它是ECMAScript 2015的一部分,而且如今能夠在Node.js中可用。mysql

什麼是Promise?

Promise是一種抽象的對象,咱們一般容許函數返回一個名爲Promise的對象,它表示異步操做的最終結果。一般狀況下,咱們說當異步操做還沒有完成時,咱們說Promise對象處於pending狀態,當操做成功完成時,咱們說Promise對象處於resolve狀態,當操做錯誤終止時,咱們說Promise對象處於reject狀態。一旦Promise處於resolvereject,咱們認爲當前異步操做結束。git

爲了接收到異步操做的正確結果或錯誤捕獲,咱們可使用Promisethen方法:github

promise.then([onFulfilled], [onRejected])
複製代碼

在前面的代碼中,onFulfilled()是一個函數,最終會收到Promise的正確結果,而onRejected()是另外一個函數,它將接收產生異常的緣由(若是有的話)。兩個參數都是可選的。redis

要了解Promise如何轉換咱們的代碼,讓咱們考慮如下幾點:算法

asyncOperation(arg, (err, result) => {
  if (err) {
    // 錯誤處理
  }
  // 正常結果處理
});
複製代碼

Promise容許咱們將這個典型的CPS代碼轉換成更好的結構化和更優雅的代碼,以下所示:

asyncOperation(arg)
  .then(result => {
    // 錯誤處理
  }, err => {
    // 正常結果處理
  });
複製代碼

then()方法的一個關鍵特徵是它同步地返回另外一個Promise對象。若是onFulfilled()onRejected()函數中的任何一個函數返回x,則then()方法返回的Promise對象將以下所示:

  • 若是x是一個值,則這個Promise對象會正確處理(resolve)x
  • 若是x是一個Promise對象或thenable,則會正確處理(resolve)x
  • 若是x是一個異常,則會捕獲異常(reject)x

注:thenable是一個具備then方法的相似於Promise的對象(Promise-like)。

這個特色使咱們可以鏈式構建Promise,容許輕鬆排列組合咱們的異步操做。另外,若是咱們沒有指定一個onFulfilled()onRejected()處理程序,則正確結果或異常捕獲將自動轉發到Promise鏈的下一個Promise。例如,這容許咱們在整個鏈中自動傳播錯誤,直到被onRejected()處理程序捕獲。隨着Promise鏈,任務的順序執行忽然變成簡單多了:

asyncOperation(arg)
  .then(result1 => {
    // 返回另外一個Promise
    return asyncOperation(arg2);
  })
  .then(result2 => {
    // 返回一個值
    return 'done';
  })
  .then(undefined, err => {
    // 捕獲Promise鏈中的異常
  });
複製代碼

下圖展現了鏈式Promise如何工做:

Promise的另外一個重要特性是onFulfilled()onRejected()函數是異步調用的,如同上述的例子,在最後那個then函數resolve一個同步的Promise,它也是同步的。這種模式避免了Zalgo(參見Chapter2-Node.js Essential Patterns),使咱們的異步代碼更加一致和穩健。

若是在onFulfilled()onRejected()處理程序中拋出異常(使用throw語句),則then()方法返回的Promise將被自動地reject,拋出異常做爲reject的緣由。這相對於CPS來講是一個巨大的優點,由於它意味着有了Promise,異常將在整個鏈中自動傳播,而且throw語句終於可使用。

在之前,許多不一樣的庫實現了Promise,大多數時候它們之間不兼容,這意味着不可能在使用不一樣Promise庫的thenable鏈式傳播錯誤。

JavaScript社區很是努力地解決了這個限制,這些努力致使了Promises / A +規範的建立。該規範詳細描述了then方法的行爲,提供了一個可互兼容的基礎,這使得來自不一樣庫的Promise對象可以彼此兼容,開箱即用。

有關Promises / A +規範的詳細說明,能夠參考Promises / A + 官方網站

Promise / A + 的實施

JavaScript中以及Node.js中,有幾個實現Promises / A +規範的庫。如下是最受歡迎的:

真正區別他們的是在Promises / A +標準之上提供的額外功能。正如咱們上述所說的那樣,該標準定義了then()方法和Promise解析過程的行爲,但它沒有指定其餘功能,例如,如何從基於回調的異步函數建立Promise

在咱們的示例中,咱們將使用由ES2015Promise,由於Promise對象自Node.js 4後便可使用,而不須要任何庫來實現。

做爲參考,如下是ES2015Promise提供的API:

constructor(new Promise(function(resolve, reject){})):建立了一個新的Promise,它基於做爲傳遞兩個類型爲函數的參數來決定resolvereject。構造函數的參數解釋以下:

  • resolve(obj)resolve一個Promise,並帶上一個參數obj,若是obj是一個值,這個值就是傳遞的異步操做成功的結果。若是obj是一個Promise或一個thenable,則會進行正確處理。
  • reject(err)reject一個Promise,並帶上一個參數err。它是Error對象的一個實例。

Promise對象的靜態方法

  • Promise.resolve(obj): 將會建立一個resolvePromise實例
  • Promise.reject(err): 將會建立一個rejectPromise實例
  • Promise.all(iterable):返回一個新的Promise實例,而且在iterable中所 有Promise狀態爲reject時, 返回的Promise實例的狀態會被置爲reject,若是iterable中至少有一個Promise狀態爲reject時, 返回的Promise實例狀態也會被置爲reject,而且reject的緣由是第一個被rejectPromise對象的reject緣由。
  • Promise.race(iterable):返回一個Promise實例,當iterable中任何一個Promiseresolve或被reject時, 返回的Promise實例以一樣的緣由resolvereject

Promise實例方法

  • Promise.then(onFulfilled, onRejected):這是Promise的基本方法。它的行爲與咱們以前描述的Promises / A +標準兼容。
  • Promise.catch(onRejected):這只是Promise.then(undefined,onRejected)的語法糖。

值得一提的是,一些Promise實現提供了另外一種機制來建立新的Promise,稱爲deferreds。咱們不會在這裏描述,由於它不是ES2015標準的一部分,可是若是您想了解更多信息,能夠閱讀Q文檔 (github.com/kriskowal/q…) 或When.js文檔 (github.com/cujojs/when…) 。

Promisifying一個Node.js回調風格的函數

JavaScript中,並非全部的異步函數和庫都支持開箱即用的Promise。大多數狀況下,咱們必須將一個典型的基於回調的函數轉換成一個返回Promise的函數,這個過程也被稱爲promisification

幸運的是,Node.js中使用的回調約定容許咱們建立一個可重用的函數,咱們經過使用Promise對象的構造函數來簡化任何Node.js風格的API。讓咱們建立一個名爲promisify()的新函數,並將其包含到utilities.js模塊中(以便稍後在咱們的Web爬蟲應用程序中使用它):

module.exports.promisify = function(callbackBasedApi) {
  return function promisified() {
    const args = [].slice.call(arguments);
    return new Promise((resolve, reject) => {
      args.push((err, result) => {
        if (err) {
          return reject(err);
        }
        if (arguments.length <= 2) {
          resolve(result);
        } else {
          resolve([].slice.call(arguments, 1));
        }
      });
      callbackBasedApi.apply(null, args);
    });
  }
};
複製代碼

前面的函數返回另外一個名爲promisified()的函數,它表示輸入中給出的callbackBasedApipromisified版本。如下展現它是如何工做的:

  1. promisified()函數使用Promise構造函數建立一個新的Promise對象,並當即將其返回給調用者。
  2. 在傳遞給Promise構造函數的函數中,咱們確保傳遞給callbackBasedApi,這是一個特殊的回調函數。因爲咱們知道回調老是最後調用的,咱們只需將回調函數附加到提供給promisified()函數的參數列表裏(args)。
  3. 在特殊的回調中,若是咱們收到錯誤,咱們當即reject這個Promise
  4. 若是沒有收到錯誤,咱們使用一個值或一個數組值來resolve這個Promise,具體取決於傳遞給回調的結果數量。
  5. 最後,咱們只需使用咱們構建的參數列表調用callbackBasedApi

大部分的Promise已經提供了一個開箱即用的接口來將一個Node.js風格的API轉換成一個返回Promise的API。例如,Q有Q.denodeify()和Q.nbind(),Bluebird有Promise.promisify(),而When.js有node.lift()。

順序執行

在一些必要的理論以後,咱們如今準備將咱們的Web爬蟲應用程序轉換爲使用Promise的形式。讓咱們直接從版本2開始,直接下載一個Web網頁的連接。

spider.js模塊中,第一步是加載咱們的Promise實現(咱們稍後會使用它)和Promisifying咱們打算使用的基於回調的函數:

const utilities = require('./utilities');
const request = utilities.promisify(require('request'));
const mkdirp = utilities.promisify(require('mkdirp'));
const fs = require('fs');
const readFile = utilities.promisify(fs.readFile);
const writeFile = utilities.promisify(fs.writeFile);
複製代碼

如今,咱們開始更改咱們的download函數:

function download(url, filename) {
  console.log(`Downloading ${url}`);
  let body;
  return request(url)
    .then(response => {
      body = response.body;
      return mkdirp(path.dirname(filename));
    })
    .then(() => writeFile(filename, body))
    .then(() => {
      console.log(`Downloaded and saved: ${url}`);
      return body;
    });
}
複製代碼

這裏要注意的到的最重要的是咱們也爲readFile()返回的Promise註冊 一個onRejected()函數,用來處理一個網頁沒有被下載的狀況(或文件不存在)。 還有,看咱們如何使用throw來傳遞onRejected()函數中的錯誤的。

既然咱們已經更改咱們的spider()函數,咱們這麼修改它的調用方式:

spider(process.argv[2], 1)
  .then(() => console.log('Download complete'))
  .catch(err => console.log(err));
複製代碼

注意咱們是如何第一次使用Promise的語法糖catch來處理源自spider()函數的任何錯誤狀況。若是咱們再看看迄今爲止咱們所寫的全部代碼,那麼咱們會驚喜的發現,咱們沒有包含任何錯誤傳播邏輯,由於咱們在使用回調函數時會被迫作這樣的事情。這顯然是一個巨大的優點,由於它極大地減小了咱們代碼中的樣板文件以及丟失任何異步錯誤的機會。

如今,完成咱們惟一缺失的Web爬蟲應用程序的第二版的spiderLinks()函數,咱們將在稍後實現它。

順序迭代

到目前爲止,Web爬蟲應用程序代碼庫主要是對Promise是什麼以及如何使用的概述,展現了使用Promise實現順序執行流程的簡單性和優雅性。可是,咱們如今考慮的代碼只涉及到一組已知的異步操做的執行。因此,完成咱們對順序執行流程的探索的缺失部分是看咱們如何使用Promise來實現迭代。一樣,網絡蜘蛛第二版的spiderLinks()函數也是一個很好的例子。

讓咱們添加缺乏的這一塊:

function spiderLinks(currentUrl, body, nesting) {
  let promise = Promise.resolve();
  if (nesting === 0) {
    return promise;
  }
  const links = utilities.getPageLinks(currentUrl, body);
  links.forEach(link => {
    promise = promise.then(() => spider(link, nesting - 1));
  });
  return promise;
}
複製代碼

爲了異步迭代一個網頁的所有連接,咱們必須動態建立一個Promise的迭代鏈。

  1. 首先,咱們定義一個空的Promiseresolveundefined。這個Promise只是用來做爲Promise的迭代鏈的起始點。
  2. 而後,咱們經過在循環中調用鏈中前一個Promisethen()方法得到的新的Promise來更新Promise變量。這就是咱們使用Promise的異步迭代模式。

這樣,循環的結束,promise變量會包含循環中最後一個then()返回的Promise對象,因此它只有當Promise的迭代鏈中所有Promise對象被resolve後才能被resolve

注:在最後調用了這個then方法來resolve這個Promise對象

經過這個,咱們已使用Promise對象重寫了咱們的Web爬蟲應用程序。咱們如今應該能夠運行它了。

順序迭代模式

爲了總結這個順序執行的部分,讓咱們提取一個模式來依次遍歷一組Promise

let tasks = [ /* ... */ ]
let promise = Promise.resolve();
tasks.forEach(task => {
  promise = promise.then(() => {
    return task();
  });
});
promise.then(() => {
  // 全部任務都完成
});
複製代碼

使用reduce()方法來替代forEach()方法,容許咱們寫出更爲簡潔的代碼:

let tasks = [ /* ... */ ]
let promise = tasks.reduce((prev, task) => {
  return prev.then(() => {
    return task();
  });
}, Promise.resolve());

promise.then(() => {
  //All tasks completed
});
複製代碼

與往常同樣,經過對這種模式的簡單調整,咱們能夠將全部任務的結果收集到一個數組中,咱們能夠實現一個mapping算法,或者構建一個filter等等。

上述這個模式使用循環動態地創建一個鏈式的Promise。

並行執行

另外一個適合用Promise的執行流程是並行執行流程。實際上,咱們須要作的就是使用內置的Promise.all()。這個方法創造了另外一個Promise對象,只有在輸入中的全部Promiseresolve時才能resolve。這是一個並行執行,由於在其參數Promise對象的之間沒有執行順序可言。

爲了演示這一點,咱們來看咱們的Web爬蟲應用程序的第三版,它將頁面中的全部連接並行下載。讓咱們再次使用Promise更新spiderLinks()函數來實現並行流程:

function spiderLinks(currentUrl, body, nesting) {
  if (nesting === 0) {
    return Promise.resolve();
  }
  const links = utilities.getPageLinks(currentUrl, body);
  const promises = links.map(link => spider(link, nesting - 1));
  return Promise.all(promises);
}
複製代碼

這裏的模式在elements.map()迭代中產生一個數組,存放全部異步任務,以後便於同時啓動spider()任務。這一次,在循環中,咱們不等待之前的下載完成,而後開始一個新的下載任務:全部的下載任務在一個循環中一個接一個地開始。以後,咱們利用Promise.all()方法,它返回一個新的Promise對象,當數組中的全部Promise對象都被resolve時,這個Promise對象將被resolve。換句話說,全部的下載任務完成,這正是咱們想要的。

限制並行執行

不幸的是,ES2015Promise API並無提供一種原生的方式來限制併發任務的數量,可是咱們老是能夠依靠咱們所學到的有關用普通JavaScript來限制併發。事實上,咱們在TaskQueue類中實現的模式能夠很容易地被調整來支持返回Promise的任務。這很容易經過修改next()方法來完成:

class TaskQueue {
  constructor(concurrency) {
    this.concurrency = concurrency;
    this.running = 0;
    this.queue = [];
  }

  pushTask(task) {
    this.queue.push(task);
    this.next();
  }

  next() {
    while (this.running < this.concurrency && this.queue.length) {
      const task = this.queue.shift();
      task().then(() => {
        this.running--;
        this.next();
      });
      this.running++;
    }
  }
}
複製代碼

不一樣於使用一個回調函數來處理任務,咱們簡單地調用Promisethen()

讓咱們回到spider.js模塊,並修改它以支持咱們的新版本的TaskQueue類。首先,咱們確保定義一個TaskQueue的新實例:

const TaskQueue = require('./taskQueue');
const downloadQueue = new TaskQueue(2);
複製代碼

而後,是咱們的spiderLinks()函數。這裏的修改也是很簡單:

function spiderLinks(currentUrl, body, nesting) {
  if (nesting === 0) {
    return Promise.resolve();
  }
  const links = utilities.getPageLinks(currentUrl, body);
  // 咱們須要以下代碼,用於建立Promise對象
  // 若是沒有下列代碼,當任務數量爲0時,將永遠不會resolve
  if (links.length === 0) {
    return Promise.resolve();
  }
  return new Promise((resolve, reject) => {
    let completed = 0;
    let errored = false;
    links.forEach(link => {
      let task = () => {
        return spider(link, nesting - 1)
          .then(() => {
            if (++completed === links.length) {
              resolve();
            }
          })
          .catch(() => {
            if (!errored) {
              errored = true;
              reject();
            }
          });
      };
      downloadQueue.pushTask(task);
    });
  });
}
複製代碼

在上述代碼中有幾點值得咱們注意的:

  • 首先,咱們須要返回使用Promise構造函數建立的新的Promise對象。正如咱們將看到的,這使咱們可以在隊列中的全部任務完成時手動resolve咱們的Promise對象。
  • 而後,咱們應該看看咱們如何定義任務。咱們所作的是將一個onFulfilled()回調函數的調用添加到由spider()返回的Promise對象中,因此咱們能夠計算完成的下載任務的數量。當完成的下載量與當前頁面中連接的數量相同時,咱們知道任務已經處理完畢,因此咱們能夠調用外部Promiseresolve()函數。

Promises / A +規範規定,then()方法的onFulfilled()和onRejected()回調函數只能調用一次(僅調用onFulfilled()和onRejected())。Promise接口的實現確保即便咱們屢次手動調用resolve或reject,Promise也僅能夠被resolve或reject一次。

如今,使用PromiseWeb爬蟲應用程序的第4版應該已經準備好了。咱們可能再次注意到下載任務如何並行運行,併發數量限制爲2。

在公有API中暴露回調函數和Promise

正如咱們在前面所學到的,Promise能夠被用做回調函數的一個很好的替代品。它們使咱們的代碼更具可讀性和易於理解。雖然Promise帶來了許多優勢,但也要求開發人員理解許多不易於理解的概念,以便正確和熟練地使用。因爲這個緣由和其餘緣由,在某些狀況下,比起Promise來講,不少開發者更偏向於回調函數。

如今讓咱們想象一下,咱們想要構建一個執行異步操做的公共庫。咱們須要作什麼?咱們是建立了一個基於回調函數的API仍是一個面向PromiseAPI?仍是二者均有?

這是許多知名的庫所面臨的問題,至少有兩種方法值得一提,使咱們可以提供一個多功能的API

requestredismysql這樣的庫所使用的第一種方法是提供一個簡單的基於回調函數的API,若是須要,開發人員能夠選擇公開函數。其中一些庫提供工具函數來Promise化異步回調,但開發人員仍然須要以某種方式將暴露的API轉換爲可以使用Promise對象。

第二種方法更透明。它還提供了一個面向回調的API,但它使回調參數可選。每當回調做爲參數傳遞時,函數將正常運行,在完成時或失敗時執行回調。當回調未被傳遞時,函數將當即返回一個Promise對象。這種方法有效地結合了回調函數和Promise,使得開發者能夠在調用時選擇採用什麼接口,而不須要提早進行Promise化。許多庫,如mongoosesequelize,都支持這種方法。

咱們來看一個簡單的例子。假設咱們要實現一個異步執行除法的模塊:

module.exports = function asyncDivision(dividend, divisor, cb) {
  return new Promise((resolve, reject) => { // [1]
    process.nextTick(() => {
      const result = dividend / divisor;
      if (isNaN(result) || !Number.isFinite(result)) {
        const error = new Error('Invalid operands');
        if (cb) {
          cb(error); // [2]
        }
        return reject(error);
      }
      if (cb) {
        cb(null, result); // [3]
      }
      resolve(result);
    });
  });
};
複製代碼

該模塊的代碼很是簡單,可是有一些值得強調的細節:

  • 首先,返回使用Promise的構造函數建立的新承諾。咱們在構造函數參數函數內定義所有邏輯。
  • 在發生錯誤的狀況下,咱們reject這個Promise,但若是回調函數在被調用時做爲參數傳遞,咱們也執行回調來進行錯誤傳播。
  • 在計算結果以後,咱們resolve了這個Promise,可是若是有回調函數,咱們也會將結果傳播給回調函數。

咱們如今看如何用回調函數和Promise來使用這個模塊:

// 回調函數的方式
asyncDivision(10, 2, (error, result) => {
  if (error) {
    return console.error(error);
  }
  console.log(result);
});

// Promise化的調用方式
asyncDivision(22, 11)
  .then(result => console.log(result))
  .catch(error => console.error(error));
複製代碼

應該很清楚的是,即將開始使用相似於上述的新模塊的開發人員將很容易地選擇最適合本身需求的風格,而無需在但願利用Promise時引入外部promisification功能。

Generators

ES2015規範引入了另一種機制,除了其餘新功能外,還能夠用來簡化Node.js應用程序的異步控制流程。咱們正在談論Generator,也被稱爲semi-coroutines。它們是子程序的通常化,能夠有不一樣的入口點。在一個正常的函數中,實際上咱們只能有一個入口點,這個入口點對應着函數自己的調用。Generator與通常函數相似,可是能夠暫停(使用yield語句),而後在稍後繼續執行。在實現迭代器時,Generator特別有用,由於咱們已經討論瞭如何使用迭代器來實現重要的異步控制流模式,如順序執行和限制並行執行。

Generators基礎

在咱們探索使用Generator來實現異步控制流程以前,學習一些基本概念是很重要的。咱們從語法開始吧。能夠經過在函數關鍵字以後附加*(星號)運算符來聲明Generator函數:

function* makeGenerator() {
  // body
}
複製代碼

makeGenerator()函數內部,咱們可使用關鍵字yield暫停執行並返回給調用者傳遞給它的值:

function* makeGenerator() {
  yield 'Hello World';
  console.log('Re-entered');
}
複製代碼

在前面的代碼中,Generator經過yield一個字符串Hello World暫停當前函數的執行。當Generator恢復時,執行將從下列語句開始:

console.log('Re-entered');
複製代碼

makeGenerator()函數本質上是一個工廠,它在被調用時返回一個新的Generator對象:

const gen = makeGenerator();
複製代碼

生成器對象的最重要的方法是next(),它用於啓動/恢復Generator的執行,並返回以下形式的對象:

{
  value: <yielded value>
  done: <true if the execution reached the end>
}
複製代碼

這個對象包含Generator yield的值和一個指示Generator是否已經完成執行的符號。

一個簡單的例子

爲了演示Generator,咱們來建立一個名爲fruitGenerator.js的新模塊:

function* fruitGenerator() {
  yield 'apple';
  yield 'orange';
  return 'watermelon';
}
const newFruitGenerator = fruitGenerator();
console.log(newFruitGenerator.next()); // [1]
console.log(newFruitGenerator.next()); // [2]
console.log(newFruitGenerator.next()); // [3]
複製代碼

前面的代碼將打印下面的輸出:

{ value: 'apple', done: false }
{ value: 'orange', done: false }
{ value: 'watermelon', done: true }
複製代碼

咱們能夠這麼解釋上述現象:

  • 第一次調用newFruitGenerator.next()時,Generator函數開始執行,直到達到第一個yield語句爲止,該命令暫停Generator函數執行,並將值apple返回給調用者。
  • 在第二次調用newFruitGenerator.next()時,Generator函數恢復執行,從第二個yield語句開始,這又使得執行暫停,同時將orange返回給調用者。
  • newFruitGenerator.next()的最後一次調用致使Generator函數的執行從其最後的yield恢復,一個返回語句,它終止Generator函數,返回watermelon,並將結果對象中的done屬性設置爲true

Generators做爲迭代器

爲了更好地理解爲何Generator函數對實現迭代器很是有用,咱們來構建一個例子。在咱們將調用iteratorGenerator.js的新模塊中,咱們編寫下面的代碼:

function* iteratorGenerator(arr) {
  for (let i = 0; i < arr.length; i++) {
    yield arr[i];
  }
}
const iterator = iteratorGenerator(['apple', 'orange', 'watermelon']);
let currentItem = iterator.next();
while (!currentItem.done) {
  console.log(currentItem.value);
  currentItem = iterator.next();
}
複製代碼

此代碼應按以下所示打印數組中的元素:

apple
orange
watermelon
複製代碼

在這個例子中,每次咱們調用iterator.next()時,咱們都會恢復Generator函數的for循環,經過yield數組中的下一個項來運行另外一個循環。這演示瞭如何在函數調用過程當中維護Generator的狀態。當繼續執行時,循環和全部變量的值與Generator函數執行暫停時的狀態徹底相同。

傳值給Generators

如今咱們繼續研究Generator的基本功能,首先學習如何將值傳遞迴Generator函數。這其實很簡單,咱們須要作的只是爲next()方法提供一個參數,而且該值將做爲Generator函數內的yield語句的返回值提供。

爲了展現這一點,咱們來建立一個新的簡單模塊:

function* twoWayGenerator() {
  const what = yield null;
  console.log('Hello ' + what);
}
const twoWay = twoWayGenerator();
twoWay.next();
twoWay.next('world');
複製代碼

當執行時,前面的代碼會輸出Hello world。咱們作以下的解釋:

  • 第一次調用next()方法時,Generator函數到達第一個yield語句,而後暫停。
  • next('world')被調用時,Generator函數從上次中止的位置,也就是上次的yield語句點恢復,可是此次咱們有一個值傳遞到Generator函數。這個值將被賦值到what變量。生成器而後執行console.log()指令並終止。

用相似的方式,咱們能夠強制Generator函數拋出異常。這能夠經過使用Generator函數的throw方法來實現,以下例所示:

const twoWay = twoWayGenerator();
twoWay.next();
twoWay.throw(new Error());
複製代碼

在這個最後這段代碼,twoWayGenerator()函數將在yield函數返回的時候拋出異常。這就好像從Generator函數內部拋出了一個異常同樣,這意味着它能夠像使用try ... catch塊同樣進行捕獲和處理異常。

Generator實現異步控制流

你必定想知道Generator函數如何幫助咱們處理異步操做。咱們能夠經過建立一個接受Generator函數做爲參數的特殊函數來演示這一點,並容許咱們在Generator函數內部使用異步代碼。這個函數在異步操做完成時要注意恢復Generator函數的執行。咱們將調用這個函數asyncFlow()

function asyncFlow(generatorFunction) {
  function callback(err) {
    if (err) {
      return generator.throw(err);
    }
    const results = [].slice.call(arguments, 1);
    generator.next(results.length > 1 ? results : results[0]);
  }
  const generator = generatorFunction(callback);
  generator.next();
}
複製代碼

前面的函數取一個Generator函數做爲輸入,而後當即調用:

const generator = generatorFunction(callback);
generator.next();
複製代碼

generatorFunction()接受一個特殊的回調函數做爲參數,當generator.throw()若是接收到一個錯誤,便當即返回。另外,經過將在回調函數中接收的results傳值回Generator函數繼續Generator函數的執行:

if (err) {
  return generator.throw(err);
}
const results = [].slice.call(arguments, 1);
generator.next(results.length > 1 ? results : results[0]);
複製代碼

爲了說明這個簡單的輔助函數的強大,咱們建立一個叫作clone.js的新模塊,這個模塊只是建立它自己的克隆。粘貼咱們剛纔建立的asyncFlow()函數,核心代碼以下:

const fs = require('fs');
const path = require('path');
asyncFlow(function*(callback) {
  const fileName = path.basename(__filename);
  const myself = yield fs.readFile(fileName, 'utf8', callback);
  yield fs.writeFile(`clone_of_${filename}`, myself, callback);
  console.log('Clone created');
});
複製代碼

明顯地,有了asyncFlow()函數的幫助,咱們能夠像咱們書寫同步阻塞函數同樣用同步的方式來書寫異步代碼了。而且這個結果背後的原理顯得很清楚。一旦異步操做結束,傳遞給每一個異步函數的回調函數將繼續Generator函數的執行。沒有什麼複雜的,可是結果確實很使人意外。

這個技術有其餘兩個變化,一個是Promise的使用,另一個則是thunks

在基於Generator的控制流中使用的thunk只是一個簡單的函數,它除了回調以外,部分地應用了原始函數的全部參數。返回值是另外一個只接受回調做爲參數的函數。例如,fs.readFile()的thunkified版本以下所示:

function readFileThunk(filename, options) {
  return function(callback) {
    fs.readFile(filename, options, callback);
  }
}
複製代碼

thunkPromise都容許咱們建立不須要回調的Generator函數做爲參數傳遞,例如,使用thunkasyncFlow()版本以下:

function asyncFlowWithThunks(generatorFunction) {
  function callback(err) {
    if (err) {
      return generator.throw(err);
    }
    const results = [].slice.call(arguments, 1);
    const thunk = generator.next(results.length > 1 ? results : results[0]).value;
    thunk && thunk(callback);
  }
  const generator = generatorFunction();
  const thunk = generator.next().value;
  thunk && thunk(callback);
}
複製代碼

這個技巧是讀取generator.next()的返回值,返回值中包含thunk。下一步是經過注入特殊的回調函數調用thunk自己。這容許咱們寫下面的代碼:

asyncFlowWithThunk(function*() {
  const fileName = path.basename(__filename);
  const myself = yield readFileThunk(__filename, 'utf8');
  yield writeFileThunk(`clone_of_${fileName}`, myself);
  console.log("Clone created")
});
複製代碼

使用co的基於Gernator的控制流

你應該已經猜到了,Node.js生態系統會藉助Generator函數來提供一些處理異步控制流的解決方案,例如,suspend是其中一個最老的支持PromisethunksNode.js風格回調函數和正常風格的回調函數的 庫。還有,大部分咱們以前分析的Promise庫都提供工具函數使得GeneratorPromise能夠一塊兒使用。

咱們選擇co做爲本章節的例子。它支持不少類型的yieldables,其中一些是:

  • Thunks
  • Promises
  • Arrays(並行執行)
  • Objects(並行執行)
  • Generators(委託)
  • Generator函數(委託)

還有不少框架或庫是基於co生態系統的,包括如下一些:

  • Web框架,最流行的是koa
  • 實現特定控制流模式的庫
  • 包裝流行的API兼容co的庫

咱們使用co從新實現咱們的Generator版本的Web爬蟲應用程序

爲了將Node.js風格的函數轉換成thunks,咱們將會使用一個叫作thunkify的庫。

順序執行

讓咱們經過修改Web爬蟲應用程序的版本2開始咱們對Generator函數和co的實際探索。咱們要作的第一件事就是加載咱們的依賴包,並生成咱們要使用的函數的thunkified版本。這些將在spider.js模塊的最開始進行:

const thunkify = require('thunkify');
const co = require('co');
const request = thunkify(require('request'));
const fs = require('fs');
const mkdirp = thunkify(require('mkdirp'));
const readFile = thunkify(fs.readFile);
const writeFile = thunkify(fs.writeFile);
const nextTick = thunkify(process.nextTick);
複製代碼

看上述代碼,咱們能夠注意到與本章前面promisify化的API的代碼的一些類似之處。在這一點上,有意思的是,若是咱們使用咱們的promisified版本的函數來代替thunkified的版本,代碼將保持徹底同樣,這要歸功於co支持thunkPromise對象做爲yieldable對象。事實上,若是咱們想,甚至能夠在同一個應用程序中使用thunkPromise,即便在同一個Generator函數中。就靈活性而言,這是一個巨大的優點,由於它使咱們可以使用基於Generator函數的控制流來解決咱們應用程序中的問題。

好的,如今讓咱們開始將download()函數轉換爲一個Generator函數:

function* download(url, filename) {
  console.log(`Downloading ${url}`);
  const response = yield request(url);
  const body = response[1];
  yield mkdirp(path.dirname(filename));
  yield writeFile(filename, body);
  console.log(`Downloaded and saved ${url}`);
  return body;
}
複製代碼

經過使用Generatorco,咱們的download()函數變得簡單多了。當咱們須要作異步操做的時候,咱們使用異步的Generator函數做爲thunk來把以前的內容轉化到Generator函數,並使用yield子句。

而後咱們開始實現咱們的spider()函數:

function* spider(url, nesting) {
  cost filename = utilities.urlToFilename(url);
  let body;
  try {
    body = yield readFile(filename, 'utf8');
  } catch (err) {
    if (err.code !== 'ENOENT') {
      throw err;
    }
    body = yield download(url, filename);
  }
  yield spiderLinks(url, body, nesting);
}
複製代碼

從上述代碼中一個有趣的細節是咱們可使用try...catch語句塊來處理異常。咱們還可使用throw來傳播異常。另一個細節是咱們yield咱們的download()函數,而這個函數既不是一個thunk,也不是一個promisified函數,只是另外的一個Generator函數。這也毫無問題,因爲co也支持其餘Generators做爲yieldables

最後轉換spiderLinks(),在這個函數中,咱們遞歸下載一個網頁的連接。在這個函數中使用Generators,顯得簡單多了:

function* spiderLinks(currentUrl, body, nesting) {
  if (nesting === 0) {
    return nextTick();
  }
  const links = utilities.getPageLinks(currentUrl, body);
  for (let i = 0; i < links.length; i++) {
    yield spider(links[i], nesting - 1);
  }
}
複製代碼

看上述代碼。雖然順序迭代沒有什麼模式能夠展現。Generatorco輔助咱們作了不少,方便了咱們可使用同步方式開書寫異步代碼。

看最重要的部分,程序的入口:

co(function*() {
  try {
    yield spider(process.argv[2], 1);
    console.log(`Download complete`);
  } catch (err) {
    console.log(err);
  }
});
複製代碼

這是惟一一處須要調用co(...)來封裝的一個Generator。實際上,一旦咱們這麼作,co會自動封裝咱們傳遞給yield語句的任何Generator函數,而且這個過程是遞歸的,因此程序的剩餘部分與咱們是否使用co是徹底無關的,雖然是被co封裝在裏面。

如今應該能夠運行使用Generator函數改寫的Web爬蟲應用程序了。

並行執行

不幸的是,雖然Generator很方便地進行順序執行,可是不能直接用來並行化執行一組任務,至少不能僅僅使用yieldGenerator。以前,在種狀況下咱們使用的模式只是簡單地依賴於一個基於回調或者Promise的函數,但使用了Generator函數後,一切會顯得更簡單。

幸運的是,若是不限制併發數的並行執行,co已經能夠經過yield一個Promise對象、thunkGenerator函數,甚至包含Generator函數的數組來實現。

考慮到這一點,咱們的Web爬蟲應用程序第三版能夠經過重寫spiderLinks()函數來作以下改動:

function* spiderLinks(currentUrl, body, nesting) {
  if (nesting === 0) {
    return nextTick();
  }
  const links = utilities.getPageLinks(currentUrl, body);
  const tasks = links.map(link => spider(link, nesting - 1));
  yield tasks;
}
複製代碼

可是上述函數所作的只是拿到全部的任務,這些任務本質上都是經過Generator函數來實現異步的,若是在cothunk內對一個包含Generator函數的數組使用yield,這些任務都會並行執行。外層的Generator函數會等到yield子句的全部異步任務並行執行後再繼續執行。

接下來咱們看怎麼用一個基於回調函數的方式來解決相同的並行流。咱們用這種方式重寫spiderLinks()函數:

function spiderLinks(currentUrl, body, nesting) {
  if (nesting === 0) {
    return nextTick();
  }
  // 返回一個thunk
  return callback => {
    let completed = 0,
      hasErrors = false;
    const links = utilities.getPageLinks(currentUrl, body);
    if (links.length === 0) {
      return process.nextTick(callback);
    }

    function done(err, result) {
      if (err && !hasErrors) {
        hasErrors = true;
        return callback(err);
      }
      if (++completed === links.length && !hasErrors) {
        callback();
      }
    }
    for (let i = 0; i < links.length; i++) {
      co(spider(links[i], nesting - 1)).then(done);
    }
  }
}
複製代碼

咱們使用co並行運行spider()函數,調用Generator函數返回了一個Promise對象。這樣,等待Promise完成後調用done()函數。一般,基於Generator控制流的庫都有這一功能,所以若是須要,你老是能夠將一個Generator轉換成一個基於回調或基於Promise的函數。

爲了並行開啓多個下載任務,咱們只要重用在前面定義的基於回調的並行執行的模式。咱們應該也注意到咱們將spiderLinks()轉換成一個thunk(而再也不是一個Generator函數)。這使得當所有並行任務完成時,咱們有一個回調函數能夠調用。

上面講到的是將一個Generator函數轉換爲一個thunk的模式,使之可以支持其餘的基於回調或基於Promise的控制流算法,並能夠經過同步阻塞的代碼風格書寫異步代碼。

限制並行執行

如今咱們知道如何處理異步執行流程,應該很容易規劃咱們的Web爬蟲應用程序的第四版的實現,這個版本對併發下載任務的數量施加了限制。咱們有幾個方案能夠用來作到這一點。其中一些方案以下:

  • 使用先前實現的基於回調的TaskQueue類。咱們只須要thunkify咱們的Generator函數和其提供的回調函數便可。
  • 使用基於PromiseTaskQueue類,並確保每一個做爲任務的Generator函數都被轉換成一個返回Promise對象的函數。
  • 使用asyncthunkify咱們打算使用的工具函數,此外還須要把咱們用到的Generator函數轉化爲基於回調的模式,以便於可以被這個庫較好地使用。
  • 使用基於co的生態系統中的庫,特別是專門爲這種場景的庫,如co-limiter
  • 實現基於生產者 - 消費者模型的自定義算法,這與co-limiter的內部實現原理相同。

爲了學習,咱們選擇最後一個方案,甚至幫助咱們能夠更好地理解一種常常與協程(也和線程和進程)同步相關的模式。

生產者 - 消費者模式

咱們的目標是利用隊列來提供固定數量的workers,與咱們想要設置的併發級別同樣多。爲了實現這個算法,咱們將基於本章前面定義的TaskQueue類改寫:

class TaskQueue {
  constructor(concurrency) {
    this.concurrency = concurrency;
    this.running = 0;
    this.taskQueue = [];
    this.consumerQueue = [];
    this.spawnWorkers(concurrency);
  }
  pushTask(task) {
    if (this.consumerQueue.length !== 0) {
      this.consumerQueue.shift()(null, task);
    } else {
      this.taskQueue.push(task);
    }
  }
  spawnWorkers(concurrency) {
    const self = this;
    for (let i = 0; i < concurrency; i++) {
      co(function*() {
        while (true) {
          const task = yield self.nextTask();
          yield task;
        }
      });
    }
  }
  nextTask() {
    return callback => {
      if (this.taskQueue.length !== 0) {
        return callback(null, this.taskQueue.shift());
      }
      this.consumerQueue.push(callback);
    }
  }
}
複製代碼

讓咱們分析這個TaskQueue類的新實現。首先是在構造函數中。須要調用一次this.spawnWorkers(),由於這是啓動worker的方法。

咱們的worker很簡單,它們只是用co()包裝的當即執行的Generator函數,因此每一個Generator函數能夠並行執行。在內部,每一個worker正在運行在一個死循環(while(true){})中,一直阻塞(yield)到新任務在隊列中可用時(yield self.nextTask()),一旦能夠執行新任務,yield這個異步任務直到其完成。您可能想知道咱們如何可以限制並行執行,並讓下一個任務在隊列中處於等待狀態。答案是在nextTask()方法中。咱們來詳細地看看在這個方法的原理:

nextTask() {
  return callback => {
    if (this.taskQueue.length !== 0) {
      return callback(null, this.taskQueue.shift());
    }
    this.consumerQueue.push(callback);
  }
}
複製代碼

咱們看這個函數內部發生了什麼,這纔是這個模式的核心:

  1. 這個方法返回一個對於co而言是一個合法的yieldablethunk
  2. 只要taskQueue類生成的實例中還有下一個任務,thunk的回調函數會被當即調用。回調函數調用時,立馬解鎖一個worker的阻塞狀態,yield這一個任務。
  3. 若是隊列中沒有任務了,回調函數自己會被放入consumerQueue中。經過這種作法,咱們將一個worker置於空閒(idle)的模式。一旦咱們有一個新的任務來要處理,在consumerQueue隊列中的回調函數會被調用,立馬喚醒咱們這一worker進行異步處理。

如今,爲了理解consumerQueue隊列中的空閒worker是如何恢復工做的,咱們須要分析pushTask()方法。若是當前有回調函數可用的話,pushTask()方法將調用consumerQueue隊列中的第一個回調函數,從而將取消對worker的鎖定。若是沒有可用的回調函數,這意味着全部的worker都是工做狀態,只須要添加一個新的任務到taskQueue任務隊列中。

TaskQueue類中,worker充當消費者的角色,而調用pushTask()函數的角色能夠被認爲是生產者。這個模式向咱們展現了一個Generator函數實際上能夠跟一個線程或進程相似。實際上,生產者 - 消費者之間問題是研究進程間通訊和同步時最多見的問題,但正如咱們已經提到的那樣,它對於進程和線程來講,也是一個常見的例子。

限制下載任務的併發量

既然咱們已經使用Generator函數和生產者 - 消費者模型實現一個限制並行算法,而且已經在Web爬蟲應用程序第四版應用它來限制中下載任務的併發數。 首先,咱們加載和初始化一個TaskQueue對象:

const TaskQueue = require('./taskQueue');
const downloadQueue = new TaskQueue(2);
複製代碼

而後,修改spiderLinks()函數。和以前不限制併發的版本相似,因此這裏咱們只展現修改的部分,主要是經過調用新版本的TaskQueue類生成的實例的pushTask()方法來限制並行執行:

function spiderLinks(currentUrl, body, nesting) {
  //...
  return (callback) => {
    //...
    function done(err, result) {
      //...
    }
    links.forEach(function(link) {
      downloadQueue.pushTask(function*() {
        yield spider(link, nesting - 1);
        done();
      });
    });
  }
}
複製代碼

在每一個任務中,咱們在下載完成後當即調用done()函數,所以咱們能夠計算下載了多少個連接,而後在完成下載時通知thunk的回調函數執行。

配合Babel使用Async await新語法

回調函數、PromiseGenerator函數都是用於處理JavaScriptNode.js異步問題的方式。正如咱們所看到的,Generator的真正意義在於它提供了一種方式來暫停一個函數的執行,而後等待前面的任務完成後再繼續執行。咱們可使用這樣的特性來書寫異步代碼,而且讓開發者用同步阻塞的代碼風格來書寫異步代碼。等到異步操做的結果返回後才恢復當前函數的執行。

Generator函數是更多的是用來處理迭代器,然而迭代器在異步代碼的使用顯得有點笨重。代碼可能難以理解,致使代碼易讀性和可維護性差。

但在不遠的未來會有一種更加簡潔的語法。實際上,這個提議即將引入到ESMASCript 2017的規範中,這項規範定義了async函數語法。

async函數規範引入兩個關鍵字(asyncawait)到原生的JavaScript語言中,改進咱們書寫異步代碼的方式。

爲了理解這項語法的用法和優點爲,咱們看一個簡單的例子:

const request = require('request');

function getPageHtml(url) {
  return new Promise(function(resolve, reject) {
    request(url, function(error, response, body) {
      resolve(body);
    });
  });
}
async function main() {
  const html = await getPageHtml('http://google.com');
  console.log(html);
}

main();
console.log('Loading...');
複製代碼

在上述代碼中,有兩個函數:getPageHtmlmain。第一個函數的做用是提取給定URL的一個遠程網頁的HTML文檔代碼。值得注意的是,這個函數返回一個Promise對象。

重點在於main函數,由於在這裏使用了asyncawait關鍵字。首先要注意的是函數要以async關鍵字爲前綴。意思是這個函數執行的是異步代碼而且容許它在函數體內使用await關鍵字。await關鍵字在getPageHtml調用以前,告訴JavaScript解釋器在繼續執行下一條指令以前,等待getPageHtml返回的Promise對象的結果。這樣,main函數內部哪部分代碼是異步的,它會等待異步代碼的完成再繼續執行後續操做,而且不會阻塞這段程序其他部分的正常執行。實際上,控制檯會打印字符串Loading...,隨後是Google主頁的HTML代碼。

是否是這種方法的可讀性更好而且更容易理解呢? 不幸地是,這個提議還沒有定案,即便經過這個提議,咱們須要等下一個版本 的ECMAScript規範出來並把它集成到Node.js後,才能使用這個新語法。 因此咱們今天作了什麼?只是漫無目的地等待?不是,固然不是!咱們已經能夠在咱們的代碼中使用async await語法,只要咱們使用Babel

安裝與運行Babel

Babel是一個JavaScript編譯器(或翻譯器),可以使用語法轉換器將高版本的JavaScript代碼轉換成其餘JavaScript代碼。語法轉換器容許例如咱們書寫並使用ES2015ES2016JSX和其它的新語法,來翻譯成日後兼容的代碼,在JavaScript運行環境如瀏覽器或Node.js中均可以使用Babel

在項目中使用npm安裝Babel,命令以下:

npm install --save-dev babel-cli
複製代碼

咱們還須要安裝插件以支持async await語法的解釋或翻譯:

npm install --save-dev babel-plugin-syntax-async-functions babel-plugin-tranform-async-to-generator
複製代碼

如今假設咱們想運行咱們以前的例子(稱爲index.js)。咱們須要經過如下命令啓動:

node_modules/.bin/babel-node --plugins "syntax-async-functions,transform-async-to-generator" index.js
複製代碼

這樣,咱們使用支持async await的轉換器動態地轉換源代碼。Node.js運行的實際是保存在內存中的日後兼容的代碼。

Babel也能被配置爲一個代碼構建工具,保存翻譯或解釋後的代碼到本地文件系統中,便於咱們部署和運行生成的代碼。

關於如何安裝和配置Babel,能夠到官方網站 babeljs.io 查閱相關文檔。

幾種方式的比較

如今,咱們應該對於怎麼處理JavaScript的異步問題有了一個更好的認識和總結。在下面的表格中總結幾大機制的優點和劣勢:

值得一提的是,咱們選擇在本章中僅介紹處理異步控制流程的最受歡迎的解決方案,或者是普遍使用的解決方案,可是例如Fibers( npmjs.org/package/fib… )和Streamline( npmjs.org/p ackage/streamline )也是值得一看的。

總結

在本章中,咱們分析了一些處理異步控制流的方法,分析了PromiseGenerator函數和即將到來的async await語法。

咱們學習瞭如何使用這些方法編寫更簡潔,更具備可讀性的異步代碼。咱們討論了這些方法的一些最重要的優勢和缺點,並認識到即便它們很是有用,也須要一些時間來掌握。這就是這幾種方式也沒有徹底取代在許多狀況下仍然很是有用的回調的緣由。做爲一名開發人員,應該按照實際狀況分析決定使用哪一種解決方案。若是您正在構建執行異步操做的公共庫,則應該提供易於使用的API,即便對於只想使用回調的開發人員也是如此。

在下一章中,咱們將探討另外一個與異步代碼執行相關的機制,這也是整個Node.js生態系統中的另外一個基本構建塊:streams

相關文章
相關標籤/搜索