在上一篇文章(www.zifangsky.cn/1355.html)中我介紹了在Spring項目中使用WebSocket的幾種實現方式。可是,上篇文章中只介紹了服務端採用廣播模式給全部客戶端發送消息,然而咱們有時須要服務端給指定用戶的客戶端發送消息(好比:發送Web通知、實時打印用戶任務的日誌、兩個用戶點對點聊天等)。javascript
關於服務端如何給指定用戶的客戶端發送消息,通常能夠經過如下三種方案來實現:css
HttpSession
中獲取用戶登陸後的用戶名,而後把「用戶名+該WebSocket鏈接」存儲到ConcurrentHashMap
。給指定用戶發送消息,只須要根據接收者的用戶名獲取對方已經創建的WebSocket鏈接,接着給他發送消息便可。HandshakeHandler
類並重寫determineUser
方法,其目的是爲了在創建鏈接時使用用戶登陸後的用戶名做爲這次WebSocket的憑證,最後咱們就能夠使用messagingTemplate.convertAndSendToUser
方法給指定用戶發送消息了。注:本篇文章的完整源碼能夠參考:github.com/zifangsky/W…html
使用org.springframework.messaging.simp.SimpMessagingTemplate
類能夠在服務端的任意地方給客戶端發送消息。此外,在咱們配置Spring支持STOMP後SimpMessagingTemplate
類就會被自動裝配到Spring的上下文中,所以咱們只須要在想要使用的地方使用@Autowired
註解注入SimpMessagingTemplate便可使用。java
須要說明的是,SimpMessagingTemplate
類有兩個重要的方法,它們分別是:jquery
public void convertAndSend(D destination, Object payload)
:給監聽了路徑destination
的全部客戶端發送消息payload
public void convertAndSendToUser(String user, String destination, Object payload)
:給監聽了路徑destination
的用戶user
發送消息payload
package cn.zifangsky.stompwebsocket.controller;
import cn.zifangsky.stompwebsocket.model.websocket.Greeting;
import cn.zifangsky.stompwebsocket.model.websocket.HelloMessage;
import cn.zifangsky.stompwebsocket.service.RedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
/** * 測試{@link org.springframework.messaging.simp.SimpMessagingTemplate}類的基本用法 * @author zifangsky * @date 2018/10/10 * @since 1.0.0 */
@Controller
@RequestMapping(("/wsTemplate"))
public class MessageTemplateController {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
@Resource(name = "redisServiceImpl")
private RedisService redisService;
/** * 簡單測試SimpMessagingTemplate的用法 */
@PostMapping("/greeting")
@ResponseBody
public String greeting(@RequestBody Greeting greeting) {
this.messagingTemplate.convertAndSend("/topic/greeting", new HelloMessage("Hello," + greeting.getName() + "!"));
return "ok";
}
}
複製代碼
很顯然,這裏發送的地址是上篇文章中最後那個示例監聽的地址,在客戶端頁面創建鏈接後,咱們使用Postman
請求一下上面這個方法,效果以下:git
而後咱們能夠發現頁面中也收到消息了:github
給指定用戶發送消息:web
package cn.zifangsky.stompwebsocket.interceptor.websocket;
import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.Map;
/** * 自定義{@link org.springframework.web.socket.server.HandshakeInterceptor},實現「須要登陸才容許鏈接WebSocket」 * * @author zifangsky * @date 2018/10/11 * @since 1.0.0 */
@Component
public class AuthHandshakeInterceptor implements HandshakeInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Map<String, Object> map) throws Exception {
HttpSession session = SpringContextUtils.getSession();
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
if(loginUser != null){
logger.debug(MessageFormat.format("用戶{0}請求創建WebSocket鏈接", loginUser.getUsername()));
return true;
}else{
logger.error("未登陸系統,禁止鏈接WebSocket");
return false;
}
}
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
}
}
複製代碼
package cn.zifangsky.stompwebsocket.interceptor.websocket;
import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
import javax.servlet.http.HttpSession;
import java.security.Principal;
import java.text.MessageFormat;
import java.util.Map;
/** * 自定義{@link org.springframework.web.socket.server.support.DefaultHandshakeHandler},實現「生成自定義的{@link java.security.Principal}」 * * @author zifangsky * @date 2018/10/11 * @since 1.0.0 */
@Component
public class MyHandshakeHandler extends DefaultHandshakeHandler{
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
HttpSession session = SpringContextUtils.getSession();
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
if(loginUser != null){
logger.debug(MessageFormat.format("WebSocket鏈接開始建立Principal,用戶:{0}", loginUser.getUsername()));
return new MyPrincipal(loginUser.getUsername());
}else{
logger.error("未登陸系統,禁止鏈接WebSocket");
return null;
}
}
}
複製代碼
相應地,這裏的MyPrincipal
繼承了java.security.Principal
類:ajax
package cn.zifangsky.stompwebsocket.interceptor.websocket;
import java.security.Principal;
/** * 自定義{@link java.security.Principal} * * @author zifangsky * @date 2018/10/11 * @since 1.0.0 */
public class MyPrincipal implements Principal {
private String loginName;
public MyPrincipal(String loginName) {
this.loginName = loginName;
}
@Override
public String getName() {
return loginName;
}
}
複製代碼
package cn.zifangsky.stompwebsocket.interceptor.websocket;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.stereotype.Component;
import java.security.Principal;
import java.text.MessageFormat;
/** * 自定義{@link org.springframework.messaging.support.ChannelInterceptor},實現斷開鏈接的處理 * * @author zifangsky * @date 2018/10/10 * @since 1.0.0 */
@Component
public class MyChannelInterceptor implements ChannelInterceptor{
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getCommand();
//用戶已經斷開鏈接
if(StompCommand.DISCONNECT.equals(command)){
String user = "";
Principal principal = accessor.getUser();
if(principal != null && StringUtils.isNoneBlank(principal.getName())){
user = principal.getName();
}else{
user = accessor.getSessionId();
}
logger.debug(MessageFormat.format("用戶{0}的WebSocket鏈接已經斷開", user));
}
}
}
複製代碼
package cn.zifangsky.stompwebsocket.config;
import cn.zifangsky.stompwebsocket.interceptor.websocket.AuthHandshakeInterceptor;
import cn.zifangsky.stompwebsocket.interceptor.websocket.MyChannelInterceptor;
import cn.zifangsky.stompwebsocket.interceptor.websocket.MyHandshakeHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/** * WebSocket相關配置 * * @author zifangsky * @date 2018/9/30 * @since 1.0.0 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer{
@Autowired
private AuthHandshakeInterceptor authHandshakeInterceptor;
@Autowired
private MyHandshakeHandler myHandshakeHandler;
@Autowired
private MyChannelInterceptor myChannelInterceptor;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/stomp-websocket").withSockJS();
registry.addEndpoint("/chat-websocket")
.addInterceptors(authHandshakeInterceptor)
.setHandshakeHandler(myHandshakeHandler)
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//客戶端須要把消息發送到/message/xxx地址
registry.setApplicationDestinationPrefixes("/message");
//服務端廣播消息的路徑前綴,客戶端須要相應訂閱/topic/yyy這個地址的消息
registry.enableSimpleBroker("/topic");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(myChannelInterceptor);
}
}
複製代碼
package cn.zifangsky.stompwebsocket.controller;
import cn.zifangsky.stompwebsocket.common.Constants;
import cn.zifangsky.stompwebsocket.common.SpringContextUtils;
import cn.zifangsky.stompwebsocket.enums.ExpireEnum;
import cn.zifangsky.stompwebsocket.model.User;
import cn.zifangsky.stompwebsocket.model.websocket.Greeting;
import cn.zifangsky.stompwebsocket.model.websocket.HelloMessage;
import cn.zifangsky.stompwebsocket.service.RedisService;
import cn.zifangsky.stompwebsocket.utils.JsonUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** * 測試{@link org.springframework.messaging.simp.SimpMessagingTemplate}類的基本用法 * @author zifangsky * @date 2018/10/10 * @since 1.0.0 */
@Controller
@RequestMapping(("/wsTemplate"))
public class MessageTemplateController {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private SimpUserRegistry userRegistry;
@Resource(name = "redisServiceImpl")
private RedisService redisService;
/** * 簡單測試SimpMessagingTemplate的用法 */
@PostMapping("/greeting")
@ResponseBody
public String greeting(@RequestBody Greeting greeting) {
this.messagingTemplate.convertAndSend("/topic/greeting", new HelloMessage("Hello," + greeting.getName() + "!"));
return "ok";
}
/** * 給指定用戶發送WebSocket消息 */
@PostMapping("/sendToUser")
@ResponseBody
public String chat(HttpServletRequest request) {
//消息接收者
String receiver = request.getParameter("receiver");
//消息內容
String msg = request.getParameter("msg");
HttpSession session = SpringContextUtils.getSession();
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
HelloMessage resultData = new HelloMessage(MessageFormat.format("{0} say: {1}", loginUser.getUsername(), msg));
this.sendToUser(loginUser.getUsername(), receiver, "/topic/reply", JsonUtils.toJson(resultData));
return "ok";
}
/** * 給指定用戶發送消息,並處理接收者不在線的狀況 * @param sender 消息發送者 * @param receiver 消息接收者 * @param destination 目的地 * @param payload 消息正文 */
private void sendToUser(String sender, String receiver, String destination, String payload){
SimpUser simpUser = userRegistry.getUser(receiver);
//若是接收者存在,則發送消息
if(simpUser != null && StringUtils.isNoneBlank(simpUser.getName())){
this.messagingTemplate.convertAndSendToUser(receiver, destination, payload);
}
//不然將消息存儲到redis,等用戶上線後主動拉取未讀消息
else{
//存儲消息的Redis列表名
String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + receiver + ":" + destination;
logger.info(MessageFormat.format("消息接收者{0}還未創建WebSocket鏈接,{1}發送的消息【{2}】將被存儲到Redis的【{3}】列表中", receiver, sender, payload, listKey));
//存儲消息到Redis中
redisService.addToListRight(listKey, ExpireEnum.UNREAD_MSG, payload);
}
}
/** * 拉取指定監聽路徑的未讀的WebSocket消息 * @param destination 指定監聽路徑 * @return java.util.Map<java.lang.String,java.lang.Object> */
@PostMapping("/pullUnreadMessage")
@ResponseBody
public Map<String, Object> pullUnreadMessage(String destination){
Map<String, Object> result = new HashMap<>();
try {
HttpSession session = SpringContextUtils.getSession();
//當前登陸用戶
User loginUser = (User) session.getAttribute(Constants.SESSION_USER);
//存儲消息的Redis列表名
String listKey = Constants.REDIS_UNREAD_MSG_PREFIX + loginUser.getUsername() + ":" + destination;
//從Redis中拉取全部未讀消息
List<Object> messageList = redisService.rangeList(listKey, 0, -1);
result.put("code", "200");
if(messageList !=null && messageList.size() > 0){
//刪除Redis中的這個未讀消息列表
redisService.delete(listKey);
//將數據添加到返回集,供前臺頁面展現
result.put("result", messageList);
}
}catch (Exception e){
result.put("code", "500");
result.put("msg", e.getMessage());
}
return result;
}
}
複製代碼
注:這裏對應的幾個Redis操做的方法以下:redis
@Override
public boolean delete(String key) {
return redisTemplate.delete(key);
}
@Override
public void addToListLeft(String listKey, ExpireEnum expireEnum, Object... values) {
//綁定操做
BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
//插入數據
boundValueOperations.leftPushAll(values);
//設置過時時間
boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit());
}
@Override
public void addToListRight(String listKey, ExpireEnum expireEnum, Object... values) {
//綁定操做
BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
//插入數據
boundValueOperations.rightPushAll(values);
//設置過時時間
boundValueOperations.expire(expireEnum.getTime(),expireEnum.getTimeUnit());
}
@Override
public List<Object> rangeList(String listKey, long start, long end) {
//綁定操做
BoundListOperations<String, Object> boundValueOperations = redisTemplate.boundListOps(listKey);
//查詢數據
return boundValueOperations.range(start, end);
}
複製代碼
<html xmlns:th="http://www.thymeleaf.org">
<head>
<meta content="text/html;charset=UTF-8"/>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=edge"/>
<meta name="viewport" content="width=device-width, initial-scale=1"/>
<title>Chat With STOMP Message</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.1.4/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
<script th:src="@{/layui/layui.js}"></script>
<script th:src="@{/layui/lay/modules/layer.js}"></script>
<link th:href="@{/layui/css/layui.css}" rel="stylesheet">
<link th:href="@{/layui/css/modules/layer/default/layer.css}" rel="stylesheet">
<link th:href="@{/css/style.css}" rel="stylesheet">
<style type="text/css"> #connect-container { margin: 0 auto; width: 400px; } #connect-container div { padding: 5px; margin: 0 7px 10px 0; } .message input { padding: 5px; margin: 0 7px 10px 0; } .layui-btn { display: inline-block; } </style>
<script type="text/javascript"> var stompClient = null; $(function () { var target = $("#target"); if (window.location.protocol === 'http:') { target.val('http://' + window.location.host + target.val()); } else { target.val('https://' + window.location.host + target.val()); } }); function setConnected(connected) { var connect = $("#connect"); var disconnect = $("#disconnect"); var echo = $("#echo"); if (connected) { connect.addClass("layui-btn-disabled"); disconnect.removeClass("layui-btn-disabled"); echo.removeClass("layui-btn-disabled"); } else { connect.removeClass("layui-btn-disabled"); disconnect.addClass("layui-btn-disabled"); echo.addClass("layui-btn-disabled"); } connect.attr("disabled", connected); disconnect.attr("disabled", !connected); echo.attr("disabled", !connected); } //鏈接 function connect() { var target = $("#target").val(); var ws = new SockJS(target); stompClient = Stomp.over(ws); stompClient.connect({}, function () { setConnected(true); log('Info: STOMP connection opened.'); //鏈接成功後,主動拉取未讀消息 pullUnreadMessage("/topic/reply"); //訂閱服務端的/topic/reply地址 stompClient.subscribe("/user/topic/reply", function (response) { log(JSON.parse(response.body).content); }) },function () { //斷開處理 setConnected(false); log('Info: STOMP connection closed.'); }); } //斷開鏈接 function disconnect() { if (stompClient != null) { stompClient.disconnect(); stompClient = null; } setConnected(false); log('Info: STOMP connection closed.'); } //向指定用戶發送消息 function sendMessage() { if (stompClient != null) { var receiver = $("#receiver").val(); var msg = $("#message").val(); log('Sent: ' + JSON.stringify({'receiver': receiver, 'msg':msg})); $.ajax({ url: "/wsTemplate/sendToUser", type: "POST", dataType: "json", async: true, data: { "receiver": receiver, "msg": msg }, success: function (data) { } }); } else { layer.msg('STOMP connection not established, please connect.', { offset: 'auto' ,icon: 2 }); } } //從服務器拉取未讀消息 function pullUnreadMessage(destination) { $.ajax({ url: "/wsTemplate/pullUnreadMessage", type: "POST", dataType: "json", async: true, data: { "destination": destination }, success: function (data) { if (data.result != null) { $.each(data.result, function (i, item) { log(JSON.parse(item).content); }) } else if (data.code !=null && data.code == "500") { layer.msg(data.msg, { offset: 'auto' ,icon: 2 }); } } }); } //日誌輸出 function log(message) { console.debug(message); } </script>
</head>
<body>
<noscript><h2 style="color: #ff0000">Seems your browser doesn't support Javascript! Websockets rely on Javascript being
enabled. Please enable
Javascript and reload this page!</h2></noscript>
<div>
<div id="connect-container" class="layui-elem-field">
<legend>Chat With STOMP Message</legend>
<div>
<input id="target" type="text" class="layui-input" size="40" style="width: 350px" value="/chat-websocket"/>
</div>
<div>
<button id="connect" class="layui-btn layui-btn-normal" onclick="connect();">Connect</button>
<button id="disconnect" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled" onclick="disconnect();">Disconnect
</button>
</div>
<div class="message">
<input id="receiver" type="text" class="layui-input" size="40" style="width: 350px" placeholder="接收者姓名" value=""/>
<input id="message" type="text" class="layui-input" size="40" style="width: 350px" placeholder="消息內容" value=""/>
</div>
<div>
<button id="echo" class="layui-btn layui-btn-normal layui-btn-disabled" disabled="disabled" onclick="sendMessage();">Send Message
</button>
</div>
</div>
</div>
</body>
</html>
複製代碼
啓動項目後,分別在兩個瀏覽器中使用不一樣的帳號登陸,接着互相給對方發送消息,效果以下:
界面一:
界面二: