最近在作一個支持多進程請求的 Node 服務,要支持多併發請求,並且請求要按前後順序串聯同步執行返回結果。web
對,這需求就是這麼奇琶,業務場景也是那麼奇琶。sql
需求是完成了,爲了對 Node.js 高併發請求原理有更深一些的理解,特地寫一篇文章來鞏固一下相關的知識點。數組
Node.js 由這些關鍵字組成: 事件驅動、非阻塞I/O、高效、輕量。瀏覽器
因而在咱們剛接觸 Node.js 時,會有所疑問:網絡
下來咱們一塊兒來解祕這是怎麼一回事!多線程
上面的問題,都挺底層的,因此咱們從 Node.js 自己入手,先來看看 Node.js 的結構。架構
注:併發
Nodejs 所謂的單線程,只是主線程是單線程。app
事件循環
被調度能夠抽象爲:主線程對應於老闆,正在工做。一旦發現有任務能夠分配給職員(子線程)來作,將會把任務分配給底下的職員來作。同時,老闆繼續作本身的工做,等到職員(子線程)把任務作完,就會經過事件把結果回調給老闆。老闆又不停重複處理職員(子線程)子任務的完成狀況。koa
老闆(主線程)給職員(子線程)分配任務,當職員(子線程)把任務作完以後,經過事件把結果回調給老闆。老闆(主線程)處理回調結果,執行相應的 JavaScript。
更具體的解釋請看下圖:
一、每一個 Node.js 進程只有一個主線程在執行程序代碼,造成一個執行棧(execution context stack)。
二、Node.js 在主線程裏維護了一個"事件隊列"(Event queue),當用戶的網絡請求或者其它的異步操做到來時,Node 都會把它放到 Event Queue之中,此時並不會當即執行它,代碼也不會被阻塞,繼續往下走,直到主線程代碼執行完畢。
三、主線程代碼執行完畢完成後,而後經過 Event Loop,也就是事件循環機制,檢查隊列中是否有要處理的事件,這時要分兩種狀況:若是是非 I/O 任務,就親自處理,並經過回調函數返回到上層調用;若是是 I/O 任務,就從 線程池
中拿出一個線程來處理這個事件,並指定回調函數,當線程中的 I/O 任務完成之後,就執行指定的回調函數,並把這個完成的事件放到事件隊列的尾部,線程歸還給線程池,等待事件循環。當主線程再次循環到該事件時,就直接處理並返回給上層調用。 這個過程就叫 事件循環 (Event Loop)
。
四、期間,主線程不斷的檢查事件隊列中是否有未執行的事件,直到事件隊列中全部事件都執行完了,此後每當有新的事件加入到事件隊列中,都會通知主線程按順序取出交 Event Loop 處理。
Nodejs 的優勢:I/O 密集型處理是 Nodejs 的強項,由於 Nodejs 的 I/O 請求都是異步的(如:sql 查詢請求、文件流操做操做請求、http 請求...)
Nodejs 的缺點:不擅長 cpu 密集型的操做(複雜的運算、圖片的操做)
一、Nodejs 與操做系統交互,咱們在 JavaScript 中調用的方法,最終都會經過 process.binding 傳遞到 C/C++ 層面,最終由他們來執行真正的操做。Node.js 即這樣與操做系統進行互動。
二、Nodejs 所謂的單線程,只是主線程是單線程,全部的網絡請求或者異步任務都交給了內部的線程池去實現,自己只負責不斷的往返調度,由事件循環不斷驅動事件執行。
三、Nodejs 之因此單線程能夠處理高併發的緣由,得益於 libuv 層的事件循環機制,和底層線程池實現。
四、Event loop 就是主線程從主線程的事件隊列裏面不停循環的讀取事件,驅動了全部的異步回調函數的執行,Event loop 總共 7 個階段,每一個階段都有一個任務隊列,當全部階段被順序執行一次後,event loop 完成了一個 tick。
就像上面說的:Node.js 在主線程裏維護了一個"事件隊列"(Event queue),當用戶的網絡請求或者其它的異步操做到來時,Node 都會把它放到 Event Queue之中,此時並不會當即執行它,代碼也不會被阻塞,繼續往下走,直到主線程代碼執行完畢。
因此要串聯同步執行併發請求的關鍵在於維護一個隊列,隊列的特色是 先進先出
,按隊列裏面的順序執行就能夠達到串聯同步執行併發請求的目的。
方案
令牌與結果
存儲在結果數組裏面代碼:
class Recorder { private list: any[]; private queueList: any[]; private intervalTimer; constructor() { this.list = []; this.queueList = []; this.intervalTimer = null; } // 根據 id 獲取任務結果 public get(id: string) { let data; console.log('this.list: ', this.list); let index; for (let i = 0; i < this.list.length; i++) { const item = this.list[i]; if (id === item.id) { data = item.data; index = i; break; } } // 刪除獲取到結果的項 if (index !== undefined) { this.list.splice(index, 1); } return data; } public clear() { this.list = []; this.queueList = []; } // 添加項 public async addQueue(item: any) { this.queueList.push(item); } public async runQueue() { clearInterval(this.intervalTimer); if (!this.queueList.length) { // console.log('隊列執行完畢'); return; } // 取出隊列裏面的最後一項 const item = this.queueList.shift(); console.log('item: ', item); // 執行隊列的回調 const data = await item.callback(); console.log('回調執行完成: ', data); // 把結果放進 結果數組 this.list.push({ id: item.id, data }); } public interval() { clearInterval(this.intervalTimer); this.intervalTimer = setInterval(async () => { clearInterval(this.intervalTimer); // 一直執行裏面的任務 await this.runQueue(); this.interval(); }, 200); } } const recorder = new Recorder(); recorder.interval(); export default recorder;
下面模擬一個請求端口的的 Node 服務。
代碼:
const Koa = require('koa') const Router = require('koa-router') const cuid = require('cuid'); const bodyParser = require('koa-bodyparser') import recorder from "./libs/recorder"; const MAX_WAITING_TIME = 60 * 5; // 最大等待時長 // web服務端口 const SERVER_PORT: number = 3000; const app = new Koa(); app.use(bodyParser()); const router = new Router(); /** * 程序睡眠 * @param time 毫秒 */ const timeSleep = (time: number) => { return new Promise((resolve) => { setTimeout(() => { resolve(""); }, time); }); }; /** * 程序睡眠 * @param second 秒 */ const sleep = (second: number) => { return timeSleep(second * 1000); }; router.post("/getPort", async (ctx, next) => { const { num } = ctx.request.body; const uniqueId = cuid(); console.log('uniqueId: ', uniqueId); recorder.addQueue({ id: uniqueId, callback: getPortFun(num) }); let waitTime = 0; while (!ctx.body) { await sleep(0.2); console.log('1'); const data: any = recorder.get(uniqueId); if (data) { ctx.body = { code: 0, data: data, msg: 'success' }; } waitTime++; // 超過最大時間就返回一個結果 if (waitTime > MAX_WAITING_TIME) { ctx.body = {}; } } }); // 返回一個函數 function getPortFun(num) { return () => { return new Promise((resolve) => { // 模擬異步程序 setTimeout(() => { console.log(`num${num}: `, num); resolve(num * num); }, num * 1000); }); }; } app.use(router.routes()).use(router.allowedMethods()); app.listen(SERVER_PORT);
最近狀態不好勁,因此最近的原創技術文章有點難產了 😥
心態急需調整,週末想出去玩,放鬆一下本身,找回那個鬥志滿滿的真我才行,唉。
推薦閱讀
支持一下下👇