在沒有深度使用函數回調的經驗的時候,去看這些內容仍是有一點吃力的。因爲Node.js獨特的異步特性,纔出現了「回調地獄」的問題,這篇文章中,我比較詳細的記錄瞭如何解決異步流問題。javascript
文章會很長,並且這篇是對異步流模式的解釋。文中會使用一個簡單的網絡蜘蛛的例子,它的做用是抓取指定URL的網頁內容並保存在項目中,在文章的最後,能夠找到整篇文章中的源碼demo。java
本篇不針對初學者,所以會省略掉大部分的基礎內容的講解:node
(spider_v1.js)git
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); function spider(url, callback) { const filename = utilities.urlToFilename(url); console.log(`filename: ${filename}`); fs.exists(filename, exists => { if (!exists) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { callback(err); } else { mkdirp(path.dirname(filename), err => { if (err) { callback(err); } else { fs.writeFile(filename, body, err => { if (err) { callback(err); } else { callback(null, filename, true); } }); } }); } }); } else { callback(null, filename, false); } }); } spider(process.argv[2], (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
上邊的代碼的流程大概是這樣的:程序員
這是一個很是簡單版本的蜘蛛,他只能抓取一個url的內容,看到上邊的回調多麼使人頭疼。那麼咱們開始進行優化。github
首先,if else 這種方式能夠進行優化,這個很簡單,不用多說,放一個對比效果:編程
/// before if (err) { callback(err); } else { callback(null, filename, true); } /// after if (err) { return callback(err); } callback(null, filename, true);
代碼這麼寫,嵌套就會少一層,但經驗豐富的程序員會認爲,這樣寫太重強調了error,咱們編程的重點應該放在處理正確的數據上,在可讀性上也存在這樣的要求。api
另外一個優化是函數拆分,上邊代碼中的spider函數中,能夠把下載文件和保存文件拆分出去。數組
(spider_v2.js)promise
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename, callback) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { return callback(err); } saveFile(filename, body, err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }) } function spider(url, callback) { const filename = utilities.urlToFilename(url); console.log(`filename: ${filename}`); fs.exists(filename, exists => { if (exists) { return callback(null, filename, false); } download(url, filename, err => { if (err) { return callback(err); } callback(null, filename, true); }) }); } spider(process.argv[2], (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
上邊的代碼基本上是採用原生優化後的結果,但這個蜘蛛的功能太過簡單,咱們如今須要抓取某個網頁中的全部url,這樣纔會引伸出串行和並行的問題。
(spider_v3.js)
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename, callback) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { return callback(err); } saveFile(filename, body, err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }) } /// 最大的啓發是實現瞭如何異步循環遍歷數組 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); function iterate(index) { if (index === links.length) { return callback(); } spider(links[index], nesting - 1, err => { if (err) { return callback(err); } iterate((index + 1)); }) } iterate(0); } function spider(url, nesting, callback) { const filename = utilities.urlToFilename(url); fs.readFile(filename, "utf8", (err, body) => { if (err) { if (err.code !== 'ENOENT') { return callback(err); } return download(url, filename, (err, body) => { if (err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); } spider(process.argv[2], 2, (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
上邊的代碼相比以前的代碼多了兩個核心功能,首先是經過輔助類獲取到了某個body中的links:
const links = utilities.getPageLinks(currentUrl, body);
內部實現就不解釋了,另外一個核心代碼就是:
/// 最大的啓發是實現瞭如何異步循環遍歷數組 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); function iterate(index) { if (index === links.length) { return callback(); } spider(links[index], nesting - 1, err => { if (err) { return callback(err); } iterate((index + 1)); }) } iterate(0); }
能夠說上邊這一小段代碼,就是採用原生實現異步串行的pattern了。除了這些以外,還引入了nesting的概念,經過這是這個屬性,能夠控制抓取層次。
到這裏咱們就完整的實現了串行的功能,考慮到性能,咱們要開發並行抓取的功能。
(spider_v4.js)
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename, callback) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { return callback(err); } saveFile(filename, body, err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }) } /// 最大的啓發是實現瞭如何異步循環遍歷數組 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } let completed = 0, hasErrors = false; function done(err) { if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { return callback(); } } links.forEach(link => { spider(link, nesting - 1, done); }); } const spidering = new Map(); function spider(url, nesting, callback) { if (spidering.has(url)) { return process.nextTick(callback); } spidering.set(url, true); const filename = utilities.urlToFilename(url); /// In this pattern, there will be some issues. /// Possible problems to download the same url again and again。 fs.readFile(filename, "utf8", (err, body) => { if (err) { if (err.code !== 'ENOENT') { return callback(err); } return download(url, filename, (err, body) => { if (err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); } spider(process.argv[2], 2, (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
這段代碼一樣很簡單,也有兩個核心內容。一個是如何實現併發:
/// 最大的啓發是實現瞭如何異步循環遍歷數組 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } let completed = 0, hasErrors = false; function done(err) { if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { return callback(); } } links.forEach(link => { spider(link, nesting - 1, done); }); }
上邊的代碼能夠說是實現併發的一個pattern。利用循環遍從來實現。另外一個核心是,既然是併發的,那麼利用fs.exists
就會存在問題,可能會重複下載同一文件,這裏的解決方案是:
如今咱們又有了新的需求,要求限制同時併發的最大數,那麼在這裏就引進了一個我認爲最重要的概念:隊列。
(task-Queue.js)
class TaskQueue { constructor(concurrency) { this.concurrency = concurrency; this.running = 0; this.queue = []; } pushTask(task) { this.queue.push(task); this.next(); } next() { while (this.running < this.concurrency && this.queue.length) { const task = this.queue.shift(); task(() => { this.running--; this.next(); }); this.running++; } } } module.exports = TaskQueue;
上邊的代碼就是隊列的實現代碼,核心是next()
方法,能夠看出,當task加入隊列中後,會馬上執行,這不是說這個任務必定立刻執行,而是指的是next會馬上調用。
(spider_v5.js)
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); const TaskQueue = require("./task-Queue"); const downloadQueue = new TaskQueue(2); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename, callback) { console.log(`Downloading ${url}`); request(url, (err, response, body) => { if (err) { return callback(err); } saveFile(filename, body, err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); }) } /// 最大的啓發是實現瞭如何異步循環遍歷數組 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } let completed = 0, hasErrors = false; links.forEach(link => { /// 給隊列出傳遞一個任務,這個任務首先是一個函數,其次該函數接受一個參數 /// 當調用任務時,觸發該函數,而後給函數傳遞一個參數,告訴該函數在任務結束時幹什麼 downloadQueue.pushTask(done => { spider(link, nesting - 1, err => { /// 這裏表示,只要發生錯誤,隊列就會退出 if (err) { hasErrors = true; return callback(err); } if (++completed === links.length && !hasErrors) { callback(); } done(); }); }); }); } const spidering = new Map(); function spider(url, nesting, callback) { if (spidering.has(url)) { return process.nextTick(callback); } spidering.set(url, true); const filename = utilities.urlToFilename(url); /// In this pattern, there will be some issues. /// Possible problems to download the same url again and again。 fs.readFile(filename, "utf8", (err, body) => { if (err) { if (err.code !== 'ENOENT') { return callback(err); } return download(url, filename, (err, body) => { if (err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); } spider(process.argv[2], 2, (err, filename, downloaded) => { if (err) { console.log(`error: ${err}`); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
所以,爲了限制併發的個數,只需在spiderLinks
方法中,把task遍歷放入隊列就能夠了。這相對來講很簡單。
到這裏爲止,咱們使用原生JavaScript實現了一個有相對完整功能的網絡蜘蛛,既能串行,也能併發,還能夠控制併發個數。
把不一樣的功能放到不一樣的函數中,會給咱們帶來巨大的好處,async庫十分流行,它的性能也不錯,它內部基於callback。
(spider_v6.js)
const request = require("request"); const fs = require("fs"); const mkdirp = require("mkdirp"); const path = require("path"); const utilities = require("./utilities"); const series = require("async/series"); const eachSeries = require("async/eachSeries"); function download(url, filename, callback) { console.log(`Downloading ${url}`); let body; series([ callback => { request(url, (err, response, resBody) => { if (err) { return callback(err); } body = resBody; callback(); }); }, mkdirp.bind(null, path.dirname(filename)), callback => { fs.writeFile(filename, body, callback); } ], err => { if (err) { return callback(err); } console.log(`Downloaded and saved: ${url}`); callback(null, body); }); } /// 最大的啓發是實現瞭如何異步循環遍歷數組 function spiderLinks(currentUrl, body, nesting, callback) { if (nesting === 0) { return process.nextTick(callback); } const links = utilities.getPageLinks(currentUrl, body); if (links.length === 0) { return process.nextTick(callback); } eachSeries(links, (link, cb) => { "use strict"; spider(link, nesting - 1, cb); }, callback); } const spidering = new Map(); function spider(url, nesting, callback) { if (spidering.has(url)) { return process.nextTick(callback); } spidering.set(url, true); const filename = utilities.urlToFilename(url); fs.readFile(filename, "utf8", (err, body) => { if (err) { if (err.code !== 'ENOENT') { return callback(err); } return download(url, filename, (err, body) => { if (err) { return callback(err); } spiderLinks(url, body, nesting, callback); }); } spiderLinks(url, body, nesting, callback); }); } spider(process.argv[2], 1, (err, filename, downloaded) => { if (err) { console.log(err); } else if (downloaded) { console.log(`Completed the download of ${filename}`); } else { console.log(`${filename} was already downloaded`); } });
在上邊的代碼中,咱們只使用了async的三個功能:
const series = require("async/series"); // 串行 const eachSeries = require("async/eachSeries"); // 並行 const queue = require("async/queue"); // 隊列
因爲比較簡單,就不作解釋了。async中的隊列的代碼在(spider_v7.js)中,和上邊咱們自定義的隊列很類似,也不作更多解釋了。
Promise是一個協議,有不少庫實現了這個協議,咱們用的是ES6的實現。簡單來講promise就是一個約定,若是完成了,就調用它的resolve方法,失敗了就調用它的reject方法。它內有實現了then方法,then返回promise自己,這樣就造成了調用鏈。
其實Promise的內容有不少,在實際應用中是如何把普通的函數promise化。這方面的內容在這裏也不講了,我本身也未入流
(spider_v8.js)
const utilities = require("./utilities"); const request = utilities.promisify(require("request")); const fs = require("fs"); const readFile = utilities.promisify(fs.readFile); const writeFile = utilities.promisify(fs.writeFile); const mkdirp = utilities.promisify(require("mkdirp")); const path = require("path"); function saveFile(filename, contents, callback) { mkdirp(path.dirname(filename), err => { if (err) { return callback(err); } fs.writeFile(filename, contents, callback); }); } function download(url, filename) { console.log(`Downloading ${url}`); let body; return request(url) .then(response => { "use strict"; body = response.body; return mkdirp(path.dirname(filename)); }) .then(() => writeFile(filename, body)) .then(() => { "use strict"; console.log(`Downloaded adn saved: ${url}`); return body; }); } /// promise編程的本質就是爲了解決在函數中設置回調函數的問題 /// 經過中間層promise來實現異步函數同步化 function spiderLinks(currentUrl, body, nesting) { let promise = Promise.resolve(); if (nesting === 0) { return promise; } const links = utilities.getPageLinks(currentUrl, body); links.forEach(link => { "use strict"; promise = promise.then(() => spider(link, nesting - 1)); }); return promise; } function spider(url, nesting) { const filename = utilities.urlToFilename(url); return readFile(filename, "utf8") .then( body => spiderLinks(url, body, nesting), err => { "use strict"; if (err.code !== 'ENOENT') { /// 拋出錯誤,這個方便與在整個異步鏈的最後經過呢catch來捕獲這個鏈中的錯誤 throw err; } return download(url, filename) .then(body => spiderLinks(url, body, nesting)); } ); } spider(process.argv[2], 1) .then(() => { "use strict"; console.log('Download complete'); }) .catch(err => { "use strict"; console.log(err); });
能夠看到上邊的代碼中的函數都是沒有callback的,只須要在最後catch就能夠了。
在設計api的時候,應該支持兩種方式,及支持callback,又支持promise
function asyncDivision(dividend, divisor, cb) { return new Promise((resolve, reject) => { "use strict"; process.nextTick(() => { const result = dividend / divisor; if (isNaN(result) || !Number.isFinite(result)) { const error = new Error("Invalid operands"); if (cb) { cb(error); } return reject(error); } if (cb) { cb(null, result); } resolve(result); }); }); } asyncDivision(10, 2, (err, result) => { "use strict"; if (err) { return console.log(err); } console.log(result); }); asyncDivision(22, 11) .then((result) => console.log(result)) .catch((err) => console.log(err));
Generator頗有意思,他可讓暫停函數和恢復函數,利用thunkify和co這兩個庫,咱們下邊的代碼實現起來很是酷。
(spider_v9.js)
const thunkify = require("thunkify"); const co = require("co"); const path = require("path"); const utilities = require("./utilities"); const request = thunkify(require("request")); const fs = require("fs"); const mkdirp = thunkify(require("mkdirp")); const readFile = thunkify(fs.readFile); const writeFile = thunkify(fs.writeFile); const nextTick = thunkify(process.nextTick); function* download(url, filename) { console.log(`Downloading ${url}`); const response = yield request(url); console.log(response); const body = response[1]; yield mkdirp(path.dirname(filename)); yield writeFile(filename, body); console.log(`Downloaded and saved ${url}`); return body; } function* spider(url, nesting) { const filename = utilities.urlToFilename(url); let body; try { body = yield readFile(filename, "utf8"); } catch (err) { if (err.code !== 'ENOENT') { throw err; } body = yield download(url, filename); } yield spiderLinks(url, body, nesting); } function* spiderLinks(currentUrl, body, nesting) { if (nesting === 0) { return nextTick(); } const links = utilities.getPageLinks(currentUrl, body); for (let i = 0; i < links.length; i++) { yield spider(links[i], nesting - 1); } } /// 經過co就自動處理了回調函數,直接返回了回調函數中的參數,把這些參數放到一個數組中,可是去掉了err信息 co(function* () { try { yield spider(process.argv[2], 1); console.log('Download complete'); } catch (err) { console.log(err); } });
我並無寫promise和generator併發的代碼。以上這些內容來自於這本書nodejs-design-patternshttps://github.com/agelessman/MyBooks。