WebSocket集羣分佈式改造:實現多人在線聊天室

前言

書接上文,咱們開始對咱們的小小聊天室進行集羣化改造。html

上文地址:前端

[WebSocket入門]手把手搭建WebSocket多人在線聊天室(SpringBoot+WebSocket)java

本文內容摘要:git

  • 爲什麼要改造爲分佈式集羣
  • 如何改造爲分佈式集羣
    • 用戶在聊天室集羣如何發消息
    • 用戶在聊天室集羣如何接收消息
  • 補充知識點:STOMP 簡介
  • 功能一:向聊天室集羣中的全體用戶發消息——Redis的訂閱/發佈
  • 功能二:集羣集羣用戶上下線通知——Redis訂閱發佈
  • 功能三:集羣用戶信息維護——Redis集合
  • WebSocket集羣還有哪些可能性

本文源碼:(媽媽不再用擔憂我沒法復現文章代碼啦)github

https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/集羣版web

若是您以爲這個教程對您有用,請關注個人技術公衆號:Rude3Knife,不定時更新技術點滴。redis

正文

WebSocket集羣/分佈式改造:實現多人在線聊天室

爲什麼要改造爲分佈式集羣

分佈式就是爲了解決單點故障問題,想象一下,若是一個服務器承載了1000個大佬同時聊天,服務器忽然掛了,1000個大佬瞬間所有掉線,大概明天你就被大佬們吊起來打了。算法

當聊天室改成集羣后,就算服務器A掛了,服務器B上聊天的大佬們還能夠愉快的聊天,而且在前端還能經過代碼,讓鏈接A的大佬們快速重連至存活的服務器B,繼續和你們愉快的聊天,豈不美哉!spring

總結一下:實現了分佈式WebSocket後,咱們能夠將流量負載均衡到不一樣的服務器上並提供一種通訊機制讓各個服務器能進行消息同步(否則用戶A連上服務器A,用戶B臉上服務器B,它們發消息的時候對方都無法收到)。json

如何改造爲分佈式集羣

當咱們要實現分佈式的時候,咱們則須要在各個機器上共享這些信息,因此咱們須要一個Publish/Subscribe的中間件。咱們如今使用Redis做爲咱們的解決方案。

1. 用戶在聊天室集羣如何發消息

假設咱們的聊天室集羣有服務器A和B,用戶Alice鏈接在A上,Bob鏈接在B上、

Alice向聊天室的服務器A發送消息,A服務器必需要將收到的消息轉發到Redis,才能保證聊天室集羣的全部服務器(也就是A和B)可以拿到消息。不然,只有Alice在的服務器A可以讀到消息,用戶Bob在的服務器B並不能收到消息,A和B也就沒法聊天了。

2. 用戶在聊天室集羣如何接收消息

說完了發送消息,那麼如何保證Alice發的消息,其餘全部人都能收到呢,前面咱們知道了Alice發送的消息已經被傳到了Redis的頻道,那麼全部服務器都必須訂閱這個Redis頻道,而後把這個頻道的消息轉發到本身的用戶那裏,這樣本身服務器所管轄的用戶就能收到消息。

補充知識點:STOMP 簡介

上期咱們搭建了個websocket聊天室demo,而且使用了STOMP協議,可是我並無介紹到底什麼是STOMP協議,同窗們會有疑惑,這裏對於STOMP有很好地總結:

當直接使用WebSocket時(或SockJS)就很相似於使用TCP套接字來編寫Web應用。由於沒有高層級的線路協議(wire protocol),所以就須要咱們定義應用之間所發送消息的語義,還須要確保鏈接的兩端都能遵循這些語義。

就像HTTP在TCP套接字之上添加了請求-響應模型層同樣,STOMP在WebSocket之上提供了一個基於幀的線路格式(frame-based wire format)層,用來定義消息的語義。

與HTTP請求和響應相似,STOMP幀由命令、一個或多個頭信息以及負載所組成。例如,以下就是發送數據的一個STOMP幀:

>>> SEND
transaction:tx-0
destination:/app/marco
content-length:20

{"message":"Marco!"}

好了,介紹完了概念,讓咱們開始動手改造!

功能一:向聊天室集羣中的全體用戶發消息——Redis的訂閱/發佈

若是你不熟悉Redis的sub/pub(訂閱/發佈)功能,請看這裏進行簡單瞭解它的用法,很簡單:

https://redisbook.readthedocs.io/en/latest/feature/pubsub.html

在咱們上篇文章的Demo基礎上,咱們進行集羣改造。上一篇文章的源碼見下方:

https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/單機版

1. 添加Redis依賴pom

<!-- redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2. application.properties新增redis配置

固然首先要確保你安裝了Redis,windows下安裝redis比較麻煩,你能夠搜索redis-for-windows下載安裝。

# redis 鏈接配置
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.password=
spring.redis.port=6379
spring.redis.ssl=false
# 空閒鏈接最大數
spring.redis.jedis.pool.max-idle=10
# 獲取鏈接最大等待時間(s)
spring.redis.jedis.pool.max-wait=60000

3. 在application.properties添加頻道名定義

# Redis定義
redis.channel.msgToAll = websocket.msgToAll

4. 新建redis/RedisListenerBean

package cn.monitor4all.springbootwebsocketdemo.redis;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

import java.net.Inet4Address;
import java.net.InetAddress;

/**
 * Redis訂閱頻道屬性類
 * @author yangzhendong01
 */
@Component
public class RedisListenerBean {

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerBean.class);

    @Value("${server.port}")
    private String serverPort;

    @Value("${redis.channel.msgToAll}")
    private String msgToAll;

    /**
     * redis消息監聽器容器
     * 能夠添加多個監聽不一樣話題的redis監聽器,只須要把消息監聽器和相應的消息訂閱處理器綁定,該消息監聽器
     * 經過反射技術調用消息訂閱處理器的相關方法進行一些業務處理
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        // 監聽msgToAll
        container.addMessageListener(listenerAdapter, new PatternTopic(msgToAll));
        LOGGER.info("Subscribed Redis channel: " + msgToAll);
        return container;
    }
}

能夠看到,咱們在代碼裏監聽了redis頻道msgToAll,這個是在application.properties定義的,固然若是你懶得定義,這裏能夠寫死。

5. 聊天室集羣:發消息改造

咱們單機聊天室的發送消息Controller是這樣的:

@MessageMapping("/chat.sendMessage")
@SendTo("/topic/public")
    public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
        return chatMessage;

咱們前端發給咱們消息後,直接給/topic/public轉發這個消息,讓其餘用戶收到。

在集羣中,咱們須要把消息轉發給Redis,而且不轉發給前端,而是讓服務端監聽Redis消息,在進行消息發送。

將Controller改成:

@Value("${redis.channel.msgToAll}")
private String msgToAll;

@Autowired
private RedisTemplate<String, String> redisTemplate;
    
@MessageMapping("/chat.sendMessage")
    public void sendMessage(@Payload ChatMessage chatMessage) {
        try {
            redisTemplate.convertAndSend(msgToAll, JsonUtil.parseObjToJson(chatMessage));
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

你會發現咱們在代碼中使用了JsonUtil將實體類ChatMessage轉爲了Json發送給了Redis,這個Json工具類須要使用到FaskJson依賴:

  1. pom添加FastJson依賴
<!-- json -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.58</version>
</dependency>
  1. 添加Json解析工具類JsonUtil,提供對象轉Json,Json轉對象的能力
package cn.monitor4all.springbootwebsocketdemo.util;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * JSON 轉換
 */
public final class JsonUtil {

    private static final Logger LOGGER = LoggerFactory.getLogger(JsonUtil.class);

    /**
     * 把Java對象轉換成json字符串
     *
     * @param object 待轉化爲JSON字符串的Java對象
     * @return json 串 or null
     */
    public static String parseObjToJson(Object object) {
        String string = null;
        try {
            string = JSONObject.toJSONString(object);
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
        return string;
    }

    /**
     * 將Json字符串信息轉換成對應的Java對象
     *
     * @param json json字符串對象
     * @param c    對應的類型
     */
    public static <T> T parseJsonToObj(String json, Class<T> c) {
        try {
            JSONObject jsonObject = JSON.parseObject(json);
            return JSON.toJavaObject(jsonObject, c);
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
        return null;
    }
}

這樣,咱們接收到用戶發送消息的請求時,就將消息轉發給了redis的頻道websocket.msgToAll

6. 聊天室集羣:接收消息改造

單機的聊天室,咱們接收消息是經過Controller直接把消息轉發到全部人的頻道上,這樣就能在全部人的聊天框顯示。

在集羣中,咱們須要服務器把消息從Redis中拿出來,而且推送到本身管的用戶那邊,咱們在Service層實現消息的推送。

  • 在處理消息以後發送消息:
    正如前面看到的那樣,使用 @MessageMapping 或者 @SubscribeMapping 註解能夠處理客戶端發送過來的消息,並選擇方法是否有返回值。
    若是 @MessageMapping註解的控制器方法有返回值的話,返回值會被髮送到消息代理,只不過會添加上"/topic"前綴。可使用@SendTo 重寫消息目的地;
    若是 @SubscribeMapping註解的控制器方法有返回值的話,返回值會直接發送到客戶端,不通過代理。若是加上@SendTo 註解的話,則要通過消息代理。
  • 在應用的任意地方發送消息:
    spring-websocket 定義了一個 SimpMessageSendingOperations 接口(或者使用SimpMessagingTemplate ),能夠實現自由的向任意目的地發送消息,而且訂閱此目的地的全部用戶都能收到消息。

咱們在service實現發送,須要使用上述第二種方法。

新建類service/ChatService:

package cn.monitor4all.springbootwebsocketdemo.service;

import cn.monitor4all.springbootwebsocketdemo.model.ChatMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Service;

@Service
public class ChatService {

    private static final Logger LOGGER = LoggerFactory.getLogger(ChatService.class);

    @Autowired
    private SimpMessageSendingOperations simpMessageSendingOperations;

    public void sendMsg(@Payload ChatMessage chatMessage) {
        LOGGER.info("Send msg by simpMessageSendingOperations:" + chatMessage.toString());
        simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage);
    }

}

咱們在哪裏調用這個service呢,咱們須要在監聽到消息後調用,因此咱們就要有下面的Redis監聽消息處理專用類

新建類redis/RedisListenerHandle:

package cn.monitor4all.springbootwebsocketdemo.redis;

import cn.monitor4all.springbootwebsocketdemo.model.ChatMessage;
import cn.monitor4all.springbootwebsocketdemo.service.ChatService;
import cn.monitor4all.springbootwebsocketdemo.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

/**
 * Redis訂閱頻道處理類
 * @author yangzhendong01
 */
@Component
public class RedisListenerHandle extends MessageListenerAdapter {

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerHandle.class);

    @Value("${redis.channel.msgToAll}")
    private String msgToAll;

    @Value("${server.port}")
    private String serverPort;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private ChatService chatService;

    /**
     * 收到監聽消息
     * @param message
     * @param bytes
     */
    @Override
    public void onMessage(Message message, byte[] bytes) {
        byte[] body = message.getBody();
        byte[] channel = message.getChannel();
        String rawMsg;
        String topic;
        try {
            rawMsg = redisTemplate.getStringSerializer().deserialize(body);
            topic = redisTemplate.getStringSerializer().deserialize(channel);
            LOGGER.info("Received raw message from topic:" + topic + ", raw message content:" + rawMsg);
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
            return;
        }


        if (msgToAll.equals(topic)) {
            LOGGER.info("Send message to all users:" + rawMsg);
            ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class);
            // 發送消息給全部在線Cid
            chatService.sendMsg(chatMessage);
        } else {
            LOGGER.warn("No further operation with this topic!");
        }
    }
}

7. 看看效果

這樣,咱們的改造就基本完成了!咱們看一下效果

咱們將服務器運行在8080上,而後打開localhost:8080,起名Alice進入聊天室

隨後,咱們在application.properties中將端口server.port=8081

再次運行程序(別忘了開啓IDEA的「容許啓動多個並行服務」設置,否則會覆蓋掉你的8080服務,以下圖),在8081啓動一個聊天室,起名Bob進入聊天室。

以下兩圖,咱們已經能夠在不一樣端口的兩個聊天室,互相聊天了!(注意看url)

在互相發送消息是,咱們還可使用命令行監聽下Redis的頻道websocket.msgToAll,能夠看到雙方傳送的消息。以下圖:

咱們還能夠打開Chrome的F12控制檯,查看前端的控制檯發送消息的log,以下圖:

大功告成了嗎?

功能實現了,可是並不完美!你會發現,Bob的加入並無提醒Bob進入了聊天室(在單機版是有的),這是由於咱們在「加入聊天室」的代碼尚未修改,在加入時,只有Bob的服務器B裏的其餘用戶知道Bob加入了聊天室。咱們還能再進一步!

功能二/功能三:集羣用戶上下線通知,集羣用戶信息存儲

咱們須要彌補上面的不足,將用戶上線下線的廣播發送到全部服務器上。

此外,我還但願之後可以查詢集羣中全部的在線用戶,咱們在redis中添加一個set,來保存用戶名,這樣就能夠隨時獲得在線用戶的數量和名稱。

1. 在application.properties添加頻道名定義

# Redis定義
redis.channel.userStatus = websocket.userStatus
redis.set.onlineUsers = websocket.onlineUsers

咱們增長兩個定義

  • 第一個是新增redis頻道websocket.userStatus用來廣播用戶上下線消息

  • 第二個是redis的set,用來保存在線用戶信息

2. 在RedisListenerBean添加新頻道監聽

container.addMessageListener(listenerAdapter, new PatternTopic(userStatus));

3. 在ChatService中添加

public void alertUserStatus(@Payload ChatMessage chatMessage) {
        LOGGER.info("Alert user online by simpMessageSendingOperations:" + chatMessage.toString());
        simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage);
    }

在service中咱們向本服務器的用戶廣播消息,用戶上線或者下線的消息都經過這裏傳達。

4. 修改ChatController中的addUser方法

@MessageMapping("/chat.addUser")
    public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {

        LOGGER.info("User added in Chatroom:" + chatMessage.getSender());
        try {
            headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
            redisTemplate.opsForSet().add(onlineUsers, chatMessage.getSender());
            redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage));
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

咱們修改了addUser方法,在這裏往redis中廣播用戶上線的消息,並把用戶名username寫入redis的set中(websocket.onlineUsers)

5. 修改WebSocketEventListener中的handleWebSocketDisconnectListener方法

@EventListener
    public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {

        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

        String username = (String) headerAccessor.getSessionAttributes().get("username");

        if(username != null) {
            LOGGER.info("User Disconnected : " + username);
            ChatMessage chatMessage = new ChatMessage();
            chatMessage.setType(ChatMessage.MessageType.LEAVE);
            chatMessage.setSender(username);
            try {
                redisTemplate.opsForSet().remove(onlineUsers, username);
                redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage));
            } catch (Exception e) {
                LOGGER.error(e.getMessage(), e);
            }

        }
    }

在用戶關閉網頁時,websocket會調用該方法,咱們在這裏須要把用戶從redis的在線用戶set裏刪除,而且向集羣發送廣播,說明該用戶退出聊天室。

6. 修改Redis監聽類RedisListenerHandle

else if (userStatus.equals(topic)) {
            ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class);
            if (chatMessage != null) {
                chatService.alertUserStatus(chatMessage);
            }

在監聽類中咱們接受了來自userStatus頻道的消息,並調用service

7. 看看效果

此外,咱們還能夠在Reids中查詢到用戶信息:

WebSocket集羣還有哪些可能性

有了這兩篇文章的基礎, 咱們固然還能實現如下的功能:

  • 某用戶A單獨私信給某用戶B,或者私信給某用戶羣(用戶B和C)
  • 系統提供外部調用接口,給指定用戶/用戶羣發送消息,實現消息推送
  • 系統提供外部接口,實時獲取用戶數據(人數/用戶信息)

感興趣的同窗能夠本身試試看。

參考文獻

深刻淺出Websocket(二)分佈式Websocket集羣

https://juejin.im/post/6844903584929153032

Spring消息之STOMP:

http://www.javashuo.com/article/p-mugrohor-nt.html

總結

咱們在本文中把單機版的聊天室改成了分佈式聊天室,大大提升了聊天室可用性。

本文工程源代碼:

單機版:

https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/單機版

集羣版:

https://github.com/qqxx6661/springboot-websocket-demo/releases/tag/集羣版

若是您以爲這個教程對您有用,請關注個人技術公衆號:Rude3Knife,不定時更新技術點滴。

關注我

我目前是一名後端開發工程師。主要關注後端開發,數據安全,爬蟲,邊緣計算等方向。

微信:yangzd1102(請註明來意)

Github:@qqxx6661

我的博客:

原創博客主要內容

  • Java知識點複習全手冊
  • Leetcode算法題解析
  • 劍指offer算法題解析
  • SpringCloud菜鳥入門實戰系列
  • SpringBoot菜鳥入門實戰系列
  • 爬蟲相關技術文章
  • 後端開發相關技術文章

我的公衆號:後端技術漫談

我的公衆號:後端技術漫談

若是文章對你有幫助,不妨收藏起來並轉發給您的朋友們~

相關文章
相關標籤/搜索