RabbitMQ特性

使用默認的exchange

channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

 

若是用空字符串去申明一個exchange,那麼系統就會使用"amq.direct"這個exchange。咱們在建立一個queue的時候,默認的都會有一個和新建queue同名的routingKey綁定到這個默認的exchange上去服務器

在方法中的第一個參數是須要輸入一個exchange。在RabbitMQ中,全部的消息都必需要經過exchange發送到各個queue裏面去。發送者發送消息,其實也就是把消息放到exchange中去。而exchange知道應該把消息放到哪裏去。在這個方法中,咱們沒有輸入exchange的名稱,只是定義了一個空的echange,而在第二個參數routeKey中輸入了咱們目標隊列的名稱。RabbitMQ會幫我定義一個默認的exchange,這個exchange會把消息直接投遞到咱們輸入的隊列中,這樣服務端只須要直接去這個定義了的隊列中獲取消息就能夠了網絡

 

信息確認

RabbitMQ有兩種應答模式,自動和手動。這也是AMQP協議所推薦的。這在point-to-point和broadcast都是同樣的。負載均衡

自動應答-當RabbitMQ把消息發送到接收端,接收端把消息出隊列的時候就自動幫你發應答消息給服務。測試

手動應答-須要咱們開發人員手動去調用ack方法去告訴服務已經收到。大數據

文檔推薦在大數據傳輸中,若是對個別消息的丟失不是很敏感的話選用自動應答比較理想,而對於那些一個消息都不能丟的場景,須要選用手動應答,也就是說在正確處理完之後才應答。若是選擇了自動應答,那麼消息重發這個功能就沒有了。atom

點對點模式spa

也就是一發一接的模式,不適用發佈/訂閱這種廣播模式設計

//autoAck 設置false,消費端掛掉,信息不會丟失,server會re-queue
    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
 //向服務器發送應答
          channel.basicAck(envelope.getDeliveryTag(), false);

 

在RabbitMQ中,爲了避免讓消息丟失,它提供了消息應答的概念。當消費者獲取到了一個消息之後,須要給RabbitMQ服務一個應答的消息,告知服務我已經收到或正確處理了該消息。那麼RabbitMQ能夠放心的在隊列中刪除該消息code

 

隊列持久化

 

//durable 設置true,queue持久化,server重啓,此queue不丟失
    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

方法的第四的參數autoDelete,通常都會輸入false。文檔描述這個參數若是是true的話,意思是:若是這個queue再也不使用(沒有被訂閱)的話,server就會刪除它。在個人測試過程當中,只要是鏈接改queue的全部接收者都斷開鏈接的話,該queue就會被刪除,即便裏面還有沒有處理的消息。RabbitMQ的重啓也一樣會刪除他們。若是輸入的是false,那與之相連的客戶端都斷開鏈接的話,服務是不會刪除這個隊列的,隊列中的消息也就會存在。發送端在沒有客戶端鏈接的時候也能夠把消息放入改隊列,客戶端起來的時候,就會獲得這些消息。可是若是RabbitMQ服務重啓的話,該隊列就沒有了,裏面的消息天然也就沒有了。server

第三個參數是exclusive,文檔描述說,若是是true,那麼申明這個queue的connection斷了,那麼這個隊列就被刪除了,包括裏面的消息。

第二個參數durable,文檔描述說,若是是true,則表明是一個持久的隊列,那麼在服務重啓後,也會存在。由於服務會把持久化的queue存放在硬盤上,放服務重啓的時候,會從新申明這個queue。固然必須是在autoDelete和exclusive都爲false的時候。隊列是能夠被持久化,可是裏面的消息是否爲持久化那還要看消息的持久化設置。也就是說,若是重啓以前那個queue裏面還有沒有發出去的消息的話,重啓以後那隊列裏面是否是還存在原來的消息,這個就要取決於發送者在發送消息時對消息的設置了(信息持久化)。

信息持久化

  //BasicProperties設置MessageProperties.PERSISTENT_TEXT_PLAIN,信息持久化
    channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
channel.BasicPublish("", "TaskQueue", properties, bytes);

 

DeliveryMode等於2就說明這個消息是persistent的。1是默認是,不是持久的。在接收者接收消息並處理的時候會出現各類各樣的問題:拋出異常致使與RabbitMQ鏈接斷開,程序掛掉,網絡問題等等。每每在出現這些問題的時候咱們一般都但願隊列能保存這些消息,並在程序再次起來的時候可以從新處理,或若是是負載均衡的模式下,可以把這個消息從新分配給其餘的同等的接受者來處理。這一樣也是RabbitMQ對消息持久化的一種功能。這咱們在消息的傳輸控制中作詳細的說明

消息的拒收

拒收,是接收端在收到消息的時候響應給RabbitMQ服務的一種命令,告訴服務器不該該由我處理,或者拒絕處理,扔掉。接收端在發送reject命令的時候能夠選擇是否要從新放回queue中。若是沒有其餘接收者監控這個queue的話,要注意一直無限循環發送的危險。

BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
channel.BasicReject(ea.DeliveryTag, false);

BasicReject方法第一個參數是消息的DeliveryTag,對於每一個Channel來講,每一個消息都會有一個DeliveryTag,通常用接收消息的順序來表示:1,2,3,4 等等。第二個參數是是否放回queue中,requeue。

BasicReject一次只能拒絕接收一個消息,而BasicNack方法能夠支持一次0個或多個消息的拒收,而且也能夠設置是否requeue。

channel.BasicNack(3, true, false);

在第一個參數DeliveryTag中若是輸入3,則消息DeliveryTag小於等於3的,這個Channel的,都會被拒收。

消息的QoS

QoS = quality-of-service, 顧名思義,服務的質量。一般咱們設計系統的時候不能徹底排除故障或保證說沒有故障,而應該設計有完善的異常處理機制。在出現錯誤的時候知道在哪裏出現什麼樣子的錯誤,緣由是什麼,怎麼去恢復或者處理纔是真正應該去作的。在接收消息出現故障的時候咱們能夠經過RabbitMQ重發機制來處理。重發就有重發次數的限制,有些時候你不可能不限次數的重發,這取決於消息的大小,重要程度和處理方式。

甚至QoS是在接收端設置的。發送端沒有任何變化,接收端的代碼也比較簡單,只須要加以下代碼:

channel.BasicQos(0, 1, false);

代碼第一個參數是可接收消息的大小的,可是彷佛在客戶端2.8.6版本中它必須爲0,即便:不受限制。若是不輸0,程序會在運行到這一行的時候報錯,說尚未實現不爲0的狀況。第二個參數是處理消息最大的數量。舉個例子,若是輸入1,那若是接收一個消息,可是沒有應答,則客戶端不會收到下一個消息,消息只會在隊列中阻塞。若是輸入3,那麼能夠最多有3個消息不該答,若是到達了3個,則發送端發給這個接收方得消息只會在隊列中,而接收方不會有接收到消息的事件產生。總結說,就是在下一次發送應答消息前,客戶端能夠收到的消息最大數量。第三個參數則設置了是否是針對整個Connection的,由於一個Connection能夠有多個Channel,若是是false則說明只是針對於這個Channel的。

這種數量的設置,也爲咱們在多個客戶端監控同一個queue的這種負載均衡環境下提供了更多的選擇。

 //對服務器確認以前,一次只接受一條信息
channel.basicQos(1);

 

mandatory標誌的做用

在生產者經過channel的basicPublish方法發佈消息時,一般有幾個參數須要設置,爲此咱們有必要了解清楚這些參數表明的具體含義及其做用,查看Channel接口,會發現存在3個重載的basicPublish方法

1 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;  
2   
3 void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)  
4             throws IOException;  
5   
6 void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)  
7             throws IOException;  

 

他們共有的參數分別是:​ exchange:交換機名稱​ routingKey:路由鍵​ props:消息屬性字段,好比消息頭部信息等等​ body:消息主體部分

mandatory和immediate是AMQP協議中basic.pulish方法中的兩個標誌位,它們都有當消息傳遞過程當中不可達目的地時將消息返回給生產者的功能。具體區別在於:

mandatory標誌位

當mandatory標誌位設置爲true時,若是exchange根據自身類型和消息routeKey沒法找到一個符合條件的queue,那麼會調用basic.return方法將消息返還給生產者;當mandatory設爲false時,出現上述情形broker會直接將消息扔掉。

immediate標誌位

當immediate標誌位設置爲true時,若是exchange在將消息route到queue(s)時發現對應的queue上沒有消費者,那麼這條消息不會放入隊列中。當與消息routeKey關聯的全部queue(一個或多個)都沒有消費者時,該消息會經過basic.return方法返還給生產者。

歸納來講,mandatory標誌告訴服務器至少將該消息route到一個隊列中,不然將消息返還給生產者;immediate標誌告訴服務器若是該消息關聯的queue上有消費者,則立刻將消息投遞給它,若是全部queue都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。

相關文章
相關標籤/搜索