RabbitMQ雖然有對隊列及消息等的一些持久化設置,但其實光光只是這一個是不可以保障數據的可靠性的,下面咱們提出這樣的質疑:服務器
(1)RabbitMQ生產者是不知道本身發佈的消息是否已經正確達到服務器呢,若是中間發生網絡異常等狀況呢?消息必然會丟失!網絡
(2)RabbitMQ若是沒有設置隊列持久化,RabbitMQ服務器重後隊列的元數據會丟失,消息天然也會丟失!分佈式
(3)RabbitMQ若是消費者設置自動確認,即autoAck爲true,那麼無論消費者發生什麼狀況,該消息會自動從隊列中移除,實際上消費者有可能掛掉,消息必然會丟失!微服務
(4)RabbitMQ中的消息若是沒有匹配到隊列時,那麼消息也會丟失!源碼分析
針對前言中的第(4)個問題,咱們能夠經過設置mandotory參數與AE備份交換器來解決性能
一、mandotory參數學習
1)當爲true時,交換器沒法根據自身的類型和路由鍵找到一個符合條件的隊列,此時RabbitMQ會調用Basic.Return命令將消息返回給生產者,消息將不會丟失ui
2)當爲false時,消息將會被直接丟棄。spa
3)RabbitMQ經過addReturnListener添加ReturnLisener監聽器監聽獲取沒有被正確路由到合適隊列的消息。orm
若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。
channel.basicPublish(EXCHANGE NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes()); channel.addReturnListener(new ReturnListener(){ public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException { String message = new String(body); System.out.println("Basic.Return 返回的結果是: " + message); } });
二、AE備份交換器
Alternate Exchange,簡稱AE,不設置mandatory參數,那麼消息將會被丟失,設置mandatory參數的話,須要添加ReturnListner監聽器,增長複雜代碼,若是既不想增長代碼又不想消息丟失,則使用AE,將沒有被路由的消息存儲於RabbitMQ中。當mandatory參數用AE一塊兒使用時,mandatory將失效。在介紹AE以前,也認識RabbitMQ對於消息的過時時間TTL設置以及隊列的過時時間TTL設置
2.1 TTL過時時間設置
能夠對隊列設置TTL與消息設置TTL,其中消息設置TTL常常用於死信隊列、延遲隊列等高級應用中。
1)設置消息TTL
設置TTL過時時間通常有兩種當時:一是經過隊列屬性,對隊列中全部消息設置相同的TTL。二就是對消息自己單獨設置,每條消息TTL不一樣。若是一塊兒使用時候,TTL小的爲準,當一旦超過設置的TTL時間時,就會變成「死信」。
方式一:針對每條消息設置TTL是經過增長expiration的屬性參數實現的,不可能像方式二同樣掃描整個隊列再判斷是否過時,只有當該消息即將被消費時再斷定是否過時便可刪除,也就是消息即便已通過期,但不必定立馬被刪除!
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); // 持久化消息 builder deliveryMode(2); // 設置 TTL=60000ms builder expiration( 60000 ); AMQP.BasicProperties properties = builder. build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "ttlTestMessage".getBytes());
方式二:經過隊列屬性設置消息TTL是增長x-message-ttl參數實現的,只須要掃描整個隊列頭部便可當即刪除,也就是消息一旦過時就會被刪除!
Map<String, Object> argss = new HashMap<String , Object>(); argss.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss) ;
2)設置隊列TTL
經過在隊列中添加參數x-message-ttl參數實現,設置隊列被自動刪除前處於未被使用狀態的時間,注意是隊列的使用狀態,並非消息是否被消費的狀態
設置ttl=30min的隊列,時間一到RabbitMQ會保證隊列被刪除,可是不會保證刪除的速度有多快。
Map<String, Object> args = new HashMap<String, Object>{); args.put("x-expires", 1800000); channel.queueDeclare("myqueue", false, false, false, args);
2.2 AE備份交換器的使用
聲明交換器的時候,添加alternate-exchange參數實現,或經過策略實現。前者優先級高。從代碼角度須要如下三個步驟,具體代碼以下:
Map<String, Object> args = new HashMap<String, Object>(); args.put("a1ternate-exchange", "myAe"); channe1.exchangeDec1are("norma1Exchange", "direct", true, fa1se, args); channe1.exchangeDec1are("myAe", "fanout", true, fa1se, nu11) ; channe1.queueDec1are( "norma1Queue", true, fa1se, fa1se, nu11); channe1.queueB nd("norma1Queue", "norma1Exchange", "norma1Key"); channe1.queueDec1are("unroutedQueue", true, fa1se, fa1se, nu11);
1)聲明normalExchange類型爲direct的交換器、類型爲fanout的myAe備份交換器;而且normalExchange的備份交換器爲myAe(備份交換器建議使用fanout類型交換器)
2)聲明normalQueue隊列,聲明unrouteQueue隊列;
3)經過路由鍵normalKey綁定normalExchange與normalQueue,不適用路由鍵綁定unrouteQueue與myAe
針對前言中第(3)個問題,咱們須要在消費者消費完消息後手動進行確認,保證消息數據不丟失!
一、autoAck參數設置
1) 當autoAck參數爲false時,手動確認:
RabbitMQ會等待消費者顯式地回覆確認信號後從內存中移去消息(其實是先標示刪除標記,以後再刪除),這是通常推薦使用的方式,由於使用手動確認有足夠的時間處理消息,不須要擔憂消費者進程掛掉以後消息丟失問題。此時的消息就會分爲兩個部分:一是等待投遞給消費者的消息;二是已經投遞給消費者但尚未收到消費者確認信號的消息。
2) 當autoAck爲true時,自動確認:
RabbitMQ會自動隱式地回覆確認信號後從內存中移去消息, RabbitMQ不須要管消費者是否真正消費了這些消息,RabbitMQ會自動把發送出去的消息置爲確認,而後直接從內存中刪除。
二、從新投遞
問:若是選擇手動確認,即autoAck爲false時,消費者因爲某些緣由斷開了,那麼消息的確認會受到影響,那麼此時的消息會丟失嗎?
這也就是一開始提出來的問題,實際上是沒必要擔憂消息會被丟失,由於RabbitMQ若是一直沒收到消費者的確認信號,而且消費此消息的消費者已經斷開,則RabbitMQ會從新安排消息進入隊列等待給下一個消費者。也就是RabbitMQ不會設置消息的過時時間(固然也能夠設置過時時間,但與之有關係方式消息丟失的特性是死信隊列),它只判斷是否須要從新安排入隊列從新投遞,而判斷的惟一標準是消費此消息的消費者鏈接是否已經斷開,即RabbitMQ會容許消費一條消息的時間好久好久。
三、消費者拒絕消息
1)使用channel.basicReject方法,但只能拒絕一條。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:消息的惟一標識
requeue:表示是否能夠拒絕的消息從新存入隊列
2)使用channel.basicNack。不一樣於前者,此方法能夠批量拒絕。
void basicNack(long deliveryTag, boolean multiple , boolean requeue) throws IOException;
multiple:設置爲true則表示拒絕deliveryTag編號以前全部未被當前消費者確認的消息。
3)問:關鍵在於,消費者拒絕消費消息後怎麼處理?是丟棄,仍是從新回到隊列呢?
當參數requeue設置爲true時候,能夠從新進入隊列,投遞給下一個消費者。若是爲false,消息就會把隊列中消息立馬移除,再結合啓用「死信隊列」,防止消息丟失而且能夠分析異常狀況的發生。
最後,因爲剩下的兩種方式涉及的內容較多,因此在此將分紅兩篇繼續介紹,請看下篇
若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。