本文RabbitMQ版本爲rabbitmq-server-3.7.17,erlang爲erlang-22.0.7.請各位去官網查看版本匹配和下載,也能夠留言,我發安裝包spring
過時時間TTL表示能夠對消息設置預期的時間,在這個時間內均可以被消費者接收穫取;過了以後消息將自動被刪除。RabbitMQ能夠對消息和隊列設置TTL。目前有兩種方法能夠設置。數據庫
若是上述兩種方法同時使用,則消息的過時時間以二者之間TTL較小的那個數值爲準。消息在隊列的生存時間一旦超過設置的TTL值,就稱爲dead message被投遞到死信隊列, 消費者將沒法再收到該消息。單元測試
配置resources\spring\spring-rabbitmq.xml
文件測試
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--定義過時隊列及其屬性,不存在則自動建立--> <rabbit:queue id="my_ttl_queue" name="my_ttl_queue" auto-declare="true"> <rabbit:queue-arguments> <!--投遞到該隊列的消息若是沒有消費都將在6秒以後被刪除--> <entry key="x-message-ttl" value-type="long" value="6000"/> </rabbit:queue-arguments> </rabbit:queue>
<rabbit:connection-factory id="connectionFactory" host="192.168.75.163"
port="5672"
username="test01"
password="test01"
virtual-host="/hello"
/>
</beans>
參數 x-message-ttl 的值 必須是非負 32 位整數 (0 <= n <= 2^32-1) ,以毫秒爲單位表示 TTL 的值。這樣,值 6000 表示存在於 隊列 中的當前 消息 將最多隻存活 6 秒鐘spa
若是不設置TTL,則表示此消息不會過時。若是將TTL設置爲0,則表示除非此時能夠直接將消息投遞到消費者,不然該消息會被當即丟棄。插件
而後在測試類中編寫以下方法發送消息到上述定義的隊列:3d
@RunWith(SpringRunner.class) @SpringBootTest public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate; /** * 過時隊列消息 * 投遞到該隊列的消息若是沒有消費都將在6秒以後被刪除 */ @Test public void ttlQueueTest(){ //路由鍵與隊列同名 rabbitTemplate.convertAndSend("my_ttl_queue", "發送到過時隊列my_ttl_queue,6秒內不消費則不能再被消費。"); } }
啓動類中導入配置文件調試
//導入配置文件 @ImportResource("classpath:/spring/spring-rabbitmq.xml") @SpringBootApplication public class ProducerApplication { public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } }
執行單元測試,看結果日誌
6秒後,再看結果code
消息的過時時間;只須要在發送消息(能夠發送到任何隊列,無論該隊列是否屬於某個交換機)的時候設置過時時間便可。
在測試類中編寫以下方法發送消息並設置過時時間到隊列:
/** * 過時消息 * 該消息投遞任何交換機或隊列中的時候;若是到了過時時間則將從該隊列中刪除 */ @Test public void ttlMessageTest(){ MessageProperties messageProperties = new MessageProperties(); //設置消息的過時時間,3秒 messageProperties.setExpiration("3000"); Message message = new Message("測試過時消息,3秒鐘過時".getBytes(), messageProperties); //路由鍵與隊列同名 rabbitTemplate.convertAndSend("my_ttl_queue", message); }
expiration 字段以微秒爲單位表示 TTL 值。且與 x-message-ttl 具備相同的約束條件。由於 expiration 字段必須爲字符串類型,broker 將只會接受以字符串形式表達的數字。
當同時指定了 queue 和 message 的 TTL 值,則二者中較小的那個纔會起做用。
DLX,全稱爲Dead-Letter-Exchange , 能夠稱之爲死信交換機,也有人稱之爲死信郵箱。
當消息在一個隊列中變成死信(dead message)以後,它能被從新發送到另外一個交換機中,這個交換機就是DLX ,綁定DLX的隊列就稱之爲死信隊列。
消息變成死信,多是因爲如下的緣由:
DLX也是一個正常的交換機,和通常的交換機沒有區別,它能在任何的隊列上被指定,實際上就是設置某一個隊列的屬性。
當這個隊列中存在死信時,Rabbitmq就會自動地將這個消息從新發布到設置的DLX上去,進而被路由到另外一個隊列,即死信隊列。
要想使用死信隊列,只須要在定義隊列的時候設置隊列參數 x-dead-letter-exchange
指定交換機便可。
具體步驟以下
<!--定義定向交換機中的持久化死信隊列,不存在則自動建立-->
<rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true"/>
<!--定義廣播類型交換機;並綁定上述隊列-->
<rabbit:direct-exchange id="my_dlx_exchange" name="my_dlx_exchange" auto-declare="true">
<rabbit:bindings>
<!--綁定路由鍵my_ttl_dlx、my_max_dlx,能夠將過時的消息轉移到my_dlx_queue隊列-->
<rabbit:binding key="my_ttl_dlx" queue="my_dlx_queue"/>
<rabbit:binding key="my_max_dlx" queue="my_dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定義過時隊列及其屬性,不存在則自動建立-->
<rabbit:queue id="my_ttl_dlx_queue" name="my_ttl_dlx_queue" auto-declare="true">
<rabbit:queue-arguments>
<!--投遞到該隊列的消息若是沒有消費都將在6秒以後被投遞到死信交換機-->
<entry key="x-message-ttl" value-type="long" value="6000"/>
<!--設置當消息過時後投遞到對應的死信交換機-->
<entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--定義限制長度的隊列及其屬性,不存在則自動建立-->
<rabbit:queue id="my_max_dlx_queue" name="my_max_dlx_queue" auto-declare="true">
<rabbit:queue-arguments>
<!--投遞到該隊列的消息最多2個消息,若是超過則最先的消息被刪除投遞到死信交換機-->
<entry key="x-max-length" value-type="long" value="2"/>
<!--設置當消息過時後投遞到對應的死信交換機-->
<entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!--定義定向交換機 根據不一樣的路由key投遞消息-->
<rabbit:direct-exchange id="my_normal_exchange" name="my_normal_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding key="my_ttl_dlx" queue="my_ttl_dlx_queue"/>
<rabbit:binding key="my_max_dlx" queue="my_max_dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
/** * 過時消息投遞到死信隊列 * 投遞到一個正常的隊列,可是該隊列有設置過時時間,到過時時間以後消息會被投遞到死信交換機(隊列) */ @Test public void dlxTTLMessageTest(){ rabbitTemplate.convertAndSend( "my_normal_exchange", "my_ttl_dlx", "測試過時消息;6秒過時後會被投遞到死信交換機2222"); }
運行看結果
6秒後
/** * 消息長度超過2,會投遞到死信隊列中 */ @Test public void dlxMaxMessageTest(){ rabbitTemplate.convertAndSend( "my_normal_exchange", "my_max_dlx", "發送消息4:消息長度超過2,會被投遞到死信隊列中!"); rabbitTemplate.convertAndSend( "my_normal_exchange", "my_max_dlx", "發送消息5:消息長度超過2,會被投遞到死信隊列中!"); rabbitTemplate.convertAndSend( "my_normal_exchange", "my_max_dlx", "發送消息6:消息長度超過2,會被投遞到死信隊列中!"); }
運行,看結果
上面發送的3條消息中的第1條消息會被投遞到死信隊列中
延遲隊列存儲的對象是對應的延遲消息;所謂「延遲消息」 是指當消息被髮送之後,並不想讓消費者馬上拿到消息,而是等待特定時間後,消費者才能拿到這個消息進行消費。
在RabbitMQ中延遲隊列能夠經過 過時時間
+ 死信隊列
來實現
延遲隊列的應用場景;如:
具體代碼不演示了
確認而且保證消息被送達,提供了兩種方式:發佈確認和事務。(二者不可同時使用)在channel爲事務時,不可引入確認模式;一樣channel爲確認模式下,不可以使用事務。
消息發送成功確認
connectionFactory 中啓用消息確認:
<rabbit:connection-factory id="connectionFactory" host="192.168.75.163" port="5672" username="test01" password="test01" virtual-host="/hello" publisher-confirms="true" />
配置消息確認回調方法以下:
<!-- 消息回調處理類 --> <bean id="confirmCallback" class="com.itheima.rabbitmq.MsgSendConfirmCallBack"/> <!--定義rabbitTemplate對象操做能夠在代碼中方便發送消息--> <!-- confirm-callback="confirmCallback" 表示:消息失敗回調 --> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallback"/>
消息確認回調方法以下:
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback { public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息確認成功...."); } else { //處理丟失的消息 System.out.println("消息確認失敗," + cause); } } }
咱們手動建一個spring_queue隊列.並測試以下:
@Test public void queueTest(){ //路由鍵與隊列同名 rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息。"); }
查看結果
消息發送失敗回調
connectionFactory 中啓用回調:
!-- publisher-returns="true" 表示:啓用了失敗回調 --> <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}" virtual-host="${rabbitmq.virtual-host}" publisher-returns="true" />
配置消息失敗回調方法以下:
<!-- 消息失敗回調類 --> <bean id="sendReturnCallback" class="com.itheima.rabbitmq.MsgSendReturnCallback"/> <!-- return-callback="sendReturnCallback" 表示:消息失敗回調 ,同時需配置mandatory="true",不然消息則丟失--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" confirm-callback="confirmCallback" return-callback="sendReturnCallback" mandatory="true"/>
注意:同時需配置mandatory="true",不然消息則丟失
消息失敗回調方法以下:
public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback { public void returnedMessage(Message message, int i, String s, String s1, String s2) { String msgJson = new String(message.getBody()); System.out.println("Returned Message:"+msgJson); } }
模擬消息發送失敗,功能測試以下:
@Test public void testFailQueueTest() throws InterruptedException { //exchange 正確,queue 錯誤 ,confirm被回調, ack=true; return被回調 replyText:NO_ROUTE amqpTemplate.convertAndSend("test_fail_exchange", "", "測試消息發送失敗進行確認應答。"); }
失敗回調結果以下:
場景:業務處理伴隨消息的發送,業務處理失敗(事務回滾)後要求消息不發送。rabbitmq 使用調用者的外部事務,一般是首選,由於它是非侵入性的(低耦合)。
外部事務的配置
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" mandatory="true" channel-transacted="true" /> <!--平臺事務管理器--> <bean id="transactionManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"> <property name="connectionFactory" ref="connectionFactory"/> </bean>
測試類或者測試方法上加入@Transactional註解
@Test @Transactional //開啓事務 //@Rollback(false)//在測試的時候,須要手動的方式制定回滾的策略 public void queueTest2(){ //路由鍵與隊列同名 rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--02222222222222222222。"); System.out.println("----------------dosoming:能夠是數據庫的操做,也能夠是其餘業務類型的操做---------------"); //模擬業務處理失敗 //System.out.println(1/0); rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--02。"); }
運行看結果
由於是測試類,因此spring自動回滾了,須要咱們手動禁止回滾
@Test @Transactional //開啓事務 @Rollback(false)//在測試的時候,須要手動的方式制定回滾的策略 public void queueTest2(){ //路由鍵與隊列同名 rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--02222222222222222222。"); System.out.println("----------------dosoming:能夠是數據庫的操做,也能夠是其餘業務類型的操做---------------"); //模擬業務處理失敗 //System.out.println(1/0); rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--02。"); }
再執行一次測試類,查看結果
咱們手動弄個異常,再試一次
@Test @Transactional //開啓事務 @Rollback(false)//在測試的時候,須要手動的方式制定回滾的策略 public void queueTest2(){ //路由鍵與隊列同名 rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--02222222222222222222。"); System.out.println("----------------dosoming:能夠是數據庫的操做,也能夠是其餘業務類型的操做---------------"); //模擬業務處理失敗 System.out.println(1/0); rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--02。"); }
看結果
這裏4條是由於我手動禁止了回滾
結果應該是沒問題的,就不測試了
消息中心的消息追蹤須要使用Trace實現,Trace是Rabbitmq用於記錄每一次發送的消息,方便使用Rabbitmq的開發者調試、排錯。可經過插件形式提供可視化界面。
Trace啓動後會自動建立系統Exchange:amq.rabbitmq.trace ,每一個隊列會自動綁定該Exchange,綁定後發送到隊列的消息都會記錄到Trace日誌。
查看插件列表
rabbitmq-plugins list
rabbitmq啓用trace插件
rabbitmq-plugins enable rabbitmq_tracing
命令集 | 描述 |
---|---|
rabbitmq-plugins list | 查看插件列表 |
rabbitmq-plugins enable rabbitmq_tracing | rabbitmq啓用trace插件 |
rabbitmqctl trace_on | 打開trace的開關 |
rabbitmqctl trace_on -p test01 | 打開trace的開關(itcast爲須要日誌追蹤的vhost) |
rabbitmqctl trace_off | 關閉trace的開關 |
rabbitmq-plugins disable rabbitmq_tracing | rabbitmq關閉Trace插件 |
rabbitmqctl set_user_tags test01 administrator | 只有administrator的角色才能查看日誌界面 |
安裝插件並開啓 trace_on 以後,會發現多個 exchange:amq.rabbitmq.trace ,類型爲:topic。
首先從新登陸管理控制檯
第一步:發送消息
@Test public void queueTest3() throws InterruptedException { rabbitTemplate.convertAndSend("spring_queue", "只發隊列spring_queue的消息--01。"); }
第二步:查看trace
第三步:點擊Tracing查看Trace log files
第四步:點擊itest-log.log確認消息軌跡正確性