距離上一次發文章已經好久了,其實這段時間一直也沒有停筆,只不過在忙着找工做還有學校結課的事情,從新弄了一下博客,後面也會陸陸續續會把文章最近更新出來~java
過時時間(TTL)就是對消息或者隊列設置一個時效,只有在時間範圍內才能夠被被消費者接收穫取,超過過時時間後消息將自動被刪除。spring
注:咱們主要講消息過時,在消息過時的第一種方式中,順便也就會提到隊列過時的設置方式shell
兩種方法同時被使用時,以二者過時時間 TTL 較小的那個數值爲準。消息在隊列的生存時間一旦超過設置的 TTL 值,就稱爲 Dead Message 被投遞到死信隊列,消費者將沒法再收到該消息(死信隊列是咱們下一點要講的)服務器
@Configuration public class RabbitMqConfiguration { public static final String TOPIC_EXCHANGE = "topic_order_exchange"; public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1"; public static final String TOPIC_ROUTINGKEY_1 = "test.*"; @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Queue topicQueue1() { // 建立參數 Map 容器 Map<String, Object> args = new HashMap<>(); // 設置消息過時時間 注意此處是數值 5000 不是字符串 args.put("x-message-ttl", 5000); // 設置隊列過時時間 args.put("x-expires", 8000); // 在最後傳入額外參數 即這些過時信息 return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args); } @Bean public Binding bindingTopic1() { return BindingBuilder.bind(topicQueue1()) .to(topicExchange()) .with(TOPIC_ROUTINGKEY_1); } }
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * 注入 RabbitTemplate */ @Autowired @Test public void testTopicSendMessage() { rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert", "This is a message !"); } }
不要配置消費者,而後就能夠在 Web 管理器中看到效果了網絡
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * 注入 RabbitTemplate */ @Autowired @Test public void testTopicSendMessage2() { MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){ public Message postProcessMessage(Message message){ // 注意此處是 字符串 「5000」 message.getMessageProperties().setExpiration("5000"); message.getMessageProperties().setContentEncoding("UTF-8"); return message; } }; rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order", "This is a message 002 !",messagePostProcessor); } }
死信官方原文爲 Dead letter ,它是RabbitMQ中的一種消息機制,當你在消費消息時,若是隊列以及隊列裏的消息出現如下狀況,說明當前消息就成爲了 「死信」,若是配置了死信隊列,這些數據就會傳送到其中,若是沒有配置就會直接丟棄。運維
不過死信隊列並非什麼很特殊的存在,咱們只須要配置一個交換機,在消費的那個隊列中配置,出現死信就從新發送到剛纔配置的交換機中去,進而被路由到與交換機綁定的隊列中去,這個隊列也就是死信隊列,因此從建立上看,它和普通的隊列沒什麼區別。異步
好比在一些比較重要的業務隊列中,未被正確消費的消息,每每咱們並不想丟棄,由於丟棄後若是想恢復這些數據,每每須要運維人員從日誌獲取到原消息,而後從新投遞消息,而配置了死信隊列,至關於給了未正確消費消息一個暫存的位置,往後須要恢復的時候,只須要編寫對應的代碼就能夠了。分佈式
@Configuration public class DeadRabbitMqConfiguration{ @Bean public DirectExchange deadDirect(){ return new DirectExchange("dead_direct_exchange");} @Bean public Queue deadQueue(){ return new Queue("dead_direct_queue");} @Bean public Binding deadBinds(){ return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead"); } }
@Configuration public class RabbitMqConfiguration { public static final String TOPIC_EXCHANGE = "topic_order_exchange"; public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1"; public static final String TOPIC_ROUTINGKEY_1 = "test.*"; @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Queue topicQueue1() { // 設置過時時間 Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 5000); // 設置死信隊列交換器 args.put("x-dead-letter-exchange","dead_direct_exchange"); // 設置交換路由的路由key fanout 模式不須要配置此條 args.put("x-dead-letter-routing-key","dead"); return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args); } @Bean public Binding bindingTopic1() { return BindingBuilder.bind(topicQueue1()) .to(topicExchange()) .with(TOPIC_ROUTINGKEY_1); } }
爲了防止避免服務器因內存不夠而崩潰,因此 RabbitMQ 設定了一個閾值,當內存使用量超過閾值的時候,RabbitMQ 會暫時阻塞全部客戶端的鏈接,而且中止繼續接受新消息。ide
有兩種方式能夠修改這個閾值post
經過命令(二選一便可)
# 經過百分比設置的命令 <fraction> 處表明百分比小數例如 0.6 rabbitmqctl set_vm_memory_high_watermark <fraction> # 經過絕對值設置的命令 <value> 處表明設置的一個固定值例如 700MB rabbitmqctl set_vm_memory_high_watermark absolute <value>
經過修改配置文件 rabbitmq.conf
# 百分比設置 默認值爲 0.4 推薦 0.4-0.7 之間 vm_memory_high_watermark.relative = 0.5 # 固定值設置 vm_memory_high_watermark.absolute = 2GB
在客戶端鏈接和生產者被阻塞以前,它會嘗試將隊列中的消息換頁到磁盤中,這種思想在操做系統中其實很是常見,以最大程度的知足消息的正常處理。
當內存換頁發生後,不管持久化仍是非持久化的消息,都會被轉移到磁盤,而因爲持久化的消息原本就在磁盤中有一個持久化的副本,因此會優先移除持久化的消息。
默認狀況下,當內存達到閾值的 50 % 的時候,就會進行換頁處理。
能夠經過設置 vm_memory_high_watermark_paging_ratio 修改
# 值小於 1, 若是大於 1 就沒有意義了 vm_memory_high_watermark_paging_ratio = 0.6
若是無止境的換頁,也頗有可能會致使耗盡磁盤空間致使服務器崩潰,因此 RabbitMQ 又提供了一個磁盤預警的閾值,當低於這個值的時候就會進行報警,默認是 50MB,能夠經過命令的方式修改
# 固定值 rabbitmqctl set_disk_free_limit <disk_limit> # 百分數 rabbitmqctl set_disk_free_limit memory_limit <fraction>
生產者向 RabbitMQ 中發送消息的時候,可能會由於網絡等種種緣由致使發送失敗,因此 RabbitMQ 提供了一系列保證消息可靠傳遞的機制,能夠大體分爲生產者和消費者兩部分的處理
生產者做爲消息的發送者,須要保證本身的消息發送成功,RabbitMQ 提供了兩種方式來保證這一點。-
生產者發送消息後,會異步等待接收一個 ack 應答,收到返回的 ack 確認消息後,根據 ack是 true 仍是 false,調用 confirmCallback 接口進行處理
spring: rabbitmq: # 發送確認 publisher-confirm-type: correlated
@Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { /** * @param correlationData 相關配置信息 * @param ack exchange交換機 是否成功收到了消息。true 成功,false表明失敗 * @param cause 失敗緣由 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { //接收成功 System.out.println("消息成功發送到交換機"); } else { //接收失敗 System.out.println("消息發送到交換機失敗,失敗緣由: " + cause); // TODO 能夠處理失敗的消息,例如再次發送等等 } } }
@Configuration public class RabbitMqConfig { @Bean() public Queue confirmTestQueue() { return new Queue("confirm_test_queue", true, false, false); } @Bean() public FanoutExchange confirmTestExchange() { return new FanoutExchange("confirm_test_exchange"); } @Bean public Binding confirmTestFanoutExchangeAndQueue() { return BindingBuilder.bind(confirmTestQueue()).to(confirmTestExchange()); } }
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * 注入 RabbitTemplate */ @Autowired /** * 注入 ConfirmCallbackService */ @Autowired private ConfirmCallbackService confirmCallbackService; @Test public void testConfirm() { // 設置確認回調類 rabbitTemplate.setConfirmCallback(confirmCallbackService); // 發送消息 rabbitTemplate.convertAndSend("confirm_test_exchange", "", "ConfirmCallback !"); } }
當 Exchange 發送到 Queue 失敗時,會調用一個 returnsCallback,咱們能夠經過實現這個接口,而後來處理這種失敗的狀況。
spring: rabbitmq: # 發送回調 publisher-returns: true
// public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) 已經屬於過期方法了 @Component public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback { @Override public void returnedMessage(ReturnedMessage returned) { System.out.println(returned); } }
@Configuration public class RabbitMqConfig { @Bean() public Queue returnsTestQueue() { return new Queue("return_test_queue", true, false, false); } @Bean() public DirectExchange returnsTestExchange() { return new DirectExchange("returns_test_exchange"); } @Bean public Binding returnsTestDirectExchangeAndQueue() { return BindingBuilder.bind(returnsTestQueue()).to(returnsTestExchange()).with("info"); } }
@SpringBootTest(classes = RabbitmqSpringbootApplication.class) @RunWith(SpringRunner.class) public class RabbitMqTest { /** * 注入 RabbitTemplate */ @Autowired /** * 注入 ConfirmCallbackService */ @Autowired private ConfirmCallbackService confirmCallbackService; /** * 注入 ReturnCallbackService */ @Autowired private ReturnCallbackService returnCallbackService; @Test public void testReturn() { // 確保消息發送失敗後能夠從新返回到隊列中 rabbitTemplate.setMandatory(true); // 消息投遞到隊列失敗回調處理 rabbitTemplate.setReturnsCallback(returnCallbackService); // 消息投遞確認模式 rabbitTemplate.setConfirmCallback(confirmCallbackService); // 發送消息 rabbitTemplate.convertAndSend("returns_test_exchange", "info", "ReturnsCallback !"); } }
ack 表示收到消息的確認,默認是自動確認,可是它有三種類型
acknowledge-mode 選項介紹
其中自動確認是指,當消息一旦被消費者接收到,則自動確認收到,並把這個消息從隊列中刪除。
可是在實際業務處理中,正確的接收到的消息可能會由於業務上的問題,致使消息沒有正確的被處理,可是若是設置了 手動確認方式,則須要在業務處理成功後,調用channel.basicAck(),手動簽收,若是出現異常,則調用 channel.basicNack()方法,讓其自動從新發送消息。
spring: rabbitmq: listener: simple: # 手動確認 acknowledge-mode: manual
@Component @RabbitListener(queues = "confirm_test_queue") public class TestConsumer { @RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("消息內容: " + new String(message.getBody())); System.out.println("業務出錯的位置:"); int i = 66 / 0; // 手動簽收 deliveryTag標識表明隊列能夠刪除了 channel.basicAck(deliveryTag, true); } catch (Exception e) { // 拒絕簽收 channel.basicNack(deliveryTag, true, true); } } }
因爲這兩個點篇幅也不短,實在不肯草草簡單寫上了事,放到後面單獨的文章編寫,發佈哇。
關於集羣的搭建暫時可參考:https://blog.csdn.net/belongh...