聊聊分佈式下的WebSocket解決方案

前言java


最近本身搭建了個項目,項目自己很簡單,可是裏面有使用WebSocket進行消息提醒的功能,大致狀況是這樣的。web

發佈消息者在系統中發送消息,實時的把消息推送給對應的一個部門下的全部人。json

這裏面若是是單機應用的狀況時,咱們能夠經過部門的id和用戶的id組成一個惟一的key,與應用服務器創建WebSocket長鏈接,而後就能夠接收到發佈消息者發送的消息了。後端

可是真正把項目應用於生產環境中時,咱們是不可能就部署一個單機應用的,而是要部署一個集羣。瀏覽器

因此我經過Nginx+兩臺Tomcat搭建了一個簡單的負載均衡集羣,做爲測試使用服務器

可是問題出現了,咱們的客戶端瀏覽器只會與一臺服務器創建WebSocket長鏈接,因此發佈消息者在發送消息時,就無法保證全部目標部門的人都能接收到消息(由於這些人鏈接的可能不是一個服務器)。websocket

本篇文章就是針對於這麼一個問題展開討論,提出一種解決方案,固然解決方案不止一種,那咱們開始吧。session

WebSocket單體應用介紹架構


在介紹分佈式集羣以前,咱們先來看一下王子的WebSocket代碼實現,先來看java後端代碼以下:app

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@ServerEndpoint("/webSocket/{key}")
public class WebSocket {

private static int onlineCount = 0;
/**
 * 存儲鏈接的客戶端
 */
private static Map<String, WebSocket> clients = new ConcurrentHashMap<String, WebSocket>();
private Session session;
/**
 * 發送的目標科室code
 */
private String key;

@OnOpen
public void onOpen(@PathParam("key") String key, Session session) throws IOException {
    this.key = key;
    this.session = session;
    if (!clients.containsKey(key)) {
        addOnlineCount();
    }
    clients.put(key, this);
    Log.info(key+"已鏈接消息服務!");
}

@OnClose
public void onClose() throws IOException {
    clients.remove(key);
    subOnlineCount();
}

@OnMessage
public void onMessage(String message) throws IOException {
    if(message.equals("ping")){
        return ;
    }
    JSONObject jsonTo = JSON.parseObject(message);
    String mes = (String) jsonTo.get("message");
    if (!jsonTo.get("to").equals("All")){
        sendMessageTo(mes, jsonTo.get("to").toString());
    }else{
        sendMessageAll(mes);
    }
}

@OnError
public void onError(Session session, Throwable error) {
    error.printStackTrace();
}

private void sendMessageTo(String message, String To) throws IOException {
    for (WebSocket item : clients.values()) {
        if (item.key.contains(To) )
            item.session.getAsyncRemote().sendText(message);
    }
}

private void sendMessageAll(String message) throws IOException {
    for (WebSocket item : clients.values()) {
        item.session.getAsyncRemote().sendText(message);
    }
}

public static synchronized int getOnlineCount() {
    return onlineCount;
}

public static synchronized void addOnlineCount() {
    WebSocket.onlineCount++;
}

public static synchronized void subOnlineCount() {
    WebSocket.onlineCount--;
}

public static synchronized Map<String, WebSocket> getClients() {
    return clients;
}

}

示例代碼中並無使用Spring,用的是原生的java web編寫的,簡單和你們介紹一下里面的方法。

onOpen:在客戶端與WebSocket服務鏈接時觸發方法執行

onClose:在客戶端與WebSocket鏈接斷開的時候觸發執行

onMessage:在接收到客戶端發送的消息時觸發執行

onError:在發生錯誤時觸發執行

能夠看到,在onMessage方法中,咱們直接根據客戶端發送的消息,進行消息的轉發功能,這樣在單體消息服務中是沒有問題的。

再來看一下js代碼

var host = document.location.host;

// 得到當前登陸科室
var deptCodes='${sessionScope.$UserContext.departmentID}';
deptCodes=deptCodes.replace(/[[|]|s]+/g, "");
var key = '${sessionScope.$UserContext.userID}'+deptCodes;
var lockReconnect = false;  //避免ws重複鏈接
var ws = null;          // 判斷當前瀏覽器是否支持WebSocket
var wsUrl = 'ws://' + host + '/webSocket/'+ key;
createWebSocket(wsUrl);   //鏈接ws

function createWebSocket(url) {
    try{
        if('WebSocket' in window){
            ws = new WebSocket(url);
        }else if('MozWebSocket' in window){  
            ws = new MozWebSocket(url);
        }else{
              layer.alert("您的瀏覽器不支持websocket協議,建議使用新版谷歌、火狐等瀏覽器,請勿使用IE10如下瀏覽器,360瀏覽器請使用極速模式,不要使用兼容模式!"); 
        }
        initEventHandle();
    }catch(e){
        reconnect(url);
        console.log(e);
    }     
}

function initEventHandle() {
    ws.onclose = function () {
        reconnect(wsUrl);
        console.log("llws鏈接關閉!"+new Date().toUTCString());
    };
    ws.onerror = function () {
        reconnect(wsUrl);
        console.log("llws鏈接錯誤!");
    };
    ws.onopen = function () {
        heartCheck.reset().start();      //心跳檢測重置
        console.log("llws鏈接成功!"+new Date().toUTCString());
    };
    ws.onmessage = function (event) {    //若是獲取到消息,心跳檢測重置
        heartCheck.reset().start();      //拿到任何消息都說明當前鏈接是正常的//接收到消息實際業務處理

        ...

};
}
// 監聽窗口關閉事件,當窗口關閉時,主動去關閉websocket鏈接,防止鏈接還沒斷開就關閉窗口,server端會拋異常。
window.onbeforeunload = function() {
    ws.close();
}  

function reconnect(url) {
    if(lockReconnect) return;
    lockReconnect = true;
    setTimeout(function () {     //沒鏈接上會一直重連,設置延遲避免請求過多
        createWebSocket(url);
        lockReconnect = false;
    }, 2000);
}

//心跳檢測
var heartCheck = {
    timeout: 300000,        //5分鐘發一次心跳
    timeoutObj: null,
    serverTimeoutObj: null,
    reset: function(){
        clearTimeout(this.timeoutObj);
        clearTimeout(this.serverTimeoutObj);
        return this;
    },
    start: function(){
        var self = this;
        this.timeoutObj = setTimeout(function(){
            //這裏發送一個心跳,後端收到後,返回一個心跳消息,
            //onmessage拿到返回的心跳就說明鏈接正常
            ws.send("ping");
            console.log("ping!")
            self.serverTimeoutObj = setTimeout(function(){//若是超過必定時間還沒重置,說明後端主動斷開了
                ws.close();     //若是onclose會執行reconnect,咱們執行ws.close()就好了.若是直接執行reconnect 會觸發onclose致使重連兩次
            }, self.timeout)
        }, this.timeout)
    }

  }

js部分使用的是原生H5編寫的,若是爲了更好的兼容瀏覽器,也可使用SockJS,有興趣小夥伴們能夠自行百度。

接下來咱們就手動的優化代碼,實現WebSocket對分佈式架構的支持。

解決方案的思考


如今咱們已經瞭解單體應用下的代碼結構,也清楚了WebSocket在分佈式環境下面臨的問題,那麼是時候思考一下如何可以解決這個問題了。

咱們先來看一看發生這個問題的根本緣由是什麼。

簡單思考一下就能明白,單體應用下只有一臺服務器,全部的客戶端鏈接的都是這一臺消息服務器,因此當發佈消息者發送消息時,全部的客戶端其實已經所有與這臺服務器創建了鏈接,直接羣發消息就能夠了。

換成分佈式系統後,假如咱們有兩臺消息服務器,那麼客戶端經過Nginx負載均衡後,就會有一部分鏈接到其中一臺服務器,另外一部分鏈接到另外一臺服務器,因此發佈消息者發送消息時,只會發送到其中的一臺服務器上,而這臺消息服務器就能夠執行羣發操做,但問題是,另外一臺服務器並不知道這件事,也就沒法發送消息了。

如今咱們知道了根本緣由是生產消息時,只有一臺消息服務器可以感知到,因此咱們只要讓另外一臺消息服務器也能感知到就能夠了,這樣感知到以後,它就能夠羣發消息給鏈接到它上邊的客戶端了。

那麼什麼方法能夠實現這種功能呢,王子很快想到了引入消息中間件,並使用它的發佈訂閱模式來通知全部消息服務器就能夠了。

引入RabbitMQ解決分佈式下的WebSocket問題


在消息中間件的選擇上,王子選擇了RabbitMQ,緣由是它的搭建比較簡單,功能也很強大,並且咱們只是用到它羣發消息的功能。

RabbitMQ有一個廣播模式(fanout),咱們使用的就是這種模式。

首先咱們寫一個RabbitMQ的鏈接類:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQUtil {

private static Connection connection;

/**
 * 與rabbitmq創建鏈接
 * @return
 */
public static Connection getConnection() {
    if (connection != null&&connection.isOpen()) {
        return connection;
    }

    ConnectionFactory factory = new ConnectionFactory();
    factory.setVirtualHost("/");
    factory.setHost("192.168.220.110"); // 用的是虛擬IP地址
    factory.setPort(5672);
    factory.setUsername("guest");
    factory.setPassword("guest");

    try {
        connection = factory.newConnection();
    } catch (IOException e) {
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }

    return connection;
}

}

這個類沒什麼說的,就是獲取MQ鏈接的一個工廠類。

而後按照咱們的思路,就是每次服務器啓動的時候,都會建立一個MQ的消費者監聽MQ的消息,王子這裏測試使用的是Servlet的監聽器,以下:

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

public class InitListener implements ServletContextListener {

@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
    WebSocket.init();
}

@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {

}

}

記得要在Web.xml中配置監聽器信息

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
     version="4.0">
<listener>
    <listener-class>InitListener</listener-class>
</listener>

</web-app>

WebSocket中增長init方法,做爲MQ消費者部分

public static void init() {

try {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        //交換機聲明(參數爲:交換機名稱;交換機類型)
        channel.exchangeDeclare("fanoutLogs",BuiltinExchangeType.FANOUT);
        //獲取一個臨時隊列
        String queueName = channel.queueDeclare().getQueue();
        //隊列與交換機綁定(參數爲:隊列名稱;交換機名稱;routingKey忽略)
        channel.queueBind(queueName,"fanoutLogs","");


        //這裏重寫了DefaultConsumer的handleDelivery方法,由於發送的時候對消息進行了getByte(),在這裏要從新組裝成String
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);
                String message = new String(body,"UTF-8");
                System.out.println(message);

            //這裏可使用WebSocket經過消息內容發送消息給對應的客戶端

}
        };

        //聲明隊列中被消費掉的消息(參數爲:隊列名稱;消息是否自動確認;consumer主體)
        channel.basicConsume(queueName,true,consumer);
        //這裏不能關閉鏈接,調用了消費方法後,消費者會一直鏈接着rabbitMQ等待消費
    } catch (IOException e) {
        e.printStackTrace();
    }
}

同時在接收到消息時,不是直接經過WebSocket發送消息給對應客戶端,而是發送消息給MQ,這樣若是消息服務器有多個,就都會從MQ中得到消息,以後經過獲取的消息內容再使用WebSocket推送給對應的客戶端就能夠了。

WebSocket的onMessage方法增長內容以下:

try {

//嘗試獲取一個鏈接
        Connection connection = RabbitMQUtil.getConnection();
        //嘗試建立一個channel
        Channel channel = connection.createChannel();
        //聲明交換機(參數爲:交換機名稱; 交換機類型,廣播模式)
        channel.exchangeDeclare("fanoutLogs", BuiltinExchangeType.FANOUT);
        //消息發佈(參數爲:交換機名稱; routingKey,忽略。在廣播模式中,生產者聲明交換機的名稱和類型便可)
        channel.basicPublish("fanoutLogs","", null,msg.getBytes("UTF-8"));
        System.out.println("發佈消息");
        channel.close();
    } catch (IOException |TimeoutException e) {
        e.printStackTrace();
    }

增長後刪除掉原來的Websocket推送部分代碼。

這樣一整套的解決方案就完成了。

總結


到這裏,咱們就解決了分佈式下WebSocket的推送消息問題。

咱們主要是引入了RabbitMQ,經過RabbitMQ的發佈訂閱模式,讓每一個消息服務器啓動的時候都去訂閱消息,而不管哪臺消息服務器在發送消息的時候都會發送給MQ,這樣每臺消息服務器就都會感知到發送消息的事件,從而再經過Websocket發送給客戶端。

大致流程就是這樣,那麼小夥伴們有沒有想過,若是RabbitMQ掛掉了幾分鐘,以後重啓了,消費者是否能夠從新鏈接到RabbitMQ?是否還能正常接收消息呢?

生產環境下,這個問題是必須考慮的。

這裏已經測試過,消費者是支持自動重連的,因此咱們能夠放心的使用這套架構來解決此問題。

本文到這裏就結束了,歡迎各位小夥伴點贊文章留言討論,一塊兒學習,一塊兒進步。

相關文章
相關標籤/搜索