SpringBoot+RabbitMQ學習筆記(五)RabbitMQ消息持久化處理

一丶簡介

 

在@Queue和@Exchange註解中都有autoDelete屬性,值是布爾類型的字符串。如:autoDelete=「false」。spring

@Queue:當全部消費客戶端斷開鏈接後,是否自動刪除隊列: true:刪除,false:不刪除。緩存

@Exchange:當全部綁定隊列都不在使用時,是否自動刪除交換器: true:刪除,false:不刪除。tomcat

當全部消費客戶端斷開鏈接時,而咱們對RabbitMQ消息進行了持久化,那麼這時未被消費的消息存於RabbitMQ服務器的內存中,若是RabbitMQ服務器都關閉了,那麼未被消費的數據也都會丟失了。服務器

下面編寫代碼試試RabbitMQ的消息持久化處理。app

二丶配置文件

這裏使用的是前面博客裏寫的error日誌消息隊列來測試的。這裏仍是建立兩個項目,一個做爲生產者,一個做爲消費者。less

生產者配置:ide

server.port=8883 spring.application.name=hello-world spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.thymeleaf.cache=false #設置交換器名稱 mq.config.exchange=log.direct #設置error隊列的路由鍵 mq.config.queue.error.routing.key=log.error.routing.key
View Code

消費者配置學習

server.port=8884 spring.application.name=lesson1 spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #設置交換器名稱 mq.config.exchange=log.direct #設置error隊列名稱 mq.config.queue.error=log.error #設置error路由鍵 mq.config.queue.error.routing.key=log.error.routing.key
View Code

三丶編寫生產者

package com.example.rabbitdurableprovider; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:發送消息 */ @Component public class DurableSender { @Autowired private AmqpTemplate amqpTemplate; //exChange 交換器
    @Value("${mq.config.exchange}") private String exChange; //routingkey 路由鍵
    @Value("${mq.config.queue.error.routing.key}") private String routingKey; /** * 發送消息的方法 * @param msg */
    public void send(String msg){ //向消息隊列發送消息 //參數1:交換器名稱 //參數2:路由鍵 //參數3:消息
        this.amqpTemplate.convertAndSend(exChange,routingKey,msg); } }
View Code

四丶編寫消費者

這裏消費者服務配置中@Queue中的autoDelete屬性設置的是true,即未持久化,一下子測試下看沒有持久化的消息隊列在全部的消費者服務器斷開後是怎樣的。測試

package com.ant.rabbitdurableconsumer; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; /** * Author:aijiaxiang * Date:2020/4/26 * Description:消息接收者 * @RabbitListener bindings:綁定隊列 * @QueueBinding value:綁定隊列的名稱 * exchange:配置交換器 * @Queue : value:配置隊列名稱 * autoDelete:是不是一個可刪除的臨時隊列 * @Exchange value:爲交換器起個名稱 * type:指定具體的交換器類型 */ @Component @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT), key = "${mq.config.queue.error.routing.key}" ) ) public class DurableErrorReceiver { /** * 接收消息的方法,採用消息隊列監聽機制 * @param msg */ @RabbitHandler public void process(String msg){ System.out.println("error-receiver:"+msg); } }
View Code

 

五丶編寫測試類

這裏寫了一個死循環持續向消息隊列中發送消息,用變量falg來記錄發送編號。this

package com.example.amqp; import com.example.helloworld.HelloworldApplication; import com.example.rabbitdurableprovider.DurableSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * Author:aijiaxiang * Date:2020/4/26 * Description: */ @RunWith(SpringRunner.class) @SpringBootTest(classes = HelloworldApplication.class) public class QueueTest { @Autowired private DurableSender durableSender; @Test public void test3() throws InterruptedException { int flag = 0; while (true){ flag++; Thread.sleep(2000); durableSender.send("hello--"+flag); } } }
View Code

先啓動消費者服務器,而後啓動測試類,控制檯輸出以下信息,而後關閉tomcat,模擬消費者服務器故障。這裏能夠看到消費者接受到的消息停留在第81條就出現了「故障」,可是生產者仍是在持續不斷的向消費者發送消息。

這時再重啓tomcat,消費者又接受到了消息,可是是從第111條消息開始的,那麼81-111之間的這些消息就都丟失了。

 

修改消費者服務的代碼,將autoDelete設置爲「false」,將RabbitMQ消息進行持久化處理。

@RabbitListener( bindings = @QueueBinding( value = @Queue(value = "${mq.config.queue.error}",autoDelete = "false"), exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT), key = "${mq.config.queue.error.routing.key}" ) )

修改後重啓消費者服務器,再次調用測試方法。而後關閉消費者服務器,模擬「故障」。這時看到消息接收到第15條服務器就「故障」了。

 

重啓消費者服務器。能夠看到服務器一啓動,消費者就從消息隊列中讀取到了服務器「故障」時緩存在RabbitMQ中的消息,消息並未丟失,RabbitMQ消息持久化處理成功。

 

OK!以上就是今天學習的RabbitMQ消息持久化處理,若有不對之處,歡迎指正!

相關文章
相關標籤/搜索