【RabbitMQ】如何進行消息可靠投遞【上篇】

說明

前幾天,忽然發生線上報警,釘釘連發了好幾條消息,一看是RabbitMQ相關的消息,心頭一緊,難道翻車了?java

u=1091165172,1855706818&fm=26&gp=0.jpg

[橙色報警] 應用[xxx]在[08-15 16:36:04]發生[錯誤日誌異常],alertId=[xxx]。由[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620]觸發。
應用xxx 可能緣由以下
服務名爲:
 異常爲:org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620
 產生緣由以下:
1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
||Consumer received fatal=false exception on startup:
...
應用xxx 可能緣由以下
服務名爲:
 異常爲:org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer:run:1160
 產生緣由以下:
1.Stopping container from aborted consumer||Stopping container from aborted consumer:
複製代碼

定睛一看,看樣子像是消費者莫名其妙斷開了鏈接,正逢公司搬家之際,難道是機房又雙叒叕。。。。斷電了?因而趕忙聯繫了運維,諮詢RabbitMQ是否發生了調整。幾分鐘後,獲得了運維的回覆,因爲一些不可描述的緣由,RabbitMQ進行了重啓,emmmm,雖然重啓只持續了10分鐘,可是致使該集羣下全部消費者都掛了,須要將項目重啓後才能正常進行消費。redis

項目重啓後,一切彷佛又正常運轉起來,但好景不長,沒過多久,工單就找上了門來,通過排查,發現是生產者在RabbitMQ重啓期間消息投遞失敗,致使消息丟失,須要手動處理和恢復。spring

因而,我開始思考,如何才能進行RabbitMQ的消息可靠投遞呢?特別是在這樣比較極端的狀況,RabbitMQ集羣不可用的時候,沒法投遞的消息該如何處理呢?sql

可靠投遞

先來講明一個概念,什麼是可靠投遞呢?在RabbitMQ中,一個消息從生產者發送到RabbitMQ服務器,須要經歷這麼幾個步驟:shell

  1. 生產者準備好須要投遞的消息。
  2. 生產者與RabbitMQ服務器創建鏈接。
  3. 生產者發送消息。
  4. RabbitMQ服務器接收到消息,並將其路由到指定隊列。
  5. RabbitMQ服務器發起回調,告知生產者消息發送成功。

所謂可靠投遞,就是確保消息可以百分百從生產者發送到服務器。數據庫

{6582FAF9-A46E-4239-810B-E1D6883ED070}.png.jpg

爲了不爭議,補充說明一下,若是沒有設置Mandatory參數,是不須要先路由消息才發起回調的,服務器收到消息後就會進行回調確認。緩存

二、三、5步都是經過TCP鏈接進行交互,有網絡調用的地方就會有事故,網絡波動隨時都有可能發生,不論是內部機房停電,仍是外部光纜被切,網絡事故沒法預測,雖然這些都是小几率事件,但對於訂單等敏感數據處理來講,這些狀況下致使消息丟失都是不可接受的。服務器

20170716034945131.jpg

RabbitMQ中的消息可靠投遞

默認狀況下,發送消息的操做是不會返回任何信息給生產者的,也就是說,默認狀況下生產者是不知道消息有沒有正確地到達服務器。網絡

那麼如何解決這個問題呢?app

對此,RabbitMQ中有一些相關的解決方案:

  1. 使用事務機制來讓生產者感知消息被成功投遞到服務器。
  2. 經過生產者確認機制實現。

在RabbitMQ中,全部確保消息可靠投遞的機制都會對性能產生必定影響,如使用不當,可能會對吞吐量形成重大影響,只有經過執行性能基準測試,才能在肯定性能與可靠投遞之間的平衡。

在使用可靠投遞前,須要先思考如下問題:

  1. 消息發佈時,保證消息進入隊列的重要性有多高?
  2. 若是消息沒法進行路由,是否應該將該消息返回給發佈者?
  3. 若是消息沒法被路由,是否應該將其發送到其餘地方稍後再從新進行路由?
  4. 若是RabbitMQ服務器崩潰了,是否能夠接受消息丟失?
  5. RabbitMQ在處理新消息時是否應該確認它已經爲發佈者執行了全部請求的路由和持久化?
  6. 消息發佈者是否能夠批量投遞消息?
  7. 在可靠投遞上是否有能夠接受的平衡性?是否能夠接受一部分的不可靠性來提高性能?

只考慮平衡性不考慮性能是不行的,至於這個平衡的度具體如何把握,就要具體狀況具體分析了,好比像訂單數據這樣敏感的信息,對可靠性的要求天然要比通常的業務消息對可靠性的要求高的多,由於訂單數據是跟錢直接相關的,可能會致使直接的經濟損失。

因此不只應該知道有哪些保證消息可靠性的解決方案,還應該知道每種方案對性能的影響程度,以此來進行方案的選擇。

RabbitMQ的事務機制

RabbitMQ是支持AMQP事務機制的,在生產者確認機制以前,事務是確保消息被成功投遞的惟一方法。

在SpringBoot項目中,使用RabbitMQ事務其實很簡單,只須要聲明一個事務管理的Bean,並將RabbitTemplate的事務設置爲true便可。

配置文件以下:

spring:
 rabbitmq:
 host: localhost
 password: guest
 username: guest
 listener:
 type: simple
 simple:
 default-requeue-rejected: false
 acknowledge-mode: manual
複製代碼

先來配置一下交換機和隊列,以及事務管理器。

@Configuration
public class RabbitMQConfig {

    public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue";

    // 聲明業務Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    // 聲明業務隊列
    @Bean("businessQueue")
    public Queue businessQueue(){
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build();
    }

    // 聲明業務隊列綁定關係
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }


    /** * 配置啓用rabbitmq事務 * @param connectionFactory * @return */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
}
複製代碼

而後建立一個消費者,來監聽消息,用以判斷消息是否成功發送。

@Slf4j
@Component
public class BusinessMsgConsumer {


    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void receiveMsg(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到業務消息:{}", msg);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    }
}
複製代碼

而後是消息生產者:

@Slf4j
@Component
public class BusinessMsgProducer{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        rabbitTemplate.setChannelTransacted(true);
    }

    @Transactional
    public void sendMsg(String msg) {
        rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME, "key", msg);
        log.info("msg:{}", msg);
        if (msg != null && msg.contains("exception"))
            throw new RuntimeException("surprise!");
        log.info("消息已發送 {}" ,msg);
    }
}
複製代碼

這裏有兩個注意的地方:

  1. 在初始化方法裏,經過使用rabbitTemplate.setChannelTransacted(true); 來開啓事務。
  2. 在發送消息的方法上加上 @Transactional 註解,這樣在該方法中發生異常時,消息將不會發送。

在controller中加一個接口來生產消息:

@RestController
public class BusinessController {

    @Autowired
    private BusinessMsgProducer producer;

    @RequestMapping("send")
    public void sendMsg(String msg){
        producer.sendMsg(msg);
    }
}
複製代碼

來驗證一下:

msg:1
消息已發送 1
收到業務消息:1
msg:2
消息已發送 2
收到業務消息:2
msg:3
消息已發送 3
收到業務消息:3
msg:exception

Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: surprise!] with root cause

java.lang.RuntimeException: surprise!
	at com.mfrank.rabbitmqdemo.producer.BusinessMsgProducer.sendMsg(BusinessMsgProducer.java:30)
    ...
複製代碼

msg 的值爲 exception 時, 在調用rabbitTemplate.convertAndSend 方法以後,程序拋出了異常,消息並無發送出去,而是被當前事務回滾了。

固然,你能夠將事務管理器註釋掉,或者將初始化方法的開啓事務註釋掉,這樣事務就不會生效,即便在調用了發送消息方法以後,程序發生了異常,消息也會被正常發送和消費。

RabbitMQ中的事務使用起來雖然簡單,可是對性能的影響是不可忽視的,由於每次事務的提交都是阻塞式的等待服務器處理返回結果,而默認模式下,客戶端是不須要等待的,直接發送就完事了,除此以外,事務消息須要比普通消息多4次與服務器的交互,這就意味着會佔用更多的處理時間,因此若是對消息處理速度有較高要求時,儘可能不要採用事務機制。

RabbitMQ的生產者確認機制

RabbitMQ中的生產者確認功能是AMQP規範的加強功能,當生產者發佈給全部隊列的已路由消息被消費者應用程序直接消費時,或者消息被放入隊列並根據須要進行持久化時,一個Basic.Ack請求會被髮送到生產者,若是消息沒法路由,代理服務器將發送一個Basic.Nack RPC請求用於表示失敗。而後由生產者決定該如何處理該消息。

也就是說,經過生產者確認機制,生產者能夠在消息被服務器成功接收時獲得反饋,並有機會處理未被成功接收的消息。

在Springboot中開啓RabbitMQ的生產者確認模式也很簡單,只多了一行配置:

spring:
 rabbitmq:
 host: localhost
 password: guest
 username: guest
 listener:
 type: simple
 simple:
 default-requeue-rejected: false
 acknowledge-mode: manual
 publisher-confirms: true
複製代碼

publisher-confirms: true 即表示開啓生產者確認模式。

而後將消息生產者的表明進行部分修改:

@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
// rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setConfirmCallback(this);
    }

    public void sendCustomMsg(String exchange, String msg) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        log.info("消息id:{}, msg:{}", correlationData.getId(), msg);

        rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (b) {
            log.info("消息確認成功, id:{}", id);
        } else {
            log.error("消息未成功投遞, id:{}, cause:{}", id, s);
        }
    }
}
複製代碼

讓生產者繼承自RabbitTemplate.ConfirmCallback 類,而後實現其confirm 方法,便可用其接收服務器回調。

須要注意的是,在發送消息時,代碼也進行了調整:

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
複製代碼

這裏咱們爲消息設置了消息ID,以便在回調時經過該ID來判斷是對哪一個消息的回調,由於在回調函數中,咱們是沒法直接獲取到消息內容的,因此須要將消息先暫存起來,根據消息的重要程度,能夠考慮使用本地緩存,或者存入Redis中,或者Mysql中,而後在回調時更新其狀態或者從緩存中移除,最後使用定時任務對一段時間內未發送的消息進行從新投遞。

如下是我盜來的圖,原諒我偷懶不想畫了[手動狗頭]:

5b65729e0001439305000294.jpg

另外,還須要注意的是,若是將消息發佈到不存在的交換機上,那麼發佈用的信道將會被RabbitMQ關閉。

此外,生產者確認機制跟事務是不能一塊兒工做的,是事務的輕量級替代方案。由於事務和發佈者確認模式都是須要先跟服務器協商,對信道啓用的一種模式,不能對同一個信道同時使用兩種模式。

在生產者確認模式中,消息的確承認以是異步和批量的,因此相比使用事務,性能會更好。

使用事務機制和生產者確認機制都能確保消息被正確的發送至RabbitMQ,這裏的「正確發送至RabbitMQ」說的是消息成功被交換機接收,但若是找不到能接收該消息的隊列,這條消息也會丟失。至於如何處理那些沒法被投遞到隊列的消息,將會在下篇進行說明。

結題

因此當公司機房「斷電」時,如何處理那些須要發送的消息呢?相信看完上文以後,你的心中已經有了答案。

通常來講,這種「斷電」不會持續較長時間,通常幾分鐘到半小時之間,很快可以恢復,因此若是是重要消息,能夠保存到數據庫中,若是是非重要消息,可使用redis進行保存,固然,還要根據消息的數量級來進行判斷。

若是消息量比較大,能夠考慮將消息發送到另外一個集羣的死信隊列中,事實上,所在公司就有兩個RabbitMQ集羣,因此當一個集羣不可用時,能夠往另外一個集羣發消息,emmm,若是兩個機房都停電了的話,當我沒說。

111.png.jpg
相關文章
相關標籤/搜索