書接上文,咱們開始對咱們的小小聊天室進行集羣化改造。html
上文地址:前端
[WebSocket入門]手把手搭建WebSocket多人在線聊天室(SpringBoot+WebSocket)java
本文內容摘要:git
本文源碼:(媽媽不再用擔憂我沒法復現文章代碼啦)github
若是您以爲這個教程對您有用,請關注個人技術公衆號:Rude3Knife,不定時更新技術點滴。redis
分佈式就是爲了解決單點故障問題,想象一下,若是一個服務器承載了1000個大佬同時聊天,服務器忽然掛了,1000個大佬瞬間所有掉線,大概明天你就被大佬們吊起來打了。算法
當聊天室改成集羣后,就算服務器A掛了,服務器B上聊天的大佬們還能夠愉快的聊天,而且在前端還能經過代碼,讓鏈接A的大佬們快速重連至存活的服務器B,繼續和你們愉快的聊天,豈不美哉!spring
總結一下:實現了分佈式WebSocket後,咱們能夠將流量負載均衡到不一樣的服務器上並提供一種通訊機制讓各個服務器能進行消息同步(否則用戶A連上服務器A,用戶B臉上服務器B,它們發消息的時候對方都無法收到)。json
當咱們要實現分佈式的時候,咱們則須要在各個機器上共享這些信息,因此咱們須要一個Publish/Subscribe的中間件。咱們如今使用Redis做爲咱們的解決方案。
假設咱們的聊天室集羣有服務器A和B,用戶Alice鏈接在A上,Bob鏈接在B上、
Alice向聊天室的服務器A發送消息,A服務器必需要將收到的消息轉發到Redis,才能保證聊天室集羣的全部服務器(也就是A和B)可以拿到消息。不然,只有Alice在的服務器A可以讀到消息,用戶Bob在的服務器B並不能收到消息,A和B也就沒法聊天了。
說完了發送消息,那麼如何保證Alice發的消息,其餘全部人都能收到呢,前面咱們知道了Alice發送的消息已經被傳到了Redis的頻道,那麼全部服務器都必須訂閱這個Redis頻道,而後把這個頻道的消息轉發到本身的用戶那裏,這樣本身服務器所管轄的用戶就能收到消息。
上期咱們搭建了個websocket聊天室demo,而且使用了STOMP協議,可是我並無介紹到底什麼是STOMP協議,同窗們會有疑惑,這裏對於STOMP有很好地總結:
當直接使用WebSocket時(或SockJS)就很相似於使用TCP套接字來編寫Web應用。由於沒有高層級的線路協議(wire protocol),所以就須要咱們定義應用之間所發送消息的語義,還須要確保鏈接的兩端都能遵循這些語義。
就像HTTP在TCP套接字之上添加了請求-響應模型層同樣,STOMP在WebSocket之上提供了一個基於幀的線路格式(frame-based wire format)層,用來定義消息的語義。
與HTTP請求和響應相似,STOMP幀由命令、一個或多個頭信息以及負載所組成。例如,以下就是發送數據的一個STOMP幀:
>>> SEND
transaction:tx-0
destination:/app/marco
content-length:20
{"message":"Marco!"}
複製代碼
好了,介紹完了概念,讓咱們開始動手改造!
若是你不熟悉Redis的sub/pub(訂閱/發佈)功能,請看這裏進行簡單瞭解它的用法,很簡單:
redisbook.readthedocs.io/en/latest/f…
在咱們上篇文章的Demo基礎上,咱們進行集羣改造。上一篇文章的源碼見下方:
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
複製代碼
固然首先要確保你安裝了Redis,windows下安裝redis比較麻煩,你能夠搜索redis-for-windows下載安裝。
# redis 鏈接配置
spring.redis.database=0
spring.redis.host=127.0.0.1
spring.redis.password=
spring.redis.port=6379
spring.redis.ssl=false
# 空閒鏈接最大數
spring.redis.jedis.pool.max-idle=10
# 獲取鏈接最大等待時間(s)
spring.redis.jedis.pool.max-wait=60000
複製代碼
# Redis定義
redis.channel.msgToAll = websocket.msgToAll
複製代碼
package cn.monitor4all.springbootwebsocketdemo.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import java.net.Inet4Address;
import java.net.InetAddress;
/**
* Redis訂閱頻道屬性類
* @author yangzhendong01
*/
@Component
public class RedisListenerBean {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerBean.class);
@Value("${server.port}")
private String serverPort;
@Value("${redis.channel.msgToAll}")
private String msgToAll;
/**
* redis消息監聽器容器
* 能夠添加多個監聽不一樣話題的redis監聽器,只須要把消息監聽器和相應的消息訂閱處理器綁定,該消息監聽器
* 經過反射技術調用消息訂閱處理器的相關方法進行一些業務處理
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 監聽msgToAll
container.addMessageListener(listenerAdapter, new PatternTopic(msgToAll));
LOGGER.info("Subscribed Redis channel: " + msgToAll);
return container;
}
}
複製代碼
能夠看到,咱們在代碼裏監聽了redis頻道msgToAll,這個是在application.properties定義的,固然若是你懶得定義,這裏能夠寫死。
咱們單機聊天室的發送消息Controller是這樣的:
@MessageMapping("/chat.sendMessage")
@SendTo("/topic/public")
public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
return chatMessage;
複製代碼
咱們前端發給咱們消息後,直接給/topic/public轉發這個消息,讓其餘用戶收到。
在集羣中,咱們須要把消息轉發給Redis,而且不轉發給前端,而是讓服務端監聽Redis消息,在進行消息發送。
將Controller改成:
@Value("${redis.channel.msgToAll}")
private String msgToAll;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@MessageMapping("/chat.sendMessage")
public void sendMessage(@Payload ChatMessage chatMessage) {
try {
redisTemplate.convertAndSend(msgToAll, JsonUtil.parseObjToJson(chatMessage));
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
複製代碼
你會發現咱們在代碼中使用了JsonUtil將實體類ChatMessage轉爲了Json發送給了Redis,這個Json工具類須要使用到FaskJson依賴:
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
複製代碼
package cn.monitor4all.springbootwebsocketdemo.util;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* JSON 轉換
*/
public final class JsonUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(JsonUtil.class);
/**
* 把Java對象轉換成json字符串
*
* @param object 待轉化爲JSON字符串的Java對象
* @return json 串 or null
*/
public static String parseObjToJson(Object object) {
String string = null;
try {
string = JSONObject.toJSONString(object);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
return string;
}
/**
* 將Json字符串信息轉換成對應的Java對象
*
* @param json json字符串對象
* @param c 對應的類型
*/
public static <T> T parseJsonToObj(String json, Class<T> c) {
try {
JSONObject jsonObject = JSON.parseObject(json);
return JSON.toJavaObject(jsonObject, c);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
return null;
}
}
複製代碼
這樣,咱們接收到用戶發送消息的請求時,就將消息轉發給了redis的頻道websocket.msgToAll
單機的聊天室,咱們接收消息是經過Controller直接把消息轉發到全部人的頻道上,這樣就能在全部人的聊天框顯示。
在集羣中,咱們須要服務器把消息從Redis中拿出來,而且推送到本身管的用戶那邊,咱們在Service層實現消息的推送。
咱們在service實現發送,須要使用上述第二種方法。
新建類service/ChatService:
package cn.monitor4all.springbootwebsocketdemo.service;
import cn.monitor4all.springbootwebsocketdemo.model.ChatMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.simp.SimpMessageSendingOperations;
import org.springframework.stereotype.Service;
@Service
public class ChatService {
private static final Logger LOGGER = LoggerFactory.getLogger(ChatService.class);
@Autowired
private SimpMessageSendingOperations simpMessageSendingOperations;
public void sendMsg(@Payload ChatMessage chatMessage) {
LOGGER.info("Send msg by simpMessageSendingOperations:" + chatMessage.toString());
simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage);
}
}
複製代碼
咱們在哪裏調用這個service呢,咱們須要在監聽到消息後調用,因此咱們就要有下面的Redis監聽消息處理專用類
新建類redis/RedisListenerHandle:
package cn.monitor4all.springbootwebsocketdemo.redis;
import cn.monitor4all.springbootwebsocketdemo.model.ChatMessage;
import cn.monitor4all.springbootwebsocketdemo.service.ChatService;
import cn.monitor4all.springbootwebsocketdemo.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
/**
* Redis訂閱頻道處理類
* @author yangzhendong01
*/
@Component
public class RedisListenerHandle extends MessageListenerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerHandle.class);
@Value("${redis.channel.msgToAll}")
private String msgToAll;
@Value("${server.port}")
private String serverPort;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private ChatService chatService;
/**
* 收到監聽消息
* @param message
* @param bytes
*/
@Override
public void onMessage(Message message, byte[] bytes) {
byte[] body = message.getBody();
byte[] channel = message.getChannel();
String rawMsg;
String topic;
try {
rawMsg = redisTemplate.getStringSerializer().deserialize(body);
topic = redisTemplate.getStringSerializer().deserialize(channel);
LOGGER.info("Received raw message from topic:" + topic + ", raw message content:" + rawMsg);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return;
}
if (msgToAll.equals(topic)) {
LOGGER.info("Send message to all users:" + rawMsg);
ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class);
// 發送消息給全部在線Cid
chatService.sendMsg(chatMessage);
} else {
LOGGER.warn("No further operation with this topic!");
}
}
}
複製代碼
這樣,咱們的改造就基本完成了!咱們看一下效果
咱們將服務器運行在8080上,而後打開localhost:8080,起名Alice進入聊天室
隨後,咱們在application.properties中將端口server.port=8081
再次運行程序(別忘了開啓IDEA的「容許啓動多個並行服務」設置,否則會覆蓋掉你的8080服務,以下圖),在8081啓動一個聊天室,起名Bob進入聊天室。
以下兩圖,咱們已經能夠在不一樣端口的兩個聊天室,互相聊天了!(注意看url)
在互相發送消息是,咱們還可使用命令行監聽下Redis的頻道websocket.msgToAll,能夠看到雙方傳送的消息。以下圖:
咱們還能夠打開Chrome的F12控制檯,查看前端的控制檯發送消息的log,以下圖:
大功告成了嗎?
功能實現了,可是並不完美!你會發現,Bob的加入並無提醒Bob進入了聊天室(在單機版是有的),這是由於咱們在「加入聊天室」的代碼尚未修改,在加入時,只有Bob的服務器B裏的其餘用戶知道Bob加入了聊天室。咱們還能再進一步!
咱們須要彌補上面的不足,將用戶上線下線的廣播發送到全部服務器上。
此外,我還但願之後可以查詢集羣中全部的在線用戶,咱們在redis中添加一個set,來保存用戶名,這樣就能夠隨時獲得在線用戶的數量和名稱。
# Redis定義
redis.channel.userStatus = websocket.userStatus
redis.set.onlineUsers = websocket.onlineUsers
複製代碼
咱們增長兩個定義
第一個是新增redis頻道websocket.userStatus用來廣播用戶上下線消息
第二個是redis的set,用來保存在線用戶信息
container.addMessageListener(listenerAdapter, new PatternTopic(userStatus));
複製代碼
public void alertUserStatus(@Payload ChatMessage chatMessage) {
LOGGER.info("Alert user online by simpMessageSendingOperations:" + chatMessage.toString());
simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage);
}
複製代碼
在service中咱們向本服務器的用戶廣播消息,用戶上線或者下線的消息都經過這裏傳達。
@MessageMapping("/chat.addUser")
public void addUser(@Payload ChatMessage chatMessage, SimpMessageHeaderAccessor headerAccessor) {
LOGGER.info("User added in Chatroom:" + chatMessage.getSender());
try {
headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
redisTemplate.opsForSet().add(onlineUsers, chatMessage.getSender());
redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage));
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
複製代碼
咱們修改了addUser方法,在這裏往redis中廣播用戶上線的消息,並把用戶名username寫入redis的set中(websocket.onlineUsers)
@EventListener
public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
String username = (String) headerAccessor.getSessionAttributes().get("username");
if(username != null) {
LOGGER.info("User Disconnected : " + username);
ChatMessage chatMessage = new ChatMessage();
chatMessage.setType(ChatMessage.MessageType.LEAVE);
chatMessage.setSender(username);
try {
redisTemplate.opsForSet().remove(onlineUsers, username);
redisTemplate.convertAndSend(userStatus, JsonUtil.parseObjToJson(chatMessage));
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
}
複製代碼
在用戶關閉網頁時,websocket會調用該方法,咱們在這裏須要把用戶從redis的在線用戶set裏刪除,而且向集羣發送廣播,說明該用戶退出聊天室。
else if (userStatus.equals(topic)) {
ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class);
if (chatMessage != null) {
chatService.alertUserStatus(chatMessage);
}
複製代碼
在監聽類中咱們接受了來自userStatus頻道的消息,並調用service
此外,咱們還能夠在Reids中查詢到用戶信息:
有了這兩篇文章的基礎, 咱們固然還能實現如下的功能:
感興趣的同窗能夠本身試試看。
深刻淺出Websocket(二)分佈式Websocket集羣
Spring消息之STOMP:
咱們在本文中把單機版的聊天室改成了分佈式聊天室,大大提升了聊天室可用性。
本文工程源代碼:
單機版:
集羣版:
若是您以爲這個教程對您有用,請關注個人技術公衆號:Rude3Knife,不定時更新技術點滴。
我目前是一名後端開發工程師。主要關注後端開發,數據安全,爬蟲,邊緣計算等方向。
微信:yangzd1102(請註明來意)
Github:@qqxx6661
我的博客:
若是文章對你有幫助,不妨收藏起來並轉發給您的朋友們~