RabbitMQ學習筆記(4、RabbitMQ隊列)

目錄:

  • 消息路由失敗了會怎樣
  • 備份交換器
  • TTL與DLX
  • 如何實現延遲隊列
  • RabbitMQ的RPC實現
  • 持久化
  • 事務
  • 發送方確認機制

消息路由失敗了會怎樣:

在RabbitMQ中,若是消息路由失敗了,通常會有兩種狀況。要麼是把消息回退給客戶端處理,要麼就把消息丟棄。java

處理邏輯是根據basicPublish方法的mandatoryimmediate兩個參數來控制。git

一、mandatory:當mandatory=true時,若是交換器沒法根據自身類型和路由鍵匹配到符合條件的隊列,便會調用Basic.Return命令將消息會推給生產者;當mandatory=false時,不知足條件則丟棄此條消息。github

1 channel.addReturnListener(new ReturnListener() { 2     public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, 3                              AMQP.BasicProperties properties, byte[] body) throws IOException { 4         // 具體處理邏輯
5  } 6 });

二、immediate:當immediate=true時,交換器將消息路由到隊列後,發現此隊列上不存在任何消費者,那麼這條消息將不會放入到隊列中。當路由鍵匹配的全部隊列都沒有消費者時,改消息將會經過Basic.Return返回給生產者。redis

備份交換器:

備份交換器能夠將未被路由到的消息存儲在RabbitMQ中,在須要它的時候再去使用。api

 1 public class AlternateProduct {  2 
 3     private static final String EXCHANGE_NAME = "alternate.exchange";  4     private static final String EXCHANGE_BAK_NAME = "alternate-bak.exchange";  5     
 6     private static final String QUEUE_NAME = "alternate.queue";  7     private static final String QUEUE_BAK_NAME = "alternate-bak.queue";  8 
 9     private static final String ROUTING_KEY_NAME = "alternate.routing.key"; 10 
11     public static void main(String[] args) throws IOException, TimeoutException { 12         Connection connection = RabbitMqUtils.getConnection(); 13         Channel channel = connection.createChannel(); 14 
15         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, false, getExchangeDeclareArgs()); 16         // fanout類型,放款路由限制
17         channel.exchangeDeclare(EXCHANGE_BAK_NAME, BuiltinExchangeType.FANOUT, false, false, false, null); 18 
19         channel.queueDeclare(QUEUE_NAME, false, false, false, null); 20         channel.queueDeclare(QUEUE_BAK_NAME, false, false, false, null); 21 
22  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_NAME); 23         // 由於交換器QUEUE_BAK_NAME設置fanout類型,因此能夠沒必要關心路由鍵,故隨便寫可能將消息路由到對應的隊列中
24         channel.queueBind(QUEUE_BAK_NAME, EXCHANGE_BAK_NAME, "123"); 25 
26         // 發消息時路由鍵設置一個不存在的"",讓其路由不到,從而把消息發到備份隊列中
27         channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, 28                 "alternate".getBytes()); 29 
30  RabbitMqUtils.close(connection, channel); 31  } 32 
33     private static Map<String, Object> getExchangeDeclareArgs() { 34         Map<String, Object> result = new HashMap<String, Object>(1); 35         result.put("alternate-exchange", EXCHANGE_BAK_NAME); 36         return result; 37  } 38 }

關於備份交換器的注意點:服務器

一、若是備份交換器不存在,客戶端和RabbitMQ客戶端都不會出現異常,可是消息會丟失。dom

二、若是備份交換器沒有綁定任何隊列,客戶端和RabbitMQ客戶端都不會出現異常,可是消息會丟失。異步

三、若是備份交換器沒有匹配到任何隊列,客戶端和RabbitMQ客戶端都不會出現異常,可是消息會丟失。async

四、若是備份交換器和mandatory一塊兒使用,且備份交換器有效,此時mandatory將無效。分佈式

TTL與DLX:

一、TTL:過時時間,有隊列過時時間消息過時時間

隊列過時時間

經過設置隊列的過時時間,來使隊列中因此的消息都具備過時時間。

消息過時時間

設置消息的BasicProperties props屬性值來控制消息的過時時間。

1 AMQP.BasicProperties.Builder publishBuilder = new AMQP.BasicProperties.Builder(); 2 // expiration單位ms
3 publishBuilder.expiration("10000"); 4 
5 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_NAME, publishBuilder.build(), 6         "ttl".getBytes());

對於第一種TTL來講,隊列一但過時就會刪除調;但對於第二種TTL來講,隊列過時不會立刻刪除,而是等隊列要被消費時再判斷是否要刪除。

那爲何會不同呢,咱們都知道mq對性能的要求是很是高的,若是第二種ddl的方式也要及時刪除的話勢必要掃描整個隊列,這樣的話,若隊列長度較大是性能便會很是的差。

而第一種爲何能夠作到及時刪除呢,咱們知道隊列具備先進先出的特性,因此先入隊的確定要比後入隊的要先過時,因此只要刪除頭部的就好啦。

而第二種的消息過時時間都是不固定的,考慮到MQ的性能,因此採用了上述的方式。

二、DLX:死信交換器,全稱Dead Letter Exchange

變爲死信隊列的有如下幾種狀況:

  • 消息被拒,且requeue=false
  • 隊列過時或隊列達到最大長度

注意:DLX也是一個正常的交換器,和通常隊列沒有區別,它能在任何的隊列上被指定。

如何實現延遲隊列:

本模塊講述RabbitMQ,僅提供RabbitMQ的實現,大佬們有興趣能夠實現其它幾種方式。

延遲隊列是指將消息發送到隊列後,等待一段時間後再進行消費。場景:餓了麼外賣下單後,超過15分鐘訂單失效。

延遲隊列場景的時間方式有四種:

一、DB輪詢:經過job或其它邏輯將訂單表的必要字段查出(如:orderId、createTime、status),當訂單超過xx時間,將狀態置爲失效。

)優勢:實現簡單、無技術難點、異常恢復、支持分佈式/集羣環境

)缺點:影響DB性能、時效性查、效率低

二、JDK DelayQueue:java api提供的延遲隊列的實現,經過poll()、take()方法獲取超時任務

)優勢:實現簡單、性能較好

)缺點:異常恢復困難、分佈式/集羣實現困難(基於JVM內存)

三、Redis sortedSet:經過zset類型的score來實現

)優勢:解耦、異常恢復、擴展性強、支持分佈式/集羣環境

)缺點:增長了redis維護成本、佔用帶寬

四、RabbitMQ TTL + DLX:使用RabbitMQ的過時時間和死信隊列實現

 實現:delay-message

)優勢:解耦、異常恢復、擴展性強、支持分佈式/集羣環境

)缺點:增長了RabbitMQ維護成本、佔用帶寬

RabbitMQ的RPC實現:

RabbitMQ也能夠實現RPC,客戶端發送消息,服務端接收消息。

replayTo:設置回調隊列,用於客戶端響應服務端的回調消息。

correlationId:RPC請求和響應的關聯id。

 1 public class RpcServer {  2 
 3     private static final String QUEUE_NAME = "rpc.queue";  4 
 5     public static void main(String[] args) throws IOException, TimeoutException {  6         Connection connection = RabbitMqUtils.getRpcConnection();  7         final Channel channel = connection.createChannel();  8         // 建立請求處理隊列,用於服務端接收客戶端RPC請求
 9         channel.queueDeclare(QUEUE_NAME, true, false, false, null); 10 
11         System.out.println("等待RPC請求..."); 12 
13         // 服務端監聽客戶端發送的RPC請求
14         channel.basicConsume(QUEUE_NAME, new DefaultConsumer(channel) { 15  @Override 16             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 17                     throws IOException { 18                 String correlationId = properties.getCorrelationId(); 19                 String message = ""; 20 
21                 try { 22                     message = new String(body); 23                     System.err.println(format("service recv message:{0}, corrId:{1}", message, correlationId)); 24                 } catch (Exception e) { 25  e.printStackTrace(); 26                 } finally { 27                     AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() 28  .correlationId(correlationId) 29  .build(); 30 
31                     // 使用默認exchange,容許經過routingKey指定message將被髮送給哪一個queue
32                     channel.basicPublish("", properties.getReplyTo(), props, (message + "--is done.").getBytes("UTF-8")); 33                     channel.basicAck(envelope.getDeliveryTag(), false); 34  } 35  } 36  }); 37  } 38 }
 1 public class RpcClient {  2 
 3     private static final String QUEUE_NAME = "rpc.queue";  4 
 5     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  6         final Connection connection = RabbitMqUtils.getConnection();  7         Channel channel = connection.createChannel();  8 
 9         // 隨機建立corrId
10         final String collId = UUID.randomUUID().toString(); 11         // 客戶端建立匿名隊列,用於響應服務端請求
12         String callbackQueueName = channel.queueDeclare().getQueue(); 13 
14         // 客戶端發送消息;使用默認exchange(exchange=""),容許經過routingKey指定message將被髮送給哪一個queue
15         channel.basicPublish("", QUEUE_NAME, getBasicPublishProperties(collId, callbackQueueName), 16                 "hello world".getBytes()); 17         // 客戶端接收服務端響應的消息
18         channel.basicConsume(callbackQueueName, new DefaultConsumer(channel) { 19  @Override 20             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 21                 if (collId.equals(properties.getCorrelationId())) { 22                     System.out.println(format("client recv message:{0}, corrId:{1}", new String(body), collId)); 23                 } else { 24                     System.out.println("不是本次請求的消息"); 25  } 26  } 27  }); 28 
29         TimeUnit.SECONDS.sleep(1); 30 
31  RabbitMqUtils.close(connection, channel); 32  } 33 
34     private static AMQP.BasicProperties getBasicPublishProperties(String corrId, String callbackQueueName) { 35         return new AMQP.BasicProperties().builder() 36  .correlationId(corrId) 37  .replyTo(callbackQueueName).build(); 38  } 39 }

持久化:

在RabbitMQ中交換器、隊列、消息都設置爲持久化就能保持消息不丟失了嘛?

固然不,狀況以下:

一、當autoAck設置爲true的時候,消費者接收到消息後還沒來得及處理就宕機了。

解決:autoAck設爲false,消費者處理完消息後再通知服務端刪除消息。

二、再RabbitMQ持久化到磁盤中的這段時間,RabbitMQ服務器宕機了。

解決:服務端確認機制、鏡像隊列(後面章節會描述)。

事務:

一、開啓事務:channel.txSelect()

二、提交事務:channel.txCommit()

三、回滾事務:channel.txRollback()

事務和db的事務很類似,不細說。

發送方確認機制:

AMQP協議提供了事務機制來保證消息能真正成功的到達RabbitMQ,但事務機制會嚴重的影響到RabbitMQ的吞吐量,因此RabbitMQ引入了一種輕量的方式,發送方確認機制。

客戶端使用方式:

一、將信道設置發送方確認方式:channel.confirmSelect()。

二、確認消息是否發送成功

)boolean waitForConfirms() throws InterruptedException;

)boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

)void waitForConfirmsOrDie() throws IOException, InterruptedException;

)void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

發送方確認消息成功的三種方式:

 1 public class PublisherConfirmProduct {  2 
 3     private static final String EXCHANGE_NAME = "demo.exchange";  4     private static final String ROUTING_KEY = "demo.routingkey";  5     private static final String QUEUE_NAME = "demo.queue";  6     private static final String MESSAGE = "Hello World!";  7 
 8     /**
 9  * 單條確認  10      */
 11     public static void commonConfirm() throws Exception {  12         Connection connection = RabbitMqUtils.getConnection();  13         Channel channel = initChannel(connection);  14 
 15  channel.confirmSelect();  16         for (int i = 0; i < 100; i++) {  17  channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes());  18             if (channel.waitForConfirms()) {  19                 // 逐條確認是否發送成功
 20                 System.out.println("send success!");  21  }  22  }  23 
 24  RabbitMqUtils.close(connection, channel);  25  }  26 
 27     /**
 28  * 批量確認  29      */
 30     public static void batchConfirm() throws Exception {  31         Connection connection = RabbitMqUtils.getConnection();  32         Channel channel = initChannel(connection);  33 
 34  channel.confirmSelect();  35         for (int i = 0; i < 100; i++) {  36  channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes());  37  }  38 
 39         // 批量確認是否發送成功,若是某一次確認失敗這一批都要從新發送
 40         if (channel.waitForConfirms()) {  41             System.out.println("send success!");  42  }  43 
 44  RabbitMqUtils.close(connection, channel);  45  }  46 
 47     /**
 48  * 異步確認  49      */
 50     public static void asyncConfirm() throws Exception {  51         Connection connection = RabbitMqUtils.getConnection();  52         Channel channel = initChannel(connection);  53         channel.basicQos(1);  54 
 55  channel.confirmSelect();  56 
 57         // 定義一個未確認消息集合
 58         final SortedSet<Long> unConfirmSet = Collections.synchronizedNavigableSet(new TreeSet<>());  59         for (int i = 0; i < 100; i++) {  60  channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes());  61  unConfirmSet.add(channel.getNextPublishSeqNo());  62  }  63 
 64         channel.addConfirmListener(new ConfirmListener() {  65  @Override  66             public void handleNack(long deliveryTag, boolean multiple) throws IOException {  67                 System.err.println(format("拒絕消息 deliveryTag:{0}, multiple:{1}", deliveryTag, multiple));  68  }  69 
 70  @Override  71             public void handleAck(long deliveryTag, boolean multiple) throws IOException {  72                 System.err.println(format("確認消息 deliveryTag:{0}, multiple:{1}", deliveryTag, multiple));  73                 if (multiple) {  74                     // multiple爲true,則deliveryTag以前的全部消息所有被確認
 75                     unConfirmSet.headSet(deliveryTag + 1).clear();  76                 } else {  77                     // 不然只確認一條消息
 78  unConfirmSet.remove(deliveryTag);  79  }  80  }  81  });  82 
 83         TimeUnit.SECONDS.sleep(5);  84  System.out.println(unConfirmSet.size());  85 
 86  RabbitMqUtils.close(connection, channel);  87  }  88 
 89     private static Channel initChannel(Connection connection) throws IOException {  90         Channel channel = connection.createChannel();  91         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);  92         channel.queueDeclare(QUEUE_NAME, true, false, false, null);  93  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);  94         return channel;  95  }  96 
 97     public static void main(String[] args) throws Exception {  98 // commonConfirm();  99 // batchConfirm();
100  asyncConfirm(); 101  } 102 }
相關文章
相關標籤/搜索