活用控制反轉 -- 解決一個棘手信息傳遞問題

在我初學編程的時候,還沒寫過完整點的項目就看過了一些高階概念。在沒有實踐時,這些概念的神奇和強大之處很難被徹底體會的。而一旦本身在摸索中應用了,瞬間以爲打開了一扇大門,技能又提高了一個層次。控制反轉(Inversion of Control)就是這些強大概念之一。一年前在 MPJ 老師的頻道上了解到了,但一直沒本身獨立創造場景用過。直到最近在項目中遇到個坑才用起來。前端

其實控制反轉或者依賴注入(這兩個感受是同一個東西,看你從什麼角度看)在前端框架中已經大量使用了。最先由 Angular 普及,後續的現代框架都有應用。好比 React 開發中目前最火的組件設計模式 Render Props,就是控制反轉的一種應用。離開框架,在平常開發中,應用這種技巧能夠幫助咱們解決不少棘手的問題,今天就講下我在開發中的一次應用。vue

項目場景是這樣的:技術棧是 Nuxt + Vuex。項目須要鏈接 Web Socket,而後根據 Socket 傳來的數據,對 Vuex 裏面相應的數據進行修改。公司爲了節約成本,將 Socket 數據壓縮了,並且不是全量推送,這要求前端收到數據後對數據進行解壓,而後對數據進行遍歷查找,更新,從新計算和排序,總之對 Socket 數據的處理很是複雜。爲了避免影響性能,我把 Socket 鏈接和數據處理放進了 Web Worker。先來看下項目結構。webpack

下面是我封裝的一個 Socket 工廠函數:web

// utils/socket.js
export default function Socket() {
  let heartBeat; // 心跳記錄
  let lost = 0; // 心跳失敗次數

  function decLost() {
    lost -= 1;
  }

  function connect() {
    const socket = new WebSocket("wss://xx.com");

    socket.onopen = () => {
      heartBeat = setInterval(() => {
        socket.send(2);
        lost += 1;
        if (lost === 3) {
          // 心跳失敗超過 3 次,斷開重連
          clearInterval(heartBeat);
          socket.close();
          connect();
        }
      }, 5000);
    };

    socket.onerror = () => {
      clearInterval(heartBeat);
      socket.close();
    };

    socket.onclose = () => {
      setTimeout(() => {
        clearInterval(heart);
        connect();
      }, 3000);
    };
    return socket;
  }
  return Object.freeze({ decLost, connect });
}

Socket 鏈接實現了心跳機制。onopen 以後,每隔 5 秒向服務器發送 2,並把心跳失敗次數加 1;服務器收到 2 以後會返回 3,客戶端收到 3 以後再把心跳失敗次數減 1。工廠函數暴露的 decLost 方法是爲了外部在收到 3 以後把心跳次數減 1.vuex

在 Web Worker 文件裏面,調用 Socket 工廠函數,並鏈接 socket:編程

// workers/socket.js
import Socket from "~/utils/socket";

const socket = Socket();
const socketConnection = socket.connect();

socketConnection.onmessage = ({ data }) => {
  // 處理 socket 接收到的數據,
  // 處理完後經過 web worker 接口發出去
  postMessage(result);
};

// web worker 收到外部的數據後,把數據發給 socket
onmessage = ({ data }) => {
  socketConnection.send(data);
};

而後在一個 Nuxt 插件裏,引入 socket worker,收到 worker 裏傳來的數據後,把數據交給 Vuex Store,反之,監聽到相關 Vuex Mutation 後,把 payload 傳給 worker:設計模式

// plugins/socket.js
// webpack 下導入 web worker 的方式:
import SocketWorker from "worker-loader!~/workers/socket.js";

const socketWorker = new SocketWorker();

export default ({ store }) => {
  store.subscribe((mutation, state) => {
    // 監聽到相關 Vuex Mutation
  });

  socketWorker.onmessage = ({ data }) => {
    //監聽 socket 發來的數據,收到數據後,
    // 經過 store.commit() 來把數據存入 vuex store
  };
};

這是我一開始寫的 naive 版本,看起來主要功能實現了,並且封裝和 Separation of concerns 作的也不錯。寫完剛跑起來,問題出現了。promise

挑戰一:等 socket 鏈接成功後再發起訂閱

當應用打開後,須要當即訂閱推送數據,包括用戶登陸狀態下的私有數據和其它基礎數據等。可是當發起訂閱時,socket 可能鏈接成功了,也可能還沒鏈接成功。一開始我想設置個定時器,過兩秒後再發起訂閱。但是想一想這種作法也太挫了。第二個思路是在 socket 鏈接的 onopen 事件裏執行訂閱。但是這樣子會直接把之前的 onopen 覆蓋掉,並且這樣作違反了封裝原則。剩下就一個辦法了,等鏈接成功後再發請求。來看怎麼作的:前端框架

// workers/socket.js
// ...
// const socketConnection ...

const waitForConnection = timeout =>
  new Promise((resolve, reject) => {
    const check = () => {
      if (socketConnection.readyState === 1) {
        resolve();
      } else if ((timeout -= 100) < 0) {
        reject("socket connection timed out");
      } else {
        setTimeout(check, 100);
      }
    };
    setTimeout(check, 100);
  });

// ... 其它細節

onmessage = async ({ data }) => {
  try {
    await waitForConnection(2000);
  } catch (e) {
    console.error(e);
    return;
  }
  socketConnection.send(data);
};

waitForConnection 函數會每隔 100 ms 檢查 socket 鏈接狀態,若是鏈接狀態是 1(成功),則 resolve Promise,不然一直隔 100 ms 檢查一次,直到鏈接成功或者超過指定時間。服務器

在向 socket 發送數據以前,先調用 waitForConnection,並指定最多等 2 秒,確保鏈接成功後再發送數據。

問題看起來解決了。奇淫技巧都用上了,讓我滿意了一下子。直到……

需求來了!

在 socket 斷開重連後,須要續訂以前的訂閱。而包括用戶 token 等訂閱參數全都在 Vuex Store 裏面。那這下頭疼了,Vuex store 裏面是無法知道斷開重連的,而 worker 裏面則根本無法讀取 vuex store。知道這個需求後我心裏是崩潰的,這根本無法寫下去了啊!就在我都快要打算調整架構重寫時,一拍腦殼靈光一閃,試試控制反轉!

首先要讓 Socket 工廠函數有個判斷重連的機制。這個簡單。

// utils/socket.js
export default function Socket() {
  let connectCount = 0; // 鏈接成功次數
  // ...細節,見文章前面
  socket.onopen = () => {
    // 每次鏈接成功,鏈接次數加1
    connectCount += 1;

    if (connectCount > 1) {
      // 若鏈接次數超過一次,則說明這次是重連
      // 在這裏能夠作些重連以後的操做了
    }
  };
  // ...
}

重連以後具體作什麼事,這能夠用依賴注入來實現。先在 worker 文件裏定義要作的事情,而後在調用 Socket 工廠函數時注入方法:

// worker/socket
// 經過 postMessage 通知外部重連
const notifyReconnect = () => {
  postMessage({ type: "reconnect" });
};

const socket = Socket(notifyReconnect);

而後在 Socket 函數裏接收一下:

// utils/socket.js
export default function Socket(notifyReconnect) {
  // ...細節,見文章前面
  socket.onopen = () => {
    connectCount += 1;
    if (connectCount > 1) {
      notifyReconnect();
    }
  };
  // ...
}

我覺得寫到這裏應該就能夠了的,然而我仍是太天真了。運行後,plugins/socket 文件裏能接收到重連消息,可是一直鏈接失敗。這個問題很詭異,最後發現仍是由於我對 Web Socket 掌握的不深致使的。每次 socket 鏈接後,生成的鏈接實例都是新的。而我在 waitForConnection 方法裏監聽的 socketConnection 在關閉後,readyState 一直是 3(關閉狀態),致使 waitForConnection 方法一直報 timeout 錯誤。

剩下的最後問題是每次重連,都更新鏈接實例。方法以下:

// worker/socket
let socketConnection;

const notifyReconnect = connection => {
  postMessage({ type: "reconnect" });
  socketConnection = connection;
};

const socket = Socket(notifyReconnect);
socketConnection = socket.connect();
// utils/socket
export default function Socket(notifyReconnect) {
  // ...細節,見文章前面
  socket.onopen = () => {
    connectCount += 1;
    if (connectCount > 1) {
      notifyReconnect(socket);
    }
  };
  // ...
}

Socket 函數在調用 notifyReconnect 時,傳入最新的鏈接實例 socket

至此,功能都實現了。完整代碼以下:

// utils/socket.js
export default function Socket(notifyReconnect) {
  let heartBeat;
  let lost = 0;
  let connectCount = 0;

  function decLost() {
    lost -= 1;
  }

  function connect() {
    const socket = new WebSocket("wss://xx.com");

    socket.onopen = () => {
      connectCount += 1;
      if (connectCount > 1) {
        notifyReconnect(socket);
      }
      heartBeat = setInterval(() => {
        socket.send(2);
        lost += 1;
        if (lost === 3) {
          clearInterval(heartBeat);
          socket.close();
          connect();
        }
      }, 5000);
    };

    socket.onerror = () => {
      clearInterval(heartBeat);
      socket.close();
    };

    socket.onclose = () => {
      setTimeout(() => {
        clearInterval(heart);
        connect();
      }, 3000);
    };
    return socket;
  }
  return Object.freeze({ decLost, connect });
}
// workers/socket.js
import Socket from "~/utils/socket";

let socketConnection;

const notifyReconnect = connection => {
  postMessage({ type: "reconnect" });
  socketConnection = connection;
};

const socket = Socket(notifyReconnect);
socketConnection = socket.connect();

const waitForConnection = timeout =>
  new Promise((resolve, reject) => {
    const check = () => {
      if (socketConnection.readyState === 1) {
        resolve();
      } else if ((timeout -= 100) < 0) {
        reject("socket connection timed out");
      } else {
        setTimeout(check, 100);
      }
    };
    setTimeout(check, 100);
  });

socketConnection.onmessage = ({ data }) => {
  // 處理完數據後經過 web worker 接口發出去
  postMessage(result);
};

onmessage = async ({ data }) => {
  try {
    await waitForConnection(2000);
  } catch (e) {
    console.error(e);
    return;
  }
  socketConnection.send(data);
};
// plugins/socket.js
import SocketWorker from "worker-loader!~/workers/socket.js";

const socketWorker = new SocketWorker();

export default ({ store }) => {
  store.subscribe((mutation, state) => {});

  socketWorker.onmessage = ({ data }) => {
    if (data.type === "reconnect") {
      socketWorker.postMessage(/* 訂閱參數 */);
    }
  };
};

Edit: 測試時發現上面寫法有個問題。即重連後,雖然鏈接實例更新了,可是 onmessage 事件沒有更新,致使重連後的 socket 數據無法處理。解決辦法是把重連更新鏈接實例的邏輯放在 plugins/socket.js 文件。Socket 工廠函數的 connect 方法返回一個 Promise,在新的鏈接實例的 onopen 事件裏 resolve promise。這樣調整以後,就用不到 waitForConnection 函數了。

相關文章
相關標籤/搜索