在nodejs中,若是要實現sleep的功能主要是經過「setTimeout + promise」實現,也能夠經過「循環空轉」來解決。前者是利用定時器實現任務的延遲執行,並經過promise鏈管理任務間的時序與依賴,本質上nodejs的執行線程並無真正的sleep,事件循環以及v8仍在運行,是僅僅表如今業務邏輯上sleep;然後者的實現則無疑實在浪費CPU性能,有點相似自旋鎖,不符合大多數場景。前端
若要實現引擎層面(運行時)的sleep,事情在ECMAScript Latest Draft (ECMA-262)出現以後開始有了起色。ECMA262規定了 Atomics.wait,它會將調用該方法的代理(引擎)陷入等待隊列並讓其sleep,直到被notify或者超時。該規範在8.10.0以上版本的nodejs上被實現。java
事實上,Atomics.wait 的出現主要解決瀏覽器或nodejs的worker之間數據同步的問題。瀏覽器上的web-worker、正式被nodejs@12歸入的worker-threads模塊,這些都是ECMAScript多線程模型的具體實現。既然出現多線程那麼線程間的同步也就不可避免的被提到,在前端以及nodejs範圍內可使用Atomics.wait和notify來解決。node
說的有些跑題,回到本節,如何實現運行時的sleep呢?很簡單,利用Atomics.wait的等待超時機制:web
let sharedBuf = new SharedArrayBuffer(4); let sharedArr = new Int32Array(sharedBuf); // 睡眠n秒 let sleep = function(n){ Atomics.wait(sharedArr, 0, 0, n * 1000); }
此處的sleep並非異步方法,它會阻塞執行線程直到超時,所以須要根據業務場景來使用該sleep模型。
關於Atomics.wait的具體使用方法,下文會着重講解。promise
雖然nodejs多線程使用場景不是不少,可是一旦涉及到多線程,那麼線程間同步就必不可少,不然沒法解決臨界區的問題。不過nodejs的work_threads對線程的建立不一樣於c或者java,它使用libuv的API建立線程 「uv_thread_create」,可是在此以前須要初始化一些設施如MessagePort、v8實例設置等,所以建立一個thread並非一個輕量級的操做,須要結合場景酌情建立適量的threads。瀏覽器
回到正題,多線程間的同步通常須要依賴鎖,而鎖的實現須要依賴於全局變量。在nodejs的work_threads實現中,主線程沒法設置全局變量,所以能夠經過Atomics實現。正如上例中所示,Atomics.wait依賴 SharedArrayBuffer,這是共享內存的ArrayBuffer,threads之間可經過它共享數據,可真正操做ArrayBuffer時並不直接使用該對象,而是TypeArray。如Atomics.wait,第一個參數必須是Int32Array對象,而該對象指向的緩衝區爲SharedArrayBuffer。當線程A由於Atomics.wait而阻塞後,可經過其它線程B調用Atomics.notify進行喚醒從而讓線程A的v8繼續執行。多線程
let { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); var sab = new SharedArrayBuffer(1024); var int32 = new Int32Array(sab); if (isMainThread) { const worker = new Worker(__filename, { workerData: sab }); worker.on('message', (d) => { console.log('parent receive message:', d); }); worker.on('error', (e) => { console.error('parent receive error', e); }); worker.on('exit', (code) => { if (code !== 0) console.error(new Error(`工做線程使用退出碼 ${code} 中止`)); }); Atomics.wait(int32, 0, 0); // A console.log(int32[0]); // C: 123 } else { let buf = workerData; let arrs = new Int32Array(buf); Atomics.store(arrs, 0, 123); Atomics.notify(arrs, 0); // B }
上例中,主線程建立thread後,在A處進行阻塞;在新線程中,經過原子操做Atomics.store修改SharedArrayBuffer的第一項爲123後,於B處喚醒阻塞在SharedArrayBuffer第一項的其它線程;此時主線程被喚醒,執行console.log(int32[0])
,輸出被新線程修改後的SharedArrayBuffer第一項數據123。app
分析一個公平、排它、不可重入鎖的實現,它使用Atomics.wait/notify/compareExchange完成線程的同步。異步
main-thread.js let Lock = require('./lock').Lock; let { Worker } = require('worker_threads'); const sharedBuffer = new SharedArrayBuffer(1 * Int32Array.BYTES_PER_ELEMENT); const sharedArray = new Int32Array(sharedBuffer); let worker = new Worker('./worker-lock.js', { workerData: sharedBuffer }); Lock.initialize(sharedArray, 0); const lock = new Lock(sharedArray, 0); // 獲取鎖 lock.lock(); // 3s後釋放鎖 setTimeout(() => { lock.unlock(); // (B) }, 3000)
worker-thread.js let Lock = require('./lock').Lock; let { parentPort, workerData } = require('worker_threads'); const sharedArray = new Int32Array(workerData); const lock = new Lock(sharedArray, 0); console.log('Waiting for lock...'); // (A) // 獲取鎖 lock.lock(); // (B) blocks! console.log('Unlocked'); // (C)
主線程初始化互斥鎖,同時建立線程,主線程獲取鎖後三秒鐘釋放;
worker線程嘗試獲取鎖,此時鎖已被主線程獲取,所以worker線程在此阻塞,等待3s後主線程釋放鎖被喚醒,繼續執行輸出。性能
lock.js const UNLOCKED = 0; const LOCKED_NO_WAITERS = 1; const LOCKED_POSSIBLE_WAITERS = 2; const NUMINTS = 1; class Lock { // 'iab' must be a Int32Array mapping shared memory. // 'ibase' must be a valid index in iab, the first of NUMINTS reserved for the lock. constructor(iab, ibase) { if (!(iab instanceof Int32Array && ibase|0 === ibase && ibase >= 0 && ibase+NUMINTS <= iab.length)) { throw new Error(`Bad arguments to Lock constructor: ${iab} ${ibase}`); } this.iab = iab; this.ibase = ibase; } static initialize(iab, ibase) { if (!(iab instanceof Int32Array && ibase|0 === ibase && ibase >= 0 && ibase+NUMINTS <= iab.length)) { throw new Error(`Bad arguments to Lock constructor: ${iab} ${ibase}`); } Atomics.store(iab, ibase, UNLOCKED); return ibase; } // Acquire the lock, or block until we can. Locking is not recursive: lock() { const iab = this.iab; const stateIdx = this.ibase; var c; if ((c = Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_NO_WAITERS)) !== UNLOCKED) { // A do { if (c === LOCKED_POSSIBLE_WAITERS || Atomics.compareExchange(iab, stateIdx, LOCKED_NO_WAITERS, LOCKED_POSSIBLE_WAITERS) !== UNLOCKED) { Atomics.wait(iab, stateIdx, LOCKED_POSSIBLE_WAITERS, Number.POSITIVE_INFINITY); } } while ((c = Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_POSSIBLE_WAITERS)) !== UNLOCKED); // B } } tryLock() { const iab = this.iab; const stateIdx = this.ibase; return Atomics.compareExchange(iab, stateIdx, UNLOCKED, LOCKED_NO_WAITERS) === UNLOCKED; } unlock() { const iab = this.iab; const stateIdx = this.ibase; var v0 = Atomics.sub(iab, stateIdx, 1); // Wake up a waiter if there are any if (v0 !== LOCKED_NO_WAITERS) { Atomics.store(iab, stateIdx, UNLOCKED); Atomics.notify(iab, stateIdx, 1); } } toString() { return "Lock:{ibase:" + this.ibase +"}"; } } exports.Lock = Lock;
當進程A嘗試獲取鎖成功時,A處判斷語句爲false,所以由compareExchange設置狀態爲LOCKED_NO_WAITERS,直接執行其後續邏輯;
若進程B此時執行lock獲取鎖時,A處判斷爲true,進入do while循環體,在wait處sleep;
進程A經過unlock釋放鎖,會將鎖狀態置爲UNLOCKED,同時喚醒阻塞的進程B;
進程B執行循環判斷語句B,此時爲false,跳出循環執行B的邏輯。
固然,也可經過tryLock實現自旋鎖或者其餘邏輯實現非阻塞等待。