咱們的項目是基於 ThinkJS + Vue 開發的,最近實現了一個多端實時同步數據的功能,因此想寫一篇文章來介紹下如何在 ThinkJS 的項目中利用 WebSocket 實現多端的實時通訊。ThinkJS 是基於 Koa 2 開發的企業級 Node.js 服務端框架,文章中會從零開始實現一個簡單的聊天室,但願讀者們能有所收穫。javascript
WebSocket 是 HTML5 中提出的一種協議。它的出現是爲了解決客戶端和服務端的實時通訊問題。在 WebSocket 出現以前,若是想實現實時消息傳遞通常有兩種方式:html
能夠看到,這兩種實現方式的本質仍是客戶端向服務端「Pull」的過程,並無一個服務端主動「Push」到客戶端的方式,全部的方式都是依賴客戶端先發起請求。爲了知足兩方的實時通訊, WebSocket 應運而生。
java
首先,WebSocket 是基於 HTTP 協議的,或者說借用了 HTTP 協議來完成鏈接的握手部分。其次,WebSocket 是一個持久化協議,相對於 HTTP 這種非持久的協議來講,一個 HTTP 請求在收到服務端回覆後會直接斷開鏈接,下次獲取消息須要從新發送 HTTP 請求,而 WebSocket 在鏈接成功後能夠保持鏈接狀態。下圖應該能體現二者的關係:git
在發起 WebSocket 請求時須要先經過 HTTP 請求告訴服務端需求將協議升級爲 WebSocket。github
瀏覽器先發送請求:web
GET / HTTP/1.1 Host: localhost:8080 Origin: [url=http://127.0.0.1:3000]http://127.0.0.1:3000[/url] Connection: Upgrade Upgrade: WebSocket Sec-WebSocket-Version: 13 Sec-WebSocket-Key: w4v7O6xFTi36lq3RNcgctw==
服務端迴應請求:redis
HTTP/1.1 101 Switching Protocols Connection:Upgrade Upgrade: WebSocket Sec-WebSocket-Accept: Oy4NRAQ13jhfONC7bP8dTKb4PTU=
在請求頭中核心的部分是 Connection 和 Upgrade ,經過這兩個字段服務端會將 HTTP 升級爲 WebSocket 協議。服務端返回對應信息後鏈接成功,客戶端和服務端就能夠正常通訊了。json
隨着新標準的推動,WebSocket 已經比較成熟了,而且各個主流瀏覽器對 WebSocket 的支持狀況比較好(不兼容低版本 IE,IE 10 如下)bootstrap
Socket.io 是一個徹底由 JavaScript 實現、基於 Node.js、支持 WebSocket 協議的用於實時通訊、跨平臺的開源框架。它包括了客戶端的 JavaScript 和服務器端的 Node.js,而且有着很好的兼容性,會根據瀏覽器的支持狀況選擇不一樣的方式進行通信,如上面介紹的輪詢和 HTTP 長鏈接。api
對於 WebSocket 目前 ThinkJS 支持了 Socket.io 並對其進行了一些簡單的包裝,只須要進行一些簡單的配置就可
以使用 WebSocket 了。
ThinkJS 默認採用了多進程模型,每次請求會根據策略輸送到不一樣的進程中執行,關於其多進程模型能夠參考《細談 ThinkJS 多進程模型》。 而 WebSocket 鏈接前須要使用 HTTP 請求來完成握手升級,多個請求須要保證命中相同的進程,才能保證握手成功。這個時候就須要開啓 StickyCluster 功能,使客戶端全部的請求命中同一進程。修改配置文件 src/config/config.js
便可。
module.exports = { stickyCluster: true, // ... }
在 src/config/extend.js
引入 WebSocket:
const websocket = require('think-websocket'); module.exports = [ // ... websocket(think.app), ];
在 src/config/adapter.js
文件中配置 WebSocket
const socketio = require('think-websocket-socket.io'); exports.websocket = { type: 'socketio', common: { // common config }, socketio: { handle: socketio, messages: { open: '/websocket/open', //創建鏈接時處理對應到 websocket Controller 下的 open Action close: '/websocket/close', // 關閉鏈接時處理的 Action room: '/websocket/room' // room 事件處理的 Action } } }
配置中的 message
對應着事件的映射關係。好比上述的例子,客戶端觸發 room 事件,服務端須要在 websocket controller 下的 roomAction
中處理消息。
建立處理消息的 controller 文件。上面的配置是 /websocket/xxx
,因此直接在項目根目錄 src/controller
下建立 websocket.js 文件。
module.exports = class extends think.Controller { // this.socket 爲發送消息的客戶端對應的 socket 實例, this.io 爲Socket.io 的一個實例 constructor(...arg) { super(...arg); this.io = this.ctx.req.io; this.socket = this.ctx.req.websocket; } async openAction() { this.socket.emit('open', 'websocket success') } closeAction() { this.socket.disconnect(true); } };
這時候服務端代碼就已經配置完了。
客戶端代碼使用比較簡單,只須要引入 socket.io.js 就能夠直接使用了。
<script src="https://lib.baomitu.com/socket.io/2.0.1/socket.io.js"></script>
引入後在初始化代碼建立 WebSocket 鏈接:
this.socket = io(); this.socket.on('open', data => { console.log('open', data) })
這樣一個最簡單的 WebSocket 的 demo 就完成了,打開頁面的時候會自動建立一個 WebSocket 鏈接,建立成功後服務端會觸發 open 事件,客戶端在監聽的 open 事件中會接收到服務端返回的 websocket success 字符串。
接下來咱們開始實現一個簡單的聊天室。
從剛纔的內容中咱們知道每一個 WebSocket 鏈接的建立會有一個 Socket 句柄建立,對應到代碼中的 this.socket
變量。因此本質上聊天室人與人的通訊能夠轉換成每一個人對應的 Socket 句柄的通訊。我只須要找到這我的對應的 Socket 句柄,就能實現給對方發送消息了。
簡單來實現咱們能夠設置一個全局變量來存儲鏈接到服務端的 WebSocket 的一些信息。在 src/bootstrap/global.js 中設置全局變量:
global.$socketChat = {};
而後在 src/bootstrap/worker.js 中引入global.js,使全局變量生效。
require('./global');
而後在服務端 controller 增長 roomAction
和 messageAction
, messageAction
用來接收客戶端用戶的聊天信息,並將信息發送給全部的客戶端成員。 roomAction
用來接收客戶端進入/離開聊天室的信息。這兩個的區別是聊天消息是須要同步到全部的成員因此使用 this.io.emit
,聊天室消息是同步到全部除當前客戶端外的全部成員因此使用this.socket.broadcast.emit
module.exports = class extends think.Controller { constructor(...arg) { super(...arg); this.io = this.ctx.req.io; this.socket = this.ctx.req.websocket; global.$socketChat.io = this.io; } async messageAction() { this.io.emit('message', { nickname: this.wsData.nickname, type: 'message', message: this.wsData.message, id: this.socket.id }) } async roomAction() { global.$socketChat[this.socket.id] = { nickname: this.wsData.nickname, socket: this.socket } this.socket.broadcast.emit('room', { nickname: this.wsData.nickname, type: 'in', id: this.socket.id }) } async closeAction() { const closeSocket = global.$socketChat[this.socket.id]; const nickname = closeSocket && closeSocket.nickname; this.socket.disconnect(true); this.socket.removeAllListeners(); this.socket.broadcast.emit('room', { nickname, type: 'out', id: this.socket.id }) delete global.$socketChat[this.socket.id] } }
客戶端經過監聽服務端 emit 的事件來處理信息
this.socket.on('message', data => { // 經過socket的id的對比,判斷消息的發送方 data.isMe = (data.id === this.socket.id); this.chatData.push(data); }) this.socket.on('room', (data) => { this.chatData.push(data); })
經過 emit 服務端對應的 action 來發送消息
this.socket.emit('room', { nickname: this.nickname }) this.socket.emit('message', { message: this.chatMsg, nickname: this.nickname })
根據發送/接收消息的type判斷消息類型
<div class="chat-box"> <div v-for="(item, index) in chatData" :key="index"> <p v-if="item.type == 'in'" class="enter-tip">{{item.nickname}}進入聊天室</p> <p v-if="item.type == 'out'" class="enter-tip">{{item.nickname}}離開聊天室</p> <p v-else-if="item.type == 'message'" :class="['message',{'me':item.isMe}]"> {{item.nickname}}:{{item.message}} </p> </div> </div>
至此一個簡單的聊天室就完成了。
剛纔咱們說了通訊的本質實際上是 Socket 句柄查詢使用的過程,本質上咱們是利用全局變量存儲全部的 WebSocket 句柄的方式解決了 WebSocket 鏈接查找的問題。可是當咱們的服務端擴容後,會出現多個服務器都有 WebSocket 鏈接,這個時候跨節點的 WebSocket 鏈接查找使用全局變量的方式就無效了。此時咱們就就須要換一種方式來實現跨服務器的通訊同步,通常有如下幾種方式:
發送消息不直接執行 emit
事件,而是將消息發送到消息隊列中,而後全部的節點對這條消息進行消費。拿到數據後查看接收方的 WebSocket 鏈接是否在當前節點上,不在的話就忽略這條數據,在的話則執行發送的動做。
經過外部存儲服務例如 Redis 充當以前的「全局變量」的角色,全部的節點建立 WebSocket 鏈接後都向 Redis 中註冊一下,告訴你們有個叫 「A」 傢伙的鏈接在 「192.168.1.1」 這。當 B 要向 A 發送消息的時候它去 Redis 中查找到 A 的鏈接所處的節點後,通知 192.168.1.1 這個節點 B 要向 A 發送消息,而後節點會執行發送的動做。
Redis 的 pub/sub 是一種消息通訊模式:發送者(pub)發送消息,訂閱者(sub)接收消息。WebSocket 的一個節點接收到消息後,經過 Redis 發佈(pub),其餘節點做爲訂閱者(sub)接收消息再進行後續處理。
此次咱們將在聊天室的 demo 上實現節點通訊的功能。
首先,在 websocket controller 文件中增長接口調用
const ip = require('ip'); const host = ip.address(); module.exports = class extends think.Controller { async openAction() { // 記錄當前 WebSocket 鏈接到的服務器ip await global.rediser.hset('-socket-chat', host, 1); } emit(action, data) { if (action === 'message') { this.io.emit(action, data) } else { this.socket.broadcast.emit(action, data); } this.crossSync(action, data) } async messageAction() { const data = { nickname: this.wsData.nickname, type: 'message', message: this.wsData.message, id: this.socket.id }; this.emit('message', data); } async closeAction() { const connectSocketCount = Object.keys(this.io.sockets.connected).length; this.crossSync(action, data); if (connectSocketCount <= 0) { await global.rediser.hdel('-socket-chat', host); } } async crossSync(action, params) { const ips = await global.rediser.hkeys('-socket-chat').filter(ip => ip !== host); ips.forEach(ip => request({ method: 'POST', uri: `http://${ip}/api/websocket/sync`, form: { action, data: JSON.stringify(params) }, json: true }); ); } }
而後在 src/controller/api/websocket
實現通訊接口
const Base = require('../base'); module.exports = class extends Base { async syncAction() { const {action, data} = this.post(); const blackApi = ['room', 'message', 'close', 'open']; if (!blackApi.includes(action)) return this.fail(); // 因爲是跨服務器接口,因此直接使用io.emit發送給當前全部客戶端 const io = global.$socketChat.io; io && io.emit(action, JSON.parse(data)); } };
這樣就實現了跨服務的通訊功能,固然這只是一個簡單的 demo ,可是基本原理是相同的。
第二種 Redis (sub/pub) 的方式,socket.io 提供了一種官方的庫 socket.io-redis 來實現。它在 Redis 的 pub/sub 功能上進行了封裝,讓開發者能夠忽略 Redis 相關的部分,方便了開發者使用。使用時只須要傳入 Redis 的配置便可。
// Thinkjs socket.io-redis 配置 const redis = require('socket.io-redis'); exports.websocket = { ... socketio: { adapter: redis({ host: 'localhost', port: 6379 }), message: { ... } } } // then controller websocket.js this.io.emit('hi', 'all sockets');
若是想經過非 socket.io 進程向 socket.io 服務通訊,例如:HTTP,可使用官方的 socket.io-emitter 庫。使用方式以下:
var io = require('socket.io-emitter')({ host: '127.0.0.1', port: 6379 }); setInterval(function(){ io.emit('time', new Date); }, 5000);
整個聊天室的代碼已經上傳到github,你們能夠直接下載體驗聊天室示例