Spring Boot系列十七 Spring Boot 集成 websocket,使用RabbitMQ作爲消息代理

1. 概述

在上篇文章Spring Boot系列十六 WebSocket簡介和spring boot集成簡單消息代理中咱們使用的消息代理是spring內置的簡單消息代理,簡單消息代理很是適合入門,可是隻支持STOMP命令的子集(如不支持acks, receipts),依賴於消息發送循環,而且不支持集羣。咱們可使用外部的消息代理(如RabbitMQ, ActiveMQ),來實現全功能消息代理。本文以集成RabbitMQ爲例。本文的主要內容以下:java

  • 使用RabbitMQ作websocket消息代理的準備工做和消息流程圖
  • Spring Boot使用RabbitMQ作websocket的主要代碼
  • 演示在RabbitMQ不一樣目的的(destination)用法

2. 使用RabbitMQ作websocket消息代理的準備工做和消息流程圖

關於RabbitMQ的用法,能夠參考本做者的RabbitMQ系列文章react

2.1. 使用RabbitMQ作websocket消息代理的準備工做

咱們選擇相似RabbitMQ全功能的消息代理。安裝消息代理後,以支持STOMP的狀況狀況運行服務。 咱們在RabbitMQ上啓動rabbitmq_web_stomp插件git

  1. 在RabbitMQ上啓動rabbitmq_web_stomp插件,在rabbitMQ上執行以下命令:sudo rabbitmq-plugins enable rabbitmq_web_stomp
  2. 登陸RabbitMQ管理平臺,看到以下信息,發現已經開啓stomp代理服務

2.2. 消息流程圖

此圖和使用簡單消息最大的不一樣是"broker relay"用於經過TCP將消息傳遞給外部STOMP代理(如這裏是RabbitMQ),並將消息從代理傳遞給訂閱客戶github

3. Spring Boot使用RabbitMQ作websocket的主要代碼

3.1. pom.xml

首先在上一篇文章的基礎上增長以下jarweb

<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-net -->
<dependency>
	<groupId>io.projectreactor</groupId>
	<artifactId>reactor-net</artifactId>
	<version>2.0.8.RELEASE</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.1.22.Final</version>
</dependency>
複製代碼

3.2. BroadcastRabbitMQCtl

和上文BroadcastCtl相似,這裏略spring

3.3. WebSocketRabbitMQMessageBrokerConfigurer 配置

配置外部Rabibitmq替代Simple Broker作消息代理:在configureMessageBroker()方法中配置外部RabbitMQ的地址、賬號密碼鏈接到RabbitMQbash

@Configuration
// 此註解開使用STOMP協議來傳輸基於消息代理的消息,此時能夠在@Controller類中使用@MessageMapping
@EnableWebSocketMessageBroker
public class WebSocketRabbitMQMessageBrokerConfigurer extends AbstractWebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        /**
         * 註冊 Stomp的端點
         *
         * addEndpoint:添加STOMP協議的端點。這個HTTP URL是供WebSocket或SockJS客戶端訪問的地址
         * withSockJS:指定端點使用SockJS協議
          */
        registry.addEndpoint("/websocket-rabbitmq").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        /**
         * 配置消息代理
         * 使用RabbitMQ作爲消息代理,替換默認的Simple Broker
         */
	registry
		// "STOMP broker relay"處理全部消息將消息發送到外部的消息代理
                .enableStompBrokerRelay("/exchange","/topic","/queue","/amq/queue")
                .setRelayHost("192.168.0.113")
                .setClientLogin("hry")
                .setClientPasscode("hry")
                .setSystemLogin("hry")
                .setSystemPasscode("hry")
                .setSystemHeartbeatSendInterval(5000)
                .setSystemHeartbeatReceiveInterval(4000);
                ;
}
}

複製代碼

3.4. ws-broadcast-rabbitmq.jsp

這裏的jsp和上面的jsp相似,這裏略websocket

3.5. 測試方法

執行啓動類: WebSocketRabbitMQApplication 若是鏈接RabbitMQ,會打印以下信息:session

2018-03-26 23:22:04.354 [reactor-tcp-io-1] INFO  o.s.m.s.s.StompBrokerRelayMessageHandler - "System" session connected.
2018-03-26 23:22:04.358 [reactor-tcp-io-1] INFO  o.s.m.s.s.StompBrokerRelayMessageHandler - BrokerAvailabilityEvent[available=true, StompBrokerRelay[192.168.0.113:61613]]
複製代碼

測試請求: http://127.0.0.1:8080//broadcast-rabbitmq/index 具體測試的配置見下方mvc

4. 演示在RabbitMQ不一樣目的的(destination)用法

WebSocketRabbitMQMessageBrokerConfigurer中咱們須要配置消息代理的前綴。在RabbitMQ中合法的目的前綴:/temp-queue, /exchange, /topic, /queue, /amq/queue, /reply-queue/. 咱們這裏演示以上後4個的用法

4.1. /exchange/exchangename/[routing_key]

經過交換機訂閱/發佈消息,交換機須要手動建立,參數說明 a. /exchange:固定值 b. exchangename:交換機名稱 c. [routing_key]:路由鍵,可選

對於接收者端,該 destination 會建立一個惟一的、自動刪除的隨機queue, 並根據 routing_key將該 queue 綁定到所給的 exchangename,實現對該隊列的消息訂閱。 對於發送者端,消息就會被髮送到定義的 exchangename中,而且指定了 routing_key。

在本文的代碼基礎進行以下修改

  1. 在RabbitMQ上建立名爲rabbitmq交換機

  2. 在BroadcastRabbitMQCtl中修改發送者代碼

    SendTo("/exchange/rabbitmq/get-response")
    public ResponseMessage broadcast(RequestMessage requestMessage){
    …
    }
    複製代碼
  3. 在ws-broadcast-rabbitmq.jsp中修改接收者的代碼

    stompClient.subscribe('/exchange/rabbitmq/get-response', function(respnose){
                    showResponse(JSON.parse(respnose.body).responseMessage);
                })
    複製代碼

測試: 打開兩個頁面,其中一個頁面發送3次,這3個消息被兩個都收到

4.2. /queue/queuename

使用默認交換機訂閱/發佈消息,默認由stomp自動建立一個持久化隊列,參數說明 a. /queue:固定值 b. queuename:自動建立一個持久化隊列

對於接收者端,訂閱隊列queuename的消息 對於接收者端,向queuename發送消息 [對於 SEND frame,destination 只會在第一次發送消息的時候會定義的共享 queue]

在本文的代碼基礎進行以下修改

  1. 在BroadcastRabbitMQCtl中修改代碼

    @SendTo("/queue/rabbitmq")
    public ResponseMessage broadcast(RequestMessage requestMessage){
    …
    }
    複製代碼
  2. 在ws-broadcast-rabbitmq.jsp中修改接收者的代碼

    stompClient.subscribe(
                    '/queue/rabbitmq',
                    function(respnose){
                    showResponse(JSON.parse(respnose.body).responseMessage);
                });
    複製代碼

測試: 打開兩個頁面,其中一個頁面發送7次,這7個消息被兩個頁面輪流接收

4.3. /amq/queue/queuename

和上文的"/queue/queuename"類似,二者的區別是 a. 與/queue/queuename的區別在於隊列不禁stomp自動進行建立,隊列不存在失敗

這種狀況下不管是發送者仍是接收者都不會產生隊列。 但若是該隊列不存在,接收者會報錯。

在本文的代碼基礎進行以下修改

  1. 在RabbitMQ上手動建立名爲rabbitmq2的隊列

  2. 在BroadcastRabbitMQCtl中修改代碼

    @SendTo("/amq/queue/rabbitmq2")
        public ResponseMessage broadcast(RequestMessage requestMessage){
    ..
    }
    複製代碼
  3. 在ws-broadcast-rabbitmq.jsp中修改接收者的代碼

    stompClient.subscribe(
                    '/amq/queue/rabbitmq2',
                    function(respnose){
                    showResponse(JSON.parse(respnose.body).responseMessage);
                });
    複製代碼

測試: 對於 SUBCRIBE frame,destination 會實現對隊列的消息訂閱。 對於 SEND frame,消息會經過默認的 exhcange 直接被髮送到隊列中。

打開兩個頁面,其中一個頁面發送7次,這7個消息被兩個頁面輪流接收

4.4. /topic/routing_key

經過amq.topic交換機訂閱/發佈消息,訂閱時默認建立一個臨時隊列,經過routing_key與topic進行綁定 a. /topic:固定前綴 b. routing_key:路由鍵

對於發送者端,會建立出自動刪除的、非持久的隊列並根據 routing_key路由鍵綁定到 amq.topic 交換機 上,同時實現對該隊列的訂閱。 對於發送者端,消息會被髮送到 amq.topic 交換機中。

在本文的代碼基礎進行以下修改

  1. 在BroadcastRabbitMQCtl中修改代碼

    @SendTo("/topic/get-response")
        public ResponseMessage broadcast(RequestMessage requestMessage){
    …
    }
    複製代碼
  2. 在ws-broadcast-rabbitmq.jsp中修改接收者的代碼

    stompClient.subscribe(
                    '/topic/get-response',
                    function(respnose){
                    showResponse(JSON.parse(respnose.body).responseMessage);
                });
    複製代碼

測試: 打開兩個頁面,其中一個頁面發送4次,這4個消息同時被兩個都收到

5. 代碼

全部的詳細代碼見github代碼,請儘可能使用tag v0.20,不要使用master,由於master一直在變,不能保證文章中代碼和github上的代碼一直相同

相關文章
相關標籤/搜索