接到的需求是後臺定向給指定web登陸用戶推送消息,且可能同一帳號會登陸多個客戶端都要接收到消息前端
/** * 使用tomcat啓動無需配置 */ //@Configuration //@ConditionalOnProperty(name="websocket.enabled",havingValue = "true") public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
spring.rabbitmq.addresses = i.tzxylao.com:5672 spring.rabbitmq.username = admin spring.rabbitmq.password = 123456 spring.rabbitmq.virtual-host = / spring.rabbitmq.connection-timeout = 15000
/** * @author laoliangliang * @date 2019/3/29 11:41 */ @Configuration @ConditionalOnProperty(name="websocket.enabled",havingValue = "true") public class RabbitmqConfig { final public static String EXCHANGENAME = "websocketExchange"; /** * 建立交換器 */ @Bean FanoutExchange exchange() { return new FanoutExchange(EXCHANGENAME); } @Bean public Queue queue(){ return new Queue(orderQueueName()); } @Bean Binding bindingExchangeMessage(Queue queue,FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean public SimpleMessageListenerContainer messageListenerContainer(OrderReceiver orderReceiver, @Qualifier("rabbitConnectionFactory") CachingConnectionFactory cachingConnectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory); // 監聽隊列的名稱 container.setQueueNames(orderQueueName()); container.setExposeListenerChannel(true); // 設置每一個消費者獲取的最大消息數量 container.setPrefetchCount(100); // 消費者的個數 container.setConcurrentConsumers(1); // 設置確認模式爲自動確認 container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setMessageListener(orderReceiver); return container; } /** * 在這裏寫獲取訂單隊列名的具體過程 * @return */ public String orderQueueName(){ return "orderChannel"; } }
/** * @author laoliangliang * @date 2019/3/29 11:38 */ @Component @Slf4j @ConditionalOnProperty(name="websocket.enabled",havingValue = "true") public class OrderReceiver implements ChannelAwareMessageListener { @Autowired private MyWebSocket myWebSocket; @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); log.info("接收到消息:" + new String(body)); try { myWebSocket.sendMessage(new String(body)); } catch (IOException e) { log.error("send rabbitmq message error", e); } } }
@Configuration @ConditionalOnProperty(name="websocket.enabled",havingValue = "true") public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
/** * @author laoliangliang * @date 2019/3/28 14:40 */ public abstract class AbstractWebSocket { protected static Map<String, CopyOnWriteArraySet<Session>> sessionStore = new HashMap<>(); public void sendMessage(String message) throws IOException { List<String> userCodes = beforeSendMessage(); for (String userCode : userCodes) { CopyOnWriteArraySet<Session> sessions = sessionStore.get(userCode); //阻塞式的(同步的) if (sessions !=null && sessions.size() != 0) { for (Session s : sessions) { if (s != null) { s.getBasicRemote().sendText(message); } } } } } /** * 刪選給誰發消息 * @return */ protected abstract List<String> beforeSendMessage(); protected void clearSession(Session session) { Collection<CopyOnWriteArraySet<Session>> values = sessionStore.values(); for (CopyOnWriteArraySet<Session> sessions : values) { for (Session session1 : sessions) { if (session.equals(session1)) { sessions.remove(session); } } } } }
@ServerEndpoint(value = "/websocket") @Component @ConditionalOnProperty(name="websocket.enabled",havingValue = "true") public class MyWebSocket extends AbstractWebSocket { private static Logger log = LogManager.getLogger(MyWebSocket.class); @Autowired private AmqpTemplate amqpTemplate; @PostConstruct public void init() { /*ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); executorService.scheduleAtFixedRate(new Runnable() { int i = 0; @Override public void run() { amqpTemplate.convertAndSend(RabbitFanout.EXCHANGENAME, "",("msg num : " + i).getBytes()); i++; } }, 50, 1, TimeUnit.SECONDS);*/ } /** * 鏈接創建成功調用的方法 * * @param session 可選的參數。session爲與某個客戶端的鏈接會話,須要經過它來給客戶端發送數據 */ @OnOpen public void onOpen(Session session) throws TimeoutException { log.info("websocket connect"); //10M session.setMaxTextMessageBufferSize(10485760); } /** * 鏈接關閉調用的方法 */ @OnClose public void onClose(Session session) { clearSession(session); } /** * 收到客戶端消息後調用的方法 * * @param message 客戶端發送過來的消息 * @param session 可選的參數 */ @OnMessage public void onMessage(String message, Session session) { log.info("from client request:" + message); CopyOnWriteArraySet<Session> sessions = sessionStore.get(message); if (sessions == null) { sessions = new CopyOnWriteArraySet<>(); } sessions.add(session); sessionStore.put(message, sessions); } /** * 發生錯誤時調用 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { clearSession(session); } /** * 這裏返回須要給哪些用戶發送消息 * @return */ @Override protected List<String> beforeSendMessage() { //TODO 給哪些用戶發送消息 return Lists.newArrayList("6"); } }
var websocket = null; var reconnectCount = 0; function connectSocket(){ var data = basicConfig(); if(data.websocketEnable !== "true"){ return; } //判斷當前瀏覽器是否支持WebSocket if ('WebSocket' in window) { if(data.localIp && data.localIp !== "" && data.serverPort && data.serverPort !== ""){ websocket = new WebSocket("ws://"+data.localIp+":"+data.serverPort+data.serverContextPath+"/websocket"); }else{ return; } }else { alert('當前瀏覽器 不支持WebSocket') } //鏈接發生錯誤的回調方法 websocket.onerror = function () { console.log("鏈接發生錯誤"); }; //鏈接成功創建的回調方法 websocket.onopen = function () { reconnectCount = 0; console.log("鏈接成功"); }; //接收到消息的回調方法,此處添加處理接收消息方法,當前是將接收到的信息顯示在網頁上 websocket.onmessage = function (event) { console.log("receive message:" + event.data); }; //鏈接關閉的回調方法 websocket.onclose = function () { console.log("鏈接關閉,如需登陸請刷新頁面。"); if(reconnectCount === 3) { reconnectCount = 0; return; } connectSocket(); basicConfig(); reconnectCount++; }; //添加事件監聽 websocket.addEventListener('open', function () { websocket.send(data.userCode); }); //監聽窗口關閉事件,當窗口關閉時,主動去關閉websocket鏈接,防止鏈接還沒斷開就關閉窗口,server端會拋異常。 window.onbeforeunload = function () { console.log("closeWebSocket"); }; } connectSocket(); function basicConfig(){ var result = {}; $.ajax({ type: "post", async: false, url: "${request.contextPath}/basicConfig", data: {}, success: function (data) { result = data; } }); return result; }
@ApolloConfig private Config config; @RequestMapping(value = {"/basicConfig"}) @ResponseBody public Map<String, Object> getUserCode(HttpSession session) { Map<String, Object> map = new HashMap<>(2); map.put("userCode",String.valueOf(session.getAttribute("userCode"))); String websocketEnable = config.getProperty("websocket.enabled", "false"); String serverContextPath = config.getProperty("server.context-path", ""); map.put("websocketEnable", websocketEnable); map.put("serverContextPath", serverContextPath); String localIp = config.getProperty("local.ip", ""); String serverPort = config.getProperty("server.port", "80"); map.put("localIp", localIp); map.put("serverPort", serverPort); return map; }