【JavaScript】吃飽了撐的系列之JavaScript模擬多線程併發

前言
最近,明學是一個火熱的話題,而我,卻也想當那麼一回明學家,那就是,把JavaScript和多線程併發這兩個八竿子打不找的東西,給硬湊了起來,還寫了一個併發庫concurrent-thread-js。尷尬的是,當我發現其中的不合理之處,即這個東東的應用場景到底是什麼時,我發現我已經把代碼寫完了。
 
 
 
⚠️注意! 本文中的線程指的都是用JS異步函數模擬的「假線程」,不是真正意義上的多線程,請不要誤解⚠️
 
 

github地址

https://github.com/penghuwan/concurrent-thread.js
node

本文的目的

事實上,這個庫用處很小,可是在寫的過程當中,我對Promise,Async函數以及event事件流的使用產生了新的認識,同時也逐漸去學習和了解怎麼去從零開始去寫一個非業務的,通用的npm模塊,因此但願拿出來和你們分享一下,這纔是本文的真正的目的。
 
好,咱們從一個故事開始。
場景一
場景二
 

github地址

https://github.com/penghuwan/concurrent-thread.js​github.com
注意!假若不考慮webworker這種解決方案,咱們通常都認爲JS是單線程的。

concurrent-thread-js功能簡介

爲單線程的JavaScript實現併發協調的功能,語意,命名和做用性質上參考Java的實現,提sleep/join/interupt等API以及鎖和條件變量等內容,並提供線程間通訊的功能,依賴ES6語法,基於Promise和Async函數實現,故須要Babel編譯才能運行。JavaScrpt原本就是單線程的,因此這只是在API的層面實現了模擬,在下文的介紹中,每條所謂的線程其實就是普通的異步函數,並在此基礎上實現不一樣線程的協調配合。
 

爲何不選用webworker實現?

沒錯,通常來講JS中模擬多線程咱們也許會選用webworker,可是它必需要求你手動建立額外的webworker腳本文件,並經過new work('work.js')這種方式使用,這並不能達到我項目中想要的API的效果,並且注意:webwork中的環境不是window!不少方法你調用不了的。你只能採起這種方案,也即在主線程完成該功能,這是我沒有選擇webworker的另外一個緣由。
說是這樣說,但其實在大多數時候仍是用webworker就夠了
 

何時使用concurrent-thread-js

這個問題真是靈魂拷問,但是既然代碼寫都寫了,我怎麼也得編一個理由出來啊!額。。。讓我想一想哈
它的做用是:當JS工程須要讓兩個函數在執行上不互相干擾,同時也不但願它們會阻塞主線程,與此同時,還但願這兩個函數實現相似併發多線程之間的協調的需求的時候,你可使用這個併發模擬庫,實際上這種應用場景。。。這尼瑪有這種應用場景嗎?!(扎心了呀)
 

API總覽

  • submit(function,[namespace]): 接收一個函數,普通函數或Async函數都可,並異步執行"線程"
  • sleep(ms): "線程"休眠,可指定休眠時間ms,以毫秒計算
  • join(threadName): "線程"同步,調用此方法的"線程"函數將在threadName執行結束後繼續執行
  • interupt(threadName): "線程"中斷,影響"線程"內部調this.isInterrupted()的返回值
  • Lock.lock: 加鎖,一個時刻只能有一個"線程"函數進入臨界區,其餘"線程"函數須要等待,鎖是非公平的,也就是說後面排隊的線程函數沒有前後,以隨機的方式進行競爭。
  • Lock.unlock:解除非公平鎖
  • Condition.wait:不具有執行條件,"線程"進入waiting狀態,等待被喚醒
  • Condition.notify:隨機喚醒一個wait的"線程"
  • Condition.notifyAll: 還沒有編寫,喚醒全部wait的"線程"
  • getState: 還沒寫完 獲取"線程"狀態,包括RUNNALE(運行),WAITING(等待),BLOCKED(阻塞),TERMINATED(終止)
三個類:ThreadPool,Lock和Condition
咱們的API分別寫入三個類中,分別是
  • ThreadPool類:包含submit/sleep/join/interrupt/getState方法
  • Lock類:包含Lock.lock和Lock.unLock方法
  • Condition類:包含Condition.wait和Condition.notify方法
注:如下所說的"線程"都是指JS中模擬的異步函數

A1.submit方法

submit模擬提交線程至線程池
// 備註:爲按部就班介紹,如下爲簡化代碼
// 存儲每一個線程函數的狀態,例如是否中斷,以及線程狀態等
const threadMap = {};

class ThreadPool {
    // 模擬線程中斷
    interrupt(threadName) {   }
    // 模擬線程同步
    join(threadName, targetThread) {   }
    // 模擬線程休眠
    sleep(ms) { }
};
function submit(func, name) {
    if (!func instanceof Function) return;
    // 方式1:傳入一個具名函數;方式2:傳入第二個參數,即線程命名空間
    const threadName = func.name || name;
    // threadMap負責存儲線程狀態數據
    threadMap[threadName] = { state: RUNNABLE, isInterrupted: false };
    // 讓func異步調用,同時將傳入函數的做用域綁定爲 ThreadPool原型
    Promise.resolve({
        then: func.bind(ThreadPool.prototype);
   })
}

 

首先,咱們作了三件事情:
  1. 獲取線程函數的命名空間,並初始化線程初始數據,不一樣線程狀態由threadMap全局存儲
  2. 將提交的函數func做爲Promise.resolve方法中的一個thenable對象的then參數,這至關於當即"完成"一個Promise,同時在then方法中執行func,func會以異步而不是同步的方式進行執行,你也能夠簡單的理解成相似於執行了setTimeOut(func,0);
  3. 將func的做用域綁定爲新生成的ThreadPool實例,ThreadPool中定義了咱們上面咱們介紹到的方法,如sleep/join/interupt等,這有什麼好處呢?這意味着咱們能夠直接在函數中經過調用this.interrupt的方式去調用咱們定義的API了,符合咱們的使用習慣(注意,class中定義的除箭頭函數外的普通函數實際上都存放在原型中)
submit(async function example() {
    this.interrupt();
});

 

但問題在於:如今由於全部的函數經過this調用的都是ThreadPool原型中的方法,咱們要在調用惟一的interrupt方法,須要在異步函數中傳入"線程"標識,如線程名。這顯然不方便,也不優雅,例以下面的命名爲example的線程函數
submit(async function example() {
    this.interrupt('example');
});

 

使用這個模塊用戶會感到奇怪:我明明在example函數中,爲何還要給調用方法傳example這個名字參數??難道不能在模塊內部把這事情幹了嗎?
對!咱們下面作的就是這件事情,咱們編寫一個delegateThreadPool方法,由它爲ThreadPool代理處理不一樣「線程「函數的函數名
// 返回代理後的ThreadPool
function delegateThreadPool(threadName) { // threadName爲待定的線程名,在submit方法調用時候傳入
    // 代理後的ThreadPool
    const proxyClass = {};
    // 獲取ThreadPool原來的全部的方法,賦給props數組
    var props = Object.getOwnPropertyNames(ThreadPool.prototype);
    for (let prop of props) {
        // 代理ThreadPool,爲其全部方法增長threadName這個參數
        let fnName = prop;
        proxyClass[fnName] = (...args) => {
            const fn = baseClass[fnName];
            return fn(threadName, ...args);
        };
    }
    return proxyClass;
}
function submit(func, name) {
    // 省略其餘代碼 。。。
    const proxyScope = delegateThreadPool(threadName);
    // 讓func異步調用,不阻塞主線程,同時實現併發
    Promise.resolve({
        then: function () {
            // 給func綁定this爲代理後的ThreadPool對象,以便調用方法
            func.call(proxyScope);
        }
    });
}
// 調用this.sleep方法時,已經無需增長函數命名做爲參數了
submit(async function example() {
    this.interrupt();
});

 

也就是說,咱們的線程函數func綁定的已經不是ThreadPool.prototype了,而是delegateThreadPool處理後返回的對象:proxyScope。這時候,咱們在「線程」函數體裏調用this.interrupt方法時,已經無需增長函數命名做爲參數了,由於這個工做,proxyScope對象幫咱們作了,其實它的工做很簡單——就是它的每一個函數,都在一個返回的閉包裏面調用ThreadPool的同名函數,並傳遞線程名做爲第一個參數。

A2. sleep方法

做用:線程休眠
sleep方法很簡單,無非就是返回一個Promise實例,在Promise的函數裏面調setTimeOut,等時間到了執行resolve函數,這段時間裏修飾Promise的await語句會阻塞一段時間,resolve後又await語句又繼續向下執行了,能知足咱們想要的休眠效果
// 模擬「線程」休眠
sleep(ms) {
  return new Promise(function (resolve) {
    setTimeout(resolve, ms);
  })
}
// 提交「線程」
submit(async function example() {
    // 阻塞停留3秒,而後才輸出1
    await this.sleep(3000);
    console.log(1);
});

 

A3. interrupt方法

做用:線程中斷,可用於處理線程中止等操做
這裏要先介紹一下Java裏面的interrupt方法:在JAVA裏,你不能經過調用terminate方法停掉一個線程,由於這有可能會由於處理邏輯忽然中斷而致使數據不一致的問題,因此要經過interrupt方法把一箇中斷標誌位置爲true,而後經過isInterrupted方法做爲判斷條件跳出關鍵代碼。
因此爲了模擬,我在JS中處理「線程」中斷也是這麼去作的,可是咱們這樣作的根本緣由是:咱們壓根沒有能夠停掉一個線程函數的方法!(JAVA是有可是不許用,即廢棄了而已)
    // 模擬線程中斷
    interrupt(threadName) {
        if (!threadName) { throw new Error('Miss function parameters') }
        if (threadMap[threadName]) {
            threadMap[threadName].isInterrupted = true;
        }
    }
    // 獲取線程中斷狀態
    isInterrupted(threadName) {
        if (!threadName) { throw new Error('Miss function parameters') }
        // !!的做用是:將undefined轉爲false
        return !!threadMap[threadName].isInterrupted;
    }

 

A4. join方法
join(threadName): "線程"同步,調用此方法的"線程"函數將在threadName執行結束後繼續執行
join方法和上面的sleep方法是同樣的道理,咱們讓它返回一個Promise,只要咱們不調resolve,那麼外部修飾Promise的await語句就會一直暫停,等到join的那個另外一個線程執行完了,咱們看準時機!把這個Promise給resolve,這時候外部修飾Promise的await語句不就又能夠向下執行了嗎?
 
 
但問題在於:咱們如何實現這個「一個函數執行完通知另外一個函數的功能呢」?沒錯!那就是咱們JavaScript最喜歡的套路: 事件流! 咱們下面使用event-emitter這個先後端通用的模塊實現事件流。
咱們只要在任何一個函數結束的時候觸發結束事件(join-finished),同時傳遞該線程的函數名做爲參數,而後在join方法內部監聽該事件,並在響應時候調用resolve方法不就能夠了嘛。
 
首先是在join方法內部監聽線程函數的結束事件
import ee from 'event-emitter';
const emitter = ee();
// 模擬線程同步
join(threadName, targetThread) {
  return new Promise((resolve) => {
    // 監聽其餘線程函數的結束事件
    emitter.on('join-finished', (finishThread) => {
      // 根據結束線程的線程名finishThread作判斷
      if (finishThread === targetThread) {
        resolve();
      }
    })
  })
}

 

同時在線程函數執行結束時觸發join-finished事件,傳遞線程名作參數
import ee from 'event-emitter';
const emitter = ee();
function submit(func, name) {
   // ...
    Promise.resolve({
        then: func().then(() => {
          emitter.emit('join-finished', threadName);
        })
    });
}
使用以下:
submit(async function thread1 () {
  this.join('thread2');
  console.log(1);
});
submit(async function thread2 () {
  this.sleep(3000);
  console.log(2)
})
// 3s後,依次輸出 2 1

A5. Lock.lock & Lock.unlock(非公平鎖)

咱們主要是要編寫兩個方法:lock和unlock方法。咱們須要設置一個Boolean屬性isLock
  • lock方法:lock方法首先會判斷isLock是否爲false,若是是,則表明沒有線程佔領臨界區,那麼容許該線程進入臨界區,同時把isLock設置爲true,不容許其餘線程函數進入。其餘線程進入時,因爲判斷isLock爲true,會setTimeOut每隔一段時間遞歸調用判斷isLock是否爲false,從而以較低性能消耗的方式模擬while死循環。當它們檢測到isLock爲false時候,則會進入臨界區,同時設置isLock爲true。由於後面的線程沒有前後順序,因此這是一個非公平鎖
  • unLock方法:unlock則是把isLock屬性設置爲false,解除鎖定就能夠了
// 這是一個非公平鎖
class Lock {
    constructor() {
        this.isLock = false;
    }
    //加鎖
    lock() {
        if (this.isLock) {
            const self = this;
            // 循環while死循環,不停測試isLock是否等於false
            return new Promise((resolve) => {
                (function recursion() {
                    if (!self.isLock) {
                        // 佔用鎖
                        self.isLock = true;
                        // 使外部await語句繼續往下執行
                        resolve();
                        return;
                    }
                    setTimeout(recursion, 100);
                })();
            });
        } else {
            this.isLock = true;
            return Promise.resolve();
        }
    }
    // 解鎖
    unLock() {
        this.isLock = false;
    }
}
const lockObj = new Lock();
export default lockObj;

 

運行示例以下:
async function commonCode() {
    await Lock.lock();
    await Executor.sleep(3000);
    Lock.unLock();
}

submit(async function example1() {
    console.log('example1 start')
    await commonCode();
    console.log('example1 end')
});
submit(async function example2() {
    console.log('example2 start')
    await commonCode();
    console.log('example2 end')
});
 
輸出
// 當即輸出
example1 start
example2 start
// 3秒後輸出
example1 end
// 再3秒後輸出
example2 end

 

A6. Condition.wait & Condition.notify(條件變量)

  • Condition.wait:不具有執行條件,線程進入waiting狀態,等待被喚醒
  • Condition.notify: 喚醒線程
對不起!寫到這裏,我實在是口乾舌燥,寫不下去了,可是道理和前面是同樣的:
無非是:事件監聽 + Promise + Async函數組合拳,一套搞定
import ee from 'event-emitter';
const ev = ee();

class Condition {
    constructor() {
        this.n = 0;
        this.list = [];
    }
    // 當不知足條件時,讓線程處於等待狀態
    wait() {
        return new Promise((resolve) => {
            const eventName = `notify-${this.n}`;
            this.n++;
            const list = this.list;
            list.push(eventName);
            ev.on(eventName, () => {
                // 從列表中刪除事件名
                const i = list.indexOf(eventName);
                list.splice(i, 1);
                // 讓外部函數恢復執行
                debugger;
                resolve();
            })
        })
    }
    // 選擇一個線程喚醒
    notify() {
        const list = this.list;
        let i = Math.random() * (this.list.length - 1);
        i = Math.floor(i);
        ev.emit(list[i])
    }
}

 

測試代碼
async function testCode() {
    console.log('i will be wait');
    if (true) {
        await Condition.wait();
    };
    console.log('i was notified ');
}

submit(async function example() {
    testCode();
    setTimeout(() => {
        Condition.notify();
    }, 3000);
});
輸出
i will be wait
// 3秒後輸出
i was notified

 

最後的大總結

其實說到底,我想和你們分享的不是什麼併發啊,什麼多線程啦。
其實我想表達的是:事件監聽 + Promise + Async函數這套組合拳很好用啊
  • 你想讓一段代碼停一下?OK!寫個返回Promise的函數,用await修飾,它就停啦!
  • 你想控制它(await)不要停了,繼續往下走?OK! 把Promise給resolve掉,它就往下走啦
  • 你說你不知道怎麼控制它停,由於監聽和發射事件的代碼分佈在兩個地方?OK!那就使用事件流
 
本文完,下面是所有項目代碼(剛寫了文章才發現有bug,待會改改)
相關文章
相關標籤/搜索