最近公司裏遇到一個問題,在集羣中一些websocket的消息丟失了。
產生問題的原理很簡單,發送消息的服務和接收者鏈接的服務不是同一個服務。java
用中間件(mq, redis etc.)來在服務之間進行通訊。web
不直接發送websocket消息,而是將消息放在mq或者redis的list中。
並在redis中維護鏈接信息,服務根據鏈接信息來判斷本身是否須要處理消息,或者將消息發給接收者鏈接的服務。redis
咱們的項目中使用的是Spring WebSocket,而且使用了STOMP協議,能夠去官網查看文檔。spring
代碼示例只作維護鏈接信息的代碼示例,其餘部分就不放上來了。apache
想要在維護STOMP協議的鏈接信息,能夠查看文檔的這一部分Listening To ApplicationContext Events and Intercepting Messageswebsocket
這裏的鏈接信息只要是可以標識出不一樣的服務就OK。app
一下是監聽了訂閱事件的Listener的部分代碼:socket
package cn.fjhdtp.websocket.interceptor; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor; public class LoginInfoInterceptor extends HttpSessionHandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { //握手前,往attributes中增長所需信息 Object loginBean = ...;//獲取登陸的用戶信息(或其餘信息) attributes.put(WebSocketConstant.WEBSOKET_LOGINBEAN,loginBean); return super.beforeHandshake(request, response, wsHandler, attributes); } }
package cn.fjhdtp.listener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationListener; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.web.socket.messaging.SessionSubscribeEvent; import java.util.Map; @Component public class SessionSubscribeEventListener implements ApplicationListener<SessionSubscribeEvent> { @Autowired @Qualifier("serversideMessageTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Autowired private IMessageHandler messageHandler; @Override public void onApplicationEvent(SessionSubscribeEvent event) { //獲取訂閱的destination String destination = (String) event.getMessage().getHeaders().get("simpDestination"); //獲取登陸信息 Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN); //TODO 向redis中增長鏈接信息 } }
package cn.fjhdtp.message.listener; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.web.socket.messaging.SessionDisconnectEvent; import java.util.Map; @Component public class SessionDisconnectEventListener implements ApplicationListener<SessionDisconnectEvent> { @Override public void onApplicationEvent(SessionDisconnectEvent event) { // stomp鏈接斷開,清除鏈接信息 //從attributes中獲取登陸信息(或其餘信息) Object loginBean = ((Map) event.getMessage().getHeaders().get("simpSessionAttributes")).get(WebSocketConstant.WEBSOKET_LOGINBEAN); //從redis中移除鏈接信息 } }
固然,有些狀況下可能不會正常的觸發斷開鏈接的事件(在was下就不會有這個事件),所以還會須要HeartBeat。ide