一個簡單的HTML5 Web Worker 多線程與線程池應用

筆者最近對項目進行優化,順帶就改了些東西,先把請求方式優化了,使用到了web worker。發現目前尚未太多對web worker實際使用進行介紹使用的文章,大可能是一些API類的講解,除了涉及到一些WebGL的文章,因此總結了這個文章,給你們參考參考。如下內容以默認你們對web worker已經有了初步瞭解,不會講解基礎知識。前端

 

1、爲何要開子線程node

筆者這個項目是一個存儲系統的中後臺管理GUI,一些數據須要經過CronJob定時地去獲取,而且業務對數據的即時性要求高,大量的和持久的HTTP請求是不可避免的,而且該項目部署了HTTP/2,帶寬和併發數能夠極大。並且不須要兼容IE系列,哈哈哈,針對這些點因而決定優(瞎)化(弄)。筆者一開始想到的就是使用HTML5的新特性web worker,而後將HTTP的請求工做從主線程放到子線程裏面去作,工做完成後,返回子線程數據便可。這樣能夠下降主線程中的負荷,使主線程能夠不勞而獲。一旦子線程中發起的請求成功或錯誤後,子線程返回給主線程請求的response對象或者直接返回請求獲得的數據或錯誤信息。最終的方案裏,選擇的是直接返回請求獲得的數據,而不是response對象,這個在後面會詳細說明爲何這樣作。子線程對於處於複雜運算,特別是搭配wasm,對於處理WebGL幀等有極大的性能優點。以往的純JS視頻解碼,筆者只看到過可以解碼MPEG1(大概240P畫面)的canvas庫,由於要達到60幀的畫面流暢度,就必須保證1幀的計算時間要小於16ms,若是要解碼1080P的畫面甚至4K,JS可能跑不過來了,並且長時間的計算會嚴重阻塞主線程,影響頁面性能,若是能開啓子線程把計算任務交給子線程作,並經過wasm加快計算速度,這將在前端領域創造極大的可能性。webpack

 

2、爲何要設計線程池ios

若是隻開一個線程,工做都在這一個子線程裏作,不能保證它不阻塞。若是無止盡的開啓而不進行控制,可能致使運行管理平臺應用時,瀏覽器的內存消耗極高:一個web worker子線程的開銷大概在5MB左右。git

不管這5MB內存是否已被這個子線程徹底使用,仍是說僅僅是給這個子線程預規劃的內存空間,但這個空間確實是被佔用了。而且頻繁地建立和終止線程,對性能的消耗也是極大的。因此咱們須要經過線程池來根據瀏覽器所在計算機的硬件資源對子線程的工做進行規劃和調度,以及對殭屍線程的清理、新線程的開闢等等。根據測試,在頁面關閉之後,主線程結束,子線程的內存佔用會被一併釋放,這點不須要作額外的處理。web

 

3、設計線程池json

對於線程池,咱們須要實現的功能有以下這些,代碼中的英文註釋爲筆者項目上的需求,由於以前有外國同事在一塊兒開發項目,爲了他們閱讀代碼方便,因此統一使用英文註釋,能夠直接忽略,筆者會對重要地方直接給出說明。canvas

1. 初始化線程api

經過 Navagitor 對象的 HardWareConcurrecy 屬性能夠獲取瀏覽器所屬計算機的CPU核心數量,若是CPU有超線程技術,這個值就是實際核心數量的兩倍。固然這個屬性存在兼容性問題,若是取不到,則默認爲4個。咱們默認有多少個CPU線程數就開多少個子線程。線程池最大線程數量就這麼肯定了,簡單而粗暴:promise

class FetchThreadPool {
    constructor (option = {}){
        const {
            inspectIntervalTime = 10 * 1000,
            maximumWorkTime = 30 * 1000
        } = option;
        this.maximumThreadsNumber = window.navigator.hardwareConcurrency || 4;
        this.threads = [];
        this.inspectIntervalTime = inspectIntervalTime;
        this.maximumWorkTime = maximumWorkTime;
        this.init();
    }
   ......
}

 

獲取到最大線程數量後,咱們就能夠根據這個數量來初始化全部的子線程了,並給它們額外加上一個咱們須要的屬性:

  init (){
        for (let i = 0; i < this.maximumThreadsNumber; i ++){
            this.createThread(i);
        }
        setInterval(() => this.inspectThreads(), this.inspectIntervalTime);
    }

    createThread (i){
        // Initialize a webWorker and get its reference.
        const thread = work(require.resolve('./fetch.worker.js'));
        // Bind message event.
        thread.addEventListener('message', event => {
            this.messageHandler(event, thread);
        });
        // Stick the id tag into thread.
        thread['id'] = i;
        // To flag the thread working status, busy or idle.
        thread['busy'] = false;
        // Record all fetch tasks of this thread, currently it is aimed to record reqPromise.
        thread['taskMap'] = {};
        // The id tag mentioned above is the same with the index of this thread in threads array.
        this.threads[i] = thread;
    }

其中:

id爲數字類型,表示這個線程的惟一標識,

busy爲布爾類型,表示這個線程當前是否處於工做繁忙狀態,

taskMap爲對象類型,存有這個線程當前的全部工做任務的key/value對,key爲任務的ID taskId,value爲這個任務的promise的resolve和reject回調對象。

由上圖還能夠看出,在初始化每一個子線程時咱們還給這個子線程在主線程裏綁定了接收它消息的事件回調。在這個回調裏面咱們能夠針對子線程返回的消息,在主線程裏作對應的處理:

  messageHandler (event, thread){
        let {channel, threadCode, threadData, threadMsg} = event.data;
        // Thread message ok.
        if (threadCode === 0){
            switch (channel){
                case 'fetch':
                    let {taskId, code, data, msg} = threadData;
                    let reqPromise = thread.taskMap[taskId];
                    if (reqPromise){
                        // Handle the upper fetch promise call;
                        if (code === 0){
                            reqPromise.resolve(data);
                        } else {
                            reqPromise.reject({code, msg});
                        }
                        // Remove this fetch task from taskMap of this thread.
                        thread.taskMap[taskId] = null;
                    }
                    // Set the thread status to idle.
                    thread.busy = false;
                    this.redirectRouter();
                    break;

                case 'inspection':
                    // console.info(`Inspection info from thread, details: ${JSON.stringify(threadData)}`);
                    // Give some tips about abnormal worker thread.
                    let {isWorking, workTimeElapse} = threadData;
                    if (isWorking && (workTimeElapse > this.maximumWorkTime)){
                        console.warn(`Fetch worker thread ID: ${thread.id} is hanging up, details: ${JSON.stringify(threadData)}, it will be terminated.`);
                        fetchThreadPool.terminateZombieThread(thread);
                    }
                    break;

                default:
                    break;
            }
        } else {
            // Thread message come with error.
            if (threadData){
                let {taskId} = threadData;
                // Set the thread status to idle.
                thread.busy = false;
                let reqPromise = thread.taskMap[taskId];
                if (reqPromise){
                    reqPromise.reject({code: threadCode, msg: threadMsg});
                }
            }
        }
    }

這裏處理的邏輯其實挺簡單的:

1). 首先規定了子線程和主線程之間通訊的數據格式:

{
     threadCode: 0,
     threadData: {taskId, data, code, msg}, 
     threadMsg:  'xxxxx',
     channel: 'fetch',
}

其中:

threadCode: 表示這個消息是否正確,也就是子線程在post此次message的時候,是不是由於報錯而發過來,由於咱們在子線程中會有這個設計機制,用來區分任務完成後的正常的消息和執行過程當中因報錯而發送的消息。若是爲正常消息,咱們約定爲0,錯誤消息爲1,暫定只有1。

threadData: 表示消息真正的數據載體對象,若是threadCode爲1,只返回taskId,以幫助主線程銷燬找到調用上層promise的reject回調函數。Fecth取到的數據放在data內部。

threadMsg: 表示消息錯誤的報錯信息。非必須的。

channel: 表示數據頻道,由於咱們可能經過子線程作其餘工做,在咱們這個設計裏至少有2個工做,一個是發起fetch請求,另一個是響應主線程的檢查(inspection)請求。因此須要一個額外的頻道字段來確認不一樣工做。

這個數據格式在第4步的子線程的設計中,也會有對應的體現。

2). 若是是子線程回覆的檢查消息,那麼根據子線程返回的狀態決定這個子線程是否已經掛起了,若是是就把它當作一個殭屍線程殺掉。並從新建立一個子線程,替換它原來的位置。

3). 在任務結束後,這個子線程的busy被設置成了false,表示它從新處於閒置狀態。

4). 在給子線程派發任務的時候,咱們post了taskId,在子線程的回覆信息中,咱們能夠拿到這個taskId,並經過它找到對應的promise的resolve或者reject回調函數,就能夠響應上層業務中Fetch調用,返回從服務端獲取的數據了。

 

二、執行主線程中Fetch調用的工做

首先,咱們在主線程中封裝了統一調用Fetch的收口,頁面全部請求均走這個惟一入口,對外暴露Get和Post方法,裏面的業務有關的部分代碼能夠忽略:

const initRequest = (url, options) => {
    if (checkRequestUnInterception(url)){
        return new Promise(async (resolve, reject) => {
            options.credentials = 'same-origin';
            options.withCredentials = true;
            options.headers = {'Content-Type': 'application/json; charset=utf-8'};
            fetchThreadPool.dispatchThread({url, options}, {resolve, reject});
        });
    }
};

const initSearchUrl = (url, param) => (param ? url + '?' + stringify(param) : url);

export const fetchGet = (url, param) => (initRequest(initSearchUrl(url, param), {method: 'GET'}));

export const fetchPost = (url, param) => (initRequest(url, {method: 'POST', body: JSON.stringify(param)}));

在線程池中,咱們實現了對應的方法來執行Fetch請求:

    dispatchThread ({url, options}, reqPromise){
        // Firstly get the idle thread in pools.
        let thread = this.threads.filter(thread => !thread.busy)[0];
        // If there is no idle thread, fetch in main thread.
        if (!thread){
            thread = fetchInMainThread({url, options});
        }
        // Stick the reqPromise into taskMap of thread.
        let taskId = Date.now();
        thread.taskMap[taskId] = reqPromise;
        // Dispatch fetch work to thread.
        thread.postMessage({
            channel: 'fetch',
            data: {url, options, taskId}
        });
        thread.busy = true;
    }

這裏調度的邏輯是:

1). 首先遍歷當前全部的子線程,過濾出閒置中的子線程,取第一個來下發任務。

2). 若是沒有閒置的子線程,就直接在主線程發起請求。後面能夠優化的地方:能夠在當前子線程中隨機找一個,來下發任務。這也是爲何每一個子線程不直接使用task屬性,而給它一個taskMap,就是由於一個子線程可能同時擁有兩個及以上的任務。

 

三、定時輪訓檢查線程與終結殭屍線程

   inspectThreads (){
        if (this.threads.length > 0){
            this.threads.forEach(thread => {
                // console.info(`Inspection thread ${thread.id} starts.`);
                thread.postMessage({
                    channel: 'inspection',
                    data: {id: thread.id}
                });
            });
        }
    }

    terminateZombieThread (thread){
        let id = thread.id;
        this.threads.splice(id, 1, null);
        thread.terminate();
        thread = null;
        this.createThread(id);
    }

從第1步的代碼中咱們能夠得知初始化定時檢查 inspectThreads 是在整個線程池init的時候執行的。對於檢查殭屍線程和執行 terminateZombieThread 也是在第1步中的處理子線程信息的回調函數中進行的。

 

4. 子線程的設計

子線程的設計,相對於線程池來講就比較簡單了:

export default self => {
    let isWorking = false;
    let startWorkingTime = 0;
    let tasks = [];
    self.addEventListener('message', async event => {
        const {channel, data} = event.data;
        switch (channel){
            case 'fetch':
                isWorking = true;
                startWorkingTime = Date.now();
                let {url, options, taskId} = data;
                tasks.push({url, options, taskId});
                try {
                    // Consider to web worker thread post data to main thread uses data cloning
                    // not change the reference. So, here we don't post the response object directly,
                    // because it is un-cloneable. If we persist to post id, we should use Transferable
                    // Objects, such as ArrayBuffer, ImageBitMap, etc. And this way is just like to
                    // change the reference(the control power) of the object in memory.
                    let response = await fetch(self.origin + url, options);
                    if (response.ok){
                        let {code, data, msg} = await response.json();
                        self.postMessage({
                            threadCode: 0,
                            channel: 'fetch',
                            threadData: {taskId, code, data, msg},
                        });
                    } else {
                        const {status, statusText} = response;
                        self.postMessage({
                            threadCode: 0,
                            channel: 'fetch',
                            threadData: {taskId, code: status, msg: statusText || `http error, code: ${status}`},
                        });
                        console.info(`%c HTTP error, code: ${status}`, 'color: #CC0033');
                    }
                } catch (e){
                   self.postMessage({
                       threadCode: 1,
                       threadData: {taskId},
                       threadMsg: `Fetch Web Worker Error: ${e}`
                   });
                }
                isWorking = false;
                startWorkingTime = 0;
                tasks = tasks.filter(task => task.taskId !== taskId);
                break;

            case 'inspection':
                // console.info(`Receive inspection thread ${data.id}.`);
                self.postMessage({
                    threadCode: 0,
                    channel: 'inspection',
                    threadData: {
                        isWorking,
                        startWorkingTime,
                        workTimeElapse: isWorking ? (Date.now() - startWorkingTime) : 0,
                        tasks
                    },
                });
                break;

            default:
                self.postMessage({
                    threadCode: 1,
                    threadMsg: `Fetch Web Worker Error: unknown message channel: ${channel}}.`
                });
                break;
        }
    });
};

首先,在每一個子線程聲明瞭 taksk 用來保存收到的任務,是爲後期一個子線程同時作多個任務作準備的,當前並不須要,子線程一旦收到請求任務,在請求完後以前, isWorking 狀態一直都爲 true 。全部子線程有任務之後,會直接在主線程發起請求,不會隨機派發給某個子線程。

而後,咱們在正常的Fecth成功後的數據通訊中,post的是對response處理之後的結構化數據,而不是直接post這個response對象,這個在第一章節中有提到,這裏詳細說一下:

Fetch請求的response對象並不是單純的Object對象。在子線程和主線程之間使用postMessage等方法進行數據傳遞,數據傳遞的方式是克隆一個新的對象來傳遞,而非直接傳遞引用,但response對象做爲一個非普通的特殊對象是不能夠被克隆的......。要傳遞response對象只有就須要用到HTML5裏的一些新特性好比  Transferable object 的 ArrayBuffer  、 ImageBitmap  等等,經過它們能夠直接傳遞對象的引用,這樣作的話就不須要克隆對象了,進而避免因對response對象進行克隆而報錯,以及克隆含有大量數據的對象帶來的高額開銷。這裏咱們選擇傳遞一個普通的結構化Object對象來現實基本的功能。

對於子線程中每次給主線程post的message,也是嚴格按照第1步中說明的那樣定義的。

還有一點須要說明:筆者的項目都是基於webpack的模塊化開發,要直接使用一個web worker的js文件,筆者選了"webworkify-webpack"這個庫來處理模塊化的,這個庫還執行在子線程中隨意import其餘模塊,使用比較方便:

import work from 'webworkify-webpack';

因此,在第1步中才出現了這樣的建立子線程的方式: const thread = work(require.resolve('./fetch.worker.js')); 

該庫把web worker的js文件經過  createObjectURL 方法把js文件內容轉成了二進制格式,這裏請求的是一個二進制數據的連接(引用),將會到內存中去找到這個數據,因此這裏並非一個js文件的連接:

若是你的項目形態和筆者不一樣,大可沒必要如此,按照常規的web worker教程中的指導方式走就行。

筆者這個項目在主線程和子線程之間只傳遞了不多量的數據,速度很是快,一旦你的項目須要去傳遞大量數據,好比說一個異常複雜的大對象,若是直接傳遞結構化對象,速度會很慢,能夠先字符串化了之後再發送,避免了在post的過程當中時間消耗過大。

筆者捕捉到的一個postMessage的消耗,若是數據量小的話,還算正常:

 

5. 經過子線程發起請求

// ...
@catchError
async getNodeList (){
    let data = await fetchGet('/api/getnodelist');
    !!data && store.dispatch(nodeAction.setNodeList(data));
},
// ...

數據回來了:

 

 

從截圖中能夠看出,和直接在主線程中發起的Fetch請求不一樣的是,在子線程中發起的請求,在Name列裏會增長一個齒輪在開頭以區分。

須要注意的一點是:若是子線程被終結,沒法查看返回信息等,由於這些數據的佔用內存已經隨子線程的終結而被回收了。

咱們在子線程中寫一個明顯的錯誤,也會回調reject,並在控制檯報錯:

從開發者工具裏能夠檢測到這8個子線程:

 

大概的設計就是如此,目前這個線程池只針對Fetch的任務,後續還須要在業務中進行優化和加強,已適配更多的任務。針對其餘的任務,在這裏架子其實已基本實現,須要增長對不一樣channel的處理。

 

4、Web Worker的兼容性

從caniuse給出的數據來看,兼容性異常的好,甚至連IE系列都在好幾年前就已經支持:

可是...,這個兼容性只能說明可否使用Web Woker,這裏的兼容並不能代表能在其中作其餘操做。好比標準規定,能夠在子線程作作計算、發起XHR請求等,但不能操做DOM對象。筆者在項目中使用的Fetch,而非Ajax,而後Fecth在IE系列(包括Edge)瀏覽器中並不支持,會直接報錯。在近新版本的Chrome、FireFox、Opera中均無任何問題。後來做者換成了Axios這種基於原生的XHR封裝的庫,在IE系列中仍是要報錯。後來又換成了純原生的XmlHttpRequest,依舊報錯。這就和標準有出入了......。同窗們能夠試試,不知到筆者的方法是否百分百正確。但欣慰的是前幾天的新聞說微軟將來在Edge瀏覽器開發中將使用Chromium內核。

至於Web Woker衍生出來的其餘新特性,好比 Shared Web Woker等,或者在子線程中再開子線程,這些特性的使用在各個瀏覽器中並不統一,有些支持,有些不支持,或者部分支持,因此對於這些特性暫時就不要去考慮它們了。

 

5、展望

在前端開發這塊(沒用Web前端了,是筆者認爲如今的前端開發已經不只限於Web平臺了,也不只限於前端了),迄今爲止活躍度是很是之高了。新技術、新標準、新協議、新框(輪)架(子)的出現是很是快速的。技術跌該更新頻率極高,好比這個Web Worker,四年前就定稿了,筆者如今針對它寫博客......。一個新技術的出現可能不能形成什麼影響,可是多種新技術的出現和搭配使用將帶來翻天覆地的變化。前端的發展愈來愈多地融入了曾經只在Client、Native端出現的技術。特別是近年來的WebGL、wasm等新技術的推出,都是具備意義的。

相關文章
相關標籤/搜索