原本想用websocket作一個消息推送 但是分佈式環境下不支持session共享由於服務器不一樣html
因此採用 rabbitMQ+webSocket實現分佈式消息推送前端
生產者將消息 發送給 rabbitMQ 的 virtual-host:/(頂極路由) 再由它路由到交換機 最終由交換機經過路由鍵指定具體的管道html5
消費者監聽指定的管道獲取消息web
最終將獲取的消息 交給 webSocket 被@OnMessage註解標識的方法spring
每次消費一條消息交給 被@OnMessage註解標識的方法 返回給前臺 json
實現分佈式實時推送瀏覽器
1.配置rabbitMQ安全
消息生產者服務器
1.1pom.xmlwebsocket
1 <!--引入rabbitmq依賴--> 2 <dependency> 3 <groupId>org.springframework.boot</groupId> 4 <artifactId>spring-boot-starter-amqp</artifactId> 5 </dependency>
1 server: 2 port: 5002 3 4 spring: 5 rabbitmq: 6 host: localhost 7 #帳號密碼 默認有的 8 username: guest 9 password: guest 10 #rbbitmq虛擬主機路徑 11 virtual-host: / 12 #rabbitmq的端口號 也是默認的 13 port: 5672
1 @SpringBootApplication 2 @MapperScan(basePackages = "com.supplychain.dao") 3 @EnableRabbit/**開啓rabbitmq*/ 4 public class ThumbsupServer5002_App { 5 6 public static void main(String[]args){ 7 8 SpringApplication.run(ThumbsupServer5002_App.class,args); 9 10 } 11 12 /**消息的轉換器 13 * 設置成json 並放入到Spring中 14 * */ 15 @Bean 16 public MessageConverter messageConverter(){ 17 18 return new Jackson2JsonMessageConverter(); 19 20 } 21 }
測試發送消息
1 @RunWith(SpringRunner.class) 2 @SpringBootTest 3 public class ThumbsupServer5002_AppTest { 4 5 6 @Autowired 7 private RabbitTemplate rabbitTemplate; 8 9 @Test 10 public void contextLoads() { 11 12 UserTest userTest = new UserTest("hao", "651238730@qq.com"); 13 14 /**1.指定發送的交換機 15 * 發送的消息會先發送給 virtual-host: /(頂級路由) 再由它到交換機 16 * 由交換機經過路由鍵指定給具體的管道 17 * 18 * 2.路由鍵 19 * 有的交換機須要路由鍵 有的不須要(發送給交換機的消息會被髮送給全部管道) 20 * 21 * 3.發送的消息 22 * 若是是對象的話必須實現序列化接口由於網絡傳輸只能傳二進制 23 * 24 * */ 25 rabbitTemplate.convertAndSend("userTest-exchange", "userTest-key", userTest); 26 } 27 28 }
2.消息消費者
一樣是pom.xml須要引入rabbitMQ依賴
1 <!--引入rabbitmq依賴--> 2 <dependency> 3 <groupId>org.springframework.boot</groupId> 4 <artifactId>spring-boot-starter-amqp</artifactId> 5 </dependency>
一樣須要配置application.yml
1 spring: 2 rabbitmq: 3 host: 127.0.0.1 4 #帳號密碼 默認有的 5 username: guest 6 password: guest 7 #rbbitmq虛擬主機路徑 8 virtual-host: / 9 #rabbitmq的端口號 也是默認的 10 port: 5672 11 listener: 12 simple: 13 acknowledge-mode: manual #手動接受數據 14 #max-concurrency: 10 #最大併發 15 #prefetch: 1 #限流
一樣主啓動類中須要開啓RabbitMQ
1 @SpringBootApplication 2 @EnableRabbit 3 public class MessageServer5003_App { 4 5 public static void main(String[]args){ 6 7 SpringApplication.run(MessageServer5003_App.class,args); 8 9 } 10 11 /**這裏也須要設置消息轉換類型 12 * 和發送的消息類型必定要對應 13 * 否則對象接受json啓動主程序類時就會報錯 14 * */ 15 @Bean 16 public MessageConverter messageConverter(){ 17 18 return new Jackson2JsonMessageConverter(); 19 20 } 21 22 }
下面到了整合的環節了
1 @ServerEndpoint(value = "/websocket") 2 @Component 3 public class WebSocketServer { 4 5 //靜態變量 用於記錄當前在線鏈接數 應該把它設計成線程安全 6 private static int onlineCount=0; 7 8 /**Concurrent包下的 寫時複製Set 用它做於存儲客戶端對應的MyWebSocket對象*/ 9 private static CopyOnWriteArraySet<WebSocketServer> webSocketSet= new CopyOnWriteArraySet<WebSocketServer>(); 10 11 12 /**與某個客戶端的連接會話,須要經過它來給客戶端發送數據*/ 13 14 private Session session; 15 /** 16 * 參數1:Message 能夠得到消息的內容字節 還能夠得到消息的其餘屬性 17 * 參數2:能夠寫肯定接受的參數類型好比User 18 * 參數3:Channel 通道 19 * com.rabbitmq.client.Channel必須是這個包下 20 * 經過這個參數能夠拒絕消息 21 * 讓rabbitmq再發給別的消費者 22 * 23 * 使用@RabbitListener 能夠綁定交換機 路由鍵 管道 24 * 25 */ 26 @RabbitListener(bindings = @QueueBinding( 27 value = @Queue(value = "userTest-queue",durable = "true"), 28 exchange = @Exchange(name = "userTest-exchange",durable = "true",type = "direct"), 29 key = "userTest-key" 30 ) 31 ) 32 @RabbitHandler//註解意思:若是有消息過來 須要消費的時候纔會調用該方法 33 /**若是已知傳遞的參數是 UserTest對象能夠經過該註解 34 * 消息頭須要用map接受 35 * 既然是手動接受消息 就須要設置channel 36 * */ 37 public void receiveUserMessage(@Payload UserTest userTest, @Headers Map<String,Object> headers, Channel channel) throws IOException { 38 //sendMessage(message.toString()); 39 System.out.println("UserTest對象"+userTest); 40 onMessage(userTest.toString());//調用消息方法將數據船體給他 41 42 Long deliveryTag= (Long)headers.get(AmqpHeaders.DELIVERY_TAG); 43 //手動接受並告訴rabbitmq消息已經接受了 deliverTag記錄接受消息 false不批量接受 44 channel.basicAck(deliveryTag,false); 45 46 /** 47 * basicReject() 48 * 參數1: 消息標籤 49 * 參數2: true 將消息重新放入隊列 false 接受到並將消息拋棄 50 * 51 * 52 try { 53 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); 54 System.out.println(message); 55 } catch (IOException e) { 56 e.printStackTrace(); 57 } 58 */ 59 60 } 61 62 /**服務器端推送消息*/ 63 public void sendMessage(String message){ 64 try { 65 System.out.println("session能否顯示出來"+session); 66 this.session.getBasicRemote().sendText(message); 67 } catch (IOException e) { 68 e.printStackTrace(); 69 } 70 } 71 72 /** 73 * 鏈接創建成功調用的方法 74 * */ 75 @OnOpen 76 public void onOpen(Session session){ 77 this.session=session; 78 webSocketSet.add(this); 79 System.out.println("有新的鏈接加入!當前在線人數爲"+getOnlineCount()); 80 System.out.println(session); 81 } 82 83 /** 84 * 鏈接關閉調用的方法 85 * */ 86 @OnClose 87 public void onClose(){ 88 /**從安全Set中 移除當前鏈接對象*/ 89 webSocketSet.remove(this); 90 subOnlineCount(); 91 System.out.println("有一鏈接關閉!當前在線人數爲"+getOnlineCount()); 92 } 93 94 95 96 @OnMessage 97 public void onMessage(String message){ 98 99 System.out.println("來自客戶端的消息:"+message); 100 101 for (WebSocketServer webSocketServer:webSocketSet){ 102 webSocketServer.sendMessage(message); 103 } 104 105 } 106 107 108 public static int getOnlineCount() { 109 return onlineCount; 110 } 111 112 public static synchronized void addOnlineCount() { 113 WebSocketServer.onlineCount++; 114 } 115 116 public static synchronized void subOnlineCount() { 117 WebSocketServer.onlineCount--; 118 } 119 120 121 122 }
websocket前端
websocket是html5提出的協議屬於雙工通訊 前端發送一次請求告訴服務器須要將http協議升級成tcp長鏈接
後面服務端直接給前端推送消息就能夠了 從之前的一次請求一次響應 服務端被動式 變成 一次請求服務端能夠無限響應
1 <script> 2 var socket; 3 console.log(typeof socket) 4 if (typeof(WebSocket)=="undefined"){ 5 alert("您的瀏覽器不支持WebSocket"); 6 }else{ 7 alert("您的瀏覽器支持WebSocket"); 8 9 socket=new WebSocket("ws://localhost:5003/websocket"); 10 11 socket.onopen=function () { 12 console.log("Socket 已打開"); 13 }; 14 15 //得到消息事件 16 socket.onmessage = function(msg) { 17 console.log(msg.data); 18 //發現消息進入 調後臺獲取 19 //getCallingList(); 20 }; 21 22 //關閉事件 23 socket.onclose = function() { 24 console.log("Socket已關閉"); 25 }; 26 //發生了錯誤事件 27 socket.onerror = function() { 28 alert("Socket發生了錯誤"); 29 }; 30 /** 31 $(window).unload(function(){ 32 socket.close(); 33 }); 34 */ 35 } 36 </script>