《Node.js設計模式》基於回調的異步控制流

本系列文章爲《Node.js Design Patterns Second Edition》的原文翻譯和讀書筆記,在GitHub連載更新,同步翻譯版連接javascript

歡迎關注個人專欄,以後的博文將在專欄同步:html

Asynchronous Control Flow Patterns with Callbacks

Node.js這類語言習慣於同步的編程風格,其CPS風格和異步特性的API是其標準,對於新手來講可能難以理解。編寫異步代碼多是一種不一樣的體驗,尤爲是對異步控制流而言。異步代碼可能讓咱們難以預測在Node.js中執行語句的順序。例如讀取一組文件,執行一串任務,或者等待一組操做完成,都須要開發人員採用新的方法和技術,以免最終編寫出效率低下和不可維護的代碼。一個常見的錯誤是回調地獄,代碼量急劇上升又不可讀,使得簡單的程序也難以閱讀和維護。在本章中,咱們將看到如何經過使用一些規則和一些模式來避免回調,並編寫乾淨、可管理的異步代碼。咱們將看到控制流庫,如async,能夠極大地簡化咱們的問題,提高咱們的代碼可讀性,更易於維護。前端

異步編程的困難

JavaScript中異步代碼的順序錯亂無疑是很容易的。閉包和對匿名函數的定義可使開發人員有更好的編程體驗,而並不須要開發人員手動對異步操做進行管理和跳轉。這是符合KISS原則的。簡單且能保持異步代碼控制流,讓它在更短的時間內工做。但不幸的是,回調嵌套是以犧牲諸如模塊性、可重用性和可維護性,增大整個函數的大小,致使糟糕的代碼結構爲代價的。大多數狀況下,建立閉包在功能上是不須要的,但這更可能是一種約束,而不是與異步編程相關的問題。認識到回調嵌套會使得咱們的代碼變得笨拙,而後根據最適合的解決方案採起相應的方法解決回調地獄,這是新手與專家的區別。java

建立一個簡單的Web爬蟲

爲了解釋上述問題,咱們建立了一個簡單的Web爬蟲,一個命令行應用,其接受一個URL爲輸入,而後能夠把其內容下載到一個文件中。在下列代碼中,咱們會依賴如下兩個npm庫。node

此外,咱們還將引用一個叫作./utilities的本地模塊。git

咱們的應用程序的核心功能包含在一個名爲spider.js的模塊中。以下所示,首先加載咱們所須要的依賴包:github

const request = require('request');
const fs = require('fs');
const mkdirp = require('mkdirp');
const path = require('path');
const utilities = require('./utilities');

接下來,咱們將建立一個名爲spider()的新函數,該函數接受URL爲參數,並在下載過程完成時調用一個回調函數。算法

function spider(url, callback) {
  const filename = utilities.urlToFilename(url);
  fs.exists(filename, exists => {
    if (!exists) {
      console.log(`Downloading ${url}`);
      request(url, (err, response, body) => {
        if (err) {
          callback(err);
        } else {
          mkdirp(path.dirname(filename), err => {
            if (err) {
              callback(err);
            } else {
              fs.writeFile(filename, body, err => {
                if (err) {
                  callback(err);
                } else {
                  callback(null, filename, true);
                }
              });
            }
          });
        }
      });
    } else {
      callback(null, filename, false);
    }
  });
}

上述函數執行如下任務:數據庫

  • 檢查該URL的文件是否已經下載過,即驗證相應文件是否已經被建立:

fs.exists(filename, exists => ...npm

  • 若是文件尚未被下載,則執行下列代碼進行下載操做:

request(url, (err, response, body) => ...

  • 而後,咱們須要肯定目錄下是否已經包含了該文件:

mkdirp(path.dirname(filename), err => ...

  • 最後,咱們把HTTP請求返回的報文主體寫入文件系統:

mkdirp(path.dirname(filename), err => ...

要完成咱們的Web爬蟲應用程序,只需提供一個URL做爲輸入(在咱們的例子中,咱們從命令行參數中讀取它),咱們只需調用spider()函數便可。

spider(process.argv[2], (err, filename, downloaded) => {
  if (err) {
    console.log(err);
  } else if (downloaded) {
    console.log(`Completed the download of "${filename}"`);
  } else {
    console.log(`"${filename}" was already downloaded`);
  }
});

如今,咱們開始嘗試運行Web爬蟲應用程序,可是首先,確保已有utilities.js模塊和package.json中的全部依賴包已經安裝到你的項目中:

npm install

以後,咱們執行咱們這個爬蟲模塊來下載一個網頁,使用如下命令:

node spider http://www.example.com

咱們的Web爬蟲應用程序要求在咱們提供的URL中老是包含協議類型(例如,http://)。另外,不要指望HTML連接被從新編寫,也不要指望下載像圖片這樣的資源,由於這只是一個簡單的例子來演示異步編程是如何工做的。

回調地獄

看看咱們的spider()函數,咱們能夠發現,儘管咱們實現的算法很是簡單,可是生成的代碼有幾個級別的縮進,並且很難讀懂。使用阻塞式的同步API實現相似的功能是很簡單的,並且不多有機會讓它看起來如此錯誤。然而,使用異步CPS是另外一回事,使用閉包可能會致使出現難以閱讀的代碼。

大量閉包和回調將代碼轉換成不可讀的、難以管理的狀況稱爲回調地獄。它是Node.js中最受承認和最嚴重的反模式之一。通常來講,對於JavaScript而言。受此問題影響的代碼的典型結構以下:

asyncFoo(err => {
  asyncBar(err => {
    asyncFooBar(err => {
      //...
    });
  });
});

咱們能夠看到,用這種方式編寫的代碼是如何造成金字塔形狀的,因爲深嵌的緣由致使的難以閱讀,稱爲「末日金字塔」。

像前面的代碼片斷這樣的代碼最明顯的問題是可讀性差。因爲嵌套太深,幾乎不可能跟蹤回調函數的結束位置和另外一個回調函數開始的位置。

另外一個問題是由每一個做用域中使用的變量名的重疊引發的。一般,咱們必須使用相似甚至相同的名稱來描述變量的內容。最好的例子是每一個回調接收到的錯誤參數。有些人常常嘗試使用相同名稱的變體來區分每一個範圍內的對象,例如,errorerrerr1err2等等。另外一些人則傾向於隱藏在範圍中定義的變量,老是使用相同的名稱。例如,err。這兩種選擇都遠非完美,並且會形成混淆,並增長致使bug的可能性。

此外,咱們必須記住,雖然閉包在性能和內存消耗方面的代價很小。此外,它們還能夠建立不易識別的內存泄漏,由於咱們不該該忘記,由閉包引用的任何上下文變量都不會被垃圾收集所保留。

關於對於V8的閉包工做原理,能夠參考Vyacheslav Egorov的博客文章

若是咱們看一下咱們的spider()函數,咱們會清楚地注意到它即是一個典型的回調地獄的場景,而且在這個函數中有咱們剛纔描述的全部問題。這正是咱們將在本章中學習的模式和技巧所要解決的問題。

使用簡單的JavaScript

既然咱們已經遇到了第一個回調地獄的例子,咱們知道咱們應該避免什麼。然而,在編寫異步代碼時,這並非唯一的關注點。事實上,有幾種狀況下,控制一組異步任務的流須要使用特定的模式和技術,特別是若是咱們只使用普通的JavaScript而沒有任何外部庫的幫助的狀況下。例如,經過按順序應用異步操做來遍歷集合並不像在數組中調用forEach()那樣簡單,但實際上它須要一種相似於遞歸的技術。

在本節中,咱們將學習如何避免回調地獄,以及如何使用簡單的JavaScript實現一些最多見的控制流模式。

回調函數的準則

在編寫異步代碼時,要記住的第一個規則是在定義回調時不要濫用閉包。濫用閉包一時很爽,由於它不須要對諸如模塊化和可重用性這樣的問題進行額外的思考。可是,咱們已經看到,這種作法弊大於利。大多數狀況下,修復回調地獄問題並不須要任何庫、花哨的技術或範式的改變,只是一些常識。

如下是一些基本原則,能夠幫助咱們更少的嵌套,並改進咱們的代碼的組織:

  • 儘量退出外層函數。根據上下文,使用returncontinuebreak,以便當即退出當前代碼塊,而不是使用if...else代碼塊。其餘語句。這將有助於優化咱們的代碼結構。
  • 爲回調建立命名函數,避免使用閉包,並將中間結果做爲參數傳遞。命名函數也會使它們在堆棧跟蹤中更優雅。
  • 代碼儘量模塊化。並儘量將代碼分紅更小的、可重用的函數。

回調調用的準則

爲了展現上述原則,咱們經過重構Web爬蟲應用程序來講明。

對於第一步,咱們能夠經過刪除else語句來重構咱們的錯誤檢查方式。這是在咱們收到錯誤後當即從函數中返回。所以,看如下代碼:

if (err) {
  callback(err);
} else {
  // 若是沒有錯誤,執行該代碼塊
}

咱們能夠經過編寫下面的代碼來改進咱們的代碼結構:

if (err) {
  return callback(err);
}
// 若是沒有錯誤,執行該代碼塊

有了這個簡單的技巧,咱們當即減小了函數的嵌套級別,它很簡單,不須要任何複雜的重構。

在執行咱們剛纔描述的優化時,一個常見的錯誤是在調用回調函數以後忘記終止函數,即return。對於錯誤處理場景,如下代碼是bug的典型來源:

if (err) {
  callback(err);
}
// 若是沒有錯誤,執行該代碼塊

在這個例子中,即便在調用回調以後,函數的執行也會繼續。那麼避免這種狀況的出現,return語句是十分必要的。還要注意,函數返回的輸出是什麼並不重要,實際結果(或錯誤)是異步生成的,並傳遞給回調。異步函數的返回值一般被忽略。該屬性容許咱們編寫以下的代碼:

return callback(...);

不然咱們必須拆成兩條語句來寫:

callback(...);
return;

接下來咱們繼續重構咱們的spider()函數,咱們能夠嘗試識別可複用的代碼片斷。例如,將給定字符串寫入文件的功能能夠很容易地分解爲一個單獨的函數:

function saveFile(filename, contents, callback) {
  mkdirp(path.dirname(filename), err => {
    if (err) {
      return callback(err);
    }
    fs.writeFile(filename, contents, callback);
  });
}

遵循一樣的原則,咱們能夠建立一個名爲download()的通用函數,它將URL文件名做爲輸入,並將URL的內容下載到給定的文件中。在內部,咱們可使用前面建立的saveFile()函數。

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);
  request(url, (err, response, body) => {
    if (err) {
      return callback(err);
    }
    saveFile(filename, body, err => {
      if (err) {
        return callback(err);
      }
      console.log(`Downloaded and saved: ${url}`);
      callback(null, body);
    });
  });
}

最後,修改咱們的spider()函數:

function spider(url, callback) {
  const filename = utilities.urlToFilename(url);
  fs.exists(filename, exists => {
    if (exists) {
      return callback(null, filename, false);
    }
    download(url, filename, err => {
      if (err) {
        return callback(err);
      }
      callback(null, filename, true);
    })
  });
}

spider()函數的功能和接口仍然是徹底相同的,改變的僅僅是代碼的組織方式。經過應用上述基本原則,咱們可以極大地減小代碼的嵌套,同時增長了它的可重用性和可測試性。實際上,咱們能夠考慮導出saveFile()download(),這樣咱們就能夠在其餘模塊中重用它們。這也使咱們可以更容易地測試他們的功能。

咱們在這一節中進行的重構清楚地代表,大多數時候,咱們所須要的只是一些規則,並確保咱們不濫用閉包和匿名函數。它的工做很是出色,只需最少的工做量,而且只使用原始的JavaScript

順序執行

如今開始探尋異步控制流的執行順序,咱們會經過開始分析一串異步代碼來探尋其控制流。

按順序執行一組任務意味着一次一個接一個地運行它們。執行順序很重要,必須保證其正確性,由於列表中一個任務的結果可能會影響下一個任務的執行。下圖說明了這個概念:

上述異步控制流有一些不一樣的變化:

  • 按順序執行一組已知任務,無需連接或傳遞執行結果
  • 使用任務的輸出做爲下一個輸入(也稱爲chainpipeline,或者waterfall
  • 在每一個元素上運行異步任務時迭代一個集合,一個元素接一個元素

對於順序執行而言,儘管在使用直接樣式阻塞API實現很簡單,但一般狀況下使用異步CPS時會致使回調地獄問題。

按順序執行一組已知的任務

在上一節中實現spider()函數時,咱們已經遇到了順序執行的問題。經過研究以下方式,咱們能夠更好地控制異步代碼。以該代碼爲準則,咱們能夠用如下模式來解決上述問題:

function task1(callback) {
  asyncOperation(() => {
    task2(callback);
  });
}

function task2(callback) {
  asyncOperation(result() => {
    task3(callback);
  });
}

function task3(callback) {
  asyncOperation(() => {
    callback(); //finally executes the callback
  });
}

task1(() => {
  //executed when task1, task2 and task3 are completed
  console.log('tasks 1, 2 and 3 executed');
});

上述模式顯示了在完成一個異步操做後,再調用下一個異步操做。該模式強調任務的模塊化,而且避免在處理異步代碼使用閉包。

順序迭代

咱們前面描述的模式若是咱們預先知道要執行什麼和有多少個任務,這些模式是完美的。這使咱們可以對序列中下一個任務的調用進行硬編碼,可是若是要對集合中的每一個項目執行異步操做,會發生什麼?在這種狀況下,咱們不能對任務序列進行硬編碼。相反的是,咱們必須動態構建它。

Web爬蟲版本2

爲了顯示順序迭代的例子,讓咱們爲Web爬蟲應用程序引入一個新功能。咱們如今想要遞歸地下載網頁中的全部連接。要作到這一點,咱們將從頁面中提取全部連接,而後按順序逐個地觸發咱們的Web爬蟲應用程序。

第一步是修改咱們的spider()函數,以便經過調用一個名爲spiderLinks()的函數觸發頁面全部連接的遞歸下載。

此外,咱們如今嘗試讀取文件,而不是檢查文件是否已經存在,並開始爬取其連接。這樣,咱們就能夠恢復中斷的下載。最後還有一個變化是,咱們確保咱們傳遞的參數是最新的,還要限制遞歸深度。結果代碼以下:

function spider(url, nesting, callback) {
  const filename = utilities.urlToFilename(url);
  fs.readFile(filename, 'utf8', (err, body) => {
    if (err) {
      if (err.code! == 'ENOENT') {
        return callback(err);
      }
      return download(url, filename, (err, body) => {
        if (err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }
    spiderLinks(url, body, nesting, callback);
  });
}
爬取連接

如今咱們能夠建立這個新版本的Web爬蟲應用程序的核心,即spiderLinks()函數,它使用順序異步迭代算法下載HTML頁面的全部連接。注意咱們在下面的代碼塊中定義的方式:

function spiderLinks(currentUrl, body, nesting, callback) {
  if(nesting === 0) {
    return process.nextTick(callback);
  }

  let links = utilities.getPageLinks(currentUrl, body); //[1]
  function iterate(index) { //[2]
    if(index === links.length) {
      return callback();
    }

    spider(links[index], nesting - 1, function(err) { //[3]
      if(err) {
        return callback(err);
      }
      iterate(index + 1);
    });
  }
  iterate(0); //[4]
}

從這個新功能中的重要步驟以下:

  1. 咱們使用utilities.getPageLinks()函數獲取頁面中包含的全部連接的列表。此函數僅返回指向相同主機名的連接。
  2. 咱們使用一個稱爲iterate()的本地函數來遍歷連接,該函數須要下一個連接的索引進行分析。在這個函數中,咱們首先要檢查索引是否等於連接數組的長度,若是等於則是迭代完成,在這種狀況下咱們當即調用callback()函數,由於這意味着咱們處理了全部的項目。
  3. 這時,處理連接已準備就緒。咱們經過遞歸調用spider()函數。
  4. 做爲spiderLinks()函數的最後一步也是最重要的一步,咱們經過調用iterate(0)來開始迭代。

咱們剛剛提出的算法容許咱們經過順序執行異步操做來迭代數組,在咱們的例子中是spider()函數。

咱們如今能夠嘗試這個新版本的Web爬蟲應用程序,並觀看它一個接一個地遞歸地下載網頁的全部連接。要中斷這個過程,若是有不少連接可能須要一段時間,請記住咱們能夠隨時使用Ctrl + C。若是咱們決定恢復它,咱們能夠經過啓動Web爬蟲應用程序並提供與上次結束時相同的URL來恢復執行。

如今咱們的網絡Web爬蟲應用程序可能會觸發整個網站的下載,請仔細考慮使用它。例如,不要設置高嵌套級別或離開爬蟲運行超過幾秒鐘。用數千個請求重載服務器是不道德的。在某些狀況下,這也被認爲是非法的。須要考慮後果!

迭代模式

咱們以前展現的spiderLinks()函數的代碼是一個清楚的例子,說明了如何在應用異步操做時迭代集合。咱們還能夠注意到,這是一種能夠適應任何其餘狀況的模式,咱們須要在集合的元素或一般的任務列表上按順序異步迭代。該模式能夠推廣以下:

function iterate(index) {
  if (index === tasks.length) {
    return finish();
  }
  const task = tasks[index];
  task(function() {
    iterate(index + 1);
  });
}

function finish() {
  // 迭代完成的操做
}

iterate(0);

注意到,若是task()是同步操做,這些類型的算法變得真正遞歸。在這種狀況下,可能形成調用棧的溢出。

咱們剛剛提出的模式是很是強大的,由於它能夠適應幾種狀況。例如,咱們能夠映射數組的值,或者咱們能夠將迭代的結果傳遞給迭代中的下一個,以實現一個reduce算法,若是知足特定的條件,咱們能夠提早退出循環,或者甚至能夠迭代無限數量的元素。

咱們還能夠選擇將解決方案進一步推廣:

iterateSeries(collection, iteratorCallback, finalCallback);

經過建立一個名爲iterator的函數來執行任務列表,該函數調用集合中的下一個可執行的任務,並確保在當前任務完成時調用迭代器結束的回調函數。

並行

在某些狀況下,一組異步任務的執行順序並不重要,咱們只須要在全部這些運行的任務完成時通知咱們。使用並行執行流更好地處理這種狀況,以下圖所示:

若是咱們認爲Node.js是單線程的話,這可能聽起來很奇怪,可是若是咱們記住咱們在第一章中討論過的內容,咱們意識到即便咱們只有一個線程,咱們仍然能夠實現併發,因爲Node.js的非阻塞性質。實際上,在這種狀況下,並行字不正確地使用,由於這並不意味着任務同時運行,而是它們的執行由底層的非阻塞API執行,並由事件循環進行交織。

咱們知道,當一個任務容許事件循環執行另外一個任務時,或者是說一個任務容許控制回到事件循環。這種工做流的名稱爲併發,但爲了簡單起見,咱們仍然會使用並行。

下圖顯示了兩個異步任務能夠在Node.js程序中並行運行:

經過上圖,咱們有一個Main函數執行兩個異步任務:

  1. Main函數觸發Task 1Task 2的執行。因爲這些觸發異步操做,這兩個函數會當即返回,並將控制權返還給主函數,以後等到事件循環完成再通知主線程。
  2. Task 1的異步操做完成時,事件循環給與其線程控制權。當Task 1同步操做完成時,它通知Main函數。
  3. Task 2的異步操做完成時,事件循環給與其線程控制權。當Task 2同步操做完成時,它再次通知Main函數。在這一點上,Main函數知曉Task 1Task 2都已經執行完畢,因此它能夠繼續執行其後操做或將操做的結果返回給另外一個回調函數。

簡而言之,這意味着在Node.js中,咱們只能執行並行異步操做,由於它們的併發性由非阻塞API在內部處理。在Node.js中,同步阻塞操做不能同時運行,除非它們的執行與異步操做交錯,或者經過setTimeout()setImmediate()延遲。咱們將在第九章中更詳細地看到這一點。

Web爬蟲版本3

上邊的Web爬蟲在並行異步操做上彷佛也算表現得很完美。到目前爲止,應用程序正在遞歸地執行連接頁面的下載。但性能不是最佳的,想要提高這個應用的性能很容易。

要作到這一點,咱們只須要修改spiderLinks()函數,確保spider()任務只執行一次,當全部任務都執行完畢後,調用最後的回調,因此咱們對spiderLinks()作以下修改:

function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }
  let completed = 0,
    hasErrors = false;

  function done(err) {
    if (err) {
      hasErrors = true;
      return callback(err);
    }
    if (++completed === links.length && !hasErrors) {
      return callback();
    }
  }
  links.forEach(link => {
    spider(link, nesting - 1, done);
  });
}

上述代碼有何變化?,如今spider()函數的任務所有同步啓動。能夠經過簡單地遍歷連接數組和啓動每一個任務,咱們沒必要等待前一個任務完成再進行下一個任務:

links.forEach(link => {
  spider(link, nesting - 1, done);
});

而後,使咱們的應用程序知曉全部任務完成的方法是爲spider()函數提供一個特殊的回調函數,咱們稱之爲done()。當爬蟲任務完成時,done()函數設定一個計數器。當完成的下載次數達到連接數組的大小時,調用最終回調:

function done(err) {
  if (err) {
    hasErrors = true;
    return callback(err);
  }
  if (++completed === links.length && !hasErrors) {
    callback();
  }
}

經過上述變化,若是咱們如今試圖對網頁運行咱們的爬蟲,咱們將注意到整個過程的速度有很大的改進,由於每次下載都是並行執行的,而沒必要等待以前的連接被處理。

模式

此外,對於並行執行流程,咱們能夠提取咱們方案,以便適應於不一樣的狀況提升代碼的可複用性。咱們可使用如下代碼來表示模式的通用版本:

const tasks = [ /* ... */ ];
let completed = 0;
tasks.forEach(task => {
  task(() => {
    if (++completed === tasks.length) {
      finish();
    }
  });
});

function finish() {
  // 全部任務執行完成後調用
}

經過小的修改,咱們能夠調整模式,將每一個任務的結果累積到一個list中,以便過濾或映射數組的元素,或者一旦完成了一個或必定數量的任務便可調用finish()回調。

注意:若是是沒有限制的狀況下,並行執行的一組異步任務,而後等待全部異步任務完成後執行回調這種方式,其方法是計算它們的執行完成的數目。

用併發任務修復競爭條件

當使用阻塞I/O與多線程組合的方式時,並行運行一組任務可能會致使一些問題。可是,咱們剛剛看到,在Node.js中卻不同,並行運行多個異步任務實際上在資源方面消耗較低。這是Node.js最重要的優勢之一,所以在Node.js中並行化成爲一種常見的作法,並且這並是多麼複雜的技術。

Node.js的併發模型的另外一個重要特徵是咱們處理任務同步和競爭條件的方式。在多線程編程中,這一般使用諸如鎖,互斥條件,信號量和觀察器之類的構造來實現,這些是多線程語言並行化的最複雜的方面之一,對性能也有很大的影響。在Node.js中,咱們一般不須要一個花哨的同步機制,由於全部運行在單個線程上!可是,這並不意味着咱們沒有競爭條件。相反,他們能夠至關廣泛。問題的根源在於異步操做的調用與其結果通知之間的延遲。舉一個具體的例子,咱們能夠再次參考咱們的Web爬蟲應用程序,特別是咱們建立的最後一個版本,其實際上包含一個競爭條件。

問題在於在開始下載相應的URL的文檔以前,檢查文件是否已經存在的spider()函數:

function spider(url, nesting, callback) {
  if(spidering.has(url)) {
    return process.nextTick(callback);
  }
  spidering.set(url, true);

  const filename = utilities.urlToFilename(url);
  fs.readFile(filename, 'utf8', function(err, body) {
    if(err) {
      if(err.code !== 'ENOENT') {
        return callback(err);
      }

      return download(url, filename, function(err, body) {
        if(err) {
          return callback(err);
        }
        spiderLinks(url, body, nesting, callback);
      });
    }

    spiderLinks(url, body, nesting, callback);
  });
}

如今的問題是,在同一個URL上操做的兩個爬蟲任務可能會在兩個任務之一完成下載並建立一個文件,致使第二個任務開始下載以前,在同一個文件上調用fs.readFile()的結果不對,導致下載兩次。這種狀況以下圖所示:

上圖顯示了Task 1Task 2如何在Node.js的單個線程中交錯執行,以及異步操做如何實際引入競爭條件。在咱們的狀況下,兩個爬蟲任務最終會下載相同的文件。
咱們如何解決這個問題?答案比咱們想象的要簡單得多。實際上,咱們所須要的只是一個變量(互斥變量),能夠相互排除運行在同一個URL上的多個spider()任務。這能夠經過如下代碼來實現:

const spidering = new Map();

function spider(url, nesting, callback) {
  if (spidering.has(url)) {
    return process.nextTick(callback);
  }
  spidering.set(url, true);
  // ...
}

並行執行頻率限制

一般,若是不控制並行任務頻率,並行任務就會致使過載。想象一下,有數千個文件要讀取,訪問的URL或數據庫查詢並行運行。在這種狀況下,常見的問題是系統資源不足,例如,當嘗試一次打開太多文件時,利用可用於應用程序的全部文件描述符。在Web應用程序中,它還可能會建立一個利用拒絕服務(DoS)攻擊的漏洞。在全部這種狀況下,最好限制同時運行的任務數量。這樣,咱們能夠爲服務器的負載增長一些可預測性,並確保咱們的應用程序不會耗盡資源。下圖描述了一個狀況,咱們將五個任務並行運行併發限制爲兩段:

從上圖能夠清楚咱們的算法如何工做:

  1. 咱們能夠執行儘量多的任務,而不超過併發限制。
  2. 每當任務完成時,咱們再執行一個或多個任務,同時確保任務數量達不到限制。

併發限制

咱們如今提出一種模式,以有限的併發性並行執行一組給定的任務:

const tasks = ...
let concurrency = 2, running = 0, completed = 0, index = 0;

function next() {
  while (running < concurrency && index < tasks.length) {
    task = tasks[index++];
    task(() => {
      if (completed === tasks.length) {
        return finish();
      }
      completed++, running--;
      next();
    });
    running++;
  }
}
next();

function finish() {
  // 全部任務執行完成
}

該算法能夠被認爲是順序執行和並行執行之間的混合。事實上,咱們可能會注意到咱們以前介紹的兩種模式的類似之處:

  1. 咱們有一個迭代器函數,咱們稱之爲next(),有一個內部循環,並行執行儘量多的任務,同時保持併發限制。
  2. 咱們傳遞給每一個任務的回調檢查是否完成了列表中的全部任務。若是還有任務要運行,它會調用next()來執行下一個任務。

全局併發限制

咱們的Web爬蟲應用程序很是適合應用咱們所學到的限制一組任務的併發性。事實上,爲了不同時爬上數千個連接的狀況,咱們能夠經過在併發下載數量上增長一些措施來限制併發量。

0.11以前的Node.js版本已經將每一個主機的併發HTTP鏈接數限制爲5.然而,這能夠改變以適應咱們的須要。請查看官方文檔http://nodejs.org/docs/v0.10.... axsockets中的更多內容。從Node.js 0.11開始,併發鏈接數沒有默認限制。

咱們能夠將咱們剛剛學到的模式應用到咱們的spiderLinks()函數,可是咱們將得到的只是限制一個頁面中的一組連接的併發性。若是咱們選擇了併發量爲2,咱們最多能夠爲每一個頁面並行下載兩個連接。然而,因爲咱們能夠一次下載多個連接,所以每一個頁面都會產生另外兩個下載,這樣遞歸下去,其實也沒有徹底作到併發量的限制。

使用隊列

咱們真正想要的是限制咱們能夠並行運行的全局下載操做數量。咱們能夠略微修改以前展現的模式,可是咱們寧願把它做爲一個練習,由於咱們想借此機會引入另外一個機制,它利用隊列來限制多個任務的併發性。讓咱們看看這是如何工做的。

咱們如今要實現一個名爲TaskQueue類,它將隊列與咱們以前提到的算法相結合。咱們建立一個名爲taskQueue.js的新模塊:

class TaskQueue {
  constructor(concurrency) {
    this.concurrency = concurrency;
    this.running = 0;
    this.queue = [];
  }
  pushTask(task) {
    this.queue.push(task);
    this.next();
  }
  next() {
    while (this.running < this.concurrency && this.queue.length) {
      const task = this.queue.shift();
      task(() => {
        this.running--;
        this.next();
      });
      this.running++;
    }
  }
};

上述類的構造函數只做爲輸入的併發限制,但除此以外,它初始化運行和隊列的變量。前一個變量是用於跟蹤全部正在運行的任務的計數器,然後者是將用做隊列以存儲待處理任務的數組。

pushTask()方法簡單地將新任務添加到隊列中,而後經過調用this.next()來引導任務的執行。

next()方法從隊列中生成一組任務,確保它不超過併發限制。

咱們可能會注意到,這種方法與限制咱們前面提到的併發性的模式有一些類似之處。它基本上從隊列開始儘量多的任務,而不超過併發限制。當每一個任務完成時,它會更新運行任務的計數,而後再次調用next()來啓動另外一輪任務。 TaskQueue類的有趣屬性是它容許咱們動態地將新的項目添加到隊列中。另外一個優勢是,如今咱們有一箇中央實體負責限制咱們任務的併發性,這能夠在函數執行的全部實例中共享。在咱們的例子中,它是spider()函數,咱們將在稍後看到。

Web爬蟲版本4

如今咱們有一個通用的隊列來執行有限的並行流程中的任務,咱們能夠在咱們的Web爬蟲應用程序中直接使用它。咱們首先加載新的依賴關係並經過將併發限制設置爲2來建立TaskQueue類的新實例:

const TaskQueue = require('./taskQueue');
const downloadQueue = new TaskQueue(2);

接下來,咱們使用新建立的downloadQueue更新spiderLinks()函數:

function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }
  let completed = 0,
    hasErrors = false;
  links.forEach(link => {
    downloadQueue.pushTask(done => {
      spider(link, nesting - 1, err => {
        if (err) {
          hasErrors = true;
          return callback(err);
        }
        if (++completed === links.length && !hasErrors) {
          callback();
        }
        done();
      });
    });
  });
}

這個函數的這種新的實現是很是容易的,它與這本章前面提到的無限並行執行的算法很是類似。這是由於咱們將併發控制委託給TaskQueue對象,咱們惟一要作的就是檢查全部任務是否完成。看上述代碼中如何定義咱們的任務:

  • 咱們經過提供自定義回調來運行spider()函數。
  • 在回調中,咱們檢查與spiderLinks()函數執行相關的全部任務是否完成。當這個條件爲真時,咱們調用spiderLinks()函數的最後回調。
  • 在咱們的任務結束時,咱們調用了done()回調,以便隊列能夠繼續執行。

在咱們進行這些小的變化以後,咱們如今能夠嘗試再次運行Web爬蟲應用程序。這一次,咱們應該注意到,同時不會有兩個以上的下載。

async庫

若是咱們到目前爲止咱們分析的每個控制流程模式看一下,咱們能夠看到它們能夠用做構建可重用和更通用的解決方案的基礎。例如,咱們能夠將無限制的並行執行算法包裝到一個接受任務列表的函數中,並行運行它們,而且當它們都完成時調用給定的回調函數。將控制流算法轉化爲可重用功能的這種方式能夠致使更具聲明性和表達性的方式來定義異步控制流,這正是async所作的。async庫是一個很是流行的解決方案,在Node.jsJavaScript中來講,用於處理異步代碼。它提供了一組功能,能夠大大簡化不一樣配置中一組任務的執行,併爲異步處理集合提供了有用的幫助。即便有其餘幾個具備類似目標的庫,因爲它的受歡迎程度,所以asyncNode.js中的一個事實上的標準。

順序執行

async庫能夠在實現複雜的異步控制流程時大大幫助咱們,可是一個難題就是選擇正確的庫來解決問題。例如,對於順序執行,有大約20個不一樣的函數可供選擇,包括eachSeries(), mapSeries(), filterSeries(), rejectSeries(), reduce(), reduceRight(), detectSeries(), concatSeries(), series(), whilst(), doWhilst(), until(), doUntil(), forever(), waterfall(), compose(), seq(), applyEachSeries(), iterator(), 和timesSeries()

選擇正確的函數是編寫更穩固和可讀的代碼的重要一步,但這也須要一些經驗和實踐。在咱們的例子中,咱們將僅介紹其中的一些狀況,但它們仍將爲理解和有效地使用庫的其他部分提供堅實的基礎。

下面,經過例子說明async庫如何工做,咱們將用於咱們的Web爬蟲應用程序。咱們直接從版本2開始,按順序遞歸地下載全部的連接。

可是,首先咱們確保將async庫安裝到咱們當前的項目中:

npm install async

而後咱們須要從spider.js模塊加載新的依賴項:

const async = require('async');

已知一組任務的順序執行

咱們先修改download()函數。以下所示,它依次作了如下三件事:

  1. 下載URL的內容。
  2. 建立一個新目錄(若是尚不存在)。
  3. URL的內容保存到文件中。

async.series()能夠實現順序執行一組任務:

async.series(tasks, [callback])

async.series()接受一個任務列表和一個在全部任務完成後調用的回調函數做爲參數。每一個任務只是一個接受回調函數的函數,當任務完成執行時,這個回調函數被調用:

function task(callback) {}

async的優點是它使用與Node.js相同的回調約定,它會自動處理錯誤傳播。因此,若是任何一個任務調用它的回調而且產生了一個錯誤,async將跳過列表中剩餘的任務,直接跳轉到最後的回調。

考慮到這一點,讓咱們看看如何經過使用async來修改上述的download()函數:

function download(url, filename, callback) {
  console.log(`Downloading ${url}`);
  let body;
  async.series([
    callback => {
      request(url, (err, response, resBody) => {
        if (err) {
          return callback(err);
        }
        body = resBody;
        callback();
      });
    },
    mkdirp.bind(null, path.dirname(filename)),
    callback => {
      fs.writeFile(filename, body, callback);
    }
  ], err => {
    if (err) {
      return callback(err);
    }
    console.log(`Downloaded and saved: ${url}`);
    callback(null, body);
  });
}

對比起這段代碼的回調地獄版本,使用async方式使咱們可以更好地組織咱們的異步任務。而且不會嵌套回調,由於咱們只須要提供一個的任務列表,一般對於用於每一個異步操做,而後異步任務將依次執行:

  1. 首先是下載URL的內容。咱們將響應體保存到一個閉包變量(body)中,以便它能夠與其餘任務共享。
  2. 建立並保存下載的頁面的目錄。咱們經過執行mkdirp()函數實現,並和建立的目錄路徑綁定。這樣,咱們能夠節省幾行代碼並增長其可讀性。
  3. 最後,咱們將下載的URL的內容寫入文件。在這種狀況下,咱們沒法執行部分應用程序(就像咱們在第二個任務中所作的那樣),由於變量body只在系列中的下載任務完成後纔可用。可是,經過將任務的回調直接傳遞到fs.writeFile()函數,咱們仍然能夠經過利用異步的自動錯誤管理來保存一些代碼行。

4.完成全部任務後,將調用async.series()的最後回調。在咱們的例子中,咱們只是作一些錯誤管理,而後返回body變量來回調download()函數。

對於上述狀況,async.series()的一個可替代的方法是async.waterfall(),它仍然按順序執行任務,但另外還提供每一個任務的輸出做爲下一個輸入。在咱們的狀況下,咱們可使用這個特徵來傳播body變量直到序列結束。

順序迭代

在前面講了如何按順序執行一組任務。上面的例子async.series()來作到這一點。可使用相同的功能來實現Web爬蟲版本2spiderLinks()函數。然而,async爲特定的狀況提供了一個更合適的API,遍歷一個集合,這個APIasync.eachSeries()。咱們來使用它來從新實現咱們的spiderLinks()函數(版本2,串行下載),以下所示:

function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }
  async.eachSeries(links, (link, callback) => {
    spider(link, nesting - 1, callback);
  }, callback);
}

若是咱們將使用async的上述代碼與使用純JavaScript模式實現的相同功能的代碼進行比較,咱們將注意到async在代碼組織和可讀性方面給咱們帶來的巨大優點。

並行執行

async不具備處理並行流的功能,其中能夠找到each()map()filter()reject()detect()some()every()concat()parallel()applyEach()times()。它們遵循與咱們已經看到的用於順序執行的功能相同的邏輯,區別在於所提供的任務是並行執行的。

爲了證實這一點,咱們能夠嘗試應用上述功能之一來實現咱們的Web爬蟲應用程序的第三版,即便用無限制的並行流程來執行下載。

若是咱們記住咱們以前使用的代碼來實現spiderLinks()函數的順序版本,那麼調整它使其並行工做就比較簡單:

function spiderLinks(currentUrl, body, nesting, callback) {
  // ...
  async.each(links, (link, callback) => {
    spider(link, nesting - 1, callback);
  }, callback);
}

這個函數與咱們用於順序下載的功能徹底相同,可是使用的是async.each()而非async.eachSeries()。這清楚地代表了使用庫(例如async)抽象異步流的功能。代碼再也不綁定到特定的執行流程了,沒有專門爲此寫的代碼。大多數只是應用邏輯。

限制並行執行

若是你想知道async還能夠用來限制並行任務的併發性,答案是確定的。咱們有一些咱們可使用的函數,即eachLimit()mapLimit()parallelLimit()queue()cargo()

咱們試圖利用其中的一個來實現Web爬蟲應用程序的第4版,以有限的併發性並行執行連接的下載。幸運的是,asyncasync.queue(),它的工做方式與本章前面建立的TaskQueue相似。 async.queue()函數建立一個新的隊列,它使用一個worker()函數來執行一組具備指定併發限制的任務:

const q = async.queue(worker, concurrency);

worker()函數做爲輸入接收要運行的任務和一個回調函數做爲參數,當任務完成時執行回調:

function worker(task, callback);

咱們應該注意到在這個例子中 task 能夠是任何類型,而不只僅只能是函數。實際上, worker有責任以最適當的方式處理任務。新建任務,能夠經過q.push(task, callback)將任務添加到隊列中。一個任務處理完後,關聯一個任務的回調函數必須被worker調用。

如今,咱們再次修改咱們的代碼實現一個全面並行的有併發限制的執行流,利用async.queue(),首先,咱們須要建立一個隊列:

const downloadQueue = async.queue((taskData, callback) => {
  spider(taskData.link, taskData.nesting - 1, callback);
}, 2);

代碼很簡單。咱們正在建立一個併發限制爲2的新隊列,讓一個工做人員只需使用與任務關聯的數據調用咱們的spider()函數。接下來,咱們實現spiderLinks()函數:

function spiderLinks(currentUrl, body, nesting, callback) {
  if (nesting === 0) {
    return process.nextTick(callback);
  }
  const links = utilities.getPageLinks(currentUrl, body);
  if (links.length === 0) {
    return process.nextTick(callback);
  }
  const completed = 0,
    hasErrors = false;
  links.forEach(function(link) {
    const taskData = {
      link: link,
      nesting: nesting
    };
    downloadQueue.push(taskData, err => {
      if (err) {
        hasErrors = true;
        return callback(err);
      }
      if (++completed === links.length && !hasErrors) {
        callback();
      }
    });
  });
}

前面的代碼應該看起來很是熟悉,由於它幾乎和使用TaskQueue對象來實現相同流程的代碼相同。此外,在這種狀況下,要分析的重要部分是將新任務推入隊列的位置。在這一點上,咱們確保咱們傳遞一個回調,使咱們可以檢查當前頁面的全部下載任務是否完成,並最終調用最終回調。

辛虧有async.queue(),咱們能夠輕鬆地複製咱們的TaskQueue對象的功能,再次證實了經過async,咱們能夠避免從頭開始編寫異步控制流模式,減小咱們的工做量,代碼量更加簡潔。

總結

在本章開始的時候,咱們說Node.js的編程可能很難由於它的異步性,特別是對於之前在其餘平臺上開發的人而言。然而,在本章中,咱們展現了異步API如何能夠從簡單原生JavaScript開始,從而爲咱們分析更復雜的技術奠基了基礎。而後咱們看到,除了爲每一種口味提供編程風格,咱們所掌握的工具確實是多樣化的,併爲咱們大部分的問題提供了很好的解決方案。例如,咱們能夠選擇async庫來簡化最多見的流程。

還有更爲先進的技術,如PromiseGenerator函數,這將是下一章的重點。當了解全部這些技術時,可以根據需求選擇最佳解決方案,或者在同一個項目中使用多種技術。

相關文章
相關標籤/搜索