rabbitmq+websocket(SpringBoot版)實現分佈式消息推送

原本想用websocket作一個消息推送 但是分佈式環境下不支持session共享由於服務器不一樣html

因此採用 rabbitMQ+webSocket實現分佈式消息推送前端

生產者將消息 發送給 rabbitMQ 的 virtual-host:/(頂極路由) 再由它路由到交換機 最終由交換機經過路由鍵指定具體的管道html5

消費者監聽指定的管道獲取消息web

最終將獲取的消息 交給 webSocket 被@OnMessage註解標識的方法spring

每次消費一條消息交給 被@OnMessage註解標識的方法 返回給前臺 json

實現分佈式實時推送瀏覽器

1.配置rabbitMQ安全

消息生產者服務器

1.1pom.xmlwebsocket

1 <!--引入rabbitmq依賴-->
2         <dependency>
3             <groupId>org.springframework.boot</groupId>
4             <artifactId>spring-boot-starter-amqp</artifactId>
5         </dependency>
 1 server:  2  port: 5002  3 
 4 spring:  5  rabbitmq:  6  host: localhost  7  #帳號密碼 默認有的  8  username: guest  9  password: guest 10  #rbbitmq虛擬主機路徑 11  virtual-host: / 12  #rabbitmq的端口號 也是默認的 13     port: 5672
 1 @SpringBootApplication  2 @MapperScan(basePackages = "com.supplychain.dao")  3 @EnableRabbit/**開啓rabbitmq*/
 4 public class ThumbsupServer5002_App {  5 
 6     public static void main(String[]args){  7 
 8         SpringApplication.run(ThumbsupServer5002_App.class,args);  9 
10  } 11 
12     /**消息的轉換器 13  * 設置成json 並放入到Spring中 14  * */
15  @Bean 16     public MessageConverter messageConverter(){ 17 
18         return new Jackson2JsonMessageConverter(); 19 
20  } 21 }

測試發送消息

 1 @RunWith(SpringRunner.class)  2 @SpringBootTest  3 public class ThumbsupServer5002_AppTest {  4 
 5 
 6  @Autowired  7     private RabbitTemplate rabbitTemplate;  8 
 9  @Test 10     public void contextLoads() { 11 
12         UserTest userTest = new UserTest("hao", "651238730@qq.com"); 13 
14         /**1.指定發送的交換機 15  * 發送的消息會先發送給 virtual-host: /(頂級路由) 再由它到交換機 16  * 由交換機經過路由鍵指定給具體的管道 17  * 18  * 2.路由鍵 19  * 有的交換機須要路由鍵 有的不須要(發送給交換機的消息會被髮送給全部管道) 20  * 21  * 3.發送的消息 22  * 若是是對象的話必須實現序列化接口由於網絡傳輸只能傳二進制 23  * 24  * */
25         rabbitTemplate.convertAndSend("userTest-exchange", "userTest-key", userTest); 26  } 27 
28 }

2.消息消費者

一樣是pom.xml須要引入rabbitMQ依賴

1 <!--引入rabbitmq依賴-->
2         <dependency>
3             <groupId>org.springframework.boot</groupId>
4             <artifactId>spring-boot-starter-amqp</artifactId>
5         </dependency>

一樣須要配置application.yml

 1 spring:  2  rabbitmq:  3     host: 127.0.0.1
 4  #帳號密碼 默認有的  5  username: guest  6  password: guest  7  #rbbitmq虛擬主機路徑  8     virtual-host: /
 9  #rabbitmq的端口號 也是默認的 10     port: 5672
11  listener: 12  simple: 13         acknowledge-mode: manual #手動接受數據 14         #max-concurrency: 10 #最大併發 15         #prefetch: 1 #限流

一樣主啓動類中須要開啓RabbitMQ

 1 @SpringBootApplication  2 @EnableRabbit  3 public class MessageServer5003_App {  4 
 5     public static void main(String[]args){  6 
 7         SpringApplication.run(MessageServer5003_App.class,args);  8 
 9  } 10 
11     /**這裏也須要設置消息轉換類型 12  * 和發送的消息類型必定要對應 13  * 否則對象接受json啓動主程序類時就會報錯 14  * */
15  @Bean 16     public MessageConverter messageConverter(){ 17 
18         return new Jackson2JsonMessageConverter(); 19 
20  } 21 
22 }

下面到了整合的環節了

 1 @ServerEndpoint(value = "/websocket")  2 @Component  3 public class WebSocketServer {  4 
 5     //靜態變量 用於記錄當前在線鏈接數 應該把它設計成線程安全
 6     private static int onlineCount=0;  7 
 8     /**Concurrent包下的 寫時複製Set 用它做於存儲客戶端對應的MyWebSocket對象*/
 9     private static CopyOnWriteArraySet<WebSocketServer> webSocketSet= new CopyOnWriteArraySet<WebSocketServer>();  10 
 11 
 12     /**與某個客戶端的連接會話,須要經過它來給客戶端發送數據*/
 13 
 14     private Session session;  15     /**
 16  * 參數1:Message 能夠得到消息的內容字節 還能夠得到消息的其餘屬性  17  * 參數2:能夠寫肯定接受的參數類型好比User  18  * 參數3:Channel 通道  19  * com.rabbitmq.client.Channel必須是這個包下  20  * 經過這個參數能夠拒絕消息  21  * 讓rabbitmq再發給別的消費者  22  *  23  * 使用@RabbitListener 能夠綁定交換機 路由鍵 管道  24  *  25      */
 26     @RabbitListener(bindings = @QueueBinding(  27          value = @Queue(value = "userTest-queue",durable = "true"),  28          exchange = @Exchange(name = "userTest-exchange",durable = "true",type = "direct"),  29          key = "userTest-key"
 30  )  31  )  32     @RabbitHandler//註解意思:若是有消息過來 須要消費的時候纔會調用該方法
 33     /**若是已知傳遞的參數是 UserTest對象能夠經過該註解  34  * 消息頭須要用map接受  35  * 既然是手動接受消息 就須要設置channel  36  * */
 37     public void receiveUserMessage(@Payload UserTest userTest, @Headers Map<String,Object> headers, Channel channel) throws IOException {  38         //sendMessage(message.toString());
 39         System.out.println("UserTest對象"+userTest);  40  onMessage(userTest.toString());//調用消息方法將數據船體給他  41 
 42         Long deliveryTag= (Long)headers.get(AmqpHeaders.DELIVERY_TAG);  43         //手動接受並告訴rabbitmq消息已經接受了 deliverTag記錄接受消息 false不批量接受
 44         channel.basicAck(deliveryTag,false);  45 
 46         /**
 47  * basicReject()  48  * 參數1: 消息標籤  49  * 參數2: true 將消息重新放入隊列 false 接受到並將消息拋棄  50  *  51  *  52  try {  53  channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);  54  System.out.println(message);  55  } catch (IOException e) {  56  e.printStackTrace();  57  }  58      */
 59 
 60  }  61 
 62     /**服務器端推送消息*/
 63     public void sendMessage(String message){  64         try {  65             System.out.println("session能否顯示出來"+session);  66             this.session.getBasicRemote().sendText(message);  67         } catch (IOException e) {  68  e.printStackTrace();  69  }  70  }  71 
 72     /**
 73  * 鏈接創建成功調用的方法  74  * */
 75  @OnOpen  76     public void onOpen(Session session){  77         this.session=session;  78         webSocketSet.add(this);  79         System.out.println("有新的鏈接加入!當前在線人數爲"+getOnlineCount());  80  System.out.println(session);  81  }  82 
 83     /**
 84  * 鏈接關閉調用的方法  85  * */
 86  @OnClose  87     public void onClose(){  88         /**從安全Set中 移除當前鏈接對象*/
 89         webSocketSet.remove(this);  90  subOnlineCount();  91         System.out.println("有一鏈接關閉!當前在線人數爲"+getOnlineCount());  92  }  93 
 94 
 95 
 96  @OnMessage  97     public void onMessage(String message){  98 
 99         System.out.println("來自客戶端的消息:"+message); 100 
101         for (WebSocketServer webSocketServer:webSocketSet){ 102  webSocketServer.sendMessage(message); 103  } 104 
105  } 106 
107 
108     public static int getOnlineCount() { 109         return onlineCount; 110  } 111 
112     public static synchronized void addOnlineCount() { 113         WebSocketServer.onlineCount++; 114  } 115 
116     public static synchronized void subOnlineCount() { 117         WebSocketServer.onlineCount--; 118  } 119 
120 
121 
122 }

websocket前端

websocket是html5提出的協議屬於雙工通訊 前端發送一次請求告訴服務器須要將http協議升級成tcp長鏈接

後面服務端直接給前端推送消息就能夠了 從之前的一次請求一次響應 服務端被動式 變成 一次請求服務端能夠無限響應

 1 <script>
 2         var socket;  3  console.log(typeof socket)  4         if (typeof(WebSocket)=="undefined"){  5  alert("您的瀏覽器不支持WebSocket");  6  }else{  7  alert("您的瀏覽器支持WebSocket");  8 
 9  socket=new WebSocket("ws://localhost:5003/websocket"); 10 
11  socket.onopen=function () { 12  console.log("Socket 已打開"); 13  }; 14 
15             //得到消息事件
16  socket.onmessage = function(msg) { 17  console.log(msg.data); 18                 //發現消息進入 調後臺獲取
19                 //getCallingList();
20  }; 21 
22             //關閉事件
23  socket.onclose = function() { 24  console.log("Socket已關閉"); 25  }; 26             //發生了錯誤事件
27  socket.onerror = function() { 28  alert("Socket發生了錯誤"); 29  }; 30             /** 31  $(window).unload(function(){ 32  socket.close(); 33  }); 34              */
35  } 36     </script>
相關文章
相關標籤/搜索