rabbit - producer的confirm和consumer的ack模式

本篇和你們分享的是關於rabbit的生產和消費方的一些實用的操做;正如文章標題,主要內容如producer的confirm和consumer的ack,這二者使用的模式都是用來保證數據完整性,防止數據丟失。spring

  • producer的confirm模式
  • consumer的ack模式

producer的confirm模式

首先,有這樣一種業務場景1:a系統在作活動前,須要給用戶的手機發送一條活動內容短信但願用戶來參加,由於用戶量有點大,因此經過往短信mq中插入數據方式,讓短信服務來消費mq發短信;編程

此時插入mq消息的服務爲了保證給全部用戶發消息,而且要在短期內插入完成(所以用到了異步插入方式(快速)),咱們就須要知道每次插入mq是否成功,若是不成功那咱們能夠收集失敗的信息後補發(所以confirm模式排上了用場);如圖設計:springboot

 在springboot中可使用基於amqp封裝的工廠類來開啓confirm模式,而後經過RabbitTemplate模板來設置回調函數,以下代碼:網絡

 1     ///region producer生產 - confirm模式
 2 
 3     public RabbitTemplate getRabbitTemplate(RabbitTemplate.ConfirmCallback confirmCallback) {
 4         return this.getRabbitTemplate(this.connectionFactory(), confirmCallback);
 5     }
 6 
 7     public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory, RabbitTemplate.ConfirmCallback confirmCallback) {
 8         RabbitTemplate template = new RabbitTemplate(connectionFactory);
 9         //product開啓confirm模式
10         connectionFactory.setPublisherConfirms(true);
11         //設置confirm回調處理
12         template.setConfirmCallback(confirmCallback);
13         return template;
14     }
15     ///endregion

這裏經過RabbitTemplate.ConfirmCallback函數編程來傳遞咱們自定義的回調方法,以下收集confirm返回的結果信息:異步

1         RabbitUtil rabbitUtil = new RabbitUtil(this.getFirstNode().getLink());
2         RabbitTemplate template = rabbitUtil.getRabbitTemplate((a, b, c) -> {
3             System.out.println("firstNodeTpl - ConfirmCallback的Id:" + a.getId() + ";狀態:" + b + ";信息:" + c);
4         });

最後再經過RabbitTemplate實例的convertAndSend方法發送mq信息,咱們可以在日誌中看到以下記錄的信息:函數

這裏的狀態true:表示send成功,false:表示send失敗;一般false的時候信息c會有響應的錯誤提示,這裏把網絡斷開,以下錯誤提示:fetch

consumer的ack模式

再來,有這樣一種場景2:短信服務去消費mq隊列信息時,假若服務調用的運營商發送短信接口異常了(短信運營商接口欠費),咱們此時的短信是發送失敗的,用戶也收不到短信,可是在默認(默認開啓ack)前提下mq消息已經被消費了rabbit中沒有記錄了(kafka例外);想要mq消息在業務邏輯異常時還存在,那麼可使用ack方式;this

在springboot中可使用基於amqp封裝的工廠類關閉自動ack模式,改成手動ack方式;只有當業務代碼流程走完後,最後經過代碼設置ack標識,來通知rabbit消息能夠丟棄了;若是設置了手動模式後,又沒有提交ack標識,那麼mq中的消息一直存在沒法釋放(每次consumer消費後,rabbit會把noack的消息重複放入隊列中):spa

 1     ///region consumer監聽 - 手動ack
 2     public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
 3         return this.listenerContainerFactory(this.connectionFactory());
 4     }
 5 
 6     public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory) {
 7         SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
 8         factory.setConnectionFactory(connectionFactory);
 9         //代碼手動ack
10         factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
11         //開啓消費者數量
12         factory.setConcurrentConsumers(2);
13         //每次接受數據量,默認250
14         factory.setPrefetchCount(300);
15         return factory;
16     }
17     ///endregion

經過鏈接工廠設置手動ack方式,而後獲取mq消息後,走完正常業務邏輯,最後再手動通知ack釋放消息,以下:設計

1     @RabbitListener(containerFactory = "firstNodeListener", queues = {"${shenniu.rabbits.firstNode.queue}"})
2     private void firstNodeListener(String msg, Channel channel, Message message) {
3         try {
4             long deliverTag = message.getMessageProperties().getDeliveryTag();
5             System.out.println("firstNodeListener - 消費消息 [" + deliverTag + "] - " + msg);
6             channel.basicAck(deliverTag, true);
7         } catch (Exception ex) {
8         }
9     }

這裏ack主要根據mq消息的惟一編號(deliverTag)來通知;若是咱們不設置ack確認,那麼消息狀態會是這樣以下rabbit管理後臺:

相關文章
相關標籤/搜索