最近在構建兩個系統的實時通訊部分,總結一下所學。前端
這是一個系列文章,暫時主要構思四個部分git
這個是我在造的玩具的一個簡單架構圖。將實時通訊部分給抽離出來做爲一個Websocket節點,造成了一個簡單的分佈式系統,而後經過Redis的Pub/Sub作Websocket集羣之間的通訊以及Websocket節點與Restful API節點的通訊(好比用戶調用Restful API發表文章以後通知Websocket推送新消息小紅點給前端)。 github
分佈式系統的坑 左耳朵耗子:從亞馬遜的實踐,談分佈式系統的難點web
本文主要介紹下分佈式Websocket集羣解決方案,最後會有個可運行的Demo。redis
在這篇博客裏,咱們最終但願構建一個Websocket集羣來實現與客戶端的實時通訊,好比聊天室。咱們固然能夠經過簡單的demo構建一個Websocket服務器並讓全部客戶端鏈接這臺機器,但當這個聊天室的交互量很是龐大呢?好比鬥魚的直播彈幕,我去鬥魚看了下請求,從命名也能夠看到其創建了一個ws鏈接,叫作danmuproxy.douyu.com
,以下圖。數據庫
那麼問題來了,若是我只使用一臺服務器,如何去支持可能有10萬人同時加入的這個聊天室呢?顯然咱們須要一個解決方案,好比將流量負載均衡到不一樣的服務器上並提供一種通訊機制讓各個服務器能進行消息同步(否則用戶A連上服務器A,用戶B臉上服務器B,它們發消息的時候對方都無法收到)。安全
其實從上圖的名字來看就知道鬥魚鏈接的這個danmuproxy.douyu.com
中的proxy
就大體能推斷出他們也是把流量作了一個分發。服務器
因爲和普通的HTTP服務器的負載均衡不一樣,上一節也說到了這些Websocket服務器須要共享信息(固然,須要作Session共享的服務器也同樣)。這意味着客戶端與Websocket服務器的交互是有狀態(stateful)的,咱們須要把每一個客戶端的鏈接數據保存在內存中。而當咱們要實現分佈式的時候,咱們則須要在各個機器上共享這些信息,因此咱們須要一個Publish/Subscribe broker(其實broker之前上學講軟件設計體系結構的時候學過,但當時太萌新了沒理解)。接下來舉個例子。websocket
假設咱們如今使用Redis做爲咱們的解決方案,而後咱們如今有三臺Websocket服務器WS1
,WS2
和WS3
。而後每臺服務器上連了三個用戶。WS1
機器上的其中一個用戶發送了某個消息到聊天室,在你的Websocket服務器的邏輯中,你首先會把這個消息存入數據庫作一個持久化(好比作歷史消息),而後將這個消息根據channelId之類的東西推送至這個聊天室的channel(Websocket的channel的實現會在下一篇中詳細講),咱們假設這個channelId叫「The☆World」。架構
如今你把數據安全的存入了DB裏,而且你發佈了一個事件給你的Pub/Sub broker(Redis channel)來通知其餘對此感興趣的部分(其餘Websocket或者API服務器等)。因此以前的另外兩個服務器WS2
和WS3
由於對這部分感興趣因此他們也經過腳本監聽了這一個Redis channel,它們就會獲得通知,而後每一個服務器就會對DB請求query獲取更新而後emit消息給Websocket上對應channel。
這就是大家能夠看到的,使用Pub/Sub brooker來實現了一個橫向擴展的Websocket集羣。
從這裏也能夠看到集羣具備的有點,高擴展性以及高可用性。
此次實現使用了個人一臺高配阿里雲國內服務器和一臺比較low的阿里雲9元學生服務器以及高配服務器上的redis。
首先配置Nginx作負載均衡,下圖是個人配置,只是個Demo沒作wss相關的。
代碼都在github上。
Demo的代碼也很短
const WebSocket = require('ws');
const publicIp = require('public-ip');
const uuidv1 = require('uuid/v1');
const redis = require("redis");
const config = require('./config');
const sub = redis.createClient(config.DB.REDIS_PORT, config.DB.REDIS_HOST);
const pub = redis.createClient(config.DB.REDIS_PORT, config.DB.REDIS_HOST);
if (config.DB.REDIS_PASSWORD) {
sub.auth(config.DB.REDIS_PASSWORD);
pub.auth(config.DB.REDIS_PASSWORD);
}
const wss = new WebSocket.Server({ port: 2333 });
const ip2name = {
'47.94.233.234': '梁王的高配據點',
'115.28.68.89': '梁王的9塊服務器',
}
let sockets = {};
wss.on('connection', function connection(ws) {
const uuid = uuidv1();
ws.uuid = uuid;
sockets[uuid] = ws;
ws.on('message', function incoming(message) {
// publish消息給其餘服務器
pub.publish('channel', `${ws.uuid}>${message}`);
console.log(`publish to channel: ${ws.uuid}>${message}`)
// 向本服務器的socket廣播
wss.clients.forEach(function each(client) {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(`來自${ws.from || '???'}的用戶${ws.uuid}發送了: ${message}`);
}
});
});
publicIp.v4().then(ip => {
console.log(ip);
ws.from = ip2name[ip] ? ip2name[ip] : '未知';
ws.send(`你鏈接的服務器爲${ws.from}`);
});
});
// 監聽其餘服務器發送的消息
sub.on('message', function(channel, message) {
console.log(`channel ${channel}, ${message}`)
if (channel == 'channel')
{
var messageArr = message.split('>');
var uuid = messageArr[0]
var wsFrom = sockets[uuid];
var content = messageArr[1];
// 若是socket是非本服務器的
if(!wsFrom) {
wss.clients.forEach(function each(client) {
client.send(`來自其餘服務器的用戶${uuid}發送了: ${content}`);
});
}
}
});
sub.subscribe('channel');
複製代碼
能夠用如下代碼在控制檯中嘗試,服務器後期可能會關。
var socket = new WebSocket('ws://websocket-demo.lwio.me');
// Listen for messages
socket.addEventListener('message', function (event) {
console.log('收到了', event.data);
});
// socket.send('keke')
複製代碼
4月1號更新,媽耶今天阿里雲一直報警,大家就看我redis直接暴露到公網就給我來了一波是吧。學習了學習了,向信安大佬低頭。
參考資料: