1.生產者丟數據spring
若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。api
生產者的消息沒有投遞到MQ中怎麼辦?從生產者弄丟數據這個角度來看,RabbitMQ提供transaction和confirm模式來確保生產者不丟消息。 transaction機制就是說,發送消息前,開啓事物(channel.txSelect()),而後發送消息,若是發送過程當中出現什麼異常,事物就會回滾(channel.txRollback()),若是發送成功則提交事 物(channel.txCommit())。 然而缺點就是吞吐量降低了。所以,按照博主的經驗,生產上用confirm模式的居多。一旦channel進入confirm模式,全部在該信道上面發佈的消息都將會被指派一個惟一的ID(從1開始),一旦 消息被投遞到全部匹配的隊列以後,rabbitMQ就會發送一個Ack給生產者(包含消息的惟一ID),這就使得生產者知道消息已經正確到達目的隊列了.若是rabiitMQ沒能處理該消息,則會發送一個N ack消息給你,你能夠進行重試操做。
下面演示一下confirm模式:服務器
//測試確認後回調 @Service public class HelloSender1 implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; public void send() { String context = "你好如今是 " + new Date() +""; System.out.println("HelloSender發送內容 : " + context); this.rabbitTemplate.setConfirmCallback(this); //exchange,queue 都正確,confirm被回調, ack=true //this.rabbitTemplate.convertAndSend("exchange","topic.message", context); //exchange 錯誤,queue 正確,confirm被回調, ack=false //this.rabbitTemplate.convertAndSend("fasss","topic.message", context); //exchange 正確,queue 錯誤 ,confirm被回調, ack=true; return被回調 replyText:NO_ROUTE //this.rabbitTemplate.convertAndSend("exchange","", context); //exchange 錯誤,queue 錯誤,confirm被回調, ack=false this.rabbitTemplate.convertAndSend("fasss","fass", context); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause); } }
2.消息隊列丟數據分佈式
處理消息隊列丟數據的狀況,通常是開啓持久化磁盤的配置。這個持久化配置能夠和confirm機制配合使用,你能夠在消息持久化磁盤後,再給生產者發送一個Ack信號。這樣,若是消息持久化磁盤 以前,rabbitMQ陣亡了,那麼生產者收不到Ack信號,生產者會自動重發。 那麼如何持久化呢,這裏順便說一下吧,其實也很容易,就下面兩步 ①、將queue的持久化標識durable設置爲true,則表明是一個持久的隊列 ②、發送消息的時候將deliveryMode=2 這樣設置之後,rabbitMQ就算掛了,重啓後也能恢復數據。在消息尚未持久化到硬盤時,可能服務已經死掉,這種狀況能夠經過引入mirrored-queue即鏡像隊列,但也不能保證消息百分百不丟 失(整個集羣都掛掉)
/** * 第二個參數:queue的持久化是經過durable=true來實現的。 * 第三個參數:exclusive:排他隊列,若是一個隊列被聲明爲排他隊列,該隊列僅對首次申明它的鏈接可見,並在鏈接斷開時自動刪除。這裏須要注意三點: 1. 排他隊列是基於鏈接可見的,同一鏈接的不一樣信道是能夠同時訪問同一鏈接建立的排他隊列; 2.「首次」,若是一個鏈接已經聲明瞭一個排他隊列,其餘鏈接是不容許創建同名的排他隊列的,這個與普通隊列不一樣; 3.即便該隊列是持久化的,一旦鏈接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用於一個客戶端發送讀取消息的應用場景。 * 第四個參數:自動刪除,若是該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。 * @param * @return * @Author zxj */ @Bean public Queue queue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 25000);//25秒自動刪除 Queue queue = new Queue("topic.messages", true, false, true, arguments); return queue; }
MessageProperties properties=new MessageProperties(); properties.setContentType(MessageProperties.DEFAULT_CONTENT_TYPE); properties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);//持久化設置 properties.setExpiration("2018-12-15 23:23:23");//設置到期時間 Message message=new Message("hello".getBytes(),properties); this.rabbitTemplate.sendAndReceive("exchange","topic.message",message);
3.消費者丟數據ide
啓用手動確認模式能夠解決這個問題 ①自動確認模式,消費者掛掉,待ack的消息迴歸到隊列中。消費者拋出異常,消息會不斷的被重發,直處處理成功。不會丟失消息,即使服務掛掉,沒有處理完成的消息會重回隊列,可是異常會讓 消息不斷重試。 ②手動確認模式 ③不確認模式,acknowledge="none" 不使用確認機制,只要消息發送完成會當即在隊列移除,不管客戶端異常仍是斷開,只要發送完就移除,不會重發。
若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。微服務
指定Acknowledge的模式: spring.rabbitmq.listener.direct.acknowledge-mode=manual,表示該監聽器手動應答消息 針對手動確認模式,有如下特色: 1.使用手動應答消息,有一點須要特別注意,那就是不能忘記應答消息,由於對於RabbitMQ來講處理消息沒有超時,只要不該答消息,他就會認爲仍在正常處理消息,致使消息隊列出現阻塞,影響 業務執行。 2.若是消費者來不及處理就死掉時,沒有響應ack時,會項目啓動後會重複發送一條信息給其餘消費者; 3.能夠選擇丟棄消息,這其實也是一種應答,以下,這樣就不會再次收到這條消息。 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); 4.若是消費者設置了手動應答模式,而且設置了重試,出現異常時不管是否捕獲了異常,都是不會重試的 5.若是消費者沒有設置手動應答模式,而且設置了重試,那麼在出現異常時沒有捕獲異常會進行重試,若是捕獲了異常不會重試。
重試機制:源碼分析
spring.rabbitmq.listener.simple.retry.max-attempts=5 最大重試次數 spring.rabbitmq.listener.simple.retry.enabled=true 是否開啓消費者重試(爲false時關閉消費者重試,這時消費端代碼異常會一直重複收到消息) spring.rabbitmq.listener.simple.retry.initial-interval=5000 重試間隔時間(單位毫秒) spring.rabbitmq.listener.simple.default-requeue-rejected=false 重試次數超過上面的設置以後是否丟棄(false不丟棄時須要寫相應代碼將該消息加入死信隊列)
若是設置了重試模式,那麼在出現異常時沒有捕獲異常會進行重試,若是捕獲了異常不會重試。性能
當出現異常時,咱們須要把這個消息回滾到消息隊列,有兩種方式:學習
//ack返回false,並從新回到隊列,api裏面解釋得很清楚測試
//ack返回false,並從新回到隊列,api裏面解釋得很清楚 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //拒絕消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
通過開發中的實際測試,當消息回滾到消息隊列時,這條消息不會回到隊列尾部,而是還是在隊列頭部,這時消費者會立馬又接收到這條消息進行處理,接着拋出異常,進行 回滾,如此反覆進行。這種狀況會致使消息隊列處理出現阻塞,消息堆積,致使正常消息也沒法運行。對於消息回滾到消息隊列,咱們但願比較理想的方式時出現異常的消息到 達消息隊列尾部,這樣既保證消息不會丟失,又保證了正常業務的進行,所以咱們採起的解決方案是,將消息進行應答,這時消息隊列會刪除該消息,同時咱們再次發送該消息 到消息隊列,這時就實現了錯誤消息進行消息隊列尾部的方案。
//手動進行應答 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //從新發送消息到隊尾 channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes(new Object()));
若是一個消息體自己有誤,會致使該消息體,一直沒法進行處理,而服務器中刷出大量無用日誌。解決這個問題能夠採起兩種方案:
1.一種是對於平常細緻處理,分清哪些是能夠恢復的異常,哪些是不能夠恢復的異常。對於能夠恢復的異常咱們採起第三條中的解決方案,對於不能夠處理的異常,咱們採用記錄日誌,直接丟棄該消息方案。
2.另外一種是咱們對每條消息進行標記,記錄每條消息的處理次數,當一條消息,屢次處理仍不能成功時,處理次數到達咱們設置的值時,咱們就丟棄該消息,但須要記錄詳細的日誌。
消息監聽內的異常處理有兩種方式:
1.內部catch後直接處理,而後使用channel對消息進行確認
2.配置RepublishMessageRecoverer將處理異常的消息發送到指定隊列專門處理或記錄。監聽的方法內拋出異常貌似沒有太大用處。由於拋出異常就算是重試也很是有可能會繼續出現異常,當重試次數完了以後消息就只有重啓應用才能接收到了,頗有可能致使消息消費不及時。固然能夠配置RepublishMessageRecoverer來解決,可是萬一RepublishMessageRecoverer發送失敗了呢。。那就可能形成消息消費不及時了。因此即便須要將處理出現異常的消息統一放到另外隊列去處理,我的建議兩種方式:
①catch異常後,手動發送到指定隊列,而後使用channel給rabbitmq確認消息已消費
②給Queue綁定死信隊列,使用nack(requque爲false)確認消息消費失敗
若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。