RabbitMQ學習(二):Java使用RabbitMQ要點知識

轉  https://blog.csdn.net/leixiaotao_java/article/details/78924863java

一、maven依賴

 

  1. <dependency>
  2. <groupId>commons-lang</groupId>
  3. <artifactId>commons-lang</artifactId>
  4. <version> 2.3</version>
  5. </dependency>
  6.  
  7. <dependency>
  8. <groupId>com.rabbitmq</groupId>
  9. <artifactId>amqp-client</artifactId>
  10. <version> 3.4.1</version>
  11. </dependency>

 

二、RabbitMQ重要方法介紹(基本經常使用的)

2.一、建立鏈接

  1. // 建立鏈接工廠
  2. ConnectionFactory cf = new ConnectionFactory();
  3. // 設置rabbitmq服務器IP地址
  4. cf.setHost( "*.*.*.*");
  5. // 設置rabbitmq服務器用戶名
  6. cf.setUsername( "***");
  7. // 設置rabbitmq服務器密碼
  8. cf.setPassword( "***");
  9. // 指定端口,默認5672
  10. cf.setPort(AMQP.PROTOCOL.PORT);
  11. // 獲取一個新的鏈接
  12. connection = cf.newConnection();
  13. // 建立一個通道
  14. channel = connection.createChannel();
  15.  
  16. //關閉管道和鏈接
  17. channel.close();
  18. connection.close();

 

2.二、聲明隊列

  1. /**
  2. * 申明一個隊列,若是這個隊列不存在,將會被建立
  3. * @param queue 隊列名稱
  4. * @param durable 持久性:true隊列會再重啓事後存在,可是其中的消息不會存在。
  5. * @param exclusive 是否只能由建立者使用,其餘鏈接不能使用。
  6. * @param autoDelete 是否自動刪除(沒有鏈接自動刪除)
  7. * @param arguments 隊列的其餘屬性(構造參數)
  8. * @return Queue.DeclareOk:宣告隊列的聲明確認方法已成功聲明。
  9. * @throws java.io.IOException if an error is encountered
  10. */
  11. channel.queueDeclare( "testQueue", true, false, false, null);

 

此方法通常由Producer調用建立消息隊列。若是由Consumer建立隊列,有可能Producer發佈消息的時候Queue尚未被建立好,會形成消息丟失的狀況。web

 

2.三、聲明Exchange

  1. /**
  2. * 聲明一個 exchange.
  3. * @param exchange 名稱
  4. * @param type exchange type:direct、fanout、topic、headers
  5. * @param durable 持久化
  6. * @param autoDelete 是否自動刪除(沒有鏈接自動刪除)
  7. * @param arguments 隊列的其餘屬性(構造參數)
  8. * @return 成功地聲明瞭一個聲明確認方法來指示交換。
  9. * @throws java.io.IOException if an error is encountered
  10. */
  11. channel.exchangeDeclare( "leitao","topic", true,false,null);

 

2.四、將queue和Exchange進行綁定(Binding)

  1. /**
  2. * 將隊列綁定到Exchange,不須要額外的參數。
  3. * @param queue 隊列名稱
  4. * @param exchange 交換機名稱
  5. * @param routingKey 路由關鍵字
  6. * @return Queue.BindOk:若是成功建立綁定,則返回綁定確認方法。
  7. * @throws java.io.IOException if an error is encountered
  8. */
  9. channel.queueBind( "testQueue", "leitao", "testRoutingKey");

 

2.五、發佈消息

  1. /**
  2. * 發佈一條不用持久化的消息,且設置兩個監聽。
  3. * @param exchange 消息交換機名稱,空字符串將使用直接交換器模式,發送到默認的Exchange=amq.direct。此狀態下,RoutingKey默認和Queue名稱相同
  4. * @param routingKey 路由關鍵字
  5. * @param mandatory 監聽是否有符合的隊列
  6. * @param immediate 監聽符合的隊列上是有至少一個Consumer
  7. * @param BasicProperties 設置消息持久化:MessageProperties.PERSISTENT_TEXT_PLAIN是持久化;MessageProperties.TEXT_PLAIN是非持久化。
  8. * @param body 消息對象轉換的byte[]
  9. * @throws java.io.IOException if an error is encountered
  10. */
  11. channel.basicPublish( "",queueName,true,false,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));

exchange的值爲空字符串或者是amq.direct時,此時的交換器類型默認是direct類型能夠不用單獨聲明Exchange,也不用單獨進行Binding,系統默認將queue名稱做爲RoutingKey進行了綁定。服務器

 

兩個傳入參數的含義網絡

mandatory異步

當mandatory標誌位設置爲true時,若是exchange根據自身類型和消息routeKey沒法找到一個符合條件的queue,那麼會調用basic.return方法將消息返回給生產者(Basic.Return + Content-Header + Content-Body);當mandatory設置爲false時,出現上述情形broker會直接將消息扔掉。maven

immediateide

當immediate標誌位設置爲true時,若是exchange在將消息路由到queue(s)時發現對於的queue上沒有消費者,那麼這條消息不會放入隊列中。當與消息routeKey關聯的全部queue(一個或者多個)都沒有消費者時,該消息會經過basic.return方法返還給生產者。函數

歸納來講,mandatory標誌告訴服務器至少將該消息route到一個隊列中,不然將消息返還給生產者;immediate標誌告訴服務器若是該消息關聯的queue上有消費者,則立刻將消息投遞給它,若是全部queue都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。工具

注意:在RabbitMQ3.0之後的版本里,去掉了immediate參數的支持,發送帶immediate=true標記的publish會返回以下錯誤:性能

com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error;protocol method: #method<connection.close>(reply-code=540, reply-text=NOT_IMPLEMENTED - immediate=true, class-id=60, method-id=40)。

爲何取消支持:immediate標記會影響鏡像隊列性能,增長代碼複雜性,並建議採用「TTL」和「DLX」等方式替代。

 

2.6、接收消息

  1. /**
  2. * 設置消費批量投遞數目,一次性投遞10條消息。當消費者未確認消息累計達到10條時,rabbitMQ將不會向此Channel上的消費者投遞消息,直到未確認數小於10條再投遞
  3. * @param prefetchCount 投遞數目
  4. * @param global 是否針對整個Channel。true表示此投遞數是給Channel設置的,false是給Channel上的Consumer設置的。
  5. * @throws java.io.IOException if an error is encountered
  6. */
  7. channel.basicQos( 10,false);
  8. //整個傳輸管道最多15條,具體分到每一個消費者身上又不能大於10條
  9. channel.basicQos( 15,true);
  10.  
  11. /**
  12. * 開始一個非局部、非排他性消費, with a server-generated consumerTag.
  13. * 執行這個方法會回調handleConsumeOk方法
  14. * @param queue 隊列名稱
  15. * @param autoAck 是否自動應答。false表示consumer在成功消費事後必需要手動回覆一下服務器,若是不回覆,服務器就將認爲此條消息消費失敗,繼續分發給其餘consumer。
  16. * @param callback 回調方法類,通常爲本身的Consumer類
  17. * @return 由服務器生成的consumertag
  18. * @throws java.io.IOException if an error is encountered
  19. */
  20. channel.basicConsume(queueName, false, Consumer);

 

2.七、Consumer處理消息

  1. /**
  2. * 消費者收到消息的回調函數
  3. * @param consumerTag 消費者標籤
  4. * @param envelope 消息的包裝數據
  5. * @param properties 消息的內容頭數據
  6. * @param body 消息對象的byte[]
  7. * @throws IOException
  8. */
  9. void handleDelivery(String consumerTag,
  10. Envelope envelope,
  11. AMQP.BasicProperties properties,
  12. byte[] body)
  13. throws IOException;

 

三、Producer消息確認機制

3.一、什麼是生產者消息確認機制?

沒有消息確認模式時,生產者不知道消息是否是已經到達了Broker服務器,這對於一些業務嚴謹的系統來講將是災難性的。消息確認模式能夠採用AMQP協議層面提供的事務機制實現(此文沒有這種實現方式),可是會下降RabbitMQ的吞吐量。RabbitMQ自身提供了一種更加高效的實現方式:confirm模式。

消息生產者經過調用Channel.confirmSelect()方法將Channel信道設置成confirm模式。一旦信道被設置成confirm模式,該信道上的全部消息都會被指派一個惟一的ID(從1開始),一旦消息被對應的Exchange接收,Broker就會發送一個確認給生產者(其中deliveryTag就是此惟一的ID),這樣消息生產者就知道消息已經成功到達Broker。

confirm模式最大的好處在於他是異步的,一旦發佈一條消息,生產者應用程序就能夠在等信道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後,生產者應用即可以經過回調方法來處理該確認消息,若是RabbitMQ由於自身內部錯誤致使消息丟失,就會發送一條nack消息,生產者應用程序一樣能夠在回調方法中處理該nack消息。

在channel 被設置成 confirm 模式以後,全部被 publish 的後續消息都將被 confirm(即 ack) 或者被nack一次。可是沒有對消息被 confirm 的快慢作任何保證,而且同一條消息不會既被 confirm又被nack 。

3.二、開啓confirm模式

如上所說生產者經過調用Channel.confirmSelect()方法將Channel信道設置成confirm模式。

注意:已經在transaction事務模式的channel是不能再設置成confirm模式的,即這兩種模式是不能共存的。

3.三、普通confirm模式

普通confirm模式是串行的,即每次發送了一次消息,生產者都要等待Broker的確認消息,而後根據確認標記權衡消息重發仍是繼續發下一條。因爲是串行的,在效率上是比較低下的。

 

(1)重點方法

  1. /**
  2. * 等待Broker返回消息確認標記
  3. * 注意,在非肯定的通道,waitforconfirms拋出IllegalStateException。
  4. * @return 是否發送成功
  5. * @throws java.lang.IllegalStateException
  6. */
  7. boolean waitForConfirms() throws InterruptedException;


(2部分使用代碼以下:

 

  1. //注意:返回的時候Return在前,Confirm在後
  2. channel.confirmSelect();
  3. int i=1;
  4. while (i<=50) {
  5. //發佈消息
  6. channel.basicPublish( "",queueName,true,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
  7. //等待Broker的確認回調
  8. if(channel.waitForConfirms())
  9. System.out.println( "send success!");
  10. else
  11. System.out.println( "send error!");
  12. i++;
  13. }

 

 

3.四、批量confirm模式

批量confirm模式是異步的方式,效率要比普通confirm模式高許多,可是此種方式也會形成線程阻塞,想要進行失敗重發就必需要捕獲異常。網絡上還有采用waitForConfirms()實現批量confirm模式的,可是隻要一條失敗了,就必須把這批次的消息通通再重發一次,很是的消耗性能,所以此文不予考慮。

 

(1)重點代碼

  1. /**
  2. * 等待直到全部消息被確認或者某個消息發送失敗。若是消息發送確認失敗了,
  3. * waitForConfirmsOrDie 會拋出IOException異常。當在非確認通道上調用時
  4. * ,會拋出IllegalStateException異常。
  5. * @throws java.lang.IllegalStateException
  6. */
  7. void waitForConfirmsOrDie() throws IOException, InterruptedException;


(2)部分代碼以下:

 

  1. //注意:返回的時候Return在前,Confirm在後
  2. channel.confirmSelect();
  3. int i=1;
  4. while (i<=50) {
  5. //發佈消息
  6. channel.basicPublish( "",queueName,true,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
  7. i++;
  8. }
  9. channel.waitForConfirmsOrDie();

 

 

3.五、ConfirmListener監聽器模式

RabbitMQ提供了一個ConfirmListener接口專門用來進行確認監聽,咱們能夠實現ConfirmListener接口來建立本身的消息確認監聽。ConfirmListener接口中包含兩個回調方法:

  1. /**
  2. * 生產者發送消息到exchange成功的回調方法
  3. */
  4. void handleAck(long deliveryTag, boolean multiple) throws IOException;
  5. /**
  6. * 生產者發送消息到服務器broker失敗的回調方法,服務器丟失了此消息。
  7. * 注意,丟失的消息仍然能夠傳遞給消費者,但broker不能保證這一點。
  8. */
  9. void handleNack(long deliveryTag, boolean multiple) throws IOException;

其中deliveryTagBroker給每條消息指定的惟一ID(從1開始);multiple表示是否接收全部的應答消息,好比multiple=true時,發送100條消息成功事後,咱們並不會收到100次handleAck方法調用。

 

(1)重要方法

  1. //註冊消息確認監聽器
  2. channel.addConfirmListener( new MyConfirmListener());

(2)部分使用代碼以下:

  1. //注意:返回的時候Return在前,Confirm在後
  2. channel.confirmSelect();
  3. //註冊消息確認監聽器
  4. channel.addConfirmListener( new MyConfirmListener());
  5. //註冊消息結果返回監聽器
  6. channel.addReturnListener( new MyReturnListener());
  7. int i=1;
  8. while (i<=50) {
  9. //發佈消息
  10. channel.basicPublish( "",queueName,true,MessageProperties.TEXT_PLAIN,SerializationUtils.
  11. serialize(object));
  12. i++;
  13. }

 

 

  1. //自定義的消息確認監聽器
  2. public class MyConfirmListener implements ConfirmListener{
  3. /**
  4. * 生產者發送消息到exchange成功的回調方法
  5. * 消息被Exchange接受之後,若是沒有匹配的Queue,則會被丟棄。可是能夠設置ReturnListener監聽來監聽有沒有匹配的隊列。
  6. * 所以handleAck執行了,並不能徹底表示消息已經進入了對應的隊列,只能表示對應的exchange成功的接收了消息。
  7. * 消息被exchange接收事後,還須要經過必定的匹配規則分發到對應的隊列queue中。
  8. */
  9. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  10. //注意:deliveryTag是broker給消息指定的惟一id(從1開始)
  11. System.out.println( "Exchange接收消息:"+deliveryTag+"(deliveryTag)成功!multiple="+multiple);
  12. }
  13. /**
  14. * 生產者發送消息到服務器broker失敗的回調方法,服務器丟失了此消息。
  15. * 注意,丟失的消息仍然能夠傳遞給消費者,但broker不能保證這一點。(不明白,既然丟失了,爲啥還能發送)
  16. */
  17. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  18. System.out.println( "Exchange接收消息:"+deliveryTag+"(deliveryTag)失敗!服務器broker丟失了消息");
  19. }
  20. }

 

 

  1. //自定義的結果返回監聽器
  2. /**
  3. * 實現此接口以通知交付basicpublish失敗時,「mandatory」或「immediate」的標誌監聽(源代碼註釋翻譯)。
  4. * 在發佈消息時設置mandatory等於true,監聽消息是否有相匹配的隊列,
  5. * 沒有時ReturnListener將執行handleReturn方法,消息將返給發送者
  6. */
  7. public class MyReturnListener implements ReturnListener {
  8. public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
  9. BasicProperties properties, byte[] body) throws IOException {
  10. System.out.println( "消息發送到隊列失敗:回覆失敗編碼:"+replyCode+";回覆失敗文本:"+replyText+";失敗消息對象:"+SerializationUtils.deserialize(body));
  11. }
  12. }

 

四、Consumer消息確認機制

爲了保證消息從隊列可靠地到達消費者,RabbitMQ提供消息確認機制(message acknowledgment)。消費者在註冊消費者時,能夠指定noAck參數,當noAck=false時,RabbitMQ會等待消費者顯式發回ack信號後才從內存(或磁盤,若是是持久化消息的話)中移去消息。不然,RabbitMQ會在隊列中消息被消費後當即刪除它。

當noAck=false時,對於RabbitMQ服務器端而言,隊列中的消息分紅了兩部分:一部分是等待投遞給消費者的消息(web管理界面上的Ready狀態);一部分是已經投遞給消費者,可是尚未收到消費者ack信號的消息(web管理界面上的Unacked狀態)。若是服務器端一直沒有收到消費者的ack信號,而且消費此消息的消費者已經斷開鏈接,則服務器端會安排該消息從新進入隊列,等待投遞給下一個消費者(也可能仍是原來的那個消費者)。

 

(1)重要方法

  1. /**
  2. *1. 開始一個非局部、非排他性消費, with a server-generated consumerTag.
  3. * 注意:執行這個方法會回調handleConsumeOk方法,在此方法中處理消息。
  4. * @param queue 隊列名稱
  5. * @param autoAck 是否自動應答。false表示consumer在成功消費事後必需要手動回覆一下服務器,若是不回覆,服務器就將認爲此條消息消費失敗,繼續分發給其餘consumer。
  6. * @param callback 回調方法類
  7. * @return 由服務器生成的consumertag
  8. * @throws java.io.IOException if an error is encountered
  9. */
  10. String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
  11.  
  12.  
  13. /**
  14. *2
  15. consumer處理成功後,通知broker刪除隊列中的消息,若是設置multiple=true,表示支持批量確認機制以減小網絡流量。
  16. 例如:有值爲5,6,7,8 deliveryTag的投遞
  17. 若是此時channel.basicAck(8, true);則表示前面未確認的5,6,7投遞也一塊兒確認處理完畢。
  18. 若是此時channel.basicAck(8, false);則僅表示deliveryTag=8的消息已經成功處理。
  19. */
  20. void basicAck(long deliveryTag, boolean multiple) throws IOException;
  21.  
  22. /**3
  23. consumer處理失敗後,例如:有值爲5,6,7,8 deliveryTag的投遞。
  24. 若是channel.basicNack(8, true, true);表示deliveryTag=8以前未確認的消息都處理失敗且將這些消息從新放回隊列中。
  25. 若是channel.basicNack(8, true, false);表示deliveryTag=8以前未確認的消息都處理失敗且將這些消息直接丟棄。
  26. 若是channel.basicNack(8, false, true);表示deliveryTag=8的消息處理失敗且將該消息從新放回隊列。
  27. 若是channel.basicNack(8, false, false);表示deliveryTag=8的消息處理失敗且將該消息直接丟棄。
  28. */
  29. void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  30.  
  31. /**4
  32. 相比channel.basicNack,除了沒有multiple批量確認機制以外,其餘語義徹底同樣。
  33. 若是channel.basicReject(8, true);表示deliveryTag=8的消息處理失敗且將該消息從新放回隊列。
  34. 若是channel.basicReject(8, false);表示deliveryTag=8的消息處理失敗且將該消息直接丟棄。
  35. */
  36. void basicReject(long deliveryTag, boolean requeue) throws IOException;

 

(2)部分使用代碼以下:

 

  1. //this表示本身的Consumer
  2. channel.basicConsume(queueName, false, this);
  3. ...
  4. @Override
  5. public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
  6. if (body == null)
  7. return;
  8. Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
  9. /**
  10. * 專門處理奇數消息的消費者
  11. */
  12. int tagId = (Integer) map.get("tagId");
  13. if (tagId % 2 != 0) {
  14. //處理消息
  15. System.out.println( "接收並處理消息:"+tagId);
  16. //通知服務器此消息已經被處理了
  17. channel.basicAck(envelope.getDeliveryTag(), false);
  18. } else{
  19. //通知服務器消息處理失敗,從新放回隊列。false表示處理失敗消息不放會隊列,直接刪除
  20. channel.basicReject(envelope.getDeliveryTag(), true);
  21. }
  22. }

 

五、Demo項目總體代碼

此demo就是向RabbitMQ服務器上面發送20個消息,消息體是map,裏面裝的是tagId=數字。而後註冊了兩個消費者,分別處理奇數和偶數。

5.一、鏈接工具類

 

  1. /**
  2. * 鏈接工具類
  3. */
  4. public class ConnectionUtil {
  5.  
  6. Channel channel;
  7. Connection connection;
  8. String queueName;
  9.  
  10. public ConnectionUtil(String queueName) throws IOException {
  11. this.queueName = queueName;
  12. // 建立鏈接工廠
  13. ConnectionFactory cf = new ConnectionFactory();
  14. // 設置rabbitmq服務器IP地址
  15. cf.setHost( "*.16.0.*");
  16. // 設置rabbitmq服務器用戶名
  17. cf.setUsername( "*");
  18. // 設置rabbitmq服務器密碼
  19. cf.setPassword( "*");
  20. cf.setPort(AMQP.PROTOCOL.PORT);
  21. // 獲取一個新的鏈接
  22. connection = cf.newConnection();
  23. // 建立一個通道
  24. channel = connection.createChannel();
  25. /**
  26. *申明一個隊列,若是這個隊列不存在,將會被建立
  27. * @param queue 隊列名稱
  28. * @param durable 持久性:true隊列會再重啓事後存在,可是其中的消息不會存在。
  29. * @param exclusive 是否只能由建立者使用
  30. * @param autoDelete 是否自動刪除(沒有鏈接自動刪除)
  31. * @param arguments 隊列的其餘屬性(構造參數)
  32. * @return 宣告隊列的聲明確認方法已成功聲明。
  33. * @throws java.io.IOException if an error is encountered
  34. */
  35. channel.queueDeclare(queueName, true, false, false, null);
  36. }
  37.  
  38. public void close() throws IOException{
  39. channel.close();
  40. connection.close();
  41. }
  42. }

 

 

 

5.二、具體生產者

 

  1. /**
  2. * 消息生產者
  3. */
  4. public class MessageProducer {
  5.  
  6. private ConnectionUtil connectionUtil;
  7.  
  8. public MessageProducer(ConnectionUtil connectionUtil){
  9. this.connectionUtil=connectionUtil;
  10. }
  11. /**
  12. * 發送消息到隊列中
  13. */
  14. public void sendMessage(Serializable object) throws IOException{
  15. /**
  16. * Publish a message
  17. * @param exchange 消息交換機名稱,空字符串將使用直接交換器模式,發送到默認的Exchange=amq.direct
  18. * @param routingKey 路由關鍵字
  19. * @param mandatory 監聽是否有符合的隊列
  20. * @param BasicProperties 設置消息持久化:MessageProperties.PERSISTENT_TEXT_PLAIN是持久化;MessageProperties.TEXT_PLAIN是非持久化
  21. * @param body 消息對象
  22. * @throws java.io.IOException if an error is encountered
  23. */
  24. connectionUtil.channel.basicPublish( "", connectionUtil.queueName, true, MessageProperties.TEXT_PLAIN, SerializationUtils.serialize(object));
  25. System.out.println( "MessageProducer發送了一條消息:"+object);
  26. }
  27. }

 

 

5.三、公共消費者父類

 

  1. /**
  2. * 消息消費者基礎類
  3. */
  4. public class MessageConsumer implements Consumer {
  5. //消費者標籤,註冊成功時由rabbitmq服務器自動生成
  6. protected String consumerTag;
  7.  
  8. protected ConnectionUtil connectionUtil;
  9.  
  10. public MessageConsumer(ConnectionUtil connectionUtil){
  11. this.connectionUtil=connectionUtil;
  12. }
  13.  
  14. public void basicConsume(){
  15. try {
  16. /**
  17. * 設置消費投遞數目,一次性投遞10條消息。當消費者未確認消息達到10條時,rabbitMQ將不會向此消費者投遞消息,直到未確認數小於10條再投遞
  18. * @param prefetchCount 投遞數目
  19. * @param global 是否針對整個Channel。true表示此投遞數是給Channel設置的,false是給Channel上的Consumer設置的。
  20. * @throws java.io.IOException if an error is encountered
  21. */
  22. connectionUtil.channel.basicQos( 10,false);//表示每一個消費者最多10條
  23. connectionUtil.channel.basicQos( 15,true);//整個傳輸管道最多15條,具體分到每一個消費者身上又不能大於10條
  24. /**
  25. * 開始一個非局部、非排他性消費, with a server-generated consumerTag.
  26. * 執行這個方法會回調handleConsumeOk方法
  27. * @param queue 隊列名稱
  28. * @param autoAck 是否自動應答。false表示consumer在成功消費事後必需要手動回覆一下服務器,若是不回覆,服務器就將認爲此條消息消費失敗,繼續分發給其餘consumer。
  29. * @param callback 回調方法類
  30. * @return 由服務器生成的consumertag
  31. * @throws java.io.IOException if an error is encountered
  32. */
  33. connectionUtil.channel.basicConsume(connectionUtil.queueName, false, this);
  34. } catch (IOException e) {
  35. e.printStackTrace();
  36. }
  37. }
  38.  
  39. /**
  40. * 收到消息時的回調函數
  41. */
  42. public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException {
  43. //子類重寫覆蓋具體操做
  44. }
  45.  
  46. /**
  47. * 消費者註冊成功回調函數
  48. */
  49. public void handleConsumeOk(String consumerTag) {
  50. this.consumerTag=consumerTag;
  51. System.out.println( "消費者:"+consumerTag+",註冊成功!");
  52. }
  53.  
  54. /**
  55. * 手動取消消費者註冊成功回調函數
  56. * 當調用Channel類的void basicCancel(String consumerTag) throws IOException;方法觸發此回調函數
  57. */
  58. public void handleCancelOk(String consumerTag) {
  59. System.out.println(consumerTag+ " 手動取消消費者註冊成功!");
  60. }
  61.  
  62. /**
  63. * 當消費者由於其餘緣由被動取消註冊時調用,好比queue被刪除了。
  64. */
  65. public void handleCancel(String consumerTag) throws IOException {
  66. System.out.println( "由於外部緣由消費者:"+consumerTag+" 取消註冊!");
  67. }
  68.  
  69. /**
  70. * 當通道或基礎鏈接被關閉時調用
  71. */
  72. public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
  73. System.out.println( "通道或基礎鏈接被關閉");
  74. }
  75.  
  76. /**
  77. * Called when a <code><b>basic.recover-ok</b></code> is received
  78. * in reply to a <code><b>basic.recover</b></code>. All messages
  79. * received before this is invoked that haven't been <i>ack</i>'ed will be
  80. * re-delivered. All messages received afterwards won't be.
  81. * @param consumerTag the <i>consumer tag</i> associated with the consumer
  82. */
  83. public void handleRecoverOk(String consumerTag) {
  84.  
  85. }
  86. }

 

5.四、具體的消費者

 

  1. /**
  2. * 專門處理偶數消息的消費者
  3. */
  4. public class EvenConsumer extends MessageConsumer {
  5.  
  6. public EvenConsumer(ConnectionUtil connectionUtil) {
  7. super(connectionUtil);
  8. }
  9.  
  10. @Override
  11. public void handleConsumeOk(String consumerTag) {
  12. this.consumerTag=consumerTag;
  13. System.out.println( "EvenConsumer消費者:"+consumerTag+",註冊成功!");
  14. }
  15.  
  16. @Override
  17. public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
  18. if (body == null)
  19. return;
  20. Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
  21. int tagId = (Integer) map.get("tagId");
  22. if (tagId % 2 == 0) {
  23. //處理消息
  24. System.out.println( "EvenConsumer接收並處理消息:"+tagId);
  25. //通知服務器此消息已經被處理了
  26. connectionUtil.channel.basicAck(envelope.getDeliveryTag(), false);
  27. } else{
  28. //通知服務器消息處理失敗,從新放回隊列。false表示處理失敗消息不放會隊列,直接刪除
  29. connectionUtil.channel.basicReject(envelope.getDeliveryTag(), true);
  30. }
  31. }
  32. }

 

 

  1. /**
  2. * 專門處理奇數消息的消費者
  3. */
  4. public class OddConsumer extends MessageConsumer {
  5.  
  6. public OddConsumer(ConnectionUtil connectionUtil) {
  7. super(connectionUtil);
  8. }
  9.  
  10. @Override
  11. public void handleConsumeOk(String consumerTag) {
  12. this.consumerTag=consumerTag;
  13. System.out.println( "OddConsumer消費者:"+consumerTag+",註冊成功!");
  14. }
  15.  
  16. @Override
  17. public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
  18. if (body == null)
  19. return;
  20. Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
  21. int tagId = (Integer) map.get("tagId");
  22. if (tagId % 2 != 0) {
  23. //處理消息
  24. System.out.println( "OddConsumer接收並處理消息:"+tagId);
  25. //通知服務器此消息已經被處理了
  26. connectionUtil.channel.basicAck(envelope.getDeliveryTag(), false);
  27. } else{
  28. //通知服務器消息處理失敗,從新放回隊列。false表示處理失敗消息不放會隊列,直接刪除
  29. connectionUtil.channel.basicReject(envelope.getDeliveryTag(), true);
  30. }
  31. }
  32. }

 

5.五、監聽器

 

  1. /**
  2. *producer發送確認事件。
  3. */
  4. public class MyConfirmListener implements ConfirmListener{
  5. /**
  6. * 生產者發送消息到exchange成功的回調方法
  7. * 消息被Exchange接受之後,若是沒有匹配的Queue,則會被丟棄。可是能夠設置ReturnListener監聽來監聽有沒有匹配的隊列。
  8. * 所以handleAck執行了,並不能徹底表示消息已經進入了對應的隊列,只能表示對應的exchange成功的接收了消息。
  9. * 消息被exchange接收事後,還須要經過必定的匹配規則分發到對應的隊列queue中。
  10. */
  11. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  12. //注意:deliveryTag是broker給消息指定的惟一id(從1開始)
  13. System.out.println( "Exchange接收消息:"+deliveryTag+"(deliveryTag)成功!multiple="+multiple);
  14. }
  15. /**
  16. * 生產者發送消息到服務器broker失敗的回調方法,服務器丟失了此消息。
  17. * 注意,丟失的消息仍然能夠傳遞給消費者,但broker不能保證這一點。
  18. */
  19. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  20. System.out.println( "Exchange接收消息:"+deliveryTag+"(deliveryTag)失敗!服務器broker丟失了消息");
  21. }
  22. }

 

 

  1. /**
  2. * 實現此接口以通知交付basicpublish失敗時,「mandatory」或「immediate」的標誌監聽(源代碼註釋翻譯)。
  3. * 在發佈消息時設置mandatory等於true,監聽消息是否有相匹配的隊列,
  4. * 沒有時ReturnListener將執行handleReturn方法,消息將返給發送者 。
  5. * 因爲3.0版本事後取消了支持immediate,此處不作過多的解釋。
  6. */
  7. public class MyReturnListener implements ReturnListener {
  8.  
  9. public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
  10. BasicProperties properties, byte[] body) throws IOException {
  11. System.out.println( "消息發送到隊列失敗:回覆失敗編碼:"+replyCode+";回覆失敗文本:"+replyText+";失敗消息對象:"+SerializationUtils.deserialize(body));
  12. }
  13. }

 

5.六、客戶端

 

  1. public class Client {
  2.  
  3. public static void main(String[] args) {
  4. new Client();
  5. }
  6.  
  7. public Client(){
  8. try {
  9. //發消息
  10. publishMessage();
  11. //註冊消費者
  12. addConsumer();
  13. } catch (IOException e) {
  14. e.printStackTrace();
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19.  
  20. public void publishMessage() throws IOException, InterruptedException{
  21. ConnectionUtil connectionUtil= new ConnectionUtil("testqueue");
  22. MessageProducer producer= new MessageProducer(connectionUtil);
  23. connectionUtil.channel.confirmSelect();
  24. //注意:返回的時候Return在前,Confirm在後
  25. connectionUtil.channel.addConfirmListener( new MyConfirmListener());
  26. connectionUtil.channel.addReturnListener( new MyReturnListener());
  27. int i=1;
  28. while (i<=10) {
  29. HashMap<String, Object> map= new HashMap<String, Object>();
  30. map.put( "tagId", i);
  31. producer.sendMessage(map);
  32. i++;
  33. }
  34. }
  35.  
  36. public void addConsumer() throws IOException{
  37. ConnectionUtil connectionUtil= new ConnectionUtil("testqueue");
  38. OddConsumer odd= new OddConsumer(connectionUtil);
  39. odd.basicConsume();
  40. EvenConsumer even= new EvenConsumer(connectionUtil);
  41. even.basicConsume();
  42. }
  43.  
  44. }

 

 

 

5.七、測試結果

  1. MessageProducer發送了一條消息:{tagId= 1}
  2. MessageProducer發送了一條消息:{tagId= 2}
  3. MessageProducer發送了一條消息:{tagId= 3}
  4. Exchange接收消息: 1(deliveryTag)成功!multiple=false
  5. Exchange接收消息: 2(deliveryTag)成功!multiple=false
  6. MessageProducer發送了一條消息:{tagId= 4}
  7. Exchange接收消息: 3(deliveryTag)成功!multiple=false
  8. MessageProducer發送了一條消息:{tagId= 5}
  9. Exchange接收消息: 4(deliveryTag)成功!multiple=false
  10. MessageProducer發送了一條消息:{tagId= 6}
  11. Exchange接收消息: 5(deliveryTag)成功!multiple=false
  12. MessageProducer發送了一條消息:{tagId= 7}
  13. Exchange接收消息: 6(deliveryTag)成功!multiple=false
  14. MessageProducer發送了一條消息:{tagId= 8}
  15. Exchange接收消息: 7(deliveryTag)成功!multiple=false
  16. Exchange接收消息: 8(deliveryTag)成功!multiple=false
  17. MessageProducer發送了一條消息:{tagId= 9}
  18. Exchange接收消息: 9(deliveryTag)成功!multiple=false
  19. MessageProducer發送了一條消息:{tagId= 10}
  20. Exchange接收消息: 10(deliveryTag)成功!multiple=false
  21. OddConsumer消費者:amq.ctag-z8s8LaSgYvo02jktCZrCYA,註冊成功!
  22. OddConsumer接收並處理消息: 1
  23. OddConsumer接收並處理消息: 3
  24. OddConsumer接收並處理消息: 5
  25. OddConsumer接收並處理消息: 7
  26. OddConsumer接收並處理消息: 9
  27. EvenConsumer消費者:amq.ctag-LpN6Q5VvNY3wCof2lXqS4A,註冊成功!
  28. EvenConsumer接收並處理消息: 4
  29. EvenConsumer接收並處理消息: 8
  30. EvenConsumer接收並處理消息: 2
  31. EvenConsumer接收並處理消息: 10
  32. EvenConsumer接收並處理消息: 6

 

六、Demo完整源碼下載地址

Java使用RabbitMQ完整項目源碼.rar

相關文章
相關標籤/搜索