消息中間件-RabbitMq(可靠性方案&死信隊列&監控)

消息中間件-RabbitMq(可靠性方案&死信隊列&監控)

上一章節聊到,他有三個重要的部分,【生產者】、【blocker(rabbit節點】、【消費者】 ,換言之,咱們保障了發送可靠性、存儲可靠性、消費可靠性,也就保證了消息可靠性。下面會出一個消息可靠性的方案,有時候咱們須要對一個超時訂單作處理,咱們可使用rabbit的死信隊列作這個,下面咱們也會聊聊死信隊列,已經對rabbit進行監控。html

可靠性思考&方案

通常消息可靠性分爲三個級別:node

【最多一次(rabbit支持):消息可能會丟失,可是不會重複傳輸。那須要->spring

  •   消費者就須要開啓事務機制或者confirm機制(rabbit提供的,當消息發送到消費者的時候,生產端有會收到異步回調,從而知道正確投遞),以此保障能夠傳遞到mq中
  •        消費者須要備份交換機,肯定消息能夠從交換機路由到隊列中,這樣就可讓消息不丟失
  •   或者經過mandatory屬性(rabbit提供,當消息沒法找到相關的隊列,那就mq就返回沒有投遞成功的消息給生產者)

【最少一次rabbit支持;消息絕對不會丟失,可是可能重複傳輸 那須要->數據庫

  •   無需考慮最多一次所須要考慮的東西,消費者發一次,可是這樣很難保證成功。

【剛好一次rabbit不支持:每條消息會傳輸一次且只傳輸一次json

下面結合給出一個解決方案(這個性能不太好,由於要走下面的流程,並且還要對數據庫進行操做):api

  • 【發送消息的時候】:把發送的消息保存在數據庫中
  • 【當消息發送到rabbit上】:在rabbit上有一個監聽器
  • 【當把消息發送到生產者這裏的時候】:消費者給一個回覆(ack)給mq(mq通知生產者,這樣就知道消息投遞成功)
  • 【更新消息表中的消息狀態 】
    •   這個時候咱們要有一個定時任務去檢查數據庫中的消息狀態(查看是不是發送成功狀態(發送成功會在消費端對數據庫的數據狀態進行修改))
      • 消息狀態爲【未知:把消息拿過來進行重發,而且記錄消息重發的次數(由於不能一直重發,這樣服務器壓力大),同時修改狀態爲定時任務干預,你能夠設置一個閾值,到達必定次數後,改變狀態爲人工干預。
      •  消息狀態爲【人工干預:這個時候就要人工進行處理異常狀態的信息

springBoot中confirm的實現->【生產端服務器

這裏是監聽功能異步

@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

    /*** @param correlationData
     *  相關配置信息
     *  @param ack exchange交換機 是否成功收到了消息。true 成功,false表明失敗
     *  @param cause 失敗緣由 */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack){
            System.out.println("成功發送");
        }else {
            System.out.println("失敗"+cause);
            //這裏進行重發,或者本身的業務處理
        }
    }
}

交換機和隊列進行綁定tcp

/*** 隊列與交換機綁定 */
@Configuration
public class QueueConfig {
    @Bean(name = "confirmTestQueue")
    public Queue confirmTestQueue() {
        return new Queue("confirm_test_queue", true, false, false);
    }


    @Bean(name = "confirmTestExchange")
    public FanoutExchange confirmTestExchange() {
        return new FanoutExchange("confirmTestExchange");
    }

    @Bean
    public Binding confirmTestFanoutExchangeAndQueue(@Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange, @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
        return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
    }
}
View Code

生產者發送消息ide

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootForkJoinApplication.class)
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate; //注入rabbitmq對象
    @Autowired
    private ConfirmCallbackService confirmCallbackService; //注入 ConfirmCallback對象

    @Test
    public void test() {
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        //發送消息
        rabbitTemplate.convertAndSend("dlq_exchange1", "", "hello,ConfirmCallback你好");
    }
}
View Code

自動回調生產者

 springBoot配置文件

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 開啓confirm確認模式
 
 

消費端(進行手動簽收)

@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage {
    @RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {System.out.println("消息內容===>" + new String(message.getBody()));
            //TODO 具體業務邏輯 //手動簽收[參數1:消息投遞序號,參數2:批量簽收]
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            //拒絕簽收[參數1:消息投遞序號,參數2:批量拒絕,參數3:是否從新加入隊列]
            channel.basicNack(deliveryTag, true, true); }
    }

}

死信交換機(DLX【dead-letter-exchange】)

就是,當消息成爲死消息後,能夠被從新發送到另外一個隊列,這個隊列就是死信隊列(他和別的隊列沒有區別),綁定這個隊列的交換機就是DLX,當存在有一個死信消息的時候,rebbit會將這個消息發送到死信交換機上,從而路由到死信隊列上

消息變成死消息通常有這幾種狀況:

  • 消息被拒絕(basic.reject / basic.nack),而且requeue = false
  • 消息TTL過時
  • 隊列達到最大長度

TTL(Time-To-Live):表示的是你發送消息的有效期,通常有兩種設置方式

  • 聲明隊列的時候,在隊列的屬性中設置
  • 發送消息時給消息設置屬性
    •  
  • 死信交換機綁定(對於死信隊列,他是一個正常隊列,咱們按照以前的章節中設置的方法設置就行)

模擬訂單支付流程【使用死信隊列】

 一個正常的支付消息發送到交換機->交換機路由到,支付消息應有的隊列中->這個時候用戶超時尚未進行消費(TTL)->支付消息變成了死信消息->死信消息進入了死信交換機->死信交換機把死信消息路由到死信隊列中->對死信隊列進行監聽->發信息通知用戶,或者修改訂單狀態 

進行監控

在真正的環境中,咱們須要實時瞭解到咱們mq的狀態和狀況,從而保障他能夠完美的提供服務,如下給出3中常見方案:

使用management】:這個通常在小項目中進行使用,【簡單,可是麻煩,由於若是是集羣可能要開多個窗口】

使用他給的api:】 他本身能把數據展現出來到management上,確定調用了http接口,那咱們也能夠這樣幹【須要開發,而且要看相關文檔】,經常使用的api以下:

  • 歸納信息:http://localhost:15672/api/overview
  • channel 列表:http://localhost:15672/api/channels節點信息:http://localhost:15672/api/nodes
  • 交換機信息:http://localhost:15672/api/exchanges
  • 隊列信息:http://localhost:15672/api/queues
  • vhost 列表:http://localhost:15672/api/vhosts

prometheus + grafana 監控rabbitmq

zabbix不一樣,zabbix是主動接受消息,而普羅米修斯(prometheus )是定時對我們的消息進行拉取的,他制定了一些規範【exporter】,各個想讓他監控的中間件都要實現本身的exporter,而且他要求傳遞給他的都是json的數據,

咱們把rabbit提供的普羅米修斯的插件啓動:rabbitmq-plugins enable rabbitmq_prometheus

同時開啓端口:firewall-cmd --zone=public --add-port=15692/tcp --permanent

訪問普羅米修斯能夠認識的數據:http://你的ip:15692/metrics

剩下的就交給普羅米修斯了

grafana 中搜索相關的模板進行導入(https://grafana.com/grafana/dashboards?search=rabbitmq)

 

 最終效果(中間的過程太繁瑣了,我們只是聊聊這個東西,具體能夠參考別的博文。)

 這裏是官方對Prometheus集成獲取指標的文檔 https://www.rabbitmq.com/prometheus.html

常見問題

【延遲隊列】

實際上仍是能夠用上面的死信隊列的流程進行實現,設置隊列過時時間給你想要的延遲時間,當延遲時間事後,進入死信隊列,這個時候執行真正的邏輯

【消息積壓】

前面說到當內存到必定程度(0.4)的時候,rabbit不會接受消息,咱們說過能夠經過配置文件對這個數據臨時增大,這個時候rabbit運行正常,咱們就能夠增長消費者對數據進行消費,這樣就能夠解決積壓問題。

【不重複消費】:

每一個消費者給一個惟一id,使用id進行判斷

 

相關文章
相關標籤/搜索