本篇和你們分享的是關於rabbit的生產和消費方的一些實用的操做;正如文章標題,主要內容如producer的confirm和consumer的ack,這二者使用的模式都是用來保證數據完整性,防止數據丟失。spring
首先,有這樣一種業務場景1:a系統在作活動前,須要給用戶的手機發送一條活動內容短信但願用戶來參加,由於用戶量有點大,因此經過往短信mq中插入數據方式,讓短信服務來消費mq發短信;編程
此時插入mq消息的服務爲了保證給全部用戶發消息,而且要在短期內插入完成(所以用到了異步插入方式(快速)),咱們就須要知道每次插入mq是否成功,若是不成功那咱們能夠收集失敗的信息後補發(所以confirm模式排上了用場);如圖設計:springboot
在springboot中可使用基於amqp封裝的工廠類來開啓confirm模式,而後經過RabbitTemplate模板來設置回調函數,以下代碼:網絡
1 ///region producer生產 - confirm模式 2 3 public RabbitTemplate getRabbitTemplate(RabbitTemplate.ConfirmCallback confirmCallback) { 4 return this.getRabbitTemplate(this.connectionFactory(), confirmCallback); 5 } 6 7 public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory, RabbitTemplate.ConfirmCallback confirmCallback) { 8 RabbitTemplate template = new RabbitTemplate(connectionFactory); 9 //product開啓confirm模式 10 connectionFactory.setPublisherConfirms(true); 11 //設置confirm回調處理 12 template.setConfirmCallback(confirmCallback); 13 return template; 14 } 15 ///endregion
這裏經過RabbitTemplate.ConfirmCallback函數編程來傳遞咱們自定義的回調方法,以下收集confirm返回的結果信息:異步
1 RabbitUtil rabbitUtil = new RabbitUtil(this.getFirstNode().getLink()); 2 RabbitTemplate template = rabbitUtil.getRabbitTemplate((a, b, c) -> { 3 System.out.println("firstNodeTpl - ConfirmCallback的Id:" + a.getId() + ";狀態:" + b + ";信息:" + c); 4 });
最後再經過RabbitTemplate實例的convertAndSend方法發送mq信息,咱們可以在日誌中看到以下記錄的信息:函數
這裏的狀態true:表示send成功,false:表示send失敗;一般false的時候信息c會有響應的錯誤提示,這裏把網絡斷開,以下錯誤提示:fetch
再來,有這樣一種場景2:短信服務去消費mq隊列信息時,假若服務調用的運營商發送短信接口異常了(短信運營商接口欠費),咱們此時的短信是發送失敗的,用戶也收不到短信,可是在默認(默認開啓ack)前提下mq消息已經被消費了rabbit中沒有記錄了(kafka例外);想要mq消息在業務邏輯異常時還存在,那麼可使用ack方式;this
在springboot中可使用基於amqp封裝的工廠類關閉自動ack模式,改成手動ack方式;只有當業務代碼流程走完後,最後經過代碼設置ack標識,來通知rabbit消息能夠丟棄了;若是設置了手動模式後,又沒有提交ack標識,那麼mq中的消息一直存在沒法釋放(每次consumer消費後,rabbit會把noack的消息重複放入隊列中):spa
1 ///region consumer監聽 - 手動ack 2 public SimpleRabbitListenerContainerFactory listenerContainerFactory() { 3 return this.listenerContainerFactory(this.connectionFactory()); 4 } 5 6 public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory) { 7 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 8 factory.setConnectionFactory(connectionFactory); 9 //代碼手動ack 10 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); 11 //開啓消費者數量 12 factory.setConcurrentConsumers(2); 13 //每次接受數據量,默認250 14 factory.setPrefetchCount(300); 15 return factory; 16 } 17 ///endregion
經過鏈接工廠設置手動ack方式,而後獲取mq消息後,走完正常業務邏輯,最後再手動通知ack釋放消息,以下:設計
1 @RabbitListener(containerFactory = "firstNodeListener", queues = {"${shenniu.rabbits.firstNode.queue}"}) 2 private void firstNodeListener(String msg, Channel channel, Message message) { 3 try { 4 long deliverTag = message.getMessageProperties().getDeliveryTag(); 5 System.out.println("firstNodeListener - 消費消息 [" + deliverTag + "] - " + msg); 6 channel.basicAck(deliverTag, true); 7 } catch (Exception ex) { 8 } 9 }
這裏ack主要根據mq消息的惟一編號(deliverTag)來通知;若是咱們不設置ack確認,那麼消息狀態會是這樣以下rabbit管理後臺: