在RabbitMQ中,若是消息路由失敗了,通常會有兩種狀況。要麼是把消息回退給客戶端處理,要麼就把消息丟棄。java
處理邏輯是根據basicPublish方法的mandatory和immediate兩個參數來控制。git
一、mandatory:當mandatory=true時,若是交換器沒法根據自身類型和路由鍵匹配到符合條件的隊列,便會調用Basic.Return命令將消息會推給生產者;當mandatory=false時,不知足條件則丟棄此條消息。github
1 channel.addReturnListener(new ReturnListener() { 2 public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, 3 AMQP.BasicProperties properties, byte[] body) throws IOException { 4 // 具體處理邏輯 5 } 6 });
二、immediate:當immediate=true時,交換器將消息路由到隊列後,發現此隊列上不存在任何消費者,那麼這條消息將不會放入到隊列中。當路由鍵匹配的全部隊列都沒有消費者時,改消息將會經過Basic.Return返回給生產者。redis
備份交換器能夠將未被路由到的消息存儲在RabbitMQ中,在須要它的時候再去使用。api
1 public class AlternateProduct { 2 3 private static final String EXCHANGE_NAME = "alternate.exchange"; 4 private static final String EXCHANGE_BAK_NAME = "alternate-bak.exchange"; 5 6 private static final String QUEUE_NAME = "alternate.queue"; 7 private static final String QUEUE_BAK_NAME = "alternate-bak.queue"; 8 9 private static final String ROUTING_KEY_NAME = "alternate.routing.key"; 10 11 public static void main(String[] args) throws IOException, TimeoutException { 12 Connection connection = RabbitMqUtils.getConnection(); 13 Channel channel = connection.createChannel(); 14 15 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, false, getExchangeDeclareArgs()); 16 // fanout類型,放款路由限制 17 channel.exchangeDeclare(EXCHANGE_BAK_NAME, BuiltinExchangeType.FANOUT, false, false, false, null); 18 19 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 20 channel.queueDeclare(QUEUE_BAK_NAME, false, false, false, null); 21 22 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_NAME); 23 // 由於交換器QUEUE_BAK_NAME設置fanout類型,因此能夠沒必要關心路由鍵,故隨便寫可能將消息路由到對應的隊列中 24 channel.queueBind(QUEUE_BAK_NAME, EXCHANGE_BAK_NAME, "123"); 25 26 // 發消息時路由鍵設置一個不存在的"",讓其路由不到,從而把消息發到備份隊列中 27 channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, 28 "alternate".getBytes()); 29 30 RabbitMqUtils.close(connection, channel); 31 } 32 33 private static Map<String, Object> getExchangeDeclareArgs() { 34 Map<String, Object> result = new HashMap<String, Object>(1); 35 result.put("alternate-exchange", EXCHANGE_BAK_NAME); 36 return result; 37 } 38 }
關於備份交換器的注意點:服務器
一、若是備份交換器不存在,客戶端和RabbitMQ客戶端都不會出現異常,可是消息會丟失。dom
二、若是備份交換器沒有綁定任何隊列,客戶端和RabbitMQ客戶端都不會出現異常,可是消息會丟失。異步
三、若是備份交換器沒有匹配到任何隊列,客戶端和RabbitMQ客戶端都不會出現異常,可是消息會丟失。async
四、若是備份交換器和mandatory一塊兒使用,且備份交換器有效,此時mandatory將無效。分佈式
一、TTL:過時時間,有隊列過時時間和消息過時時間。
)隊列過時時間:
經過設置隊列的過時時間,來使隊列中因此的消息都具備過時時間。
)消息過時時間:
設置消息的BasicProperties props屬性值來控制消息的過時時間。
1 AMQP.BasicProperties.Builder publishBuilder = new AMQP.BasicProperties.Builder(); 2 // expiration單位ms 3 publishBuilder.expiration("10000"); 4 5 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_NAME, publishBuilder.build(), 6 "ttl".getBytes());
對於第一種TTL來講,隊列一但過時就會刪除調;但對於第二種TTL來講,隊列過時不會立刻刪除,而是等隊列要被消費時再判斷是否要刪除。
那爲何會不同呢,咱們都知道mq對性能的要求是很是高的,若是第二種ddl的方式也要及時刪除的話勢必要掃描整個隊列,這樣的話,若隊列長度較大是性能便會很是的差。
而第一種爲何能夠作到及時刪除呢,咱們知道隊列具備先進先出的特性,因此先入隊的確定要比後入隊的要先過時,因此只要刪除頭部的就好啦。
而第二種的消息過時時間都是不固定的,考慮到MQ的性能,因此採用了上述的方式。
二、DLX:死信交換器,全稱Dead Letter Exchange
變爲死信隊列的有如下幾種狀況:
注意:DLX也是一個正常的交換器,和通常隊列沒有區別,它能在任何的隊列上被指定。
本模塊講述RabbitMQ,僅提供RabbitMQ的實現,大佬們有興趣能夠實現其它幾種方式。
延遲隊列是指將消息發送到隊列後,等待一段時間後再進行消費。場景:餓了麼外賣下單後,超過15分鐘訂單失效。
延遲隊列場景的時間方式有四種:
一、DB輪詢:經過job或其它邏輯將訂單表的必要字段查出(如:orderId、createTime、status),當訂單超過xx時間,將狀態置爲失效。
)優勢:實現簡單、無技術難點、異常恢復、支持分佈式/集羣環境
)缺點:影響DB性能、時效性查、效率低
二、JDK DelayQueue:java api提供的延遲隊列的實現,經過poll()、take()方法獲取超時任務
)優勢:實現簡單、性能較好
)缺點:異常恢復困難、分佈式/集羣實現困難(基於JVM內存)
三、Redis sortedSet:經過zset類型的score來實現
)優勢:解耦、異常恢復、擴展性強、支持分佈式/集羣環境
)缺點:增長了redis維護成本、佔用帶寬
四、RabbitMQ TTL + DLX:使用RabbitMQ的過時時間和死信隊列實現
)優勢:解耦、異常恢復、擴展性強、支持分佈式/集羣環境
)缺點:增長了RabbitMQ維護成本、佔用帶寬
RabbitMQ也能夠實現RPC,客戶端發送消息,服務端接收消息。
replayTo:設置回調隊列,用於客戶端響應服務端的回調消息。
correlationId:RPC請求和響應的關聯id。
1 public class RpcServer { 2 3 private static final String QUEUE_NAME = "rpc.queue"; 4 5 public static void main(String[] args) throws IOException, TimeoutException { 6 Connection connection = RabbitMqUtils.getRpcConnection(); 7 final Channel channel = connection.createChannel(); 8 // 建立請求處理隊列,用於服務端接收客戶端RPC請求 9 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 10 11 System.out.println("等待RPC請求..."); 12 13 // 服務端監聽客戶端發送的RPC請求 14 channel.basicConsume(QUEUE_NAME, new DefaultConsumer(channel) { 15 @Override 16 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 17 throws IOException { 18 String correlationId = properties.getCorrelationId(); 19 String message = ""; 20 21 try { 22 message = new String(body); 23 System.err.println(format("service recv message:{0}, corrId:{1}", message, correlationId)); 24 } catch (Exception e) { 25 e.printStackTrace(); 26 } finally { 27 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() 28 .correlationId(correlationId) 29 .build(); 30 31 // 使用默認exchange,容許經過routingKey指定message將被髮送給哪一個queue 32 channel.basicPublish("", properties.getReplyTo(), props, (message + "--is done.").getBytes("UTF-8")); 33 channel.basicAck(envelope.getDeliveryTag(), false); 34 } 35 } 36 }); 37 } 38 }
1 public class RpcClient { 2 3 private static final String QUEUE_NAME = "rpc.queue"; 4 5 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { 6 final Connection connection = RabbitMqUtils.getConnection(); 7 Channel channel = connection.createChannel(); 8 9 // 隨機建立corrId 10 final String collId = UUID.randomUUID().toString(); 11 // 客戶端建立匿名隊列,用於響應服務端請求 12 String callbackQueueName = channel.queueDeclare().getQueue(); 13 14 // 客戶端發送消息;使用默認exchange(exchange=""),容許經過routingKey指定message將被髮送給哪一個queue 15 channel.basicPublish("", QUEUE_NAME, getBasicPublishProperties(collId, callbackQueueName), 16 "hello world".getBytes()); 17 // 客戶端接收服務端響應的消息 18 channel.basicConsume(callbackQueueName, new DefaultConsumer(channel) { 19 @Override 20 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 21 if (collId.equals(properties.getCorrelationId())) { 22 System.out.println(format("client recv message:{0}, corrId:{1}", new String(body), collId)); 23 } else { 24 System.out.println("不是本次請求的消息"); 25 } 26 } 27 }); 28 29 TimeUnit.SECONDS.sleep(1); 30 31 RabbitMqUtils.close(connection, channel); 32 } 33 34 private static AMQP.BasicProperties getBasicPublishProperties(String corrId, String callbackQueueName) { 35 return new AMQP.BasicProperties().builder() 36 .correlationId(corrId) 37 .replyTo(callbackQueueName).build(); 38 } 39 }
在RabbitMQ中交換器、隊列、消息都設置爲持久化就能保持消息不丟失了嘛?
固然不,狀況以下:
一、當autoAck設置爲true的時候,消費者接收到消息後還沒來得及處理就宕機了。
解決:autoAck設爲false,消費者處理完消息後再通知服務端刪除消息。
二、再RabbitMQ持久化到磁盤中的這段時間,RabbitMQ服務器宕機了。
解決:服務端確認機制、鏡像隊列(後面章節會描述)。
一、開啓事務:channel.txSelect()
二、提交事務:channel.txCommit()
三、回滾事務:channel.txRollback()
事務和db的事務很類似,不細說。
AMQP協議提供了事務機制來保證消息能真正成功的到達RabbitMQ,但事務機制會嚴重的影響到RabbitMQ的吞吐量,因此RabbitMQ引入了一種輕量的方式,發送方確認機制。
客戶端使用方式:
一、將信道設置發送方確認方式:channel.confirmSelect()。
二、確認消息是否發送成功:
)boolean waitForConfirms() throws InterruptedException;
)boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;
)void waitForConfirmsOrDie() throws IOException, InterruptedException;
)void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;
發送方確認消息成功的三種方式:
1 public class PublisherConfirmProduct { 2 3 private static final String EXCHANGE_NAME = "demo.exchange"; 4 private static final String ROUTING_KEY = "demo.routingkey"; 5 private static final String QUEUE_NAME = "demo.queue"; 6 private static final String MESSAGE = "Hello World!"; 7 8 /** 9 * 單條確認 10 */ 11 public static void commonConfirm() throws Exception { 12 Connection connection = RabbitMqUtils.getConnection(); 13 Channel channel = initChannel(connection); 14 15 channel.confirmSelect(); 16 for (int i = 0; i < 100; i++) { 17 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes()); 18 if (channel.waitForConfirms()) { 19 // 逐條確認是否發送成功 20 System.out.println("send success!"); 21 } 22 } 23 24 RabbitMqUtils.close(connection, channel); 25 } 26 27 /** 28 * 批量確認 29 */ 30 public static void batchConfirm() throws Exception { 31 Connection connection = RabbitMqUtils.getConnection(); 32 Channel channel = initChannel(connection); 33 34 channel.confirmSelect(); 35 for (int i = 0; i < 100; i++) { 36 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes()); 37 } 38 39 // 批量確認是否發送成功,若是某一次確認失敗這一批都要從新發送 40 if (channel.waitForConfirms()) { 41 System.out.println("send success!"); 42 } 43 44 RabbitMqUtils.close(connection, channel); 45 } 46 47 /** 48 * 異步確認 49 */ 50 public static void asyncConfirm() throws Exception { 51 Connection connection = RabbitMqUtils.getConnection(); 52 Channel channel = initChannel(connection); 53 channel.basicQos(1); 54 55 channel.confirmSelect(); 56 57 // 定義一個未確認消息集合 58 final SortedSet<Long> unConfirmSet = Collections.synchronizedNavigableSet(new TreeSet<>()); 59 for (int i = 0; i < 100; i++) { 60 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes()); 61 unConfirmSet.add(channel.getNextPublishSeqNo()); 62 } 63 64 channel.addConfirmListener(new ConfirmListener() { 65 @Override 66 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 67 System.err.println(format("拒絕消息 deliveryTag:{0}, multiple:{1}", deliveryTag, multiple)); 68 } 69 70 @Override 71 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 72 System.err.println(format("確認消息 deliveryTag:{0}, multiple:{1}", deliveryTag, multiple)); 73 if (multiple) { 74 // multiple爲true,則deliveryTag以前的全部消息所有被確認 75 unConfirmSet.headSet(deliveryTag + 1).clear(); 76 } else { 77 // 不然只確認一條消息 78 unConfirmSet.remove(deliveryTag); 79 } 80 } 81 }); 82 83 TimeUnit.SECONDS.sleep(5); 84 System.out.println(unConfirmSet.size()); 85 86 RabbitMqUtils.close(connection, channel); 87 } 88 89 private static Channel initChannel(Connection connection) throws IOException { 90 Channel channel = connection.createChannel(); 91 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null); 92 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 93 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); 94 return channel; 95 } 96 97 public static void main(String[] args) throws Exception { 98 // commonConfirm(); 99 // batchConfirm(); 100 asyncConfirm(); 101 } 102 }