【深刻淺出Node.js系列十四】Nodejs異步流程控制Async

#0 系列目錄#javascript

「流程控制」原本是件比較簡單的事,可是因爲Nodejs的異步架構的實現方法,對於須要同步的業務邏輯,實現起來就比較麻煩。嵌套3-4層,代碼就會變得的支離破碎了!今天就遇到了一個業務邏輯,連續對數據庫操做,先後有依賴。讓咱們看看Async是如何解決問題的。java

#1 Async介紹# Async是一個流程控制工具包,提供了直接而強大的異步功能。基於Javascript爲Node.js設計,同時也能夠直接在瀏覽器中使用。node

Async提供了大約20個函數,包括經常使用的 map, reduce, filter, forEach 等,異步流程控制模式包括,串行(series),並行(parallel),瀑布(waterfall)等mysql

項目地址:https://github.com/caolan/asynclinux

#2 Async安裝# 安裝async有兩個方式:git

  1. 獨立安裝asyncgithub

  2. 下載async demo代碼安裝web

##2.1 獨立安裝async##redis

~ D:\workspace\javascript>mkdir nodejs-async && cd nodejs-async
~ D:\workspace\javascript\nodejs-async>npm install async
npm http GET https://registry.npmjs.org/async
npm http 304 https://registry.npmjs.org/async
async@0.2.9 node_modules\async

打開網頁,參照示例學習:https://github.com/bsspirit/async_demosql

##2.2 下載async demo代碼安裝##

~ D:\workspace\javascript>git clone git@github.com:bsspirit/async_demo.git nodejs-async
~ D:\workspace\javascript>cd nodejs-async
~ D:\workspace\javascript\nodejs-async>npm install
npm http GET https://registry.npmjs.org/moment
npm http GET https://registry.npmjs.org/async
npm http 304 https://registry.npmjs.org/moment
npm http 304 https://registry.npmjs.org/async
async@0.2.9 node_modules\async
moment@2.1.0 node_modules\moment

這套demo示例,比較全面的介紹了async的使用,有中文註釋。

#3 Async函數介紹# 基於async的0.2.9版本。async主要實現了三個部分的流程控制功能:

集合: Collections

流程控制: Control Flow

工具類: Utils

##3.1 集合: Collections##

each: 若是想對同一個集合中的全部元素都執行同一個異步操做。

map: 對集合中的每個元素,執行某個異步操做,獲得結果。全部的結果將彙總到最終的callback裏。與each的區別是,each只關心操做無論最後的值,而map關心的最後產生的值。

filter: 使用異步操做對集合中的元素進行篩選, 須要注意的是,iterator的callback只有一個參數,只能接收true或false。

reject: reject跟filter正好相反,當測試爲true時則拋棄

reduce: 可讓咱們給定一個初始值,用它與集合中的每個元素作運算,最後獲得一個值。reduce從左向右來遍歷元素,若是想從右向左,可以使用reduceRight。

detect: 用於取得集合中知足條件的第一個元素。

sortBy: 對集合內的元素進行排序,依據每一個元素進行某異步操做後產生的值,從小到大排序。

some: 當集合中是否有至少一個元素知足條件時,最終callback獲得的值爲true,不然爲false.

every: 若是集合裏每個元素都知足條件,則傳給最終回調的result爲true,不然爲false

concat: 將多個異步操做的結果合併爲一個數組。

##3.2 流程控制: Control Flow##

series: 串行執行,一個函數數組中的每一個函數,每個函數執行完成以後才能執行下一個函數。

parallel: 並行執行多個函數,每一個函數都是當即執行,不須要等待其它函數先執行。傳給最終callback的數組中的數據按照tasks中聲明的順序,而不是執行完成的順序。

whilst: 至關於while,但其中的異步調用將在完成後纔會進行下一次循環。

doWhilst: 至關於do…while, doWhilst交換了fn,test的參數位置,先執行一次循環,再作test判斷。

until: until與whilst正好相反,當test爲false時循環,與true時跳出。其它特性一致。

doUntil: doUntil與doWhilst正好相反,當test爲false時循環,與true時跳出。其它特性一致。

forever: 不管條件循環執行,若是不出錯,callback永遠不被執行。

waterfall: 按順序依次執行一組函數。每一個函數產生的值,都將傳給下一個。

compose: 建立一個包括一組異步函數的函數集合,每一個函數會消費上一次函數的返回值。把f(),g(),h()異步函數,組合成f(g(h()))的形式,經過callback獲得返回值。

applyEach: 實現給一數組中每一個函數傳相同參數,經過callback返回。若是隻傳第一個參數,將返回一個函數對象,我能夠傳參調用。

queue: 是一個串行的消息隊列,經過限制了worker數量,再也不一次性所有執行。當worker數量不夠用時,新加入的任務將會排隊等候,直到有新的worker可用。

cargo: 一個串行的消息隊列,相似於queue,經過限制了worker數量,再也不一次性所有執行。不一樣之處在於,cargo每次會加載滿額的任務作爲任務單元,只有任務單元中所有執行完成後,纔會加載新的任務單元。

auto: 用來處理有依賴關係的多個任務的執行。

iterator: 將一組函數包裝成爲一個iterator,初次調用此iterator時,會執行定義中的第一個函數並返回第二個函數以供調用。

apply: 可讓咱們給一個函數預綁定多個參數並生成一個可直接調用的新函數,簡化代碼。

nextTick: 與nodejs的nextTick同樣,再最後調用函數。

times: 異步運行,times能夠指定調用幾回,並把結果合併到數組中返回。

timesSeries: 與time相似,惟一不一樣的是同步執行。

##3.3 工具類: Utils##

memoize: 讓某一個函數在內存中緩存它的計算結果。對於相同的參數,只計算一次,下次就直接拿到以前算好的結果。

unmemoize: 讓已經被緩存的函數,返回不緩存的函數引用。

log: 執行某異步函數,並記錄它的返回值,日誌輸出。

dir: 與log相似,不一樣之處在於,會調用瀏覽器的console.dir()函數,顯示爲DOM視圖。

noConflict: 若是以前已經在全局域中定義了async變量,當導入本async.js時,會先把以前的async變量保存起來,而後覆蓋它。僅僅用於瀏覽器端,在nodejs中沒用,這裏沒法演示。

async_demo使用介紹,詳細使用請參考github源代碼:https://github.com/bsspirit/async_demo每一個函數的用法,有很是詳細的實例!!

#4 場景:對數據庫的連續操做# 原場景中,對數據串行操做,增刪改查(CRUD),代碼以下:

var mysql = require('mysql');
var conn = mysql.createConnection({
    host: 'localhost',
    user: 'nodejs',
    password: 'nodejs',
    database: 'nodejs',
    port: 3306
});
conn.connect();

var insertSQL = 'insert into t_user(name) values("conan"),("fens.me")';
var selectSQL = 'select * from t_user limit 10';
var deleteSQL = 'delete from t_user';
var updateSQL = 'update t_user set name="conan update"  where name="conan"';

//delete
conn.query(deleteSQL, function (err0, res0) {
    if (err0) console.log(err0);
    console.log("DELETE Return ==> ");
    console.log(res0);

    //insert
    conn.query(insertSQL, function (err1, res1) {
        if (err1) console.log(err1);
        console.log("INSERT Return ==> ");
        console.log(res1);

        //query
        conn.query(selectSQL, function (err2, rows) {
            if (err2) console.log(err2);

            console.log("SELECT ==> ");
            for (var i in rows) {
                console.log(rows[i]);
            }

            //update
            conn.query(updateSQL, function (err3, res3) {
                if (err3) console.log(err3);
                console.log("UPDATE Return ==> ");
                console.log(res3);

                //query
                conn.query(selectSQL, function (err4, rows2) {
                    if (err4) console.log(err4);

                    console.log("SELECT ==> ");
                    for (var i in rows2) {
                        console.log(rows2[i]);
                    }
                });
            });
        });
    });
});

//conn.end();

爲了實現了串行操做,全部的調用都是在callback中實現的,5層嵌套結構。這種代碼已經變得不能夠維護了。因此,須要用async庫,對上面的代碼結構進行重寫!

var mysql = require('mysql');
var async = require('async');

var conn = mysql.createConnection({
    host: 'localhost',
    user: 'nodejs',
    password: 'nodejs',
    database: 'nodejs',
    port: 3306
});

var sqls = {
    'insertSQL': 'insert into t_user(name) values("conan"),("fens.me")',
    'selectSQL': 'select * from t_user limit 10',
    'deleteSQL': 'delete from t_user',
    'updateSQL': 'update t_user set name="conan update"  where name="conan"'
};

var tasks = ['deleteSQL', 'insertSQL', 'selectSQL', 'updateSQL', 'selectSQL'];
async.eachSeries(tasks, function (item, callback) {
    console.log(item + " ==> " + sqls[item]);
    conn.query(sqls[item], function (err, res) {
        console.log(res);
        callback(err, res);
    });
}, function (err) {
    console.log("err: " + err);
});

控制檯輸出:

deleteSQL ==> delete from t_user
{ fieldCount: 0,
  affectedRows: 0,
  insertId: 0,
  serverStatus: 34,
  warningCount: 0,
  message: '',
  protocol41: true,
  changedRows: 0 }
insertSQL ==> insert into t_user(name) values("conan"),("fens.me")
{ fieldCount: 0,
  affectedRows: 2,
  insertId: 45,
  serverStatus: 2,
  warningCount: 0,
  message: '&Records: 2  Duplicates: 0  Warnings: 0',
  protocol41: true,
  changedRows: 0 }
selectSQL ==> select * from t_user limit 10
[ { id: 45,
    name: 'conan',
    create_date: Fri Sep 13 2013 12:24:51 GMT+0800 (中國標準時間) },
  { id: 46,
    name: 'fens.me',
    create_date: Fri Sep 13 2013 12:24:51 GMT+0800 (中國標準時間) } ]
updateSQL ==> update t_user set name="conan update"  where name="conan"
{ fieldCount: 0,
  affectedRows: 1,
  insertId: 0,
  serverStatus: 2,
  warningCount: 0,
  message: '(Rows matched: 1  Changed: 1  Warnings: 0',
  protocol41: true,
  changedRows: 1 }
selectSQL ==> select * from t_user limit 10
[ { id: 45,
    name: 'conan update',
    create_date: Fri Sep 13 2013 12:24:51 GMT+0800 (中國標準時間) },
  { id: 46,
    name: 'fens.me',
    create_date: Fri Sep 13 2013 12:24:51 GMT+0800 (中國標準時間) } ]
err: null

代碼一下讀性就加強了許多倍,這就是高效的開發。

#5 Async多任務時間管理# 作服務器端開發時,常常會遇到時間管理的功能需求,好比每2秒刷新一次,每三分鐘作一次統計計算,週一至週五9點30啓動一個定時任務等等。不少時候咱們會把這些定時任務,交給linux系統的Crontab來實現。不過,有時爲了增長系統的靈活性,咱們須要在服務器後臺實現。

對於單線程的Nodejs,如何控制多任務的時間管理呢?

##5.1 需求描述## 基於Nodejs的express3構建的web框架,須要在週一至週五,早上9點15分時,分別啓動程序A和程序B,程序C。下午16點程序A,B,C中止。

程序A: 每1秒去redis取數據一次,保留在Nodejs的全局變量G中。

程序B: 每10秒去mysql取數據一次,經過websocket直接訪問給客戶端。

程序C: 每5秒對全局變量G,進行平均值計算,而後經過websocket直接訪問給客戶端。

##5.2 Nodejs的實現方案setInterval## 初始化項目:

~ cd D:\workspace\javascript\nodejs-async\demo
~ express -e timers
~ cd timers && npm install
~ npm install moment
~ npm install twix

編輯文件:app.js,在文件最面下增長新代碼

...

//moment,twix時間工具
var  moment = require('moment')
    ,twix = require('twix');

//判斷程序啓動時間
function isTime(){
    var hms = 'HHmmss';
    return moment("091500",hms).twix(moment("160000",hms)).contains(moment());
}

//打印日誌
if(isTime()){
    console.log("===============Working time===================");
}

//日誌時間格式化
function now() {
    return moment().format("HH:mm:ss");
}

//全局變量G
var G = 0;

//模擬程序A
function A() {
    console.log(now() + " A(s1)=> {G:" + (G++) + "} Cache G");
}

//模擬程序B
function B() {
    console.log(now() + " B(s10)=> {B:10} TO client");
}

//模擬程序C
function C() {
    console.log(now() + " C(s5)=> {G:" + (G / 5) + "} TO client");
    G = 0;
}

//分別對A,B,C程序進行時間管理
setInterval(function () {
    if(isTime()){
        A()
    };
}, 1000);

setInterval(function () {
    if(isTime()){
        C();
    }
}, 5 * 1000);

setInterval(function () {
    if(isTime()) {
        B();
    }
}, 10 * 1000);

運行nodejs,查看日誌輸出:

D:\workspace\javascript\nodejs-async\demo\timers>node app.js
===============Working time===================
Express server listening on port 3000
15:02:24 A(s1)=> {G:0} Cache G
15:02:25 A(s1)=> {G:1} Cache G
15:02:26 A(s1)=> {G:2} Cache G
15:02:27 A(s1)=> {G:3} Cache G
15:02:28 A(s1)=> {G:4} Cache G
15:02:28 C(s5)=> {G:1} TO client
15:02:29 A(s1)=> {G:0} Cache G
15:02:30 A(s1)=> {G:1} Cache G
15:02:31 A(s1)=> {G:2} Cache G
15:02:32 A(s1)=> {G:3} Cache G
15:02:33 A(s1)=> {G:4} Cache G
15:02:33 C(s5)=> {G:1} TO client
15:02:33 B(s10)=> {B:10} TO client
15:02:34 A(s1)=> {G:0} Cache G
15:02:35 A(s1)=> {G:1} Cache G
15:02:36 A(s1)=> {G:2} Cache G
15:02:37 A(s1)=> {G:3} Cache G
15:02:38 A(s1)=> {G:4} Cache G
15:02:38 C(s5)=> {G:1} TO client
15:02:39 A(s1)=> {G:0} Cache G
15:02:40 A(s1)=> {G:1} Cache G
15:02:41 A(s1)=> {G:2} Cache G
15:02:42 A(s1)=> {G:3} Cache G

程序A,每1秒運行一次,給G+1。

程序B,每10秒運行一次,輸出到客戶端。

程序C,每5秒運行一次,取G的平均值,給G賦值爲0,輸出到客戶端。

雖然完成了功能需求,可是代碼不美觀!若是再增長任務D,E,F….代碼很差維護。

##5.3 Async多任務時間管理## 下面咱們用async包,對上面的瀏覽進行封裝。

//moment,twix時間工具
var  moment = require('moment')
    ,twix = require('twix');

//判斷程序啓動時間
function isTime(){
    var hms = 'HHmmss';
    return moment("091500",hms).twix(moment("160000",hms)).contains(moment());
}

//打印日誌
if(isTime()){
    console.log("===============Working time===================");
}

//日誌時間格式化
function now() {
    return moment().format("HH:mm:ss");
}

//全局變量G
var G = 0;

//模擬程序A
function A() {
    console.log(now() + " A(s1)=> {G:" + (G++) + "} Cache G");
}

//模擬程序B
function B() {
    console.log(now() + " B(s10)=> {B:10} TO client");
}

//模擬程序C
function C() {
    console.log(now() + " C(s5)=> {G:" + (G / 5) + "} TO client");
    G = 0;
}

var async = require('async');

var arr = [
    {fun: A, delay: 1000, test: isTime},
    {fun: B, delay: 10 * 1000, test: isTime},
    {fun: C, delay: 5 * 1000, test: isTime}
];

async.each(arr, function (item, callback) {
    async.whilst(item.test,function(cb) {
            item.fun();
            setTimeout(cb, item.delay);
        },function(err) {
            console.log("Not working time!");
        }
    );
}, function (err) {
    log('Error: ' + err);
});

構建一個arr數組,封裝調用的A,B,C的參數。

使用async.each函數,對arr的item異步並行

使用async.whilst函數,對任務啓動時間進行判斷,並根據delay運行任務

程序運行結果與setInterval同樣。而代碼更利於維護,一旦須要增減任務,簡單地修改arr的數組就好了,其餘的代碼都不用動!!

相關文章
相關標籤/搜索