基於node+socket.io+redis的多房間多進程聊天室

1、相關技術介紹:

消息實時推送,指的是將消息實時地推送到瀏覽器,用戶不須要刷新瀏覽器就能夠實時獲取最新的消息,實時聊天室的技術原理也是如此。傳統的Web站點爲了實現推送技術,所用的技術都是輪詢,這種傳統的模式帶來很明顯的缺點,即瀏覽器須要不斷的向服務器發出請求。
短輪詢(Polling)

客戶端須要定時往瀏覽器輪詢發送請求,且只有當服務有數據更新後,客戶端的下一次輪詢請求才能拿到更新後的數據,在數據更新前的屢次請求至關於無效。這對帶寬資源形成了極大的浪費,若提升輪詢定時器時間,又會有數據更新不及時的煩惱。
commet
爲了解決短輪詢的弊端,一種基於http長鏈接的"服務器推"方式被hack出來。其與短輪詢的區別主要是,採用commet時,客戶端與服務端保持一個長鏈接,當數據發生改變時,服務端主動將數據推送到客戶端。Comet 又能夠被細分爲兩種實現方式,一種是長輪詢機制,一種是流技術。html

  • 長輪詢
    html5

長輪詢跟短輪詢不一樣的地方是,客戶端往服務端發送請求後,服務端判斷是否有數據更新,若沒有,則將請求hold住,等待數據更新時,才返回響應。這樣則避免了大量無效的http請求,但即便採用長輪詢方式,接受數據更新的最小時間間隔仍是爲2*RTT(往返時間)。node

  • 流技術
    nginx

流技術(http stream)基於iframe實現。經過HTML標籤iframe src指向服務端,創建一個長鏈接。當有數據推送,則往客戶端返回,無須再請求。但流技術有個缺點就是,在瀏覽器頂部會一直出現頁面未加載完成的loading標示。git

websocket

爲了解決服務端如何更快地實時推送數據到客戶端以及以上推送方式技術的不足,HTML5中定義了Websocket協議,它是一種在單個TCP鏈接上進行全雙工通信的協議。與http協議不一樣的請求/響應模式不一樣,Websocket在創建鏈接以前有一個Handshake(Opening Handshake)過程,創建鏈接以後,雙方便可雙向通訊。固然,因爲websocket是html5新特性,在部分瀏覽器(IE10如下)是不支持的。
咱們來看下websocket的握手報文:github

請求報文:web

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Protocol: chat
Sec-WebSocket-Version: 13
Origin: http://example.com
  • "Upgrade "、"Connection": 告訴服務器這個請求是一個websocket協議,須要區別處理redis

  • "Upgrade: websocket": 代表這是一個 WebSocket 類型請求,意在告訴 server 須要將通訊協議切換到 WebSocket瀏覽器

  • "Sec-WebSocket-Key": 是 client 發送的一個 base64 編碼的密文,要求 server 必須返回一個對應加密的 "Sec-WebSocket-Accept" 應答,不然 client 會拋出 "Error during WebSocket handshake" 錯誤,並關閉鏈接服務器

  • "Sec-WebSocket-Protocol":一個用戶定義的字符串,用來區分同URL下,不一樣的服務所須要的協議

  • "Sec-WebSocket-Version":Websocket Draft (協議版本)

響應報文:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat
  • "Sec-WebSocket-Accept": 這個則是通過服務器確認,而且加密事後的 Sec-WebSocket-Key。加密方式爲將Sec-WebSocket-Key與一段固定的 GUID 字符串進行鏈接,而後進行SHA-1 hash,接着base64編碼獲得。

socket.io(http://socket.io)

是一個徹底由JavaScript實現,基於Node.js、支持WebSocket的協議用於實時通訊、跨平臺的開源框架。Socket.IO除了支持WebSocket通信協議外,還支持許多種輪詢機制以及其它實時通訊方式,並封裝成了通用的接口,並可以根據瀏覽器對通信機制的支持狀況自動地選擇最佳的方式來實現網絡實時應用。

首先,咱們建立一個socket.io server對象,指定監聽80端口。而且指定收到message消息,以及socket端口的監聽方法。接着,當socket創建鏈接後,經過socket.emit方法,能夠往客戶端發送消息。

var io = require('socket.io')();
 io.on('connection', function(socket) {
    //接受消息
    socket.on('message', function (msg) {
        console.log('receive messge : ' + msg );
    });
    
    //發送消息
    socket.emit('message', 'hello');
    
    //斷開鏈接回調
    socket.on('disconnect', function () { 
        console.log('socket disconnect');
    });
});
io.listen(80);

客戶端的代碼也很是簡單,只要引入socket.io對應的客戶端庫(https://github.com/socketio/s...
在socket創建鏈接的回調中,使用socket.emit以及socket.on就能夠分別作消息的發送以及監聽了。

<script>
  var socket = io('http://localhost/');
  socket.on('connect', function () {
    socket.emit('message', 'hi, i am client!');

    socket.on('message', function (msg) {
      console.log('msg received from server');
    });
  });
</script>

2、多節點集羣架構設計

若只是單機部署應用,單純使用socket.io的消息事件監聽處理便可知足咱們的需求。但隨着業務的擴大,咱們須要考慮多機集羣部署,客戶端能夠鏈接到任一節點,併發送消息。如何作到多節點的同時推送,咱們須要創建一套多節點之間的消息分發/訂閱架構。這時咱們引入redis的pub/sub功能。

redis
redis是一個key-value存儲系統,在該項目中主要起到一個消息分發中心(publish/subscribe)的做用。用戶經過socket.io namespace 訂閱房間號後,socket.io server則往redis訂閱(subscribe)該房間號channel。當在該房間中的某一用戶發送消息時,則經過redis的publish功能往redis該房間號channel publish消息。這樣全部訂閱該房間號channel的websocket鏈接則會收到消息回調,而後推送給客戶端。

nginx
因爲採用了集羣架構,則須要nginx來作反向代理。須要注意的是,websocket的支持須要nginx1.3以上版本。而且咱們須要經過配置ip_hash作粘性會話(ip_hash)處理,避免在低版本瀏覽器socket.io使用兼容方案輪詢請求,請求到不一樣機器,形成session異常。

####3、架構設計圖

客戶端經過socket.io namespace 指定對應roomid,請求到nginx。nginx根據ip_hash反向代理到對應機器的某一端口的socket.io server 進程。創建websocket鏈接,並往redis訂閱對應到房間(roomid)channel。到這個時候,一個訂閱了某一房間的websocket通道創建完成。
當用戶發送消息時,socket.io server捕獲到該房間到消息後,即往redis對應房間id的channel publish消息。這時全部訂閱了該房間id channel的socket.io server就會收到訂閱響應,接着找到對應房間id的webscoket通道,並將消息推送到客戶端。

4、代碼示例(多房間實時聊天室):

nginx配置(nginx版本須>1.3):
在http{}裏配置定義upstream,並設置ip_hash。使同一個ip的請求可以落在同一個機器同一個進程中。 若是改節點掛了,則自動重連到另一個節點,該方案對於後期擴容也很是方便。

upstream io_nodes {
 ip_hash;
 server 127.0.0.1:6001;
 server 127.0.0.1:6002;
 server 127.0.0.1:6003;
 server 127.0.0.1:6004;
 server 127.0.0.1:6005;
 server 127.0.0.1:6006;
 server 127.0.0.1:6007;
 server 127.0.0.1:6008;
 server 10.x.x.x:6001;
 server 10.x.x.x:6002;
 server 10.x.x.x:6003;
 server 10.x.x.x:6004;
 server 10.x.x.x:6005;
 server 10.x.x.x:6006;
 server 10.x.x.x:6007;
 server 10.x.x.x:6008;
 }

在server中,配置location:

location / {
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    proxy_set_header X-Forwarded-For  $proxy_add_x_forwarded_for;
    proxy_set_header Host $host;
    proxy_http_version 1.1;
    proxy_pass http://io_nodes;
    proxy_redirect off;
}

cluster.js
咱們採用了多進程的設計,充分利用cpu多核優點。經過主進程統一管理維護子進程,每一個進程監聽一個端口。

var cupNum = require('os').cpus().length,
    workerArr = [],
    roomInfo = [];
var connectNum = 0;

for (var i = 0; i < cupNum; i++) {
    workerArr.push(fork('./fork_server.js', [6001 + i]));

    workerArr[i].on('message', function(msg) {
        if (msg.cmd && msg.cmd === 'client connect') {
            connectNum++;
            console.log('socket server connectnum:' + connectNum);
        }
        if (msg.cmd && msg.cmd === 'client disconnect') {
            connectNum--;
            console.log('socket server connectnum:' + connectNum);
        }
    });

fork_server.js

var process = require('process');

var io = require('socket.io')();

var num = 0;

var redis = require('redis');
var redisClient = redis.createClient;

//創建redis pub、sub鏈接
var pub = redisClient({port:13800, host: '127.0.0.1', password:'xxxx'});
var sub = redisClient({port: 13800, host:'127.0.0.1', password:'xxxx'});

var roomSet = {};

//獲取父進程傳遞端口
var port = parseInt(process.argv[2]);

//當websocket鏈接時
io.on('connection', function(socket) {

    //客戶端請求ws URL:  http://127.0.0.1:6001?roomid=k12_webcourse_room_1
    var roomid = socket.handshake.query.roomid;

    console.log('worker pid: ' + process.pid  + ' join roomid: '+ roomid);
    
    socket.on('join', function (data) {

        socket.join(roomid);    //加入房間
         
        // 往redis訂閱房間id
        if(!roomSet[roomid]){
            roomSet[roomid] = {};
            console.log('sub channel ' + roomid);
            sub.subscribe(roomid);
        }

      roomSet[roomid][socket.id] = {};
      reportConnect();
      console.log(data.username + ' join, IP: ' + socket.client.conn.remoteAddress);
      roomSet[roomid][socket.id].username = data.username;
      // 往該房間id的reids channel publish用戶進入房間消息
      pub.publish(roomid, JSON.stringify({"event":'join',"data": data}));
  });
  
  //用戶發言 推送消息到redis
  socket.on('say', function (data) {
    console.log("Received Message: " + data.text);
    pub.publish(roomid, JSON.stringify({"event":'broadcast_say',"data": {
      username: roomSet[roomid][socket.id].username,
      text: data.text
    }}));
  });


    socket.on('disconnect', function() {
        num--;
        console.log('worker pid: ' + process.pid + ' clien disconnection num:' + num);
        process.send({
            cmd: 'client disconnect'
        });

        if (roomSet[roomid] && roomSet[roomid][socket.id] && roomSet[roomid][socket.id].username) {
      console.log(roomSet[roomid][socket.id].username + ' quit');
      pub.publish(roomid, JSON.stringify({"event":'broadcast_quit',"data": {
        username: roomSet[roomid][socket.id].username
      }}));
    }
    roomSet[roomid] && roomSet[roomid][socket.id] && (delete roomSet[roomid][socket.id]);

    });
});

/**
 * 訂閱redis 回調
 * @param  {[type]} channel [頻道]
 * @param  {[type]} count   [數量]  
 * @return {[type]}         [description]
 */
sub.on("subscribe", function (channel, count) {
    console.log('worker pid: ' + process.pid + ' subscribe: ' + channel);
});

/**
 * 收到redis publish 對應channel的消息
 * @param  {[type]} channel  [description]
 * @param  {[type]} message
 * @return {[type]}          [description]
 */
sub.on("message", function (channel, message) {
    console.log("message channel " + channel + ": " + message);
    //往對應房間廣播消息
    io.to(channel).emit('message', JSON.parse(message));
});

/**
 * 上報鏈接到master進程 
 * @return {[type]} [description]
 */
var reportConnect = function(){
    num++;
    console.log('worker pid: ' + process.pid + ' client connect connection num:' + num);
    process.send({
        cmd: 'client connect'
    });
};


io.listen(port);

console.log('worker pid: ' + process.pid + ' listen port:' + port);

客戶端:

<script src="static/socket.io.js"></script>
<script>
    var roomid = (function () {
        return prompt('請輸入房間號','')
    })();

    var userInfo = {
        username: (function () {
            return prompt('請輸入rtx暱稱', '');
        })()
    };

    if(roomid != null && roomid != "") {
        var socket = io.connect('http://10.244.146.2?roomid='+ roomid);

        socket.emit('join', {
            username: userInfo.username
        });

        socket.on('message', function(msg){ 
            switch (msg.event) {
                case 'join':
                if (msg.data.username) {
                    console.log(msg.data.username + '加入了聊天室');
                    var data = {
                        text: msg.data.username + '加入了聊天室'
                    };
                    showNotice(data);
                }
                break;
                /*收到消息廣播後,顯示消息*/
                case 'broadcast_say':
                    if(msg.data.username!==userInfo.username) {
                        console.log(msg.data.username + '說: ' + msg.data.text);
                        showMessage(msg.data);
                    }
                break;
/*離開聊天室廣播後,顯示消息*/
                case 'broadcast_quit':
                    if (msg.data.username) {
                        console.log(msg.data.username + '離開了聊天室');
                        var data = {
                            text: msg.data.username + '離開了聊天室'
                        };
                        showNotice(data);
                    }
                    break;
            }
        })

    }



    /*點擊發送按鈕*/
    document.getElementById('send').onclick = function () {
        var keywords = document.getElementById('keywords');
        if (keywords.value === '') {
            keywords.focus();
            return false;
        }
        var data = {
            text: keywords.value,
            type: 0,
            username: userInfo.username
        };
        /*向服務器提交一個say事件,發送消息*/
        socket.emit('say', data);

        showMessage(data);
        keywords.value = "";
        keywords.focus();
    };
    /*展現消息*/
    function showMessage(data) {
        var itemArr = [];
        itemArr.push('<dd class="'+(data.type === 0 ? "me" : "other")+'">');
        itemArr.push('<ul>');
        itemArr.push('<li class="nick-name">' + data.username + '</li>');
        itemArr.push('<li class="detail">');
        itemArr.push('<div class="head-icon"></div>');
        itemArr.push('<div class="text">' + data.text + '</div>');
        itemArr.push('</li>');
        itemArr.push('</ul>');
        itemArr.push('</dd>');

        document.getElementById('list').innerHTML += itemArr.join('');
    }
    /*展現通知*/
    function showNotice(data) {
        var item = '<dd class="tc"><span>' + data.text + '</span><dd>';
        document.getElementById('list').innerHTML += item;
    }

    /*回車事件*/
    document.onkeyup = function (e) {
        if (!e) e = window.event;
        if ((e.keyCode || e.which) == 13) {
            document.getElementById('send').click();
        }
    }

</script>

gihub源碼地址:https://github.com/493326889/...

相關文章
相關標籤/搜索