async源碼之series

前言

最近在看Node設計模式之異步編程的順序異步迭代,簡單的實現以下:html

function series(tasks, callback) {
    let results = [];
    function iterate(index) {
        if (index === tasks.length) {
            return finish();
        }
        const task = tasks[index];
        task(function(err, res) {
            results.push(res);
            iterate(index + 1);
        });
    }

    function finish() {
        // 迭代完成的操做
        callback(null, results);
    }

    iterate(0);
}

series(
    [
        callback => {
            setTimeout(function() {
                console.log(456);
                callback(null, 1);
            }, 500);
        },
        callback => {
            console.log(123);
            callback(null, 2);
        }
    ],
    function(err, results) {
        console.log(results);
    }
);

// 456
// 123
// [1, 2]

而async庫是一個很是流行的解決方案,在Node.js和JavaScript中來講,用於處理異步代碼。它提供了一組功能,能夠大大簡化不一樣配置中一組任務的執行,併爲異步處理集合提供了有用的幫助。git

async庫能夠在實現複雜的異步控制流程時大大幫助咱們,可是一個難題就是選擇正確的庫來解決問題。例如,對於順序執行,有大約20個不一樣的函數可供選擇。github

好奇心起來,就想看看一個成熟的庫跟咱們簡單實現的代碼區別有多大。編程

series

按順序運行任務集合中的函數,每一個函數在前一個函數完成後運行。若是系列中的任何函數將錯誤傳遞給其回調函數,則不會運行更多函數,並當即使用錯誤值調用回調函數。不然,回調會在任務完成時收到一系列結果。設計模式

const async = require('async');

async.series({
    one: function(callback) {
        setTimeout(function() {
            callback(null, 1);
        }, 200);
    },
    two: function(callback){
        setTimeout(function() {
            callback(null, 2);
        }, 100);
    }
}, function(err, results) {
    console.log(results);
    // results is now equal to: {one: 1, two: 2}
});

咱們來看看源碼,找到series方法,能夠看到:數組

function series(tasks, callback) {
    _parallel(eachOfSeries, tasks, callback);
}

除了咱們本身傳的兩個參數之外,默認還傳了一個eachOfSeries,接着往下看:promise

function _parallel(eachfn, tasks, callback) {
    // noop:空的函數
    callback = callback || noop;
    // isArrayLike:檢查'value'是否與array類似
    var results = isArrayLike(tasks) ? [] : {};

    eachfn(tasks, function (task, key, callback) {
        // wrapAsync:包裝成異步
        wrapAsync(task)(function (err, result) {
            if (arguments.length > 2) {
                result = slice(arguments, 1);
            }
            results[key] = result;
            callback(err);
        });
    }, function (err) {
        callback(err, results);
    });
}

這裏咱們能夠看到,_parallel方法其實就是eachOfSeries方法的調用。閉包

先解釋一下eachOfSeries這三個參數:app

  • 第一個參數就是要執行的函數的集合。
  • 第二個參數能夠當作每一個函數的執行(wrapAsync能夠先忽略掉,直接當作這一個函數)。
  • 第三個參數就是全部函數執行完後的回調。

讓咱們來看看eachOfSeries是如何的實現:異步

var eachOfSeries = doLimit(eachOfLimit, 1);

function eachOfLimit(coll, limit, iteratee, callback) {
    _eachOfLimit(limit)(coll, wrapAsync(iteratee), callback);
}

function doLimit(fn, limit) {
    return function (iterable, iteratee, callback) {
        return fn(iterable, limit, iteratee, callback);
    };
}

咱們把上面進行轉換,這樣看起來更明瞭些:

var eachOfSeries = function(iterable, iteratee, callback) {
    return _eachOfLimit(1)(iterable, wrapAsync(iteratee), callback);
};

Soga,最終就是調用_eachOfLimit完成的:

// limit:一次異步操做的最大數量,傳1能夠當作串行,一個函數執行完才進行下一個
function _eachOfLimit(limit) {
    return function (obj, iteratee, callback) {
        // once:函數只運行一次
        callback = once(callback || noop);
        if (limit <= 0 || !obj) {
            return callback(null);
        }
        // iterator:迭代器,有根據類型分類,這邊簡單拿數組迭代器createArrayIterator來分析
        var nextElem = iterator(obj);
        var done = false;
        var running = 0;
        var looping = false;

        function iterateeCallback(err, value) {
            running -= 1;
            if (err) {
                done = true;
                callback(err);
            }
            else if (value === breakLoop || (done && running <= 0)) {
                done = true;
                return callback(null);
            }
            else if (!looping) {
                replenish();
            }
        }

        function replenish () {
            looping = true;
            while (running < limit && !done) {
                var elem = nextElem();
                if (elem === null) {
                    done = true;
                    if (running <= 0) {
                        callback(null);
                    }
                    return;
                }
                running += 1;
                // onlyOnce:函數只運行一次
                iteratee(elem.value, elem.key, onlyOnce(iterateeCallback));
            }
            looping = false;
        }
        
        // 遞歸
        replenish();
    };
}

function once(fn) {
    return function() {
        if (fn === null) return;
        var callFn = fn;
        fn = null;
        callFn.apply(this, arguments);
    };
}

// 閉包大法,拿取集合中的函數
function createArrayIterator(coll) {
    var i = -1;
    var len = coll.length;
    return function next() {
        return ++i < len ? {value: coll[i], key: i} : null;
    }
}

終於,看到series的真身了。實現其實就是replenish()的遞歸大法。由於要實現串行,因此在replenish()中控制running數爲1,取出集合中一個函數執行,而後回調iterateeCallback(),running數減1,再調用replenish(),這樣就能控制每一個函數在前一個函數完成後運行。

提及來這流程仍是比較簡單,可是在異步編程裏仍是不太好理解,咱們先來了解一下js執行機制,再舉一個例子來看:

js執行機制
  • 同步的進入主線程,異步的進入Event Table並註冊函數。
  • 當指定的事情完成時,Event Table會將這個函數移入Event Queue。
  • 主線程內的任務執行完畢爲空,會去Event Queue讀取對應的函數,進入主線程執行。
  • 上述過程會不斷重複,也就是常說的Event Loop(事件循環)。

普通版

function a() {
    setTimeout(function() {
        console.log(456);
    }, 500);
}

function b() {
    console.log(123);
}

function c() {
    setTimeout(function() {
        console.log(789);
    }, 0);
}

a();
b();
c();

// 123
// 789
// 456

按順序執行能夠看到

  • a()中setTimeout進入Event Table,註冊回調函數。
  • b(),執行console.log(123)。
  • c()中setTimeout進入Event Table,註冊回調函數。
  • c()中setTimeout先完成,回調函數進入Event Queue。
  • c()中setTimeout 500ms後完成,回調函數進入Event Queue。
  • 主線程從Event Queue讀取回調函數並執行。

series版

const async = require('async');

async.series(
    [
        callback => {
            setTimeout(function() {
                console.log(456);
                callback(null, 1);
            }, 500);
        },
        callback => {
            console.log(123);
            callback(null, 2);
        },
        callback => {
            setTimeout(function() {
                console.log(789);
                callback(null, 3);
            }, 0);
        }
    ],
    function(err, results) {
        console.log(results);
    }
);

// 456
// 123
// 789
// [ 2, 1, 3 ]

按我本身的理解,主線程和Event Loop都執行完稱爲一輪:

  1. 第一輪

    • 按照上面流程,主線程走到_eachOfLimit(),調用replenish()。根據while循環(運行數running < 一次異步操做的最大數量 limit),running += 1,進入集合中第一個函setTimeout數的調用,setTimeout進入Event Table,註冊回調函數。
    • 回到while循環,running=limit,結束循環,結束主線程。
    • setTimeout事件完成,回調函數進入Event Queue。
    • 主線程從Event Queue讀取回調函數並執行,回調iterateeCallback,running -= 1,調用replenish()。
  2. 第二輪

    • 重複第一輪。只要的區別在於集合中的第二個函數是同步的,全部是主線程一路執行下來。
  3. 第三輪

    • 重複第一輪。
  4. 第四輪

    • 集合中的三個函數已經都執行完了,經過iterator()閉包拿到是null,回調最終結果。
wrapAsync
function wrapAsync(asyncFn) {
    return isAsync(asyncFn) ? asyncify(asyncFn) : asyncFn;
}

var supportsSymbol = typeof Symbol === 'function';

function isAsync(fn) {
    return supportsSymbol && fn[Symbol.toStringTag] === 'AsyncFunction';
}

wrapAsync()先判斷是否異步函數,若是是es7 Async Functions的話調用asyncify,不然返回原函數。

function asyncify(func) {
    return initialParams(function (args, callback) {
        var result;
        try {
            result = func.apply(this, args);
        } catch (e) {
            return callback(e);
        }
        // if result is Promise object
        if (isObject(result) && typeof result.then === 'function') {
            result.then(function(value) {
                invokeCallback(callback, null, value);
            }, function(err) {
                invokeCallback(callback, err.message ? err : new Error(err));
            });
        } else {
            callback(null, result);
        }
    });
}

var initialParams = function (fn) {
    return function (/*...args, callback*/) {
        var args = slice(arguments);
        var callback = args.pop();
        fn.call(this, args, callback);
    };
};

採用同步功能並將其設置爲異步,並將其返回值傳遞給回調函數。若是傳遞給asyncify的函數返回一個Promise,則該Promise的resolved/rejected狀態將用於調用回調,而不單單是同步返回值。

總結

平日用慣async-await、promise,用起來簡單,但也致使缺乏思考。而嘗試用原生js去模擬,閱讀源碼,卻能帶來更多的收穫。

github地址,喜歡的支持star一下,Thanks♪(・ω・)ノ。

相關文章
相關標籤/搜索