最近在公司項目中接到個需求。就是後臺跟前端瀏覽器要保持長鏈接,後臺主動往前臺推數據。javascript
網上查了下,websocket stomp協議處理這個很簡單。尤爲是跟springboot 集成。前端
可是因爲開始是單機玩的,很順利。java
可是後面部署到生產搞集羣的話,就會出問題了。web
假如集羣兩個節點,瀏覽器A與節點A創建鏈接,A節點發的消息瀏覽器A節點確定能收到。可是B節點因爲沒有跟瀏覽器A創建鏈接。B節點發的消息瀏覽器就收不到了。redis
網上也查了好多,可是沒有一個說的很清楚的,也不少都是理論層面的。spring
還有不少思路都是經過session獲取信息的。可是這都不是我須要的。我須要的是從前臺傳遞參數,鏈接的時候每一個節點保存下。而後經過SimpleUserRegistry.getUser獲取。瀏覽器
話很少說,直接上代碼。springboot
<script type="text/javascript" src="${request.contextPath}/scripts/sockjs.min.js"></script> <script type="text/javascript" src="${request.contextPath}/scripts/stomp.min.js"></script> var WEB_SOCKET = { topic : "", url : "", stompClient : null, connect : function(url, topic, callback,userid) { this.url = url; this.topic = topic; var socket = new SockJS(url); //鏈接SockJS的endpoint名稱爲"endpointOyzc" WEB_SOCKET.stompClient = Stomp.over(socket);//使用STMOP子協議的WebSocket客戶端 WEB_SOCKET.stompClient.connect({userid:userid},function(frame){//鏈接WebSocket服務端 // console.log('Connected:' + frame); //經過stompClient.subscribe訂閱/topic/getResponse 目標(destination)發送的消息 WEB_SOCKET.stompClient.subscribe(topic, callback); }); } };
這是響應的前端代碼。只須要引入兩個js。調用new SockJS(url) 就表明跟服務器創建鏈接了。服務器
@Configuration //註解開啓使用STOMP協議來傳輸基於代理(message broker)的消息,這時控制器支持使用@MessageMapping,就像使用@RequestMapping同樣 @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { @Autowired private GetHeaderParamInterceptor getHeaderParamInterceptor; @Override //註冊STOMP協議的節點(endpoint),並映射指定的url public void registerStompEndpoints(StompEndpointRegistry registry) { //註冊一個STOMP的endpoint,並指定使用SockJS協議 registry.addEndpoint("/endpointOyzc") .setAllowedOrigins("*") .withSockJS(); /* registry.addEndpoint("/endpointOyzc") .setAllowedOrigins("*") .setHandshakeHandler(xlHandshakeHandler) .withSockJS();*/ } @Override //配置消息代理(Message Broker) public void configureMessageBroker(MessageBrokerRegistry registry) { //點對點應配置一個/user消息代理,廣播式應配置一個/topic消息代理 registry.enableSimpleBroker("/topic", "/user"); // 全局使用的消息前綴(客戶端訂閱路徑上會體現出來) //registry.setApplicationDestinationPrefixes("/app"); //點對點使用的訂閱前綴(客戶端訂閱路徑上會體現出來),不設置的話,默認也是/user/ registry.setUserDestinationPrefix("/user"); } /** * 採用自定義攔截器,獲取connect時候傳遞的參數 * * @param registration */ @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(getHeaderParamInterceptor); } }
注:上面的endpointOyzc就是前端的url。後面註冊端點,前臺連接。websocket
而後注意下configureClientInboundChannel這個方法,這個方法裏面注入攔截器就是爲了連接時候接收參數的。
/** * @author : hao * @description : websocket創建連接的時候獲取headeri裏認證的參數攔截器。 * @time : 2019/7/3 20:42 */ @Component public class GetHeaderParamInterceptor extends ChannelInterceptorAdapter { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (StompCommand.CONNECT.equals(accessor.getCommand())) { Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS); if (raw instanceof Map) { Object name = ((Map) raw).get("userid"); if (name instanceof LinkedList) { // 設置當前訪問的認證用戶 accessor.setUser(new JqxxPrincipal(((LinkedList) name).get(0).toString())); } } } return message; } }
/** * @author : hao * @description : 自定義的java.security.Principal * @time : 2019/7/3 20:42 */ public class JqxxPrincipal implements Principal { private String loginName; public JqxxPrincipal(String loginName) { this.loginName = loginName; } @Override public String getName() { return loginName; } }
這樣就存入的前臺傳的參數。
後臺發消息的時候怎麼發呢?
/** * @author : hao * @description : websocket發送代理,負責發送消息 * @time : 2019/7/4 11:01 */ @Component @Slf4j public class WebsocketSendProxy<T> { @Autowired private SimpMessagingTemplate template; @Autowired private SimpUserRegistry userRegistry; @Resource(name = "redisServiceImpl") private RedisService redisService; @Value("spring.redis.message.topic-name") private String topicName; public void sendMsg(RedisWebsocketMsg<T> redisWebsocketMsg) { SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver()); log.info("發送消息前獲取接收方爲{},根據Registry獲取本節點上這個用戶{}", redisWebsocketMsg.getReceiver(), simpUser); if (simpUser != null && StringUtils.isNotBlank(simpUser.getName())) { //2. 獲取WebSocket客戶端的訂閱地址 WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode()); if (channelEnum != null) { //3. 給WebSocket客戶端發送消息 template.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent()); } } else { //給其餘訂閱了主題的節點發消息,由於本節點沒有 redisService.convertAndSend(topicName, redisWebsocketMsg); } } }
能夠發現上面代碼利用了redis監聽模型,也就是redis模型的消息隊列
/** * @author : hao * @description : redis消息監聽實現類,接收處理類 * @time : 2019/7/3 14:00 */ @Component @Slf4j public class MessageReceiver { @Autowired private SimpMessagingTemplate messagingTemplate; @Autowired private SimpUserRegistry userRegistry; /** * 處理WebSocket消息 */ public void receiveMessage(RedisWebsocketMsg redisWebsocketMsg) { log.info(MessageFormat.format("Received Message: {0}", redisWebsocketMsg)); //1. 取出用戶名並判斷是否鏈接到當前應用節點的WebSocket SimpUser simpUser = userRegistry.getUser(redisWebsocketMsg.getReceiver()); if (simpUser != null && StringUtils.isNotBlank(simpUser.getName())) { //2. 獲取WebSocket客戶端的訂閱地址 WebSocketChannelEnum channelEnum = WebSocketChannelEnum.fromCode(redisWebsocketMsg.getChannelCode()); if (channelEnum != null) { //3. 給WebSocket客戶端發送消息 messagingTemplate.convertAndSendToUser(redisWebsocketMsg.getReceiver(), channelEnum.getSubscribeUrl(), redisWebsocketMsg.getContent()); } } } }
redis消息模型只貼部分代碼就行了
/** * 消息監聽器 */ @Bean MessageListenerAdapter messageListenerAdapter(MessageReceiver messageReceiver, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer){ //消息接收者以及對應的默認處理方法 MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageReceiver, "receiveMessage"); //消息的反序列化方式 messageListenerAdapter.setSerializer(jackson2JsonRedisSerializer); return messageListenerAdapter; } /** * message listener container */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory , MessageListenerAdapter messageListenerAdapter){ RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //添加消息監聽器 container.addMessageListener(messageListenerAdapter, new PatternTopic(topicName)); return container; }
上面的思路大致以下:客戶端簡歷連接時候,傳過來userid保存起來。發消息的時候 經過userRegistry獲取,能獲取到就證實是跟本節點創建的連接,直接用本節點發消息就行了。
若是不是就利用redis消息隊列,把消息推出去。每一個節點去判斷獲取看下是否是本節點的userid。這樣就實現了集羣的部署。