Spring Boot中使用WebSocket總結(二):向指定用戶發送WebSocket消息並處理對方不在線的狀況

在上一篇文章(www.zifangsky.cn/1355.html)中我介紹了在Spring項目中使用WebSocket的幾種實現方式。可是,上篇文章中只介紹了服務端採用廣播模式給全部客戶端發送消息,然而咱們有時須要服務端給指定用戶的客戶端發送消息(好比:發送Web通知、實時打印用戶任務的日誌、兩個用戶點對點聊天等)。javascript

關於服務端如何給指定用戶的客戶端發送消息,通常能夠經過如下三種方案來實現:css

  • 方案一:WebSocket使用「Java提供的@ServerEndpoint註解」實現或者使用「Spring低層級API」實現,在創建鏈接時從HttpSession中獲取用戶登陸後的用戶名,而後把「用戶名+該WebSocket鏈接」存儲到ConcurrentHashMap。給指定用戶發送消息,只須要根據接收者的用戶名獲取對方已經創建的WebSocket鏈接,接着給他發送消息便可
  • 方案二:在頁面的監聽路徑前面動態添加當前登陸的「用戶ID/用戶名」,這樣給指定用戶發送消息,只須要發送廣播消息到監聽了前面那個路徑的客戶端便可。
  • 方案三:這種方案相似於方案一。使用Spring的高級API實現WebSocket,而後自定義HandshakeHandler類並重寫determineUser方法,其目的是爲了在創建鏈接時使用用戶登陸後的用戶名做爲這次WebSocket的憑證,最後咱們就能夠使用messagingTemplate.convertAndSendToUser方法給指定用戶發送消息了。

注:本篇文章的完整源碼能夠參考:github.com/zifangsky/W…html

使用SimpMessagingTemplate發送消息

使用org.springframework.messaging.simp.SimpMessagingTemplate類能夠在服務端的任意地方給客戶端發送消息。此外,在咱們配置Spring支持STOMP後SimpMessagingTemplate類就會被自動裝配到Spring的上下文中,所以咱們只須要在想要使用的地方使用@Autowired註解注入SimpMessagingTemplate便可使用。java

須要說明的是,SimpMessagingTemplate類有兩個重要的方法,它們分別是:jquery

  • public void convertAndSend(D destination, Object payload):給監聽了路徑destination的全部客戶端發送消息payload
  • public void convertAndSendToUser(String user, String destination, Object payload):給監聽了路徑destination的用戶user發送消息payload

一個簡單示例:

package cn.zifangsky.stompwebsocket.controller;

import cn.zifangsky.stompwebsocket.model.websocket.Greeting;
import cn.zifangsky.stompwebsocket.model.websocket.HelloMessage;
import cn.zifangsky.stompwebsocket.service.RedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;

/** * 測試{@link org.springframework.messaging.simp.SimpMessagingTemplate}類的基本用法 * @author zifangsky * @date 2018/10/10 * @since 1.0.0 */
@Controller
@RequestMapping(("/wsTemplate"))
public class MessageTemplateController {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private SimpUserRegistry userRegistry;

    @Resource(name = "redisServiceImpl")
    private RedisService redisService;

    /** * 簡單測試SimpMessagingTemplate的用法 */
    @PostMapping("/greeting")
    @ResponseBody
    public String greeting(@RequestBody Greeting greeting) {
        this.messagingTemplate.convertAndSend("/topic/greeting", new HelloMessage("Hello," + greeting.getName() + "!"));

        return "ok";
    }

}
複製代碼

很顯然,這裏發送的地址是上篇文章中最後那個示例監聽的地址,在客戶端頁面創建鏈接後,咱們使用Postman請求一下上面這個方法,效果以下:git

而後咱們能夠發現頁面中也收到消息了:github

向指定用戶發送WebSocket消息並處理對方不在線的狀況

給指定用戶發送消息:web

  • 若是接收者在線,則直接發送消息;
  • 不然將消息存儲到redis,等用戶上線後主動拉取未讀消息。

(1)自定義HandshakeInterceptor,用於禁止未登陸用戶鏈接WebSocket:

package cn.zifangsky.stompwebsocket.interceptor.websocket;

import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;

import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.Map;

/** * 自定義{@link org.springframework.web.socket.server.HandshakeInterceptor},實現「須要登陸才容許鏈接WebSocket」 * * @author zifangsky * @date 2018/10/11 * @since 1.0.0 */
@Component
public class AuthHandshakeInterceptor implements HandshakeInterceptor {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
        HttpSession session = SpringContextUtils.getSession();
        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

        if(loginUser != null){
            logger.debug(MessageFormat.format("用戶{0}請求創建WebSocket鏈接", loginUser.getUsername()));
            return true;
        }else{
            logger.error("未登陸系統,禁止鏈接WebSocket");
            return false;
        }

    }

    @Override
    public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {

    }

}
複製代碼

(2)自定義HandshakeHandler,用於在創建WebSocket的時候使用自定義的Principal:

package cn.zifangsky.stompwebsocket.interceptor.websocket;

import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

import javax.servlet.http.HttpSession;
import java.security.Principal;
import java.text.MessageFormat;
import java.util.Map;

/** * 自定義{@link org.springframework.web.socket.server.support.DefaultHandshakeHandler},實現「生成自定義的{@link java.security.Principal}」 * * @author zifangsky * @date 2018/10/11 * @since 1.0.0 */
@Component
public class MyHandshakeHandler extends DefaultHandshakeHandler{
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
        HttpSession session = SpringContextUtils.getSession();
        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

        if(loginUser != null){
            logger.debug(MessageFormat.format("WebSocket鏈接開始建立Principal,用戶:{0}", loginUser.getUsername()));
            return new MyPrincipal(loginUser.getUsername());
        }else{
            logger.error("未登陸系統,禁止鏈接WebSocket");
            return null;
        }
    }

}
複製代碼

相應地,這裏的MyPrincipal繼承了java.security.Principal類:ajax

package cn.zifangsky.stompwebsocket.interceptor.websocket;

import java.security.Principal;

/** * 自定義{@link java.security.Principal} * * @author zifangsky * @date 2018/10/11 * @since 1.0.0 */
public class MyPrincipal implements Principal {
    private String loginName;

    public MyPrincipal(String loginName) {
        this.loginName = loginName;
    }

    @Override
    public String getName() {
        return loginName;
    }
}
複製代碼

(3)自定義ChannelInterceptor,用於在用戶斷開鏈接的時候記錄日誌:

package cn.zifangsky.stompwebsocket.interceptor.websocket;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.stereotype.Component;

import java.security.Principal;
import java.text.MessageFormat;

/** * 自定義{@link org.springframework.messaging.support.ChannelInterceptor},實現斷開鏈接的處理 * * @author zifangsky * @date 2018/10/10 * @since 1.0.0 */
@Component
public class MyChannelInterceptor implements ChannelInterceptor{
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getCommand();

        //用戶已經斷開鏈接
        if(StompCommand.DISCONNECT.equals(command)){
            String user = "";
            Principal principal = accessor.getUser();
            if(principal != null && StringUtils.isNoneBlank(principal.getName())){
                user = principal.getName();
            }else{
                user = accessor.getSessionId();
            }

            logger.debug(MessageFormat.format("用戶{0}的WebSocket鏈接已經斷開", user));
        }
    }

}
複製代碼

(4)WebSocket相關的完整配置:

package cn.zifangsky.stompwebsocket.config;

import cn.zifangsky.stompwebsocket.interceptor.websocket.AuthHandshakeInterceptor;
import cn.zifangsky.stompwebsocket.interceptor.websocket.MyChannelInterceptor;
import cn.zifangsky.stompwebsocket.interceptor.websocket.MyHandshakeHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/** * WebSocket相關配置 * * @author zifangsky * @date 2018/9/30 * @since 1.0.0 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
    @Autowired
    private AuthHandshakeInterceptor authHandshakeInterceptor;

    @Autowired
    private MyHandshakeHandler myHandshakeHandler;

    @Autowired
    private MyChannelInterceptor myChannelInterceptor;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/stomp-websocket").withSockJS();

        registry.addEndpoint("/chat-websocket")
                .addInterceptors(authHandshakeInterceptor)
                .setHandshakeHandler(myHandshakeHandler)
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //客戶端須要把消息發送到/message/xxx地址
        registry.setApplicationDestinationPrefixes("/message");
        //服務端廣播消息的路徑前綴,客戶端須要相應訂閱/topic/yyy這個地址的消息
        registry.enableSimpleBroker("/topic");
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(myChannelInterceptor);
    }

}
複製代碼

(5)Controller中的消息處理以下:

package cn.zifangsky.stompwebsocket.controller;

import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.enums.ExpireEnum;
import cn.zifangsky.stompwebsocket.model.User;
import cn.zifangsky.stompwebsocket.model.websocket.Greeting;
import cn.zifangsky.stompwebsocket.model.websocket.HelloMessage;
import cn.zifangsky.stompwebsocket.service.RedisService;
import cn.zifangsky.stompwebsocket.utils.JsonUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** * 測試{@link org.springframework.messaging.simp.SimpMessagingTemplate}類的基本用法 * @author zifangsky * @date 2018/10/10 * @since 1.0.0 */
@Controller
@RequestMapping(("/wsTemplate"))
public class MessageTemplateController {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private SimpUserRegistry userRegistry;

    @Resource(name = "redisServiceImpl")
    private RedisService redisService;

    /** * 簡單測試SimpMessagingTemplate的用法 */
    @PostMapping("/greeting")
    @ResponseBody
    public String greeting(@RequestBody Greeting greeting) {
        this.messagingTemplate.convertAndSend("/topic/greeting", new HelloMessage("Hello," + greeting.getName() + "!"));

        return "ok";
    }

    /** * 給指定用戶發送WebSocket消息 */
    @PostMapping("/sendToUser")
    @ResponseBody
    public String chat(HttpServletRequest request) {
        //消息接收者
        String receiver = request.getParameter("receiver");
        //消息內容
        String msg = request.getParameter("msg");
        HttpSession session = SpringContextUtils.getSession();
        User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

        HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg));
        this.sendToUser(loginUser.getUsername(), receiver, "/topic/reply", JsonUtils.toJson(resultData));

        return "ok";
    }

    /** * 給指定用戶發送消息,並處理接收者不在線的狀況 * @param sender 消息發送者 * @param receiver 消息接收者 * @param destination 目的地 * @param payload 消息正文 */
    private void sendToUser(String sender, String receiver, String destination, String payload){
        SimpUser simpUser = userRegistry.getUser(receiver);

        //若是接收者存在,則發送消息
        if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){
            this.messagingTemplate.convertAndSendToUser(receiver, destination, payload);
        }
        //不然將消息存儲到redis,等用戶上線後主動拉取未讀消息
        else{
            //存儲消息的Redis列表名
            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;
            logger.info(MessageFormat.format("消息接收者{0}還未創建WebSocket鏈接,{1}發送的消息【{2}】將被存儲到Redis的【{3}】列表中", receiver, sender, payload, listKey));

            //存儲消息到Redis中
            redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload);
        }

    }


    /** * 拉取指定監聽路徑的未讀的WebSocket消息 * @param destination 指定監聽路徑 * @return java.util.Map<java.lang.String,java.lang.Object> */
    @PostMapping("/pullUnreadMessage")
    @ResponseBody
    public Map<String, Object> pullUnreadMessage(String destination){
        Map<String, Object> result = new HashMap<>();
        try {
            HttpSession session = SpringContextUtils.getSession();
            //當前登陸用戶
            User loginUser = (User) session.getAttribute(Constants.SESSION_USER);

            //存儲消息的Redis列表名
            String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;
            //從Redis中拉取全部未讀消息
            List<Object> messageList = redisService.rangeList(listKey, 0, -1);

            result.put("code", "200");
            if(messageList !=null && messageList.size() > 0){
                //刪除Redis中的這個未讀消息列表
                redisService.delete(listKey);
                //將數據添加到返回集,供前臺頁面展現
                result.put("result", messageList);
            }
        }catch (Exception e){
            result.put("code", "500");
            result.put("msg", e.getMessage());
        }

        return result;
    }

}
複製代碼

注:這裏對應的幾個Redis操做的方法以下:redis

@Override
public boolean delete(String key) {
	return redisTemplate.delete(key);
}

@Override
public void addToListLeft(String listKey, ExpireEnum expireEnum, Object... values) {
	//綁定操做
	BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
	//插入數據
	boundValueOperations.leftPushAll(values);
	//設置過時時間
	boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit());
}

@Override
public void addToListRight(String listKey, ExpireEnum expireEnum, Object... values) {
	//綁定操做
	BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
	//插入數據
	boundValueOperations.rightPushAll(values);
	//設置過時時間
	boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit());
}

@Override
public List<Object> rangeList(String listKey, long start, long end) {
	//綁定操做
	BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
	//查詢數據
	return boundValueOperations.range(start, end);
}
複製代碼

(6)示例頁面:

<html xmlns:th="http://www.thymeleaf.org">
<head>
    <meta content="text/html;charset=UTF-8"/>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
    <meta http-equiv="X-UA-Compatible" content="IE=edge"/>
    <meta name="viewport" content="width=device-width, initial-scale=1"/>
    <title>Chat With STOMP Message</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
    <script th:src="@{/layui/layui.js}"></script>
    <script th:src="@{/layui/lay/modules/layer.js}"></script>
    <link th:href="@{/layui/css/layui.css}" rel="stylesheet">
    <link th:href="@{/layui/css/modules/layer/default/layer.css}" rel="stylesheet">
    <link th:href="@{/css/style.css}" rel="stylesheet">
    <style type="text/css"> #connect-container { margin: 0 auto; width: 400px; } #connect-container div { padding: 5px; margin: 0 7px 10px 0; } .message input { padding: 5px; margin: 0 7px 10px 0; } .layui-btn { display: inline-block; } </style>
    <script type="text/javascript"> var stompClient = null; $(function () { var target = $("#target"); if (window.location.protocol === 'http:') { target.val('http://' + window.location.host + target.val()); } else { target.val('https://' + window.location.host + target.val()); } }); function setConnected(connected) { var connect = $("#connect"); var disconnect = $("#disconnect"); var echo = $("#echo"); if (connected) { connect.addClass("layui-btn-disabled"); disconnect.removeClass("layui-btn-disabled"); echo.removeClass("layui-btn-disabled"); } else { connect.removeClass("layui-btn-disabled"); disconnect.addClass("layui-btn-disabled"); echo.addClass("layui-btn-disabled"); } connect.attr("disabled", connected); disconnect.attr("disabled", !connected); echo.attr("disabled", !connected); } //鏈接 function connect() { var target = $("#target").val(); var ws = new SockJS(target); stompClient = Stomp.over(ws); stompClient.connect({}, function () { setConnected(true); log('Info: STOMP connection opened.'); //鏈接成功後,主動拉取未讀消息 pullUnreadMessage("/topic/reply"); //訂閱服務端的/topic/reply地址 stompClient.subscribe("/user/topic/reply", function (response) { log(JSON.parse(response.body).content); }) },function () { //斷開處理 setConnected(false); log('Info: STOMP connection closed.'); }); } //斷開鏈接 function disconnect() { if (stompClient != null) { stompClient.disconnect(); stompClient = null; } setConnected(false); log('Info: STOMP connection closed.'); } //向指定用戶發送消息 function sendMessage() { if (stompClient != null) { var receiver = $("#receiver").val(); var msg = $("#message").val(); log('Sent: ' + JSON.stringify({'receiver': receiver, 'msg':msg})); $.ajax({ url: "/wsTemplate/sendToUser", type: "POST", dataType: "json", async: true, data: { "receiver": receiver, "msg": msg }, success: function (data) { } }); } else { layer.msg('STOMP connection not established, please connect.', { offset: 'auto' ,icon: 2 }); } } //從服務器拉取未讀消息 function pullUnreadMessage(destination) { $.ajax({ url: "/wsTemplate/pullUnreadMessage", type: "POST", dataType: "json", async: true, data: { "destination": destination }, success: function (data) { if (data.result != null) { $.each(data.result, function (i, item) { log(JSON.parse(item).content); }) } else if (data.code !=null && data.code == "500") { layer.msg(data.msg, { offset: 'auto' ,icon: 2 }); } } }); } //日誌輸出 function log(message) { console.debug(message); } </script>
</head>
<body>
    <noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websockets rely on Javascript being
        enabled. Please enable
        Javascript and reload this page!</h2></noscript>
    <div>
        <div id="connect-container" class="layui-elem-field">
            <legend>Chat With STOMP Message</legend>
            <div>
                <input id="target" type="text" class="layui-input" size="40" style="width: 350px" value="/chat-websocket"/>
            </div>
            <div>
                <button id="connect" class="layui-btn layui-btn-normal" onclick="connect();">Connect</button>
                <button id="disconnect" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled" onclick="disconnect();">Disconnect
                </button>

            </div>
            <div class="message">
                <input id="receiver" type="text" class="layui-input" size="40" style="width: 350px" placeholder="接收者姓名" value=""/>
                <input id="message" type="text" class="layui-input" size="40" style="width: 350px" placeholder="消息內容" value=""/>
            </div>
            <div>
                <button id="echo" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled" onclick="sendMessage();">Send Message
                </button>
            </div>
        </div>
    </div>
</body>
</html>
複製代碼

啓動項目後,分別在兩個瀏覽器中使用不一樣的帳號登陸,接着互相給對方發送消息,效果以下:

界面一:

img

界面二:

img
相關文章
相關標籤/搜索