結合promise與websocket的發佈/訂閱模式實踐

結合promise與websocket的發佈/訂閱模式實踐

本文初衷

最近剛好在公司作了一個聊天室系統,因此在系統中作了一下對websocket進行的promise化改造,因此想寫篇文章總結一下,若是你們有什麼更好的方法或者心得感悟,歡迎交流前端

技術棧

dva + protobuf
考慮到protobuf對websocket並沒什麼本質影響,因此本文就不涉及了web

業務場景

基於websocket的聊天室系統redux

開發痛點

  1. 可能存在按順序觸發的請求
    eg. 刪除req---確認刪除rsp---刷新頁面req---刷新頁面rsp
    但因爲並不是全部的刪除操做後都會刷新頁面,因此考慮是否是可使用發佈訂閱模式來模擬相似promise的流式操做
  2. 存在同一類型請求短期內屢次觸發時,如何尋找每一條回覆信息的發射源,考慮可使用promise池+惟一識別碼token來實現
  3. 因爲dva的異步操做是基於redux-saga的,因此若是能夠用promise完成與websocket的互動,那麼對於effects中使用yield控制異步流程,也會是一個很好的體驗

實現原理

首先,這一套系統的一切前提是請求的惟一標識符token,前端發送給服務器以後,服務器必需要把這個token跟數據放在一塊兒發回來才行數組

本系統的實現原理是promise

對websocket的send方法進行封裝
發送階段
1. send執行時,先生成一個promise,及其惟一token
2. 將promise的resolve, reject,token,及其餘須要的信息放入一個對象,而後推入一個promise池中
3. 執行websocket的send方法
4. return 這個promise

接收階段
1. 收到回覆消息時,先從promise池中對token進行匹配
2. 根據回覆的狀態決定執行resolve或reject
3. 其餘須要的操做

實現代碼

// 每個實例都只能open一條socket線路,用鎖機制防止重複open
// 本例中不使用心跳檢測,爲了方便,只要close是非主動觸發且前端能捕捉到的(如瀏覽器主動斷開,服務器主動斷開),都會進行自動重連
export class MyWebSocket {
    constructor(url) {
        this.url = url;
    
        // close來源判斷及後續操做
        this.closeConfig = {
            resolve: null,
            closing: false
        }
        // promise池
        this.promisePool = [];
    }
    
    tokenCheck(req, rsp) {
    // 此處根據本身的數據結構進行tokenCheck的判斷,返回一個boolean
    }
    
    open() {
        return new Promise((resolve, reject) => {
            if (typeof this._websocket === 'undefined') {
                this._websocket = new WebSocket(this.url);
                this._websocket.open = (e) => {
                    resolve({e, ws: this});
                };
                this._websocket.onerror = (e) => {
                    reject(e);
                }
            }
            this._websocket.onclose = (e) => {
                // 非主動close
                if (!this.closeConfig.closing) {
                    console.log('reconnect');     
                    // 對應的重連操做     
                }
                // 若手動close,恢復初始狀態
                this.closeConfig.closing = false;
            }
            
            this._websocket.onmessage = (e) => {
                this.promisePool = this.promisePool.filter((item) => {
                if (this.tokenCheck(req, rsp) {
                  req.resolve(rsp);
                  return false;
                }
                return true;
              })
            };
        });
    }
    
    close() {
        this.closeConfig.closing = true;
        this._websocket.close();        
    }
    // token包含在content中
    send(name, content) {
        return new Promise((resolve, reject) => {
          this.promisePool.push({
            content,
            resolve,
            reject,
            name
          });
          this._websocket.send({name, content});
    });
}

大概流程就是這樣,具體的樣例以下瀏覽器

*test () {
    const ws = new MyWebSocket('www.mywebsocket.com');
    yield ws.open();
    yield ws.send(.....).then(()=>{...});
    yield ws.send(.....).then(()=>{...});
}

本文呢大概就是這麼多了,若是有什麼錯誤或者遺漏的地方還請你們多多指教服務器


v0.0.2

採起了評論大佬的建議,將promise池從數組改成對象,直接將token作爲key,查詢起來也很是方便websocket

export class MyWebSocket {
    constructor(url) {
        this.url = url;
    
        // close來源判斷及後續操做
        this.closeConfig = {
            resolve: null,
            closing: false
        }
        // promise池
        this.promisePool = {};
    }
    
    tokenCheck(req, rsp) {
    // 此處根據本身的數據結構進行tokenCheck的判斷,返回一個boolean
    }
    
    open() {
        return new Promise((resolve, reject) => {
            if (typeof this._websocket === 'undefined') {
                this._websocket = new WebSocket(this.url);
                this._websocket.open = (e) => {
                    resolve({e, ws: this});
                };
                this._websocket.onerror = (e) => {
                    reject(e);
                }
            }
            this._websocket.onclose = (e) => {
                // 非主動close
                if (!this.closeConfig.closing) {
                    console.log('reconnect');     
                    // 對應的重連操做     
                }
                // 若手動close,恢復初始狀態
                this.closeConfig.closing = false;
            }
            
            this._websocket.onmessage = (e) => {         
                const key = e.content.token;    
                const req = this.promisePool[key]
                req.resolve(e);
                delete this.promisePool[key];
            };
        });
    }
    
    close() {
        this.closeConfig.closing = true;
        this._websocket.close();
    }
    // token包含在content中
    send(name, content) {
        return new Promise((resolve, reject) => {
          this.promisePool[content.token] = {
            content,
            resolve,
            reject,
            name
          };
          this._websocket.send({name, content});
    });
}
相關文章
相關標籤/搜索