Node.js之異步流控制

前言

在沒有深度使用函數回調的經驗的時候,去看這些內容仍是有一點吃力的。因爲Node.js獨特的異步特性,纔出現了「回調地獄」的問題,這篇文章中,我比較詳細的記錄瞭如何解決異步流問題。javascript

文章會很長,並且這篇是對異步流模式的解釋。文中會使用一個簡單的網絡蜘蛛的例子,它的做用是抓取指定URL的網頁內容並保存在項目中,在文章的最後,能夠找到整篇文章中的源碼demo。java

1.原生JavaScript模式

本篇不針對初學者,所以會省略掉大部分的基礎內容的講解:node

(spider_v1.js)git

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function spider(url, callback) {
    const filename = utilities.urlToFilename(url);
    console.log(`filename: ${filename}`);

    fs.exists(filename, exists => {
        if (!exists) {
            console.log(`Downloading ${url}`);

            request(url, (err, response, body) => {
                if (err) {
                    callback(err);
                } else {
                    mkdirp(path.dirname(filename), err => {
                        if (err) {
                            callback(err);
                        } else {
                            fs.writeFile(filename, body, err => {
                                if (err) {
                                    callback(err);
                                } else {
                                    callback(null, filename, true);
                                }
                            });
                        }
                    });
                }
            });
        } else {
            callback(null, filename, false);
        }
    });
}

spider(process.argv[2], (err, filename, downloaded) => {
    if (err) {
        console.log(err);
    } else if (downloaded) {
        console.log(`Completed the download of ${filename}`);
    } else {
        console.log(`${filename} was already downloaded`);
    }
});

上邊的代碼的流程大概是這樣的:程序員

  • 把url轉換成filename
  • 判斷該文件名是否存在,若存在直接返回,不然進入下一步
  • 發請求,獲取body
  • 把body寫入到文件中

這是一個很是簡單版本的蜘蛛,他只能抓取一個url的內容,看到上邊的回調多麼使人頭疼。那麼咱們開始進行優化。github

首先,if else 這種方式能夠進行優化,這個很簡單,不用多說,放一個對比效果:編程

/// before
if (err) {
    callback(err);
} else {
    callback(null, filename, true);
}

/// after
if (err) {
    return callback(err);
}
callback(null, filename, true);

代碼這麼寫,嵌套就會少一層,但經驗豐富的程序員會認爲,這樣寫太重強調了error,咱們編程的重點應該放在處理正確的數據上,在可讀性上也存在這樣的要求。api

另外一個優化是函數拆分,上邊代碼中的spider函數中,能夠把下載文件和保存文件拆分出去。數組

(spider_v2.js)promise

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function saveFile(filename, contents, callback) {
    mkdirp(path.dirname(filename), err => {
        if (err) {
            return callback(err);
        }
        fs.writeFile(filename, contents, callback);
    });
}

function download(url, filename, callback) {
    console.log(`Downloading ${url}`);

    request(url, (err, response, body) => {
        if (err) {
            return callback(err);
        }
        saveFile(filename, body, err => {
            if (err) {
                return callback(err);
            }
            console.log(`Downloaded and saved: ${url}`);
            callback(null, body);
        });
    })
}

function spider(url, callback) {
    const filename = utilities.urlToFilename(url);
    console.log(`filename: ${filename}`);

    fs.exists(filename, exists => {
        if (exists) {
            return callback(null, filename, false);
        }
        download(url, filename, err => {
            if (err) {
                return callback(err);
            }
            callback(null, filename, true);
        })
    });
}

spider(process.argv[2], (err, filename, downloaded) => {
    if (err) {
        console.log(err);
    } else if (downloaded) {
        console.log(`Completed the download of ${filename}`);
    } else {
        console.log(`${filename} was already downloaded`);
    }
});

上邊的代碼基本上是採用原生優化後的結果,但這個蜘蛛的功能太過簡單,咱們如今須要抓取某個網頁中的全部url,這樣纔會引伸出串行和並行的問題

(spider_v3.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function saveFile(filename, contents, callback) {
    mkdirp(path.dirname(filename), err => {
        if (err) {
            return callback(err);
        }
        fs.writeFile(filename, contents, callback);
    });
}

function download(url, filename, callback) {
    console.log(`Downloading ${url}`);

    request(url, (err, response, body) => {
        if (err) {
            return callback(err);
        }
        saveFile(filename, body, err => {
            if (err) {
                return callback(err);
            }
            console.log(`Downloaded and saved: ${url}`);
            callback(null, body);
        });
    })
}

/// 最大的啓發是實現瞭如何異步循環遍歷數組
function spiderLinks(currentUrl, body, nesting, callback) {
    if (nesting === 0) {
        return process.nextTick(callback);
    }

    const links = utilities.getPageLinks(currentUrl, body);

    function iterate(index) {
        if (index === links.length) {
            return callback();
        }
        spider(links[index], nesting - 1, err => {
            if (err) {
                return callback(err);
            }
            iterate((index + 1));
        })
    }

    iterate(0);
}

function spider(url, nesting, callback) {
    const filename = utilities.urlToFilename(url);

    fs.readFile(filename, "utf8", (err, body) => {
        if (err) {
            if (err.code !== 'ENOENT') {
                return callback(err);
            }
            return download(url, filename, (err, body) => {
                if (err) {
                    return callback(err);
                }
                spiderLinks(url, body, nesting, callback);
            });
        }

        spiderLinks(url, body, nesting, callback);
    });
}

spider(process.argv[2], 2, (err, filename, downloaded) => {
    if (err) {
        console.log(err);
    } else if (downloaded) {
        console.log(`Completed the download of ${filename}`);
    } else {
        console.log(`${filename} was already downloaded`);
    }
});

上邊的代碼相比以前的代碼多了兩個核心功能,首先是經過輔助類獲取到了某個body中的links:

const links = utilities.getPageLinks(currentUrl, body);

內部實現就不解釋了,另外一個核心代碼就是:

/// 最大的啓發是實現瞭如何異步循環遍歷數組
function spiderLinks(currentUrl, body, nesting, callback) {
    if (nesting === 0) {
        return process.nextTick(callback);
    }

    const links = utilities.getPageLinks(currentUrl, body);

    function iterate(index) {
        if (index === links.length) {
            return callback();
        }
        spider(links[index], nesting - 1, err => {
            if (err) {
                return callback(err);
            }
            iterate((index + 1));
        })
    }

    iterate(0);
}

能夠說上邊這一小段代碼,就是採用原生實現異步串行的pattern了。除了這些以外,還引入了nesting的概念,經過這是這個屬性,能夠控制抓取層次。

到這裏咱們就完整的實現了串行的功能,考慮到性能,咱們要開發並行抓取的功能。

(spider_v4.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");

function saveFile(filename, contents, callback) {
    mkdirp(path.dirname(filename), err => {
        if (err) {
            return callback(err);
        }
        fs.writeFile(filename, contents, callback);
    });
}

function download(url, filename, callback) {
    console.log(`Downloading ${url}`);

    request(url, (err, response, body) => {
        if (err) {
            return callback(err);
        }
        saveFile(filename, body, err => {
            if (err) {
                return callback(err);
            }
            console.log(`Downloaded and saved: ${url}`);
            callback(null, body);
        });
    })
}

/// 最大的啓發是實現瞭如何異步循環遍歷數組
function spiderLinks(currentUrl, body, nesting, callback) {
    if (nesting === 0) {
        return process.nextTick(callback);
    }

    const links = utilities.getPageLinks(currentUrl, body);
    if (links.length === 0) {
        return process.nextTick(callback);
    }

    let completed = 0, hasErrors = false;

    function done(err) {
        if (err) {
            hasErrors = true;
            return callback(err);
        }

        if (++completed === links.length && !hasErrors) {
            return callback();
        }
    }

    links.forEach(link => {
        spider(link, nesting - 1, done);
    });
}

const spidering = new Map();

function spider(url, nesting, callback) {
    if (spidering.has(url)) {
        return process.nextTick(callback);
    }

    spidering.set(url, true);

    const filename = utilities.urlToFilename(url);

    /// In this pattern, there will be some issues.
    /// Possible problems to download the same url again and again。
    fs.readFile(filename, "utf8", (err, body) => {
        if (err) {
            if (err.code !== 'ENOENT') {
                return callback(err);
            }
            return download(url, filename, (err, body) => {
                if (err) {
                    return callback(err);
                }
                spiderLinks(url, body, nesting, callback);
            });
        }

        spiderLinks(url, body, nesting, callback);
    });
}

spider(process.argv[2], 2, (err, filename, downloaded) => {
    if (err) {
        console.log(err);
    } else if (downloaded) {
        console.log(`Completed the download of ${filename}`);
    } else {
        console.log(`${filename} was already downloaded`);
    }
});

這段代碼一樣很簡單,也有兩個核心內容。一個是如何實現併發:

/// 最大的啓發是實現瞭如何異步循環遍歷數組
function spiderLinks(currentUrl, body, nesting, callback) {
    if (nesting === 0) {
        return process.nextTick(callback);
    }

    const links = utilities.getPageLinks(currentUrl, body);
    if (links.length === 0) {
        return process.nextTick(callback);
    }

    let completed = 0, hasErrors = false;

    function done(err) {
        if (err) {
            hasErrors = true;
            return callback(err);
        }

        if (++completed === links.length && !hasErrors) {
            return callback();
        }
    }

    links.forEach(link => {
        spider(link, nesting - 1, done);
    });
}

上邊的代碼能夠說是實現併發的一個pattern。利用循環遍從來實現。另外一個核心是,既然是併發的,那麼利用fs.exists就會存在問題,可能會重複下載同一文件,這裏的解決方案是:

  • 使用Map緩存某一url,url應該做爲key

如今咱們又有了新的需求,要求限制同時併發的最大數,那麼在這裏就引進了一個我認爲最重要的概念:隊列。

(task-Queue.js)

class TaskQueue {
    constructor(concurrency) {
        this.concurrency = concurrency;
        this.running = 0;
        this.queue = [];
    }

    pushTask(task) {
        this.queue.push(task);
        this.next();
    }

    next() {
        while (this.running < this.concurrency && this.queue.length) {
            const task = this.queue.shift();
            task(() => {
                this.running--;
                this.next();
            });
            this.running++;
        }
    }
}

module.exports = TaskQueue;

上邊的代碼就是隊列的實現代碼,核心是next()方法,能夠看出,當task加入隊列中後,會馬上執行,這不是說這個任務必定立刻執行,而是指的是next會馬上調用。

(spider_v5.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const TaskQueue = require("./task-Queue");
const downloadQueue = new TaskQueue(2);

function saveFile(filename, contents, callback) {
    mkdirp(path.dirname(filename), err => {
        if (err) {
            return callback(err);
        }
        fs.writeFile(filename, contents, callback);
    });
}

function download(url, filename, callback) {
    console.log(`Downloading ${url}`);

    request(url, (err, response, body) => {
        if (err) {
            return callback(err);
        }
        saveFile(filename, body, err => {
            if (err) {
                return callback(err);
            }
            console.log(`Downloaded and saved: ${url}`);
            callback(null, body);
        });
    })
}

/// 最大的啓發是實現瞭如何異步循環遍歷數組
function spiderLinks(currentUrl, body, nesting, callback) {
    if (nesting === 0) {
        return process.nextTick(callback);
    }

    const links = utilities.getPageLinks(currentUrl, body);
    if (links.length === 0) {
        return process.nextTick(callback);
    }

    let completed = 0, hasErrors = false;

    links.forEach(link => {
        /// 給隊列出傳遞一個任務,這個任務首先是一個函數,其次該函數接受一個參數
        /// 當調用任務時,觸發該函數,而後給函數傳遞一個參數,告訴該函數在任務結束時幹什麼
        downloadQueue.pushTask(done => {
            spider(link, nesting - 1, err => {
                /// 這裏表示,只要發生錯誤,隊列就會退出
                if (err) {
                    hasErrors = true;
                    return callback(err);
                }
                if (++completed === links.length && !hasErrors) {
                    callback();
                }

                done();
            });
        });

    });
}

const spidering = new Map();

function spider(url, nesting, callback) {
    if (spidering.has(url)) {
        return process.nextTick(callback);
    }

    spidering.set(url, true);

    const filename = utilities.urlToFilename(url);

    /// In this pattern, there will be some issues.
    /// Possible problems to download the same url again and again。
    fs.readFile(filename, "utf8", (err, body) => {
        if (err) {
            if (err.code !== 'ENOENT') {
                return callback(err);
            }
            return download(url, filename, (err, body) => {
                if (err) {
                    return callback(err);
                }
                spiderLinks(url, body, nesting, callback);
            });
        }

        spiderLinks(url, body, nesting, callback);
    });
}

spider(process.argv[2], 2, (err, filename, downloaded) => {
    if (err) {
        console.log(`error: ${err}`);
    } else if (downloaded) {
        console.log(`Completed the download of ${filename}`);
    } else {
        console.log(`${filename} was already downloaded`);
    }
});

所以,爲了限制併發的個數,只需在spiderLinks方法中,把task遍歷放入隊列就能夠了。這相對來講很簡單。

到這裏爲止,咱們使用原生JavaScript實現了一個有相對完整功能的網絡蜘蛛,既能串行,也能併發,還能夠控制併發個數。

2.使用async庫

把不一樣的功能放到不一樣的函數中,會給咱們帶來巨大的好處,async庫十分流行,它的性能也不錯,它內部基於callback。

(spider_v6.js)

const request = require("request");
const fs = require("fs");
const mkdirp = require("mkdirp");
const path = require("path");
const utilities = require("./utilities");
const series = require("async/series");
const eachSeries = require("async/eachSeries");

function download(url, filename, callback) {
    console.log(`Downloading ${url}`);

    let body;

    series([
        callback => {
            request(url, (err, response, resBody) => {
                if (err) {
                    return callback(err);
                }
                body = resBody;
                callback();
            });
        },
        mkdirp.bind(null, path.dirname(filename)),
        callback => {
            fs.writeFile(filename, body, callback);
        }
    ], err => {
        if (err) {
            return callback(err);
        }
        console.log(`Downloaded and saved: ${url}`);
        callback(null, body);
    });
}

/// 最大的啓發是實現瞭如何異步循環遍歷數組
function spiderLinks(currentUrl, body, nesting, callback) {
    if (nesting === 0) {
        return process.nextTick(callback);
    }

    const links = utilities.getPageLinks(currentUrl, body);
    if (links.length === 0) {
        return process.nextTick(callback);
    }

    eachSeries(links, (link, cb) => {
        "use strict";
        spider(link, nesting - 1, cb);
    }, callback);
}

const spidering = new Map();

function spider(url, nesting, callback) {
    if (spidering.has(url)) {
        return process.nextTick(callback);
    }

    spidering.set(url, true);

    const filename = utilities.urlToFilename(url);

    fs.readFile(filename, "utf8", (err, body) => {
        if (err) {
            if (err.code !== 'ENOENT') {
                return callback(err);
            }
            return download(url, filename, (err, body) => {
                if (err) {
                    return callback(err);
                }
                spiderLinks(url, body, nesting, callback);
            });
        }

        spiderLinks(url, body, nesting, callback);
    });
}

spider(process.argv[2], 1, (err, filename, downloaded) => {
    if (err) {
        console.log(err);
    } else if (downloaded) {
        console.log(`Completed the download of ${filename}`);
    } else {
        console.log(`${filename} was already downloaded`);
    }
});

在上邊的代碼中,咱們只使用了async的三個功能:

const series = require("async/series"); // 串行
const eachSeries = require("async/eachSeries"); // 並行
const queue = require("async/queue"); // 隊列

因爲比較簡單,就不作解釋了。async中的隊列的代碼在(spider_v7.js)中,和上邊咱們自定義的隊列很類似,也不作更多解釋了。

3.Promise

Promise是一個協議,有不少庫實現了這個協議,咱們用的是ES6的實現。簡單來講promise就是一個約定,若是完成了,就調用它的resolve方法,失敗了就調用它的reject方法。它內有實現了then方法,then返回promise自己,這樣就造成了調用鏈。

其實Promise的內容有不少,在實際應用中是如何把普通的函數promise化。這方面的內容在這裏也不講了,我本身也未入流

(spider_v8.js)

const utilities = require("./utilities");
const request = utilities.promisify(require("request"));
const fs = require("fs");
const readFile = utilities.promisify(fs.readFile);
const writeFile = utilities.promisify(fs.writeFile);
const mkdirp = utilities.promisify(require("mkdirp"));
const path = require("path");


function saveFile(filename, contents, callback) {
    mkdirp(path.dirname(filename), err => {
        if (err) {
            return callback(err);
        }
        fs.writeFile(filename, contents, callback);
    });
}

function download(url, filename) {
    console.log(`Downloading ${url}`);

    let body;

    return request(url)
        .then(response => {
            "use strict";
            body = response.body;
            return mkdirp(path.dirname(filename));
        })
        .then(() => writeFile(filename, body))
        .then(() => {
            "use strict";
            console.log(`Downloaded adn saved: ${url}`);
            return body;
        });
}

/// promise編程的本質就是爲了解決在函數中設置回調函數的問題
/// 經過中間層promise來實現異步函數同步化
function spiderLinks(currentUrl, body, nesting) {
    let promise = Promise.resolve();
    if (nesting === 0) {
        return promise;
    }

    const links = utilities.getPageLinks(currentUrl, body);

    links.forEach(link => {
        "use strict";
        promise = promise.then(() => spider(link, nesting - 1));
    });

    return promise;
}

function spider(url, nesting) {
    const filename = utilities.urlToFilename(url);

    return readFile(filename, "utf8")
        .then(
            body => spiderLinks(url, body, nesting),
            err => {
                "use strict";
                if (err.code !== 'ENOENT') {
                    /// 拋出錯誤,這個方便與在整個異步鏈的最後經過呢catch來捕獲這個鏈中的錯誤
                    throw err;
                }
                return download(url, filename)
                    .then(body => spiderLinks(url, body, nesting));
            }
        );
}

spider(process.argv[2], 1)
    .then(() => {
        "use strict";
        console.log('Download complete');
    })
    .catch(err => {
        "use strict";
        console.log(err);
    });

能夠看到上邊的代碼中的函數都是沒有callback的,只須要在最後catch就能夠了。

在設計api的時候,應該支持兩種方式,及支持callback,又支持promise

function asyncDivision(dividend, divisor, cb) {
    return new Promise((resolve, reject) => {
        "use strict";
        process.nextTick(() => {
            const result = dividend / divisor;
            if (isNaN(result) || !Number.isFinite(result)) {
                const error = new Error("Invalid operands");
                if (cb) {
                    cb(error);
                }
                return reject(error);
            }

            if (cb) {
                cb(null, result);
            }
            resolve(result);
        });
    });
}

asyncDivision(10, 2, (err, result) => {
    "use strict";
    if (err) {
        return console.log(err);
    }
    console.log(result);
});

asyncDivision(22, 11)
    .then((result) => console.log(result))
    .catch((err) => console.log(err));

4.Generator

Generator頗有意思,他可讓暫停函數和恢復函數,利用thunkify和co這兩個庫,咱們下邊的代碼實現起來很是酷。

(spider_v9.js)

const thunkify = require("thunkify");
const co = require("co");
const path = require("path");
const utilities = require("./utilities");

const request = thunkify(require("request"));
const fs = require("fs");
const mkdirp = thunkify(require("mkdirp"));
const readFile = thunkify(fs.readFile);
const writeFile = thunkify(fs.writeFile);
const nextTick = thunkify(process.nextTick);

function* download(url, filename) {
    console.log(`Downloading ${url}`);

    const response = yield request(url);
    console.log(response);

    const body = response[1];
    yield mkdirp(path.dirname(filename));

    yield writeFile(filename, body);

    console.log(`Downloaded and saved ${url}`);
    return body;
}

function* spider(url, nesting) {
    const filename = utilities.urlToFilename(url);

    let body;

    try {
        body = yield readFile(filename, "utf8");
    } catch (err) {
        if (err.code !== 'ENOENT') {
            throw err;
        }
        body = yield download(url, filename);
    }

    yield  spiderLinks(url, body, nesting);
}

function* spiderLinks(currentUrl, body, nesting) {
    if (nesting === 0) {
        return nextTick();
    }

    const links = utilities.getPageLinks(currentUrl, body);

    for (let i = 0; i < links.length; i++) {
        yield spider(links[i], nesting - 1);
    }
}

/// 經過co就自動處理了回調函數,直接返回了回調函數中的參數,把這些參數放到一個數組中,可是去掉了err信息
co(function* () {
    try {
        yield spider(process.argv[2], 1);
        console.log('Download complete');
    } catch (err) {
        console.log(err);
    }
});

總結

我並無寫promise和generator併發的代碼。以上這些內容來自於這本書nodejs-design-patternshttps://github.com/agelessman/MyBooks

demo下載

相關文章
相關標籤/搜索