異步編程解決方案 - generator

異步編程難點

異常處理

在處理異常時常常用try/catch/final語句塊進行異常捕獲,可是這種異常捕獲對異步編程並非用前端

function async(callback) {
    process.nextTick(callback);
}

try {
    async(function () {
        console.log(a);
    });
} catch (err) {
    // TODO
}

異步代碼分爲兩個過程,提交請求和處理結果,其中代碼在異步處理完成以前返回,而異常不必定在這個過程當中發生,因此try、catch不會有任何做用,調用async時,callback被暫時掛起,等到代碼執行完畢纔會執行,try只能捕獲當前事件循環的異常,對下一次的事件循環沒法處理(nodejs異步時間作了約定,異常必定被當成第一個參數傳回,在調用callback時先判斷是否有異常發生)java

function async(callback) {
    process.nextTick(function () {
        if (err) {
            return callback(err);
        }
        callback(null);

    });
}

try {
    async(function (err) {
        if (!err) {
            console.log(a);
        }
    });
} catch (err) {
    // TODO
}

函數嵌套過深

對於Node和agax調用而言,有時會存在多個異步調用嵌套的場景,好比一個文件目錄的遍歷操做:node

fs.readdir(path.join(__dirname, '..'), function (err, file) {
    files.forEach(function (filename, index) {
        fs.readFile(filename, 'utf8), function (err, file) {
            // TODO
        }
    });
});

或者一個網頁渲染操做:jquery

$(selector).click(function (e) {
    $ajax({
        data: '',
        success: function (data) {
            template.init(data, function (tpl) {
                // TODO
            });
        }
    });
});

上面的代碼邏輯上是沒有問題的,可是並無利用好異步I/O帶來的優點,這是異步編程的典型問題。ajax

多線程編程

若是是多核CPU,單個Node進程實際沒有充分利用多核CPU,瀏覽器提出了Web workers,經過將javascrit執行與UI渲染分離,能夠良好的利用多核CPU。由於前端瀏覽器對標準的滯後,Web workers並無普遍應用起來。express

異步轉同步

習慣同步編程的同窗,並不能從容面對異步編程帶來的副產品,好比嵌套回調、業務分散。Node 提供了絕大部分異步 API 卻不多有同步 API,每每出現同步需求會無所適從,雖然 Node 試圖異步轉同步可是並無原生的支持,須要藉助庫或者編譯實現,對於異步編程經過良好的流程控制,仍是能夠降落幾梳理成順序的形式。編程

異步編程解決方案

事件發佈/訂閱模式

// 訂閱 
emiiiter.on('event', function(message) {
    console.log(message);
})
// 發佈
emitter.emit('event', 'i am a message');

事件監聽是一種高階函數的應用,經過事件能夠把內部數據傳遞給外部的調用者,編程者能夠不用關心組件內部如何執行,只需關注在須要的事件點上便可。注意:promise

  • 若是事件的監聽器過多可能出現過分佔用cup的結果。
  • 若是運行期間觸發了error事件,解釋器會檢查是否對error監聽了事件,若是有就交給監聽器處理,若是沒有則將錯誤拋出。因此應該對error事件作監聽。

利用事件能夠解決雪崩問題:當大量的訪問同時發生時,服務器沒法對全部的訪問作處理,能夠在第一個回調添加狀態鎖控制服務器的訪問數量,同時使用事件(once)把全部請求壓入隊列中。瀏覽器

promise/deferred模式

promise/A 規定了三種狀態,未完成態、完成態和失敗態,未完成態向其餘兩種狀態轉化,不能逆轉;服務器

pedding -> resolved

-> rejected
function call(state, fn, err, arg) {
    if  (state === 'pendding') {
        fn(arg);
    } else {
        fn(err);
    }

}
new Promise = function (fn) {
    this.state = 'pendding';
    this.fn = function() {};
    return fn(this.resolve, this.reject);
}
Promise.prototype.then = function (fn) {
    this.fn = fn;
    return this;
}
Promise.prototype.resolve = function (arg) {
    this.state = 'resolved';
    call(this.state, this.fn, null, arg);
    return this;
}
Promise.prototype.reject = function () {
    this.state = 'rejected';
    var err = "err opened";
    call(this.state, this.fn, err);
    return this;
}
new Promise(function (resolve, reject) {
    setTimeout(function () {
        var value = 'abc';
        resolve(value);
    }, 100);
}).then(function (result) {
    console.log(result);
});

流程控制庫

中間件

使用connect存儲中間件手動調用執行的方式,例如next,一般叫作尾觸發,尾觸發在jquery中很是常見,好比

$get('/get').success().error();

這種方式首先註冊中間件,每一箇中間件包括傳遞請求對象,響應對象和尾觸發函數,經過隊列行程一個處理流,最簡單的中間例如:

function (req, res, err) {
    // 中間件
}

connect核心代碼:

function creatServer() {
    function app(req, res) {
        app.handle(req,res);
    }
    app.stack = [];
    for (var i = 0; i < arguments.length; ++i) {
        app.use(arguments[i]);
    }
    return app;
}

app.use:

app.use = function(router, fn) {
    this.stack.push(fn);
    return this;
}

next:

function handle = function() {
    // ...
    next();
}
function next() {
    // ... next callback ...
    layer = this.stack[index++];
    layer.handle(req, res, next);
}
async

異步的串行執行

async.series([function (callback) {
    callback();
},function (callback) {
    callback();
}], function (err, result) {})

等價於:

function (callback) {
    function (callback) {
        callback();
    }
    callback();
}

異步的並行執行:

async.parallel([function (callback) {
    callback();
}, function (callback) {
    callback();
}], function (err, results) {

});

等價於:

var counter = 2;
var results = [];
var done = function (index, value) {
    results[index] = value;
    if (!--conuter) {
        callback(null, results);
    }
}

function (callback) {
    // var value = ...
    callback();
    done(0, value);
}
function (callback) {
    // var value = ...
    callback();
    done(1, value);
}

依賴處理

當前一個異步的結果是後一個異步的輸入時,async使用waterfall方式處理

async.waterfall([function (callback) {
    callback();
}, function (arg1, callback) {
    callback();
}, function (arg2, callback) {
    callback();
}], function (err, results) {

});

當存在不少依賴關係,有同步有異步時,async使用auto()實現複雜的處理

async.waterfall({
    fun1:function (callback) {
        callback();
    },
    fun2: ['fun1', function (arg1, callback) {
        callback();
    }, function (arg2, callback) {
        callback();
    }]}, function (err, results) {

    });
step

step接受任意數量的任務,全部任務會串行執行:

step(task1, task2, task3);

step使用next把上一步的結果傳遞給下一步做爲參數
在執行多個異步任務時,調用代碼以下:

step(function () {
    fn1(this.parallel());
    fn2(this.parallel());
}, function (err, result1, result2) {

});
wind

wind旨在控制異步流程的邏輯控制,其做用相似generator:

eval(Wind.compile('async', funtion () {
    $await(Wind.Async.sleep(20)); //延遲20ms
    console.log('hello world');
}));

generator

generaor函數

function * maker(){
    var index = 0;
    while (index < 10) {
        yield index++;
    }
}

var g = maker();
// 輸出結果
console.log(g.next().value); // 0
console.log(g.next().value); // 1
console.log(g.next().value); // 2

yeild關鍵字

yield 關鍵字用來暫停和恢復一個生成器函數

[rv] = yield [expression];

yield [[expression]];

rv 返回傳遞給生成器的 next() 方法的可選值,以恢復其執行。

Regenerator

上面這段代碼等價下面代碼:

var _marked = [maker].map(regeneratorRuntime.mark);

function maker() {
    var index;
    return regeneratorRuntime.wrap(function maker$(_context) {
        while (1) {
            switch (_context.prev = _context.next) {
                case 0:
                    index = 0;

                case 1:
                    if (!(index < 10)) {
                        _context.next = 6;
                        break;
                    }

                    _context.next = 4;
                    return index++;

                case 4:
                    _context.next = 1;
                    break;

                case 6:
                case "end":
                    return _context.stop();
            }
        }
    }, _marked[0], this);
}

var g = maker();

console.log(g.next().value); // 0
console.log(g.next().value); // 1
console.log(g.next().value); // 2

編譯機制造了一個狀態機,經過_context.next狀態的裝換完成代碼執行的掛起。
假設狀態是0 -> n(n是最後一個狀態)
0運行第一個yield以前的全部代碼,n運行最後一個yield函數以後的全部代碼,generator的next尾調用經過一個while循環實現,若是_context.next到達最後一個case就退出循環,等待下一次next調用

regenerator是用來生成generetor函數並返回一個迭代器供外界調用的高階函數,功能主要是

  • regenerator-transform: 重寫generator函數把yield重寫成switch case,而且建立_context.next保存上下文環境;
  • 包裝generator函數被返回一個迭代器對象;

通過wrap返回的迭代器:

GeneratorFunctionPrototype {
    _invoke: function invoke(method, arg) { … }
    __proto__: GeneratorFunctionPrototype {
        constructor: function GeneratorFunctionPrototype() {},
        next: function (arg) { … },
        throw: function (arg) { … } 
        …
    }
}

當調用迭代器對象iter.next()方法時,由於有以下代碼,因此會執行_invoke方法,而根據前面wrap方法代碼可知,最終是調用了迭代器對象的 makeInvokeMethod (innerFn, self, context); 方法

makeInvokeMethod方法內容較多,這裏選取部分分析。

function makeInvokeMethod(innerFn, self, context) {
    var state = GenStateSuspendedStart;
    return function invoke(method, arg) {

makeInvokeMethod返回invoke函數,當咱們執行.next方法時,實際調用的是invoke方法中的下面語句

var record = tryCatch(innerFn, self, context);

這裏tryCatch方法中fn爲通過轉換後的example$方法,arg爲上下文對象context,由於invoke函數內部對context的引用造成閉包引用,因此context上下文得以在迭代期間一直保持。

function tryCatch(fn, obj, arg) {
    try {
        return { type: "normal", arg: fn.call(obj, arg) };
    } catch (err) {
        return { type: "throw", arg: err };
    }
}

tryCatch方法會實際調用 example$ 方法,進入轉換後的switch case,執行代碼邏輯。若是獲得的結果是一個普通類型的值,咱們將它包裝成一個可迭代對象格式,而且更新生成器狀態至GenStateCompleted或者GenStateSuspendedYield

var record = tryCatch(innerFn, self, context);
        if (record.type === "normal") {
        // If an exception is thrown from innerFn, we leave state ===
        // GenStateExecuting and loop back for another invocation.
        state = context.done
            ? GenStateCompleted
            : GenStateSuspendedYield;

        var info = {
            value: record.arg,
            done: context.done
        };

僞代碼:

function wrap(innerFn, outerFn, self, tryLocsList) {
    var protoGenerator = outerFn && outerFn.prototype instanceof Generator
        ? outerFn
        : Generator;
    var generator = Object.create(protoGenerator.prototype);
    var context = new Context(tryLocsList || []);

    generator._invoke = makeInvokeMethod(innerFn, self, context);

    return generator;
}

function makeInvokeMethod(innerFn, self, context) {
    var obj = this;
    return function invoke(method, arg) {
        context.method = method;
        // 把next帶入的arg參數賦值給sent
        if (context.method === "next") {
            context.sent = context._sent = context.arg;
        }
        // 實際上調用了mark$,而且帶入了context
        var record = {
            arg: innerFn.call(obj, context)
        };
        // 返回一個能夠迭代的對象
        return {value: record.arg, done: context.done};
    };
}

// 用一個next調用invoke, 若是要進行下一步就傳入next
generator.next = next(arg) {
    generator._invoke('next', arg);
}

cojs處理generator過程

thunk函數

可以獲得一個函數的函數叫thunk函數, thunk函數是一個偏函數,它只帶一個執行參數

function getThunk(number) {
    return function (fn) {
        setTimeout(() => {
            if (number) {
                fn(null, number);
            } else {
                const err = "error open";
                fn(err);
            }
        }, number)
    }
}
cojs-generator的自動執行器
import co from 'co';

co(function * () {
    var a = yield getThunk(100);
    var b = yield getThunk(1000);
    console.log('a:', a);
    console.log('b:', b);
    return [a, b];
})

// 輸出
// a 100
// b 1000
cojs代碼解析
function co2Thunk(fn) {
    return (done) => {
        const ctx = this;
        const g = fn.call(ctx);
        function next(err, res) {
            console.log('next1', res);
            let it = g.next(res);
            if (it.done) {
                done.call(ctx, err, it.value);
            } else {
                it.value(next);
            }
        }
        next();
    }
}

co2Thunk(function * () {
    var a = yield getThunk(10000);
    var b = yield getThunk(1000);
    // console.log('a:', a);
    // console.log('b:', b);
    return [a, b];
})(function (err, args) {
    console.log("callback thunk co : ==========");
    // console.log(err, args);
});

co2Thunk的代碼等價於:

function co2Thunk(fn) {
    return (done) => {
        const ctx = this;
        const g = fn.call(ctx);
        let it0 = g.next();
        it0.value((err, res) => {
            const it1 = g.next(res); // 第一次迭代返回的是getThunk(10000);
            it0.value((err, res) => {
                const it1 = g.next(res); // 第二次迭代返回的是getThunk(1000);
                it1.value((err, res) => {
                    const it2 = g.next(data);
                    // ...
                });
            });
        });
    }
}

// it.value 等價於:
function (fn) {
    setTimeout(() => {
        if (number) {
            fn(null, number);
        } else {
            const err = "error open";
            fn(err);
        }
    }, number)
}
promise版
function co2Promise(fn) {
    return new Promise((resolve, reject) => {
        const ctx = this;
        const g = fn.call(ctx);
        function next(err, res) {
            let it = g.next(res);
            if (it.done) {
                resolve(it.value);
            } else {
                it.value(next);
            }
        }
        next();
    });
}

co2Promise(function * () {
    var a = yield getThunk(100);
    var b = yield getThunk(1000);
    console.log('a:', a);
    console.log('b:', b);
    return [a, b];
}).then(function (args) {
    console.log("callback promise co : ==========");
    console.log(args);
});
thunk升級版
function co2Thunk(fn) {
    return (done) => {
        const ctx = this;
        const g = fn.call(ctx);
        function next(err, res) {
            let it = g.next(res);
            if (it.done) {
                done.call(ctx, err, it.value);
            } else {
                // 增長對其餘類型的處理
                const value = toThunk.call(ctx, it.value);
                // 對於promise 此處應該是 value.then(next)
                value(next);
            }
        }
        next();
    }
}

co2Thunk(function * () {
    var a = getThunk(100);
    var b = getThunk(1000);
    // console.log('a:', a); console.log('b:', b);
    return yield [a, b];
})(function (err, args) {
    console.log("callback thunk co : ==========");
    console.log(err, args);
});

function toThunk(obj) {
    if (isObject(obj) || isArray(obj)) {
        return objectToThunk(obj);
    }
    if (isPromise(obj)) {
        return promiseToThunk.call(ctx, obj);
    }
    return obj;
}

function objectToThunk(obj) {
    return function (done) {
        let keys = Object.keys(obj);
        let length = keys.length;
        let results = new obj.constructor();
        for(let key in keys) {
            const fn = toThunk(obj[key]);
            fn((err, res) => {
                results[key] = res;
                --length || done(null, results);
            }, key);
        }
    }
}

function promiseToThunk(promise){
    return function(done){
        promise.then(function(err,res){
            done(err,res);
        },done)
    }
}

function isObject(obj) {
    return obj && Object == obj.constructor;
}
function isArray(obj) {
    return Array.isArray(obj);
}

function isPromise(obj) {
return obj && 'function' == typeof obj.then;
}

async/await

async function fn(args){
// ...
}

等同於

function fn(args){ 
    return co2Thunk(function*() {
        // ...
    }); 
}
相關文章
相關標籤/搜索