https://github.com/penghuwan/concurrent-thread.jsgithub.com
// 備註:爲按部就班介紹,如下爲簡化代碼 // 存儲每一個線程函數的狀態,例如是否中斷,以及線程狀態等 const threadMap = {}; class ThreadPool { // 模擬線程中斷 interrupt(threadName) { } // 模擬線程同步 join(threadName, targetThread) { } // 模擬線程休眠 sleep(ms) { } }; function submit(func, name) { if (!func instanceof Function) return; // 方式1:傳入一個具名函數;方式2:傳入第二個參數,即線程命名空間 const threadName = func.name || name; // threadMap負責存儲線程狀態數據 threadMap[threadName] = { state: RUNNABLE, isInterrupted: false }; // 讓func異步調用,同時將傳入函數的做用域綁定爲 ThreadPool原型 Promise.resolve({ then: func.bind(ThreadPool.prototype); }) }
submit(async function example() { this.interrupt(); });
submit(async function example() { this.interrupt('example'); });
// 返回代理後的ThreadPool function delegateThreadPool(threadName) { // threadName爲待定的線程名,在submit方法調用時候傳入 // 代理後的ThreadPool const proxyClass = {}; // 獲取ThreadPool原來的全部的方法,賦給props數組 var props = Object.getOwnPropertyNames(ThreadPool.prototype); for (let prop of props) { // 代理ThreadPool,爲其全部方法增長threadName這個參數 let fnName = prop; proxyClass[fnName] = (...args) => { const fn = baseClass[fnName]; return fn(threadName, ...args); }; } return proxyClass; } function submit(func, name) { // 省略其餘代碼 。。。 const proxyScope = delegateThreadPool(threadName); // 讓func異步調用,不阻塞主線程,同時實現併發 Promise.resolve({ then: function () { // 給func綁定this爲代理後的ThreadPool對象,以便調用方法 func.call(proxyScope); } }); } // 調用this.sleep方法時,已經無需增長函數命名做爲參數了 submit(async function example() { this.interrupt(); });
// 模擬「線程」休眠 sleep(ms) { return new Promise(function (resolve) { setTimeout(resolve, ms); }) } // 提交「線程」 submit(async function example() { // 阻塞停留3秒,而後才輸出1 await this.sleep(3000); console.log(1); });
// 模擬線程中斷 interrupt(threadName) { if (!threadName) { throw new Error('Miss function parameters') } if (threadMap[threadName]) { threadMap[threadName].isInterrupted = true; } } // 獲取線程中斷狀態 isInterrupted(threadName) { if (!threadName) { throw new Error('Miss function parameters') } // !!的做用是:將undefined轉爲false return !!threadMap[threadName].isInterrupted; }
import ee from 'event-emitter'; const emitter = ee(); // 模擬線程同步 join(threadName, targetThread) { return new Promise((resolve) => { // 監聽其餘線程函數的結束事件 emitter.on('join-finished', (finishThread) => { // 根據結束線程的線程名finishThread作判斷 if (finishThread === targetThread) { resolve(); } }) }) }
import ee from 'event-emitter'; const emitter = ee(); function submit(func, name) { // ... Promise.resolve({ then: func().then(() => { emitter.emit('join-finished', threadName); }) }); }
submit(async function thread1 () { this.join('thread2'); console.log(1); }); submit(async function thread2 () { this.sleep(3000); console.log(2) }) // 3s後,依次輸出 2 1
// 這是一個非公平鎖 class Lock { constructor() { this.isLock = false; } //加鎖 lock() { if (this.isLock) { const self = this; // 循環while死循環,不停測試isLock是否等於false return new Promise((resolve) => { (function recursion() { if (!self.isLock) { // 佔用鎖 self.isLock = true; // 使外部await語句繼續往下執行 resolve(); return; } setTimeout(recursion, 100); })(); }); } else { this.isLock = true; return Promise.resolve(); } } // 解鎖 unLock() { this.isLock = false; } } const lockObj = new Lock(); export default lockObj;
async function commonCode() { await Lock.lock(); await Executor.sleep(3000); Lock.unLock(); } submit(async function example1() { console.log('example1 start') await commonCode(); console.log('example1 end') }); submit(async function example2() { console.log('example2 start') await commonCode(); console.log('example2 end') });
// 當即輸出 example1 start example2 start // 3秒後輸出 example1 end // 再3秒後輸出 example2 end
import ee from 'event-emitter'; const ev = ee(); class Condition { constructor() { this.n = 0; this.list = []; } // 當不知足條件時,讓線程處於等待狀態 wait() { return new Promise((resolve) => { const eventName = `notify-${this.n}`; this.n++; const list = this.list; list.push(eventName); ev.on(eventName, () => { // 從列表中刪除事件名 const i = list.indexOf(eventName); list.splice(i, 1); // 讓外部函數恢復執行 debugger; resolve(); }) }) } // 選擇一個線程喚醒 notify() { const list = this.list; let i = Math.random() * (this.list.length - 1); i = Math.floor(i); ev.emit(list[i]) } }
async function testCode() { console.log('i will be wait'); if (true) { await Condition.wait(); }; console.log('i was notified '); } submit(async function example() { testCode(); setTimeout(() => { Condition.notify(); }, 3000); });
i will be wait // 3秒後輸出 i was notified