解祕 Node.js 單線程實現高併發請求原理,以及串聯同步執行併發請求的方案

最近在作一個支持多進程請求的 Node 服務,要支持多併發請求,並且請求要按前後順序串聯同步執行返回結果。web

對,這需求就是這麼奇琶,業務場景也是那麼奇琶。sql

需求是完成了,爲了對 Node.js 高併發請求原理有更深一些的理解,特地寫一篇文章來鞏固一下相關的知識點。數組

問題

Node.js 由這些關鍵字組成: 事件驅動、非阻塞I/O、高效、輕量瀏覽器

因而在咱們剛接觸 Node.js 時,會有所疑問:網絡

  • 爲何在瀏覽器中運行的 JavaScript 能與操做系統進行如此底層的交互?
  • Node 真的是單線程嗎?
  • 若是是單線程,他是如何處理高併發請求的?
  • Node 事件驅動是如何實現的?

下來咱們一塊兒來解祕這是怎麼一回事!多線程

架構一覽

上面的問題,都挺底層的,因此咱們從 Node.js 自己入手,先來看看 Node.js 的結構。架構

  • Node.js 標準庫,這部分是由 Javascript編寫的,即咱們使用過程當中直接能調用的 API。在源碼中的 lib 目錄下能夠看到。
  • Node bindings,這一層是 Javascript 與底層 C/C++ 可以溝通的關鍵,前者經過 bindings 調用後者,相互交換數據。
  • 第三層是支撐 Node.js 運行的關鍵,由 C/C++ 實現。
  • V8:Google 推出的 Javascript VM,也是 Node.js 爲何使用的是 JavaScript 的關鍵,它爲 JavaScript 提供了在非瀏覽器端運行的環境,它的高效是 Node.js 之因此高效的緣由之一。
  • Libuv:它爲 Node.js 提供了跨平臺,線程池,事件池,異步 I/O 等能力,是 Node.js 如此強大的關鍵。
  • C-ares:提供了異步處理 DNS 相關的能力。
  • http_parser、OpenSSL、zlib 等:提供包括 http 解析、SSL、數據壓縮等其餘的能力。

單線程、異步

  • 單線程:全部任務須要排隊,前一個任務結束,纔會執行後一個任務。若是前一個任務耗時很長,後一個任務就不得不一直等着。Node 單線程指的是 Node 在執行程序代碼時,主線程是單線程
  • 異步:主線程以外,還維護了一個"事件隊列"(Event queue)。當用戶的網絡請求或者其它的異步操做到來時,Node 都會把它放到 Event Queue 之中,此時並不會當即執行它,代碼也不會被阻塞,繼續往下走,直到主線程代碼執行完畢。

注:併發

  • JavaScript 是單線程的,Node 自己實際上是多線程的,只是 I/O 線程使用的 CPU 比較少;還有個重要的觀點是,除了用戶的代碼沒法並行執行外,全部的 I/O (磁盤 I/O 和網絡 I/O) 則是能夠並行起來的。
  • libuv 線程池默認打開 4 個,最多打開 128 個 線程。

事件循環

Nodejs 所謂的單線程,只是主線程是單線程。app

  • 主線程運行 V8 和 JavaScript
  • 多個子線程經過 事件循環 被調度

能夠抽象爲:主線程對應於老闆,正在工做。一旦發現有任務能夠分配給職員(子線程)來作,將會把任務分配給底下的職員來作。同時,老闆繼續作本身的工做,等到職員(子線程)把任務作完,就會經過事件把結果回調給老闆。老闆又不停重複處理職員(子線程)子任務的完成狀況。koa

老闆(主線程)給職員(子線程)分配任務,當職員(子線程)把任務作完以後,經過事件把結果回調給老闆。老闆(主線程)處理回調結果,執行相應的 JavaScript。

更具體的解釋請看下圖:

一、每一個 Node.js 進程只有一個主線程在執行程序代碼,造成一個執行棧(execution context stack)。

二、Node.js 在主線程裏維護了一個"事件隊列"(Event queue),當用戶的網絡請求或者其它的異步操做到來時,Node 都會把它放到 Event Queue之中,此時並不會當即執行它,代碼也不會被阻塞,繼續往下走,直到主線程代碼執行完畢。

三、主線程代碼執行完畢完成後,而後經過 Event Loop,也就是事件循環機制,檢查隊列中是否有要處理的事件,這時要分兩種狀況:若是是非 I/O 任務,就親自處理,並經過回調函數返回到上層調用;若是是 I/O 任務,就從 線程池 中拿出一個線程來處理這個事件,並指定回調函數,當線程中的 I/O 任務完成之後,就執行指定的回調函數,並把這個完成的事件放到事件隊列的尾部,線程歸還給線程池,等待事件循環。當主線程再次循環到該事件時,就直接處理並返回給上層調用。 這個過程就叫 事件循環 (Event Loop)

四、期間,主線程不斷的檢查事件隊列中是否有未執行的事件,直到事件隊列中全部事件都執行完了,此後每當有新的事件加入到事件隊列中,都會通知主線程按順序取出交 Event Loop 處理。

優缺點

Nodejs 的優勢:I/O 密集型處理是 Nodejs 的強項,由於 Nodejs 的 I/O 請求都是異步的(如:sql 查詢請求、文件流操做操做請求、http 請求...)

Nodejs 的缺點:不擅長 cpu 密集型的操做(複雜的運算、圖片的操做)

總結

一、Nodejs 與操做系統交互,咱們在 JavaScript 中調用的方法,最終都會經過 process.binding 傳遞到 C/C++ 層面,最終由他們來執行真正的操做。Node.js 即這樣與操做系統進行互動。

二、Nodejs 所謂的單線程,只是主線程是單線程,全部的網絡請求或者異步任務都交給了內部的線程池去實現,自己只負責不斷的往返調度,由事件循環不斷驅動事件執行。

三、Nodejs 之因此單線程能夠處理高併發的緣由,得益於 libuv 層的事件循環機制,和底層線程池實現。

四、Event loop 就是主線程從主線程的事件隊列裏面不停循環的讀取事件,驅動了全部的異步回調函數的執行,Event loop 總共 7 個階段,每一個階段都有一個任務隊列,當全部階段被順序執行一次後,event loop 完成了一個 tick。

參考文章:Nodejs探祕:深刻理解單線程實現高併發原理

串聯同步執行併發請求

就像上面說的:Node.js 在主線程裏維護了一個"事件隊列"(Event queue),當用戶的網絡請求或者其它的異步操做到來時,Node 都會把它放到 Event Queue之中,此時並不會當即執行它,代碼也不會被阻塞,繼續往下走,直到主線程代碼執行完畢。

因此要串聯同步執行併發請求的關鍵在於維護一個隊列,隊列的特色是 先進先出,按隊列裏面的順序執行就能夠達到串聯同步執行併發請求的目的。

方案

  • 根據每一個請求的 uniqueId 變量做爲惟一令牌
  • 隊列裏面維護一個結果數組和一個執行隊列,把執行隊列完成的 令牌與結果 存儲在結果數組裏面
  • 根據惟一令牌,一直去獲取執行完成的結果,間隔 200 毫秒,超時等待時間爲 10 分鐘
  • 一直等待並獲取結果,等待到有結果時,才返回給請求;並根據令牌把結果數組裏面相應的項刪除

隊列

代碼:

class Recorder {
    private list: any[];

    private queueList: any[];

    private intervalTimer;

    constructor() {
        this.list = [];
        this.queueList = [];
        this.intervalTimer = null;
    }

    // 根據 id 獲取任務結果
    public get(id: string) {
        let data;
        console.log('this.list: ', this.list);
        let index;
        for (let i = 0; i < this.list.length; i++) {
            const item = this.list[i];
            if (id === item.id) {
                data = item.data;
                index = i;
                break;
            }
        }
        // 刪除獲取到結果的項
        if (index !== undefined) {
            this.list.splice(index, 1);
        }
        return data;
    }

    public clear() {
        this.list = [];
        this.queueList = [];
    }

    // 添加項
    public async addQueue(item: any) {
        this.queueList.push(item);
    }

    public async runQueue() {
        clearInterval(this.intervalTimer);
        if (!this.queueList.length) {
            // console.log('隊列執行完畢');
            return;
        }
        // 取出隊列裏面的最後一項
        const item = this.queueList.shift();
        console.log('item: ', item);
        // 執行隊列的回調
        const data = await item.callback();
        console.log('回調執行完成: ', data);
        // 把結果放進 結果數組
        this.list.push({ id: item.id, data });
    }

    public interval() {
        clearInterval(this.intervalTimer);
        this.intervalTimer = setInterval(async () => {
            clearInterval(this.intervalTimer);
            // 一直執行裏面的任務
            await this.runQueue();
            this.interval();
        }, 200);
    }
}

const recorder = new Recorder();
recorder.interval();

export default recorder;

服務

下面模擬一個請求端口的的 Node 服務。

代碼:

const Koa = require('koa')
const Router = require('koa-router')
const cuid = require('cuid');
const bodyParser = require('koa-bodyparser')
import recorder from "./libs/recorder";

const MAX_WAITING_TIME = 60 * 5; // 最大等待時長
// web服務端口
const SERVER_PORT: number = 3000;
const app = new Koa();
app.use(bodyParser());
const router = new Router();


/**
 * 程序睡眠
 * @param time 毫秒
 */
const timeSleep = (time: number) => {
    return new Promise((resolve) => {
        setTimeout(() => {
            resolve("");
        }, time);
    });
};

/**
 * 程序睡眠
 * @param second 秒
 */
const sleep = (second: number) => {
    return timeSleep(second * 1000);
};

router.post("/getPort", async (ctx, next) => {
    const { num } = ctx.request.body;
    const uniqueId = cuid();
    console.log('uniqueId: ', uniqueId);
    recorder.addQueue({
        id: uniqueId,
        callback: getPortFun(num)
    });
    let waitTime = 0;
    while (!ctx.body) {
        await sleep(0.2);
        console.log('1');
        const data: any = recorder.get(uniqueId);
        if (data) {
            ctx.body = {
                code: 0,
                data: data,
                msg: 'success'
            };
        }
        waitTime++;
        // 超過最大時間就返回一個結果
        if (waitTime > MAX_WAITING_TIME) {
            ctx.body = {};
        }
    }
});

// 返回一個函數
function getPortFun(num) {
    return () => {
        return new Promise((resolve) => {
            // 模擬異步程序
            setTimeout(() => {
                console.log(`num${num}: `, num);
                resolve(num * num);
            }, num * 1000);
        });
    };
}

app.use(router.routes()).use(router.allowedMethods());

app.listen(SERVER_PORT);

最後

最近狀態不好勁,因此最近的原創技術文章有點難產了 😥

心態急需調整,週末想出去玩,放鬆一下本身,找回那個鬥志滿滿的真我才行,唉。

推薦閱讀

支持一下下👇

相關文章
相關標籤/搜索