在上篇文章Spring Boot系列十六 WebSocket簡介和spring boot集成簡單消息代理中咱們使用的消息代理是spring內置的簡單消息代理,簡單消息代理很是適合入門,可是隻支持STOMP命令的子集(如不支持acks, receipts),依賴於消息發送循環,而且不支持集羣。咱們可使用外部的消息代理(如RabbitMQ, ActiveMQ),來實現全功能消息代理。本文以集成RabbitMQ爲例。本文的主要內容以下:java
關於RabbitMQ的用法,能夠參考本做者的RabbitMQ系列文章react
咱們選擇相似RabbitMQ全功能的消息代理。安裝消息代理後,以支持STOMP的狀況狀況運行服務。 咱們在RabbitMQ上啓動rabbitmq_web_stomp插件git
此圖和使用簡單消息最大的不一樣是"broker relay"用於經過TCP將消息傳遞給外部STOMP代理(如這裏是RabbitMQ),並將消息從代理傳遞給訂閱客戶github
首先在上一篇文章的基礎上增長以下jarweb
<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-net -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-net</artifactId>
<version>2.0.8.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.22.Final</version>
</dependency>
複製代碼
和上文BroadcastCtl相似,這裏略spring
配置外部Rabibitmq替代Simple Broker作消息代理:在configureMessageBroker()方法中配置外部RabbitMQ的地址、賬號密碼鏈接到RabbitMQbash
@Configuration
// 此註解開使用STOMP協議來傳輸基於消息代理的消息,此時能夠在@Controller類中使用@MessageMapping
@EnableWebSocketMessageBroker
public class WebSocketRabbitMQMessageBrokerConfigurer extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
/**
* 註冊 Stomp的端點
*
* addEndpoint:添加STOMP協議的端點。這個HTTP URL是供WebSocket或SockJS客戶端訪問的地址
* withSockJS:指定端點使用SockJS協議
*/
registry.addEndpoint("/websocket-rabbitmq").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
/**
* 配置消息代理
* 使用RabbitMQ作爲消息代理,替換默認的Simple Broker
*/
registry
// "STOMP broker relay"處理全部消息將消息發送到外部的消息代理
.enableStompBrokerRelay("/exchange","/topic","/queue","/amq/queue")
.setRelayHost("192.168.0.113")
.setClientLogin("hry")
.setClientPasscode("hry")
.setSystemLogin("hry")
.setSystemPasscode("hry")
.setSystemHeartbeatSendInterval(5000)
.setSystemHeartbeatReceiveInterval(4000);
;
}
}
複製代碼
這裏的jsp和上面的jsp相似,這裏略websocket
執行啓動類: WebSocketRabbitMQApplication 若是鏈接RabbitMQ,會打印以下信息:session
2018-03-26 23:22:04.354 [reactor-tcp-io-1] INFO o.s.m.s.s.StompBrokerRelayMessageHandler - "System" session connected.
2018-03-26 23:22:04.358 [reactor-tcp-io-1] INFO o.s.m.s.s.StompBrokerRelayMessageHandler - BrokerAvailabilityEvent[available=true, StompBrokerRelay[192.168.0.113:61613]]
複製代碼
測試請求: http://127.0.0.1:8080//broadcast-rabbitmq/index 具體測試的配置見下方mvc
WebSocketRabbitMQMessageBrokerConfigurer中咱們須要配置消息代理的前綴。在RabbitMQ中合法的目的前綴:/temp-queue, /exchange, /topic, /queue, /amq/queue, /reply-queue/. 咱們這裏演示以上後4個的用法
經過交換機訂閱/發佈消息,交換機須要手動建立,參數說明 a. /exchange:固定值 b. exchangename:交換機名稱 c. [routing_key]:路由鍵,可選
對於接收者端,該 destination 會建立一個惟一的、自動刪除的隨機queue, 並根據 routing_key將該 queue 綁定到所給的 exchangename,實現對該隊列的消息訂閱。 對於發送者端,消息就會被髮送到定義的 exchangename中,而且指定了 routing_key。
在本文的代碼基礎進行以下修改
在RabbitMQ上建立名爲rabbitmq交換機
在BroadcastRabbitMQCtl中修改發送者代碼
SendTo("/exchange/rabbitmq/get-response")
public ResponseMessage broadcast(RequestMessage requestMessage){
…
}
複製代碼
在ws-broadcast-rabbitmq.jsp中修改接收者的代碼
stompClient.subscribe('/exchange/rabbitmq/get-response', function(respnose){
showResponse(JSON.parse(respnose.body).responseMessage);
})
複製代碼
測試: 打開兩個頁面,其中一個頁面發送3次,這3個消息被兩個都收到
使用默認交換機訂閱/發佈消息,默認由stomp自動建立一個持久化隊列,參數說明 a. /queue:固定值 b. queuename:自動建立一個持久化隊列
對於接收者端,訂閱隊列queuename的消息 對於接收者端,向queuename發送消息 [對於 SEND frame,destination 只會在第一次發送消息的時候會定義的共享 queue]
在本文的代碼基礎進行以下修改
在BroadcastRabbitMQCtl中修改代碼
@SendTo("/queue/rabbitmq")
public ResponseMessage broadcast(RequestMessage requestMessage){
…
}
複製代碼
在ws-broadcast-rabbitmq.jsp中修改接收者的代碼
stompClient.subscribe(
'/queue/rabbitmq',
function(respnose){
showResponse(JSON.parse(respnose.body).responseMessage);
});
複製代碼
測試: 打開兩個頁面,其中一個頁面發送7次,這7個消息被兩個頁面輪流接收
和上文的"/queue/queuename"類似,二者的區別是 a. 與/queue/queuename的區別在於隊列不禁stomp自動進行建立,隊列不存在失敗
這種狀況下不管是發送者仍是接收者都不會產生隊列。 但若是該隊列不存在,接收者會報錯。
在本文的代碼基礎進行以下修改
在RabbitMQ上手動建立名爲rabbitmq2的隊列
在BroadcastRabbitMQCtl中修改代碼
@SendTo("/amq/queue/rabbitmq2")
public ResponseMessage broadcast(RequestMessage requestMessage){
..
}
複製代碼
在ws-broadcast-rabbitmq.jsp中修改接收者的代碼
stompClient.subscribe(
'/amq/queue/rabbitmq2',
function(respnose){
showResponse(JSON.parse(respnose.body).responseMessage);
});
複製代碼
測試: 對於 SUBCRIBE frame,destination 會實現對隊列的消息訂閱。 對於 SEND frame,消息會經過默認的 exhcange 直接被髮送到隊列中。
打開兩個頁面,其中一個頁面發送7次,這7個消息被兩個頁面輪流接收
經過amq.topic交換機訂閱/發佈消息,訂閱時默認建立一個臨時隊列,經過routing_key與topic進行綁定 a. /topic:固定前綴 b. routing_key:路由鍵
對於發送者端,會建立出自動刪除的、非持久的隊列並根據 routing_key路由鍵綁定到 amq.topic 交換機 上,同時實現對該隊列的訂閱。 對於發送者端,消息會被髮送到 amq.topic 交換機中。
在本文的代碼基礎進行以下修改
在BroadcastRabbitMQCtl中修改代碼
@SendTo("/topic/get-response")
public ResponseMessage broadcast(RequestMessage requestMessage){
…
}
複製代碼
在ws-broadcast-rabbitmq.jsp中修改接收者的代碼
stompClient.subscribe(
'/topic/get-response',
function(respnose){
showResponse(JSON.parse(respnose.body).responseMessage);
});
複製代碼
測試: 打開兩個頁面,其中一個頁面發送4次,這4個消息同時被兩個都收到
全部的詳細代碼見github代碼,請儘可能使用tag v0.20,不要使用master,由於master一直在變,不能保證文章中代碼和github上的代碼一直相同